# Conexión y Extracción de datos desde una API pública de Finanzas

In [1]:
import requests
import json
import os
from dotenv import load_dotenv

def download_data(api_url,params):
    response = requests.get(api_url,params=params)
    
    if response.status_code == 200:
        try:
            data = json.loads(response.text)
            errors = [(value['code'],value['message'],value['status']) for key, value in data.items() if 'code' in value and value['code'] in [400, 401, 403, 404, 414, 429, 500]]
            if errors:
                for error in errors:
                    code, message, status = error
                    print(f"{code}\n{message}\n{status}\n")
                return None
            else:
                print("Los datos se han extraído correctamente!")
                return data
        except json.JSONDecodeError as e:
            print(f"JSONDecodeError: {e}")
    else:
        print(f"Request failed with status code: {response.status_code}")
    
    return None

load_dotenv()

base_url = 'https://api.twelvedata.com' 
endpoint = '/time_series' 
params = {
    'symbol': 'AAPL,AMZN,TSLA,META,MSFT,GOOG,SPY,QQQ',
    'interval': '1day',
    'start_date': '2024-01-01',
    'apikey': os.getenv('APIKEY')
}

api_url = base_url + endpoint

data = download_data(api_url,params)

Los datos se han extraído correctamente!


# Obtención del DataFrame y Transformación de datos

In [2]:
import pandas as pd
from datetime import datetime

# Transformación de datos
for key, value in data.items():
    for val in value['values']:
        if 'datetime' in val:
            val['datetime'] = datetime.strptime(val['datetime'], "%Y-%m-%d")
        if 'open' in val:
            val['open'] = round(float(val['open']), 3)
        if 'high' in val:
            val['high'] = round(float(val['high']), 3)
        if 'low' in val:
            val['low'] = round(float(val['low']), 3)
        if 'close' in val:
            val['close'] = round(float(val['close']), 3)
        if 'volume' in val:
            val['volume'] = int(val['volume'])

# Creación del dataframe a partir de una lista de compresión
df = pd.DataFrame([
    {
        'symbol': value['meta']['symbol'],
        'currency': value['meta']['currency'],
        'exchange_timezone': value['meta']['exchange_timezone'],
        'exchange': value['meta']['exchange'],
        'mic_code': value['meta']['mic_code'],
        'type': value['meta']['type'],
        **val,
        'datetime_load': pd.Timestamp.now().round("1s"),
    }
    for key, value in data.items() 
        for val in value['values']
])

df = df.rename(columns={'open': 'open_value', 'high': 'high_value', 'low': 'low_value', 'close': 'close_value'})

df

Unnamed: 0,symbol,currency,exchange_timezone,exchange,mic_code,type,datetime,open_value,high_value,low_value,close_value,volume,datetime_load
0,AAPL,USD,America/New_York,NASDAQ,XNGS,Common Stock,2024-04-15,175.38,176.63,172.50,172.69,73096917,2024-04-15 18:28:31
1,AAPL,USD,America/New_York,NASDAQ,XNGS,Common Stock,2024-04-12,174.26,178.36,174.21,176.55,101593300,2024-04-15 18:28:31
2,AAPL,USD,America/New_York,NASDAQ,XNGS,Common Stock,2024-04-11,168.34,175.46,168.16,175.04,91070300,2024-04-15 18:28:31
3,AAPL,USD,America/New_York,NASDAQ,XNGS,Common Stock,2024-04-10,168.80,169.09,167.11,167.78,49709300,2024-04-15 18:28:31
4,AAPL,USD,America/New_York,NASDAQ,XNGS,Common Stock,2024-04-09,168.70,170.08,168.35,169.67,42451200,2024-04-15 18:28:31
...,...,...,...,...,...,...,...,...,...,...,...,...,...
571,QQQ,USD,America/New_York,NASDAQ,XNMS,ETF,2024-01-08,397.99,405.24,397.84,404.95,42473800,2024-04-15 18:28:31
572,QQQ,USD,America/New_York,NASDAQ,XNMS,ETF,2024-01-05,396.45,399.56,395.34,396.75,44867900,2024-04-15 18:28:31
573,QQQ,USD,America/New_York,NASDAQ,XNMS,ETF,2024-01-04,396.44,399.59,396.06,396.28,39432800,2024-04-15 18:28:31
574,QQQ,USD,America/New_York,NASDAQ,XNMS,ETF,2024-01-03,399.93,401.00,397.89,398.33,47002800,2024-04-15 18:28:31


# Exportación de datos a un archivo CSV

In [3]:
df.to_csv('finances.csv', index=False)

# Conexión a la Base de Datos en Amazon Redshift

In [4]:
import psycopg2

load_dotenv()

try:
    conn = psycopg2.connect(
        host = 'data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com',
        dbname = 'data-engineer-database',
        user = 'serenadituro_coderhouse',
        password = os.getenv('AR_PASSWORD'),
        port = '5439'
    )
    print("Se ha establecido la conexión con Amazon Redshift de manera exitosa!\n")
except Exception as e:
    print(f"No es posible establecer la conexión con Amazon Redshift\nError: {e}\n")

Se ha establecido la conexión con Amazon Redshift de manera exitosa!



# Creación de Tabla

In [5]:
try:
    with conn.cursor() as cur:
        create_table = f''' CREATE TABLE IF NOT EXISTS serenadituro_coderhouse.finances (
                    symbol VARCHAR(10) NOT NULL,
                    currency VARCHAR(30) NOT NULL,
                    exchange_timezone VARCHAR(50) NOT NULL,
                    exchange VARCHAR(20) NOT NULL,
                    mic_code VARCHAR(10) NOT NULL,
                    type VARCHAR(30) NOT NULL,
                    datetime DATE NOT NULL,
                    open_value FLOAT NOT NULL,
                    high_value FLOAT NOT NULL,
                    low_value FLOAT NOT NULL,
                    close_value FLOAT NOT NULL,
                    volume INT NOT NULL,
                    datetime_load DATETIME NOT NULL,
                    PRIMARY KEY(symbol,datetime)
                )'''
        cur.execute(create_table)
        conn.commit()
        print(f'Operación realizada con éxito!\n')
except Exception as e:
    print(f'Error al crear la tabla\n{e}\n')

Operación realizada con éxito!



# Carga de datos a la tabla desde un archivo CSV

In [6]:
import csv

try:
    with open('finances.csv', 'r') as csv_file:
        csv_reader = csv.reader(csv_file)
        try: 
            with conn.cursor() as cur:
                next(csv_reader) # para saltear el encabezado
                for row in csv_reader:
                    # verifico si el dato fue previamente insertado en la tabla
                    select_data = f'''SELECT COUNT(*) FROM serenadituro_coderhouse.finances WHERE symbol = %s AND datetime = %s'''
                    cur.execute(select_data, (row[0], row[6]))
                    result = cur.fetchone()
                    if result[0] == 0:  # si no existe un dato con el mismo símbolo y fecha asociada, se inserta en la tabla
                        insert_data = f'''INSERT INTO serenadituro_coderhouse.finances 
                                        (symbol,currency,exchange_timezone,exchange,mic_code,type,datetime,open_value,high_value,low_value,close_value,volume,datetime_load) 
                                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'''
                        data = (row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12])
                        cur.execute(insert_data, data)
            conn.commit()
            print(f'Los datos se han insertado correctamente!\n')
        except Exception as e:
            print(f'Error al insertar los datos\n{e}\n')
except FileNotFoundError:
    print("No se encontró el archivo")


Los datos se han insertado correctamente!



# Cierre de la conexión a la Base de Datos

In [7]:
conn.close()