In [1]:
# Importo librerias

import requests
import pandas as pd
import os
from configparser import ConfigParser
import datetime
import importlib
from time import sleep
import logging
import warnings

# Nombre de la biblioteca que deseo instalar
lib_name = 'sqlalchemy'

# Verifica si la biblioteca ya está instalada
try:
    importlib.import_module(lib_name)
    print(f"{lib_name} ya está instalada.")
except ImportError:
    print(f"{lib_name} no está instalada. Instalando...")
    %pip install -q "redshift_connector[full]" sqlalchemy-redshift

import sqlalchemy as sa

# Desactivar todos los warnings temporalmente
warnings.filterwarnings("ignore")


sqlalchemy ya está instalada.


In [2]:
# Seteo el path del archivo de configuracion
base_dir = "/Users/Bubu/Documents/REPOS/DE-CODER"
os.chdir(base_dir)

In [3]:
# Leo el archivo de configuracion
config = ConfigParser()
config_dir = "config/config.ini"
config.read(config_dir)

['config/config.ini']

### Obtengo información de las APIs

In [4]:
# Obtenego datos de Tickers por pais de : https://stockanalysis.com 
url_api_country = "https://stockanalysis.com/api/screener/s/d/country.json"

r1 = requests.get(url_api_country)

if r1.status_code == 200:
    print('Request successful')
else:
    print(r1.status_code, r1.content)

Request successful


In [5]:
# Genero un dataframe con los datos de los tickers por pais
tickers_countries = pd.DataFrame(r1.json()['data']['data'])

In [6]:
# Selecciono columnas del dataset tickers_countries
tickers_countries=tickers_countries[[0,1]]
tickers_countries.rename(columns={0:'Ticker',1:'Country'}, inplace=True)

In [7]:
tickers_countries.head()

Unnamed: 0,Ticker,Country
0,AAPL,United States
1,MSFT,United States
2,GOOG,United States
3,GOOGL,United States
4,AMZN,United States


In [8]:
# Genero una nueva primary key para la tabla tickers_countries basada en el ticker

tickers_countries['Ticker_id'] = tickers_countries['Ticker'].astype('category').cat.codes


In [9]:
tickers_countries.head()

Unnamed: 0,Ticker,Country,Ticker_id
0,AAPL,United States,14
1,MSFT,United States,3703
2,GOOG,United States,2458
3,GOOGL,United States,2459
4,AMZN,United States,329


In [10]:
tickers_countries[tickers_countries['Country']=='Argentina'][['Ticker','Ticker_id']]

Unnamed: 0,Ticker,Ticker_id
751,YPF,6117
1138,BMA,813
1160,PAM,4254
1227,GGAL,2362
1311,TGS,5434
1351,TEO,5412
1719,CEPU,1116
1826,BBAR,634
1955,LOMA,3339
2238,IRS,2952


In [11]:
# Selecciono solo los tickers de Argentina 

tickers_argentina=tickers_countries[tickers_countries['Country']=='Argentina']['Ticker']
tickers_argentina

751       YPF
1138      BMA
1160      PAM
1227     GGAL
1311      TGS
1351      TEO
1719     CEPU
1826     BBAR
1955     LOMA
2238      IRS
2278    CRESY
2296      EDN
2723     BIOX
3051     DESP
3224     SUPV
Name: Ticker, dtype: object

In [12]:
# Ahora obtengo los datos de cotización historica de los tickers de Argentina de polygon.io
# Obtuve el token de acceso a la API de polygon.io y lo guardé en config.ini

api_key = config['credenciales_api']['api_key']

In [13]:
# Armamos la url con el endpoint y especificando las credenciales como parámetros
url_base = "https://api.polygon.io"
endpoint = "v2/aggs/ticker"

# params de fecha para iteración
start='2023-01-01'
end='2023-11-14'
data_dict = []

