In [128]:
#    **** TRABAJO FINAL INTEGRADOR ****
# Alumno: Alejandro Barquinero
# Curso: Data Engineering
# #Año: 2024
# UTN BA
# La API Seleccionada es https://argentinadatos.com/docs/, esta API contiene la cotizacion historica del dolar de la Argentina
# La casa corresponde a los diferentes tipos del dolar. Ej: Dolar oficial, Dolar liqui, Dolar blue, etc.

# Se procede a instalar las líbrerías para su posterior uso.
!pip install requests
!pip install -q fastparquet
!pip install fastparquet
!pip install -q ydata-profiling
!pip install sqlalchemy
!pip install psycopg2-binary



In [129]:
# Se importan las líbrerias que se van a utilizar
import requests
import json
import os
import fastparquet
import pandas as pd
import sqlalchemy as sa
from ydata_profiling import ProfileReport
from pprint import pprint
from datetime import datetime, timedelta
from sqlalchemy import create_engine
from sqlalchemy.sql import text

In [130]:
## Se definen los métodos que se van a utilizar. ##

# Método para consultar a una API y obtenter los datos.
def get_data(base_url, endpoint, params=None, headers=None):
    """
    Realiza una solicitud GET a una API para obtener datos.

    Parámetros:
    base_url (str): La URL base de la API.
    endpoint (str): El endpoint de la API al que se realizará la solicitud.
    params (dict): Parámetros de consulta para enviar con la solicitud.
    data_field (str): El nombre del campo en el JSON que contiene los datos.
    headers (dict): Encabezados para enviar con la solicitud.

    Retorna:
    dict: Los datos obtenidos de la API en formato JSON.
    """
    try:
        endpoint_url = f"{base_url}/{endpoint}"
        #print(endpoint_url)
        response = requests.get(endpoint_url, params=params, headers=headers)
        response.raise_for_status()  # Levanta una excepción si hay un error en la respuesta HTTP.

        # Verificar si los datos están en formato JSON.
        try:
            data = response.json()
            #data = data[data_field]
        except:
            print("El formato de respuesta no es el esperado")
            return None
        return data

    except requests.exceptions.RequestException as e:
        # Capturar cualquier error de solicitud, como errores HTTP.
        print(f"La petición ha fallado. Código de error : {e}")
        return None

# Datos obtenidos en formato JSON a Dataframe.
def build_table(json_data):
    """
    Construye un DataFrame de pandas a partir de datos en formato JSON.

    Parámetros:
    json_data (dict): Los datos en formato JSON obtenidos de una API.

    Retorna:
    DataFrame: Un DataFrame de pandas que contiene los datos.
    """
    try:
        df = pd.json_normalize(json_data)
        return df
    except:
        print("Los datos no están en el formato esperado")
        return None

# Se guarda el Dataframe en un archivo con formato .parquet
def save_to_parquet(df, output_path, partition_cols=None):
    """
    Recibe un dataframe, se recomienda que haya sido convertido a un formato tabular,
    y lo guarda en formato parquet.

    Parametros:
    df (pd.DataFrame). Dataframe a guardar.
    output_path (str). Ruta donde se guardará el archivo. Si no existe, se creará.
    partition_cols (list o str). Columna/s por las cuales particionar los datos.
    """

    # Crear el directorio si no existe
    directory = os.path.dirname(output_path)
    pprint(directory)
    if directory and not os.path.exists(directory):
        os.makedirs(directory)

    df.to_parquet(
        output_path,
        engine="fastparquet",
        partition_cols=partition_cols
    )

In [131]:
# Una vez definido las líbrerías y los métodos, se procede a realizar las consignas del Trabajo Final Integrador.
# Se utiliza la api para consultar las cotizaciones historicas del dolar.
# Página: https://argentinadatos.com/docs/

# URL base
base_url = "https://api.argentinadatos.com"

# Ejecicio 1 - Extracción - Full
endpoint_dolar = "/v1/cotizaciones/dolares"
json_data_dolar = get_data(base_url, endpoint_dolar)
pprint(json_data_dolar)

