In [None]:
#import libraries
import requests
import pandas as pd
import os
import json
import numpy as np
from datetime import datetime, timedelta
from datetime import date
from dotenv import load_dotenv
from configparser import ConfigParser
from sqlalchemy import create_engine, text
import psycopg2

In [None]:
# configuracion de pandas para visualizar todas las filas y columnas de un dataframe
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)


In [None]:
# Cargar las variables de entorno desde el archivo variables_entorno.env
load_dotenv("variables_entorno.env")

# Obtener la clave de API desde las variables de entorno
API_KEY_ALPHA = os.getenv("ALPHA_VANTAGE_API_KEY")

# Verificar que la clave de API se hayan cargado correctamente
if not all([API_KEY_ALPHA]):
    raise ValueError("Asegúrate que la API key este en el archivo .env")

In [None]:
# configuracion parametros para mi api 
# valores por defecto para extraccion de datos
simbolo = "IBM"
intervalo = "daily" 
today = str(datetime.today().strftime("%d-%m-%Y"))


In [None]:
def read_metadata():
# Leo el archivo de metadata
    try:
        with open('metadata.json', 'r') as file:
            metadata = json.load(file)
        # Obtener la fecha de la última extracción
        return (metadata['last_extraction_date'])
    except:
        # retorno una fecha dump que permita la ingesta de datos
        # sabiendo que no existe un archivo de metadata
        return ("01-01-1900")


In [None]:
def save_metadata(df, output_path):
# Creo un archivo con metadata cada vez que hago una ingesta
    df = df.dropna()
    last_extraction_date = pd.to_datetime(df.iloc[0]["Meta Data.3. Last Refreshed"])
    last_extraction_date = last_extraction_date.strftime("%Y-%m-%d")
    metadata = {
    "last_extraction_date": last_extraction_date
    # aca hay un casteo implicito a string para poder guardar el json
    }

# Guardo el archivo como metadata.json
    with open(f'{output_path}/metadata.json', 'w') as file:
        json.dump(metadata, file, indent=4)

In [None]:
def get_data(tipo_extraccion, simbolo=None,intervalo=None,data_field=None, params=None, headers=None):
    """
    Realiza una solicitud GET a una API para obtener datos.

    Parámetros:
    base_url (str): La URL base de la API.
    endpoint (str): El endpoint de la API al que se realizará la solicitud.
    data_field (str): Atribudo del json de respuesta donde estará la lista
    de objetos con los datos que requerimos
    params (dict): Parámetros de consulta para enviar con la solicitud.
    headers (dict): Encabezados para enviar con la solicitud.

    Retorna:
    dict: Los datos obtenidos de la API en formato JSON.
    """
    # Endpoint para extraccion full
    url_e_full = f'https://www.alphavantage.co/query?function=OVERVIEW&symbol={simbolo}&apikey={API_KEY_ALPHA}'
    # Endpoint para extraccion incremental
    url_e_inc = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={simbolo}&interval={intervalo}&apikey={API_KEY_ALPHA}'
    today = (datetime.now().strftime('%Y-%m-%d'))
    if tipo_extraccion == "full":
        try:
            url = url_e_full
        #endpoint_url = f"{base_url}/{endpoint}"
            response = requests.get(url, params=params, headers=headers)
            response.raise_for_status()  # Levanta una excepción si hay un error en la respuesta HTTP.

        # Verificar si los datos están en formato JSON.
            try:
                data = response.json()
                if data_field:
                    data = data[data_field]
            except:
                print("El formato de respuesta no es el esperado")
                return None
            return (data, tipo_extraccion)
        except requests.exceptions.RequestException as e:
        # Capturar cualquier error de solicitud, como errores HTTP.
            print(f"La petición ha fallado. Código de error : {e}")
            return None
    elif tipo_extraccion == "incremental":
        url = url_e_inc
        ultimo_dato_extraido = read_metadata()
        if ultimo_dato_extraido < today:
            print("Extrayendo los datos de hoy")
            try:
                response = requests.get(url, params=params, headers=headers)
                response.raise_for_status()  # Levanta una excepción si hay un error en la respuesta HTTP.

            # Verificar si los datos están en formato JSON.
                try:
                    data = response.json()
                    if data_field:
                        data = data[data_field]
                except:
                    print("El formato de respuesta no es el esperado")
                    return None
                return (data , tipo_extraccion)
                # Verificar si los datos están en formato JSON.
            except requests.exceptions.RequestException as e:
            # Capturar cualquier error de solicitud, como errores HTTP.
                print(f"La petición ha fallado. Código de error : {e}")
                return None
        else:
            print("no hay nuevos datos para extraer")