for ticker in tickers_argentina:
    success = False
    while not success:
        url = f"{url_base}/{endpoint}/{ticker}/range/1/day/{start}/{end}?adjusted=true&sort=asc&apiKey={api_key}"
        r = requests.get(url)
        if r.status_code == 200:
            temp = [r.json()]
            data_dict.extend(temp)
            logging.info(f"Datos obtenidos con éxito para {ticker}")
            success = True
        else:
            logging.warning(f"Error {r.status_code} en {ticker}. Esperando 60 segundos antes de volver a intentar...")
            sleep(60)



In [14]:
# Genero un dataframe con los datos de cotización historica de los tickers de Argentina
df = pd.json_normalize(data_dict, record_path =['results'], meta=['ticker'])

In [15]:
df.ticker.unique() # Verifico que se hayan obtenido datos de todos los tickers

array(['YPF', 'BMA', 'PAM', 'GGAL', 'TGS', 'TEO', 'CEPU', 'BBAR', 'LOMA',
       'IRS', 'CRESY', 'EDN', 'BIOX', 'DESP', 'SUPV'], dtype=object)

In [16]:
# Transformo la columna timestamp a formato fecha y reordeno las columnas
df['timestamp'] = pd.to_datetime(df['t'], unit='ms')
df = df[['timestamp', 'ticker', 'v', 'vw', 'o', 'c', 'h', 'l']]
df.head()

Unnamed: 0,timestamp,ticker,v,vw,o,c,h,l
0,2023-01-03 05:00:00,YPF,3595097.0,8.9449,9.23,8.71,9.5,8.64
1,2023-01-04 05:00:00,YPF,1694038.0,8.683,8.7,8.71,8.81,8.5
2,2023-01-05 05:00:00,YPF,2467675.0,9.1565,8.71,9.35,9.36,8.68
3,2023-01-06 05:00:00,YPF,2349927.0,9.564,9.54,9.45,9.72,9.42
4,2023-01-09 05:00:00,YPF,2298850.0,9.6229,9.55,9.75,9.77,9.355


In [17]:
# Crear una nueva columna en df de identificación única basada en el ticker y la fecha

df['id_quote'] = df['ticker'] + '_' + df['timestamp'].dt.strftime('%Y-%m-%d')


In [18]:
df.head()

Unnamed: 0,timestamp,ticker,v,vw,o,c,h,l,id_quote
0,2023-01-03 05:00:00,YPF,3595097.0,8.9449,9.23,8.71,9.5,8.64,YPF_2023-01-03
1,2023-01-04 05:00:00,YPF,1694038.0,8.683,8.7,8.71,8.81,8.5,YPF_2023-01-04
2,2023-01-05 05:00:00,YPF,2467675.0,9.1565,8.71,9.35,9.36,8.68,YPF_2023-01-05
3,2023-01-06 05:00:00,YPF,2349927.0,9.564,9.54,9.45,9.72,9.42,YPF_2023-01-06
4,2023-01-09 05:00:00,YPF,2298850.0,9.6229,9.55,9.75,9.77,9.355,YPF_2023-01-09


In [19]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3270 entries, 0 to 3269
Data columns (total 9 columns):
 #   Column     Non-Null Count  Dtype         
---  ------     --------------  -----         
 0   timestamp  3270 non-null   datetime64[ns]
 1   ticker     3270 non-null   object        
 2   v          3270 non-null   float64       
 3   vw         3270 non-null   float64       
 4   o          3270 non-null   float64       
 5   c          3270 non-null   float64       
 6   h          3270 non-null   float64       
 7   l          3270 non-null   float64       
 8   id_quote   3270 non-null   object        
dtypes: datetime64[ns](1), float64(6), object(2)
memory usage: 230.0+ KB


### Levanto los datos en Redshift

In [20]:
# Defino la cadena de conexión

