In [1]:
import requests
import pandas as pd

def api_bcra(token, endpoints):
    df = pd.DataFrame(columns=['fecha', 'valor', 'variable'])
    headers = {"Authorization": token}

    for endpoint in endpoints:
        url = "https://api.estadisticasbcra.com/" + endpoint
        response = requests.get(url, headers=headers)
        
        # Comprobar si la solicitud fue exitosa
        if response.status_code == 200:
            data_json = response.json()
            data = pd.DataFrame(data_json)
            data = data.rename(columns={'d': 'fecha', 'v': 'valor'})
            data['variable'] = endpoint
            df = pd.concat([df, data], ignore_index=True)
        else:
            # Si hubo un error en la solicitud, imprimir el código de estado
            print(f"Error {response.status_code} al obtener datos del endpoint: {endpoint}")
    
    return df

  from pandas.core.computation.check import NUMEXPR_INSTALLED


In [None]:
import pandas as pd

def transform_data(df: pd.DataFrame) -> pd.DataFrame:
    df = df[df["variable"] != 'milestones']
    df = df.drop(['e', 't'], axis=1)
    df = df.pivot(index='fecha', columns='variable', values='valor')
    df = df.reset_index()
    df['fecha'] = pd.to_datetime(df['fecha'])
    df2 = df[df["fecha"] >= '2024-01-01']
    return df2

In [None]:
import pandas as pd
import logging


from sqlalchemy import create_engine

logging.basicConfig(
    filename='app.log',
    filemode='a',
    format='%(asctime)s ::DataConnectionModule-> %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO)
    
class DataConn:
    def __init__(self, config: dict,schema: str):
        self.config = config
        self.schema = schema
        self.db_engine = None


    def get_conn(self):
        username = self.config.get('REDSHIFT_USERNAME')
        password = self.config.get('REDSHIFT_PASSWORD')
        host = self.config.get('REDSHIFT_HOST')
        port = self.config.get('REDSHIFT_PORT', '5439')
        dbname = self.config.get('REDSHIFT_DBNAME')

        # Construct the connection URL
        connection_url = f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{dbname}"
        self.db_engine = create_engine(connection_url)

        try:
            with self.db_engine.connect() as connection:
                result = connection.execute('SELECT 1;')
            if result:
                logging.info("Connection created")
                return
        except Exception as e:
            logging.error(f"Failed to create connection: {e}")
            raise
    
    def check_table_exists(self, table_name:str) -> bool:
        with self.db_engine.connect() as connection:
            cursor = connection.cursor
            query_checker = f"""
                SELECT 1 FROM information_schema.tables 
                WHERE  table_schema = 'andru_ocatorres_coderhouse'
                AND    table_name   = '{table_name}';              
            """
            cursor.execute(query_checker)
            
            if not cursor.fetchone():
                logging.error(f"No {table_name} has been created")
                raise ValueError(f"No {table_name} has been created")

            logging.info(f"{table_name} exists")

    def upload_data(self, data: pd.DataFrame, table: str):
        if self.db_engine is None:
            logging.warn("Execute it before")
            self.get_conn()

        try:
            data.to_sql(
                table,
                con=self.db_engine,
                schema=self.schema,
                if_exists='append',
                index=False
            )

            logging.info(f"Data from the DataFrame has been uploaded to the {self.schema}.{table} table in Redshift.")
        except Exception as e:
            logging.error(f"Failed to upload data to {self.schema}.{table}:\n{e}")
            raise

    def close_conn(self):
        if self.db_engine:
            self.db_engine.dispose()
            logging.info("Connection to Redshift closed.")
        else:
            logging.warning("No active connection to close.")

In [None]:

import os
import logging
from dotenv import load_dotenv
from modules import api_bcra , transform_data , DataConn


logging.basicConfig(
    filename='app.log',
    filemode='a',
    format='%(asctime)s ::MainModule-> %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO)

load_dotenv()


def main():
    user_credentials = {
        "REDSHIFT_USERNAME" : os.getenv('REDSHIFT_USERNAME'),
        "REDSHIFT_PASSWORD" : os.getenv('REDSHIFT_PASSWORD'),
        "REDSHIFT_HOST" : os.getenv('REDSHIFT_HOST'),
        "REDSHIFT_PORT" : os.getenv('REDSHIFT_PORT', '5439'),
        "REDSHIFT_DBNAME" : os.getenv('REDSHIFT_DBNAME')
    }

    schema:str = "marcoscervera_coderhouse"
    table:str = "prueba"
    token:str = "BEARER eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NDg3Mjg4NDEsInR5cGUiOiJleHRlcm5hbCIsInVzZXIiOiJtYXJjb3MuY2VydmVyYUBncnVwb3NhbmNyaXN0b2JhbC5jb20ifQ.7BIMFrn8dExn-Vyq8KS275NXlpn3mtOnxWnZowEGrPjBN1b-aYfgW1baMV_-1q0pLmuTmG7K4kPqHnXcZvYdZg"
    endpoints = ['milestones','usd','usd_of','usd_of_minorista','base','reservas','circulacion_monetaria',
             'depositos','cuentas_corrientes','cajas_ahorro','plazo_fijo','cer','uva','inflacion_mensual_oficial',
             'inflacion_interanual_oficial']

    data_conn = DataConn(user_credentials, schema)
    df = api_bcra(token, endpoints)
    df_bcra = transform_data(df)
    
    try:
        data_conn.upload_data(df_bcra, 'prueba')
        logging.info(f"Data uploaded to -> {schema}.{table}")

    except Exception as e:
        logging.error(f"Not able to upload data\n{e}")
        
    finally:
        data_conn.close_conn()

if __name__ == "__main__":
    main()   