[1;30;43mSe truncaron las últimas líneas 5000 del resultado de transmisión.[0m
  'venta': 410.32},
 {'casa': 'tarjeta', 'compra': 238.68, 'fecha': '2023-05-29', 'venta': 435.19},
 {'casa': 'blue', 'compra': 488, 'fecha': '2023-05-30', 'venta': 493},
 {'casa': 'bolsa', 'compra': 462.32, 'fecha': '2023-05-30', 'venta': 462.32},
 {'casa': 'contadoconliqui',
  'compra': 463.14,
  'fecha': '2023-05-30',
  'venta': 463.14},
 {'casa': 'cripto', 'compra': 483.23, 'fecha': '2023-05-30', 'venta': 483.23},
 {'casa': 'mayorista', 'compra': 238.6, 'fecha': '2023-05-30', 'venta': 239},
 {'casa': 'oficial', 'compra': 240, 'fecha': '2023-05-30', 'venta': 250},
 {'casa': 'solidario',
  'compra': 239.79,
  'fecha': '2023-05-30',
  'venta': 412.15},
 {'casa': 'tarjeta', 'compra': 239.79, 'fecha': '2023-05-30', 'venta': 437.13},
 {'casa': 'blue', 'compra': 485, 'fecha': '2023-05-31', 'venta': 490},
 {'casa': 'bolsa', 'compra': 466.98, 'fecha': '2023-05-31', 'venta': 466.98},
 {'casa': 'contadoconliqui',

In [132]:
# Ejecicio 2 - JSON to Dataframe - Full
df_full = build_table(json_data_dolar)
df_full.head()

Unnamed: 0,casa,compra,venta,fecha
0,blue,4.0,4.0,2011-01-03
1,mayorista,3.97,3.98,2011-01-03
2,oficial,4.0,4.0,2011-01-03
3,blue,4.0,4.0,2011-01-04
4,mayorista,3.97,3.98,2011-01-04


In [133]:
# Ejecicio 3 - Guardar Dataframe en formata .parquet - Full
full_dir = "datalake/dolar"

save_to_parquet(
    df_full,
    f"{full_dir}/full/cotizacionFull.parquet"
    )

'datalake/dolar/full'


In [134]:
# Incremental (con respecto a cotizaciones sobre el dolar).
# Ejecicio 1 - Extracción - Incremental
endpoint_dolar_casa_fecha = "/v1/cotizaciones/dolares/blue/2024/02/01"
json_data_casa_fecha = get_data(base_url, endpoint_dolar_casa_fecha)
pprint(json_data_casa_fecha)

{'casa': 'blue', 'compra': 1175, 'fecha': '2024-02-01', 'venta': 1195}


In [135]:
# Ejecicio 2 - JSON to Dataframe - Incremental
df_incremental = build_table(json_data_casa_fecha)
df_incremental.head()

Unnamed: 0,casa,compra,venta,fecha
0,blue,1175,1195,2024-02-01


In [136]:
# Ejecicio 3 - Guardar Dataframe en formata .parquet - Incremental
incremental_dir = "datalake/dolar"

save_to_parquet(
    df_incremental,
    f"{incremental_dir}/incremental/cotizacionInc.parquet"
    )

'datalake/dolar/incremental'


In [137]:
# Parte 2
# Eliminación de duplicados - Full
full_path = 'datalake/dolar/full/cotizacionFull.parquet'
df_full = pd.read_parquet(full_path)
df_full = df_full.drop_duplicates()
df_full.head()


Unnamed: 0,casa,compra,venta,fecha
0,blue,4.0,4.0,2011-01-03
1,mayorista,3.97,3.98,2011-01-03
2,oficial,4.0,4.0,2011-01-03
3,blue,4.0,4.0,2011-01-04
4,mayorista,3.97,3.98,2011-01-04


In [138]:
# Eliminación de duplicados - Incremental
full_path = 'datalake/dolar/incremental/cotizacionInc.parquet'
df_incremental = pd.read_parquet(full_path)
df_incremental = df_incremental.drop_duplicates(subset=["fecha"])
df_incremental.head()

Unnamed: 0,casa,compra,venta,fecha
0,blue,1175,1195,2024-02-01


In [None]:
# Eliminación o reemplazo de nulos - Full
df_full = df_full.dropna()
df_full.head()

In [140]:
# Eliminación o reemplazo de nulos - Incremental
df_incremental = df_incremental.dropna()
df_incremental.head()

Unnamed: 0,casa,compra,venta,fecha
0,blue,1175,1195,2024-02-01


In [141]:
# Convertir la columna fecha a datatime
df_full["fecha"] = pd.to_datetime(df_full["fecha"])
# Formatear columnas de tipo fecha a d/m/Y
df_full['fecha'] = df_full['fecha'].dt.strftime('%d/%m/%Y')
df_full.head()


Unnamed: 0,casa,compra,venta,fecha
0,blue,4.0,4.0,03/01/2011
1,mayorista,3.97,3.98,03/01/2011
2,oficial,4.0,4.0,03/01/2011
3,blue,4.0,4.0,04/01/2011
4,mayorista,3.97,3.98,04/01/2011


In [142]:
#Renombrar columnas
df_full = df_full.rename(columns={'compra': 'compra_dolar'})
df_full = df_full.rename(columns={'venta': 'venta_dolar'})
df_full = df_full.rename(columns={'fecha': 'fecha_cotizacion'})

df_full.head()

Unnamed: 0,casa,compra_dolar,venta_dolar,fecha_cotizacion
0,blue,4.0,4.0,03/01/2011
1,mayorista,3.97,3.98,03/01/2011
2,oficial,4.0,4.0,03/01/2011
3,blue,4.0,4.0,04/01/2011
4,mayorista,3.97,3.98,04/01/2011


In [143]:
# Crear una nueva columna 'venta_alcanza_1000' basada en la condición de venta
df_full['venta_alcanza_1000'] = df_full['venta_dolar'] >= 1000

# Mostrar el DataFrame con la nueva columna
print(df_full)

                  casa  compra_dolar  venta_dolar fecha_cotizacion  \
0                 blue          4.00         4.00       03/01/2011   
1            mayorista          3.97         3.98       03/01/2011   
2              oficial          4.00         4.00       03/01/2011   
3                 blue          4.00         4.00       04/01/2011   
4            mayorista          3.97         3.98       04/01/2011   
...                ...           ...          ...              ...   
24582            bolsa       1269.80      1276.40       20/06/2024   
24583  contadoconliqui       1262.50      1263.90       20/06/2024   
24584        mayorista        904.50       907.50       20/06/2024   
24585           cripto       1304.00      1346.00       20/06/2024   
24586          tarjeta       1415.20      1479.20       20/06/2024   

       venta_alcanza_1000  
0                   False  
1                   False  
2                   False  
3                   False  
4                  

In [144]:
## Se crea la tabla cotizacion y se establece la conexión en la base de datos.
print(f"Versión de Pandas: {pd.__version__}")
print(f"Versión de SQLAlchemy: {sa.__version__}")

# Definimos el string de conexión
host = "pg-366fd5eb-alebarka35-4fe8.d.aivencloud.com"
port = "11054"
database = "defaultdb"
username = "avnadmin"
password = "AVNS_DMVVYunbdbUQ6ps-b1_"

conn_string = f"postgresql://{username}:{password}@{host}:{port}/{database}"
pprint(conn_string)

# Creamos la conexión y establecemos la conexión a la db
engine = create_engine(conn_string)

# Se recomienda crear la tabla antes de insertar los datos
# para definir un esquema adecuado

query = text("""
-- Se crea la tabla cotizacion
CREATE TABLE IF NOT EXISTS public.cotizacion (
    casa VARCHAR(100), -- Nombre de la casa
    compra_dolar REAL, -- Compra dolar
    venta_dolar REAL, -- Venta dolar
    fecha_cotizacion VARCHAR(50),
    venta_alcanza_1000 VARCHAR(20)
);
""")

# Ejecutamos la query
with engine.connect() as conn, conn.begin():
    conn.execute(query)
    print("Tabla creada correctamente")

with engine.connect() as conn, conn.begin():
    df_full.head(10).to_sql(
        "cotizacion",
        schema="public",
        con=conn,
        if_exists="append", # opcionse: "replace" o "append"
       method="multi",
       index=False
    )

 #Se realiza la consulta a la tabla y se procede a proyectar las columnas
query_sql = text("""
SELECT *
FROM public.cotizacion
""")

with engine.connect() as conn, conn.begin():
    df_check = pd.read_sql(query_sql, conn)

df_check

Versión de Pandas: 2.0.3
Versión de SQLAlchemy: 2.0.30
'postgresql://avnadmin:AVNS_DMVVYunbdbUQ6ps-b1_@pg-366fd5eb-alebarka35-4fe8.d.aivencloud.com:11054/defaultdb'
Tabla creada correctamente


Unnamed: 0,casa,compra_dolar,venta_dolar,fecha_cotizacion,venta_alcanza_1000
0,blue,4.0,4.0,03/01/2011,False
1,mayorista,3.97,3.98,03/01/2011,False
2,oficial,4.0,4.0,03/01/2011,False
3,blue,4.0,4.0,04/01/2011,False
4,mayorista,3.97,3.98,04/01/2011,False
5,oficial,4.0,4.0,04/01/2011,False
6,blue,4.0,4.0,05/01/2011,False
7,mayorista,3.97,3.98,05/01/2011,False
8,oficial,4.0,4.0,05/01/2011,False
9,blue,4.0,4.0,06/01/2011,False


In [145]:
#Cruzar dataframes usando JOINS
# INNER JOIN
inner_join_df = pd.merge(df_full, df_incremental, on='casa', how='inner')
print("\nINNER JOIN:")
print(inner_join_df)

# LEFT JOIN
left_join_df = pd.merge(df_full, df_incremental, on='casa', how='left')
print("\nLEFT JOIN:")
print(left_join_df)

# RIGHT JOIN
right_join_df = pd.merge(df_full, df_incremental, on='casa', how='right')
print("\nRIGHT JOIN:")
print(right_join_df)


INNER JOIN:
      casa  compra_dolar  venta_dolar fecha_cotizacion  venta_alcanza_1000  \
0     blue           4.0          4.0       03/01/2011               False   
1     blue           4.0          4.0       04/01/2011               False   
2     blue           4.0          4.0       05/01/2011               False   
3     blue           4.0          4.0       06/01/2011               False   
4     blue           4.0          4.0       07/01/2011               False   
...    ...           ...          ...              ...                 ...   
4913  blue        1260.0       1280.0       16/06/2024                True   
4914  blue        1260.0       1280.0       17/06/2024                True   
4915  blue        1285.0       1305.0       18/06/2024                True   
4916  blue        1280.0       1300.0       19/06/2024                True   
4917  blue        1280.0       1300.0       20/06/2024                True   

      compra  venta       fecha  
0       1175   1

In [146]:
#Aplicar agregaciones por medio de GROUP BY y funciones como MAX, MIN, AVG, etc.
# Múltiples agregaciones por casa
agg_result = df_full.groupby('casa').agg({
    'compra_dolar': ['min', 'max', 'mean'],
    'venta_dolar': ['min', 'max', 'mean']
})
print("\nMúltiples agregaciones por casa:")
print(agg_result)


Múltiples agregaciones por casa:
                compra_dolar                      venta_dolar           \
                         min      max        mean         min      max   
casa                                                                     
blue                    4.00  1285.00  133.854616        4.00  1305.00   
bolsa                  35.55  1279.40  297.801028       35.55  1286.70   
contadoconliqui         6.67  1306.90  158.246941        6.67  1319.80   
cripto                365.19  1304.00  801.182320      365.19  1370.00   
mayorista               3.97   904.50   80.976228        3.98   922.15   
oficial                 4.00   918.00   83.500527        4.00   924.50   
solidario              57.01   357.37  123.741487       81.85   642.90   
tarjeta                57.01  1468.80  290.464160       81.85  1479.20   

                             
                       mean  
casa                         
blue             136.711671  
bolsa            298.656756  
c