def build_conn_string(config_path, config_section):
    """
    Construye la cadena de conexión a la base de datos
    a partir de un archivo de configuración.
    """

    # Lee el archivo de configuración
    config = ConfigParser()
    config.read(config_dir)

    # Lee la sección de configuración de PostgreSQL
    config = config[config_section]
    host = config['host']
    port = config['port']
    database = config['database']
    username = config['username']
    password = config['password']

    # Construye la cadena de conexión
    conn_string = f'postgresql://{username}:{password}@{host}:{port}/{database}?sslmode=require'
    
    return conn_string

In [21]:
# Defino la función para conectarme a la base de datos

def connect_to_db(conn_string):
    """
    Crea una conexión a la base de datos.
    """
    engine = sa.create_engine(conn_string)
    conn = engine.connect()
    return conn, engine

In [22]:
# Creo la conexión a la base de datos

conn_str = build_conn_string('config.ini', 'Credenciales_Redshift')
conn_str


'postgresql://florenciaortega_coderhouse:3jBQ7CMr7j@data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com:5439/data-engineer-database?sslmode=require'

In [23]:
#Conecto a la base de datos

conn, engine = connect_to_db(conn_str)

In [24]:
# Crea la tabla en Redshift

schema = "florenciaortega_coderhouse"

conn.execute(
    f"""
        CREATE TABLE IF NOT EXISTS {schema}.fct_tickers_argentina (
            Date TIMESTAMP,
            Ticker VARCHAR(10) distkey,
            Volume INT,
            Vol_weighted FLOAT,
            Open_price FLOAT,
            Close_price FLOAT,
            High_price FLOAT,
            Low_price FLOAT,
            Id_quote VARCHAR(50),
            primary key (Id_quote)
        )
        
        sortkey(Date, Id_quote, ticker);

        CREATE TABLE IF NOT EXISTS {schema}.dim_tickers (
            Ticker VARCHAR(10) distkey,
            Country VARCHAR(10),
            Ticker_id INT,
            primary key (Ticker_id)
        )
        
        sortkey(Ticker_id);

        CREATE TABLE IF NOT EXISTS {schema}.dim_fechas (
            Date TIMESTAMP distkey,
            Year INT,
            Month INT,
            Day INT,
            Weekday INT,
            Quarter INT,
            Semester INT,
            Day_of_year INT,
            primary key (Date)
        )
        
        sortkey(Date);

    """
)

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x11c7b6190>

In [25]:
# Mapeo los nombres de las columnas con las que quiero que se suban a la base de datos

column_mapping = {
    'timestamp': 'Date',
    'ticker': 'Ticker',
    'v': 'Volume',
    'vw': 'Vol_weighted',
    'o': 'Open_price',
    'c': 'Close_price',
    'h': 'High_price',
    'l': 'Low_price',
    'id_quote': 'Id_quote'
}

# Renombro las columnas del dataframe

df.rename(columns=column_mapping, inplace=True)


In [26]:
# Genero un dataset de periodos de tiempo para subir como tabla dimensional de tiempos

# Defino la función para generar un dataset de periodos de tiempo

def generate_date_range(start_date, end_date, freq='D'):
    """
    Genera un dataset de periodos de tiempo.
    """
    date_range = pd.date_range(start=start_date, end=end_date, freq=freq)
    date_range = pd.DataFrame(date_range).rename(columns={0: 'Date'})
    date_range['Date'] = pd.to_datetime(date_range['Date'])
    date_range['Year'] = date_range['Date'].dt.year
    date_range['Month'] = date_range['Date'].dt.month
    date_range['Day'] = date_range['Date'].dt.day
    date_range['Weekday'] = date_range['Date'].dt.weekday
    date_range['Quarter'] = date_range['Date'].dt.quarter
    date_range['Semester'] = date_range['Quarter'].apply(lambda x: 1 if x <= 2 else 2)
    date_range['Day_of_year'] = date_range['Date'].dt.dayofyear

    return date_range   