In [None]:
def build_table(json_data, tipo_extraccion, record_path=None):
    """
    Construye un DataFrame de pandas a partir de datos en formato JSON.

    Parámetros:
    json_data (dict): Los datos en formato JSON obtenidos de una API.

    Retorna:
    DataFrame: Un DataFrame de pandas que contiene los datos. Y unas columnas añadidas para 
    controlar la fecha de extraccion de los datos, en caso de extraccion incremental
    """
    try:
        df = pd.json_normalize(
            json_data,
            record_path)
        if (tipo_extraccion == "incremental"):
             df["year"] = (pd.to_datetime(df["Meta Data.3. Last Refreshed"]).dt.strftime("%Y"))
             df["month"] = (pd.to_datetime(df["Meta Data.3. Last Refreshed"]).dt.strftime("%m"))
             df["day"] = (pd.to_datetime(df["Meta Data.3. Last Refreshed"]).dt.strftime("%d"))
             print(df)
             return df
        elif (tipo_extraccion == "full"):
            return df
    except:
        print("Los datos no están en el formato esperado")
        return None

In [None]:
def save_to_parquet(df, output_path, partition_cols=None):
    """
    Recibe un dataframe, se recomienda que haya sido convertido a un formato tabular,
    y lo guarda en formato parquet.

    Parametros:
    df (pd.DataFrame). Dataframe a guardar.
    output_path (str). Path + nombre del archivo
    partition_cols (list o str). Columna/s por las cuales particionar los datos.
    """
    
    # Crear el directorio si no existe
    directory = os.path.dirname(output_path)
    if directory and not os.path.exists(directory):
        os.makedirs(directory)

    df.to_parquet(
        output_path,
        engine="fastparquet",
        partition_cols=partition_cols
        )

In [None]:
def extraer_datos(tipo_extraccion,simbolo:None,intervalo:None):
                  partition_cols = ["year","month","day"]
                  if tipo_extraccion == "incremental":
                      output_path = "raw_" + simbolo
                      read_metadata()
                      json = get_data(tipo_extraccion,simbolo,intervalo)
                      df = build_table(json,tipo_extraccion)
                      save_to_parquet(df,output_path,partition_cols)
                      save_metadata(df,output_path)
                  elif tipo_extraccion == "full":
                        output_path = "informacion_general_" + simbolo
                        json = get_data(tipo_extraccion,simbolo,intervalo)
                        df = build_table(json,tipo_extraccion)
                        save_to_parquet(df,output_path)

In [None]:
# Extraccion de datos de varias empresas
def extraer_datos_multiple(tipo_extraccion,simbolos:list,intervalo:None):
    partition_cols = ["year","month","day"]
    if tipo_extraccion == "incremental":
                    output_path = "multiple_incremental"
                    read_metadata()
                    for simbolo in (simbolos):
                        json = get_data(tipo_extraccion,simbolo,intervalo)
                        globals()[simbolo] = build_table(json,tipo_extraccion)
                    dataframes = [globals()[simbolo] for simbolo in simbolos]
                    df = pd.concat(dataframes)
                    df = df.dropna()
                    save_to_parquet(df,output_path,partition_cols)
                    save_metadata(df,output_path)
    elif tipo_extraccion == "full":
                    output_path = "multiple_full"
                    for simbolo in (simbolos):
                        json = get_data(tipo_extraccion,simbolo,intervalo)
                        globals()[simbolo] = build_table(json,tipo_extraccion)
                    dataframes = [globals()[simbolo] for simbolo in simbolos]
                    df = pd.concat(dataframes)
                    df = df.dropna()
                    save_to_parquet(df,output_path)

In [None]:
# funcion de conexion a la base de datos relacional
def connect_to_db(config_file, section, driverdb):
    """
    Crea una conexión a la base de datos especificada en el archivo de configuración.

    Parámetros:
    config_file (str): La ruta del archivo de configuración.
    section (str): La sección del archivo de configuración que contiene los datos de la base de datos.
    driverdb (str): El driver de la base de datos a la que se conectará.

    Retorna:
    Un objeto de conexión a la base de datos.
    """
    # driverdb = postgresql
    # section = postgres
    # config_file = pipeline.config
    try:
        # Lectura del archivo de configuración
        parser = ConfigParser()
        parser.read(config_file)

        # Creación de un diccionario
        # donde cargaremos los parámetros de la base de datos
        db = {}
        if parser.has_section(section):
            params = parser.items(section)
            db = {param[0]: param[1] for param in params}

            # Creación de la conexión a la base de datos
            engine = create_engine(
                f"{driverdb}://{db['usr']}:{db['pwd']}@{db['host']}:{db['port']}/{db['dbname']}"
            )
            return engine

        else:
            print(
                f"Sección {section} no encontrada en el archivo de configuración.")
            return None
    except Exception as e:
        print(f"Error al conectarse a la base de datos: {e}")
        return None