In [27]:
dim_fechas = generate_date_range('2023-01-01', '2024-12-01', freq='D')
dim_fechas.head()

Unnamed: 0,Date,Year,Month,Day,Weekday,Quarter,Semester,Day_of_year
0,2023-01-01,2023,1,1,6,1,1,1
1,2023-01-02,2023,1,2,0,1,1,2
2,2023-01-03,2023,1,3,1,1,1,3
3,2023-01-04,2023,1,4,2,1,1,4
4,2023-01-05,2023,1,5,3,1,1,5


In [28]:
# Subo los dataframes a la base de datos Redshift

def load_table(df, table_name, conn, schema, if_exists='replace', method='multi', index=False):
    """
    Sube un dataframe a una tabla de Redshift.
    """
    # Verificar si la tabla ya existe en la base de datos
    if if_exists == 'replace' or (if_exists == 'append' and not table_exists(conn, table_name, schema)):
        # Cargar el DataFrame en la tabla
        df.to_sql(
            name=table_name,
            con=conn,
            schema=schema,
            if_exists=if_exists,
            method=method,
            index=index
        )
        print(f'DataFrame cargado en la tabla {schema}.{table_name}.')
    else:
        # Verificar la fecha máxima antes de cargar datos si la opción es 'append'
        if if_exists == 'append':
            # Obtener la fecha máxima de la tabla existente en la base de datos
            existing_max_date = pd.read_sql(
                f"""
                    SELECT MAX(Date) AS max_date
                    FROM {schema}.{table_name}
                """,
                conn
            )['max_date'][0]

            # Obtener la fecha máxima del DataFrame
            df_max_date = df['Date'].max()

            # Verificar si la fecha máxima de la tabla existente es menor que la del DataFrame
            if existing_max_date is None or existing_max_date < df_max_date:
                # Cargar datos solo si la fecha máxima es menor
                df.to_sql(
                    name=table_name,
                    con=conn,
                    schema=schema,
                    if_exists='append',
                    method=method,
                    index=index
                )
                print(f'DataFrame cargado en la tabla {schema}.{table_name}.')
            else:
                print(f'No se ha realizado ninguna carga en la tabla {schema}.{table_name}. La fecha máxima en la base de datos ({existing_max_date}) es mayor o igual a la fecha máxima en el DataFrame ({df_max_date}).')

def table_exists(conn, table_name, schema):
    """
    Verifica si una tabla existe en la base de datos.
    """
    query = f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = '{schema}' AND table_name = '{table_name}';"
    result = conn.execute(query).scalar()
    return result > 0


In [29]:
# Subo los dataframes a la base de datos Redshift

load_table(df, 'fct_tickers_argentina', conn, schema, if_exists='append', method='multi', index=False)
load_table(tickers_countries, 'dim_tickers', conn, schema, if_exists='replace', method='multi', index=False)
load_table(dim_fechas, 'dim_fechas', conn, schema, if_exists='replace', method='multi', index=False)

    

DataFrame cargado en la tabla florenciaortega_coderhouse.fct_tickers_argentina.
DataFrame cargado en la tabla florenciaortega_coderhouse.dim_tickers.
DataFrame cargado en la tabla florenciaortega_coderhouse.dim_fechas.


In [30]:
# Verifico que se hayan subido los datos a la base de datos

tables = ['fct_tickers_argentina', 'dim_tickers', 'dim_fechas']

for i in tables:
    table_exists(conn, i, schema)
    if table_exists(conn, i, schema):     
        #conteo registros por tabla
        print(f"La tabla {i} existe y tiene {pd.read_sql(f'select count(*) from {schema}.{i}', conn)['count'][0]} registros")
    else:
        print(f"La tabla {i} no existe")

La tabla fct_tickers_argentina existe y tiene 3270 registros
La tabla dim_tickers existe y tiene 6169 registros
La tabla dim_fechas existe y tiene 701 registros