In [None]:
# Llamada a la extraccion incremental
extraer_datos("incremental","AAPL","daily")

In [None]:
# Llamada a la extraccion full
extraer_datos("full","AAPL","")

In [None]:
# Llamada a la extraccion multiple full
simbolos = ["GOOG","IBM","AAPL"]
extraer_datos_multiple("full",simbolos,"")

In [None]:
# Llamada a la extraccion multiple incremental
simbolos = ["MSFT","TSLA","NVDA"]
extraer_datos_multiple("incremental",simbolos,"")

In [None]:
## Segunda parte
# leo lo archivos de los resultados de las extraccion para primero transformar y luego insertar en la base de datos

In [None]:
df = pd.read_parquet("multiple_full")

In [None]:
# Transformacion : limpieza de duplicados
df = df.drop_duplicates()

In [None]:
# Transformacion: Conversión de tipos de datos de columnas
columnas = {
    'Symbol': "string",
    'AssetType': "category",
    'Description': "string",
    'Name': "string",
    'Exchange': "category",
    'Currency': "category",
    'Country': "string",
    'Sector': "category",
    'Industry': "string",
    'MarketCapitalization': "float",
    'EBITDA': "float",
    'PERatio': "float",
    'PEGRatio': "float",
    'DividendPerShare': "float",
    'DividendYield': "float",
    'EPS': "float",
    'ProfitMargin': "float",
    'AnalystTargetPrice': "float",
    'AnalystRatingStrongBuy': "int",
    'AnalystRatingBuy': "int",
    'AnalystRatingHold': "int",
    'AnalystRatingSell': "int",
    'AnalystRatingStrongSell': "int",
    'EVToRevenue': "float",
    'EVToEBITDA': "float",
    '52WeekHigh': "float",
    '52WeekLow': "float",
    '50DayMovingAverage': "float",
    'SharesOutstanding': "float",
    'FiscalYearEnd': "string",
    # conversion de columnas de tipo fecha que optimiza el almacenamiento
    'LatestQuarter': "datetime64[ns]",
    'DividendDate': "datetime64[ns]",
    'ExDividendDate': "datetime64[ns]"
    }

df = df.astype(columnas)

In [None]:
# Transformacion : Eliminar columnas, por ejemplo datos que no son relevantes para determinada logica de negocio

columnas_a_eliminar = [
"CIK",
"Address",
"BookValue",
"RevenuePerShareTTM",
"OperatingMarginTTM",
"ReturnOnAssetsTTM",
"ReturnOnEquityTTM",
"GrossProfitTTM",
"RevenueTTM",
"DilutedEPSTTM",
"QuarterlyEarningsGrowthYOY",
"QuarterlyRevenueGrowthYOY",
"TrailingPE",
"ForwardPE",
"PriceToSalesRatioTTM",
"PriceToBookRatio",
"Beta",
"200DayMovingAverage",
]

df = df.drop(columns=columnas_a_eliminar)

In [None]:
df = df.rename(columns={   
    "52WeekHigh": "YearHigh",
    "52WeekLow":"YearLow",
    "50DayMovingAverage": "LastFiftyDaysMovingAverage"
        })

In [None]:
# Transformacion : renombrar columnas (traduccion)
# Nota: Esta transformacion no voy a realizarla simplemente la comento
df.rename(columns={    
    'Symbol': "Simbolo",
    'AssetType': "Tipo_activo",
    'Description': "Descripcion",
    'Name': "Nombre",
    'Exchange': "Indice_Bolsa",
    'Currency': "Moneda",
    'Country': "Pais",
    'Sector': "Sector",
    'Industry': "Industria",
    'MarketCapitalization': "Capitalizacion_Bursatil",
    'DividendPerShare': "Dividendo_x_accion",
    'DividendYield': "Dividendo_porcentage",
    'ProfitMargin': "Margen_ganancias",
    'AnalystTargetPrice': "Precio_objetivo",
    'AnalystRatingStrongBuy': "Recomendacion_compra_fuerte",
    'AnalystRatingBuy': "Recomendacion_compra",
    'AnalystRatingHold': "Recomendacion_mantener",
    'AnalystRatingSell': "Recomendacion_venta",
    'AnalystRatingStrongSell': "Recomendacion_venta_fuerte",
    '52WeekHigh': "maximo_anual",
    '52WeekLow': "minimo_anual",
    '50DayMovingAverage': "movimiento_50_dias",
    'SharesOutstanding': "numero_acciones",
    'FiscalYearEnd': "fin_anio_fiscal",
    'LatestQuarter': "ultimo_cuatrimestre",
    'DividendDate': "fecha_dividendo",
    'ExDividendDate': "anuncio_dividendo"
    })

In [None]:
# Transformacion:  conversión de columnas de tipo fecha al formato mas usual dia-mes-año
columnas_fecha = [
    'LatestQuarter',
    'DividendDate',
    'ExDividendDate'
    ]

for i in columnas_fecha:
    df[i] = df[i].dt.strftime("%d-%m-%Y")

In [None]:
# Transformacion: calculo de una columna como logica en base a otras dos columnas.
df['Recomendation'] = np.where((df['AnalystRatingStrongBuy'] + df['AnalystRatingBuy']) > (df['AnalystRatingStrongSell'] + df['AnalystRatingSell']), 'Buy', 'Not Buy')

In [None]:
# Guardar los datos procesados en la zona "curada" / procesada del data
output_path = "curated_multiple"
save_to_parquet(df,output_path)

In [None]:
# conexion a la base de datos 
# parametros archivo de configuracion, motor de base de datos y librerias python usadas para la conexion
engine = connect_to_db(
    "pipeline.conf",
    "postgres",
    "postgresql+psycopg2"
    )

In [None]:
# Guardo en la base de datos sql los datos de la extraccion full
df = pd.read_parquet("curated_multiple")

In [None]:
# creacion de la tabla para insercion de datos de la extraccion full
create_query = text(
    """
    BEGIN;
    CREATE SCHEMA IF NOT EXISTS stocks_general;
    CREATE TABLE IF NOT EXISTS stocks_general.full(   
    Symbol VARCHAR(4) PRIMARY KEY,
    AssetType VARCHAR(20),
    Name VARCHAR(100),
    Description VARCHAR(200),
    Exchange VARCHAR(10),
    Currency VARCHAR(3),
    Country VARCHAR(3),
    Sector VARCHAR(20),
    Industry VARCHAR(50),
    Recomendation VARCHAR(10),
    FiscalYearEnd VARCHAR(10),
    MarketCapitalization REAL,
    EBITDA REAL,
    PERatio REAL,
    PEGRatio REAL,
    DividendPerShare REAL,
    DividendYield REAL,
    EPS REAL,
    ProfitMargin REAL,
    AnalystTargetPrice REAL,
    AnalystRatingStrongBuy SMALLINT,
    AnalystRatingBuy SMALLINT,
    AnalystRatingHold SMALLINT,
    AnalystRatingSell SMALLINT,
    AnalystRatingStrongSell SMALLINT,
    EVToRevenue REAL,
    EVToEBITDA REAL,
    YearHigh REAL,
    YearLow REAL,
    LastFiftyDaysMovingAverage REAL,
    SharesOutstanding INTEGER,
    DividendDate DATE,
    ExDividendDate DATE,
    LatestQuarter DATE
    );
 COMMIT;
    """
    )

with engine.connect() as conn:
    conn.execute(create_query)

In [None]:
# Insertar datos de mi extraccion full
df.to_sql(name="full", con=engine, schema="stocks_general", index=False ,if_exists='append',method="multi")


In [None]:
# Leo los datos de la extraccion incremental
df = pd.read_parquet("raw_AAPL\year=2024\month=06\day=21")

In [None]:
# Hago unas pequeñas transformaciones a la informacion de mi extraccion incremental antes de guardar en BD
# Eliminar columnas de metadata
columnas_a_eliminar = [
"Meta Data.1. Information",
"Meta Data.3. Last Refreshed",
"Meta Data.4. Output Size",
"Meta Data.5. Time Zone"
]

df = df.drop(columns=columnas_a_eliminar)

In [None]:
# convertir columnas en filas
df = pd.melt(df,id_vars=['Meta Data.2. Symbol'])

In [None]:
# renombrar columna
df = df.rename(columns={    
    'Meta Data.2. Symbol': "Symbol"})

In [None]:
# genero una columna incremental para identificar cada operacion, esta columna sera mi PK luego en la BD
df['Id'] = range(1, len(df) + 1)

In [None]:
# creacion de la tabla para insercion de datos de la extraccion incremental
create_query = text(
    """
    BEGIN;
    CREATE SCHEMA IF NOT EXISTS stocks_general;
    CREATE TABLE IF NOT EXISTS stocks_general.incremental(   
    Symbol VARCHAR(4),
    variable VARCHAR(50),
    value REAL.
    Id INTEGER PRIMARY KEY
    );
 COMMIT;
    """
    )

with engine.connect() as conn:
    conn.execute(create_query)

In [None]:
# Insertar datos de mi extraccion incremental
df.to_sql(name="incremental", con=engine, schema="stocks_general", index=False ,if_exists='append',method="multi")