# CoderHouse - Data Engineering

## Entregable N° 2 - Páez Darío

# Importación de módulos

In [1]:
import os
import requests
import pandas as pd
import seaborn as sns
from os import environ as env
import matplotlib.pyplot as plt
import urllib.parse
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col, avg

# Seaborn theme
sns.set_theme()

In [2]:
# Psycopg
!pip install psycopg2-binary



In [3]:
import psycopg2

# Extracción de Datos

## API Datos Argentina

Mediante la API pública del gobierno de Argentina se extrae el **EMAE** (Estimador Mensual de Actividad Económica) para el período Marzo-2013 a Junio-2021, debido a que la llamada a la API solo permite extraer de a **100 registros**.

In [4]:
# Función para formatear la url de conexión con la API
def get_api_call(ids, **kwargs):
    API_BASE_URL = "https://apis.datos.gob.ar/series/api/"
    kwargs["ids"] = ",".join(ids)
    return "{}{}?{}".format(API_BASE_URL, "series", urllib.parse.urlencode(kwargs))

In [5]:
# Formateo de la url de acuerdo a la variable y período deseados
api_call = get_api_call(["143.3_NO_PR_2004_A_21"], start_date="2013-03-01", end_date="2023-06-01")
print(api_call)

https://apis.datos.gob.ar/series/api/series?start_date=2013-03-01&end_date=2023-06-01&ids=143.3_NO_PR_2004_A_21


In [6]:
# Respuesta de la API
result = requests.get(api_call).json()

### A través de pandas se crea una tabla con los resultados obtenidos de la API

In [7]:
df_emae = pd.DataFrame(result['data'], columns=['date', 'EMAE'])

In [8]:
df_emae.head()

Unnamed: 0,date,EMAE
0,2013-03-01,149.403946
1,2013-04-01,155.941596
2,2013-05-01,167.973479
3,2013-06-01,156.271864
4,2013-07-01,150.731001


In [9]:
# Se incorpora una columna que permita ver la variación respecto a la media
df_emae['variacion'] = df_emae['EMAE'] - df_emae['EMAE'].mean()

In [10]:
df_emae.shape

(100, 3)

In [11]:
df_emae.head()

Unnamed: 0,date,EMAE,variacion
0,2013-03-01,149.403946,5.04377
1,2013-04-01,155.941596,11.58142
2,2013-05-01,167.973479,23.613304
3,2013-06-01,156.271864,11.911689
4,2013-07-01,150.731001,6.370826


## Canasta básica

A través del explorador de la [API de series de tiempo del Ministerio de Modernización](https://datosgobar.github.io/series-tiempo-ar-explorer/#/series/?ids=444.1_CANASTA_BATAL_0_0_20_94), se obtiene en formato **.csv** los datos de la **canasta básica alimentaria** y **canasta básica total** en pesos de la Ciudad Autónoma de Buenos Aires para el período **Enero-2013/Noviembre-2018** (frecuencia mensual)

In [12]:
df_canasta = pd.read_csv('data/canastas-basicas-ciudad-de-buenos-aires.csv')

In [13]:
# Cambio del nombre de la columna indice_tiempo
df_canasta.rename(columns={'indice_tiempo': 'date'}, inplace=True)

In [14]:
df_canasta.head()

Unnamed: 0,date,canasta_basica_alimentaria,canasta_basica_total
0,2013-01-01,2634.91,5813.43
1,2013-02-01,2645.91,5852.52
2,2013-03-01,2676.89,5974.26
3,2013-04-01,2702.55,6074.8
4,2013-05-01,2713.25,6150.23


In [15]:
df_canasta.shape

(71, 3)

## Tipo de Cambio 

De la misma manera, se obtiene los datos de tipo de cambio (Dólar-Peso) vendedor del Banco de la Nación Argentina para el período **2014-11-03 a 2022-11-30**. En este caso, la frecuencia de los datos es diaria  

In [16]:
df_cambio = pd.read_csv('data/datos-tipo-cambio-usd-futuro-dolar-frecuencia-diaria.csv')

In [17]:
df_cambio.rename(columns={'indice_tiempo': 'date'}, inplace=True)

In [18]:
df_cambio.head()

Unnamed: 0,date,tipo_cambio_bna_vendedor,tipo_cambio_a3500,tipo_cambio_mae,volumen_mae,tipo_cambio_implicito_en_adrs,futuro_rofex_usd1m,interes_abierto_1m,futuro_rofex_usd2m,interes_abierto_2m,futuro_rofex_usd3m,interes_abierto_3m,futuro_rofex_usd4m,interes_abierto_4m,futuro_rofex_usd5m,interes_abierto_5m,futuro_rofex_usd6m,interes_abierto_6m
0,2002-03-05,,1.9917,,,2.180124,,,,,,,,,,,,
1,2002-03-06,,2.0508,,,2.222222,,,,,,,,,,,,
2,2002-03-07,,2.1375,,,2.343949,,,,,,,,,,,,
3,2002-03-08,,2.2033,,,2.227778,,,,,,,,,,,,
4,2002-03-09,,2.2033,,,2.227778,,,,,,,,,,,,


In [19]:
df_cambio.shape

(7727, 18)

Vemos que se obtiene una gran cantidad de datos con varios valores nulos y que arranca desde el año 2002. Al analizar los valores nulos se puede ver que la columna de interés (**tipo_cambio_bna_vendedor**) tiene gran cantidad de valores nulos.

In [20]:
df_cambio.isna().sum()

date                                0
tipo_cambio_bna_vendedor         4626
tipo_cambio_a3500                   0
tipo_cambio_mae                  2311
volumen_mae                      4393
tipo_cambio_implicito_en_adrs       2
futuro_rofex_usd1m               2496
interes_abierto_1m               2496
futuro_rofex_usd2m               2496
interes_abierto_2m               2496
futuro_rofex_usd3m               2496
interes_abierto_3m               2496
futuro_rofex_usd4m               2496
interes_abierto_4m               2496
futuro_rofex_usd5m               2496
interes_abierto_5m               2496
futuro_rofex_usd6m               2496
interes_abierto_6m               2496
dtype: int64

In [21]:
# Filtrado por fecha y se selecciona la columna de interés
df_cambio2 = df_cambio[df_cambio.date > '2014-11-03'][['date', 'tipo_cambio_bna_vendedor']]

In [22]:
df_cambio2.shape

(3100, 2)

In [23]:
# Comprobación de valores nulos
df_cambio2.isna().sum()

date                        0
tipo_cambio_bna_vendedor    0
dtype: int64

In [24]:
df_cambio2.head()

Unnamed: 0,date,tipo_cambio_bna_vendedor
4627,2014-11-04,8.51
4628,2014-11-05,8.51
4629,2014-11-06,8.51
4630,2014-11-07,8.51
4631,2014-11-08,8.51


# Creación de Tablas en Redshift

## Conexión a través de psycopg

In [64]:
# Connect to Redshift using psycopg2
conn = psycopg2.connect(
    host=env['REDSHIFT_URL'],
    port=env['REDSHIFT_PORT'],
    dbname=env['REDSHIFT_DATABASE'],
    user=env['REDSHIFT_USER'],
    password=env['REDSHIFT_PASSWORD']
)

## Tabla de Actividad Economica

Esta tabla fue creada en el entregable anterior por lo tanto no es necesario volver a crearla

## Tabla de Canasta Básica

In [53]:
cursor = conn.cursor()
cursor.execute(f"""
create table if not exists {env['REDSHIFT_SCHEMA']}.canasta_basica (
    date VARCHAR(10) distkey,
    canasta_basica_alimentaria decimal(6,2),
    canasta_basica_total decimal(6,2)
) sortkey(date);
""")
conn.commit()
cursor.close()
print("Table created!")

Table created!


## Tabla de Tipo de Cambio

In [72]:
cursor = conn.cursor()
cursor.execute(f"""
create table if not exists {env['REDSHIFT_SCHEMA']}.tipo_de_cambio (
    date VARCHAR(10) distkey,
    tipo_cambio_bna_vendedor decimal(3,2)
) sortkey(date);
""")
conn.commit()
cursor.close()
print("Table created!")

Table created!


# Carga de Tablas

## Iniciar Sesión de Spark

In [65]:
# Postgres and Redshift JDBCs
driver_path = "/home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar"

# Seteo de variables de entorno para que Pyspark cargue los drivers correspondientes
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

## Tabla de Actividad económica

Crear dataframe de Pyspark para guardar los resultados de la consulta a la API

In [26]:
df_emae_to_write = spark.createDataFrame(df_emae, ["date", "EMAE", "variacion"])

In [27]:
df_emae_to_write.printSchema()
df_emae_to_write.show()

root
 |-- date: string (nullable = true)
 |-- EMAE: double (nullable = true)
 |-- variacion: double (nullable = true)

+----------+------------------+-------------------+
|      date|              EMAE|          variacion|
+----------+------------------+-------------------+
|2013-03-01|149.40394566736484|  5.043770324548916|
|2013-04-01|155.94159584157654| 11.581420498760622|
|2013-05-01| 167.9734791225302| 23.613303779714272|
|2013-06-01|156.27186443024067| 11.911689087424747|
|2013-07-01|150.73100129825897|  6.370825955443053|
|2013-08-01|148.42144953050052|  4.061274187684603|
|2013-09-01| 147.0048441881334|  2.644668845317483|
|2013-10-01| 148.6939593959755|  4.333784053159576|
|2013-11-01|145.69589762827002| 1.3357222854540964|
|2013-12-01| 142.5768791271264|-1.7832962156895178|
|2014-01-01|137.96969739537636| -6.390477947439564|
|2014-02-01|132.48630687243056| -11.87386847038536|
|2014-03-01|144.53782808347017|0.17765274065425274|
|2014-04-01|152.34143546147135| 7.981260118655427

In [28]:
# Insertar datos en la tabla de Redshift
df_emae_to_write.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['REDSHIFT_URL']}:{env['REDSHIFT_PORT']}/{env['REDSHIFT_DATABASE']}") \
    .option("dbtable", f"{env['REDSHIFT_SCHEMA']}.actividad_economica") \
    .option("user", env['REDSHIFT_USER']) \
    .option("password", env['REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

## Tabla de Canasta Básica

In [29]:
df_canasta_to_write = spark.createDataFrame(df_canasta, ["date", "canasta_basica_alimentaria", "canasta_basica_total"])
df_canasta_to_write.printSchema()
df_canasta_to_write.show()

root
 |-- date: string (nullable = true)
 |-- canasta_basica_alimentaria: double (nullable = true)
 |-- canasta_basica_total: double (nullable = true)

+----------+--------------------------+--------------------+
|      date|canasta_basica_alimentaria|canasta_basica_total|
+----------+--------------------------+--------------------+
|2013-01-01|                   2634.91|             5813.43|
|2013-02-01|                   2645.91|             5852.52|
|2013-03-01|                   2676.89|             5974.26|
|2013-04-01|                   2702.55|              6074.8|
|2013-05-01|                   2713.25|             6150.23|
|2013-06-01|                   2773.94|              6272.0|
|2013-07-01|                    2879.6|              6478.7|
|2013-08-01|                   3016.21|             6653.45|
|2013-09-01|                   3090.68|             6805.64|
|2013-10-01|                   3203.54|             6992.68|
|2013-11-01|                   3260.61|             712

In [30]:
# Insertar datos en la tabla de Redshift
df_canasta_to_write.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['REDSHIFT_URL']}:{env['REDSHIFT_PORT']}/{env['REDSHIFT_DATABASE']}") \
    .option("dbtable", f"{env['REDSHIFT_SCHEMA']}.canasta_basica") \
    .option("user", env['REDSHIFT_USER']) \
    .option("password", env['REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

## Tabla de Tipo de Cambio

In [31]:
df_cambio_to_write = spark.createDataFrame(df_cambio2, ["date", "tipo_cambio_bna_vendedor"])
df_cambio_to_write.printSchema()
df_cambio_to_write.show()

root
 |-- date: string (nullable = true)
 |-- tipo_cambio_bna_vendedor: double (nullable = true)

+----------+------------------------+
|      date|tipo_cambio_bna_vendedor|
+----------+------------------------+
|2014-11-04|                    8.51|
|2014-11-05|                    8.51|
|2014-11-06|                    8.51|
|2014-11-07|                    8.51|
|2014-11-08|                    8.51|
|2014-11-09|                    8.51|
|2014-11-10|                    8.51|
|2014-11-11|                    8.51|
|2014-11-12|                    8.51|
|2014-11-13|                    8.51|
|2014-11-14|                    8.51|
|2014-11-15|                    8.51|
|2014-11-16|                    8.51|
|2014-11-17|                    8.51|
|2014-11-18|                    8.51|
|2014-11-19|                    8.51|
|2014-11-20|                    8.52|
|2014-11-21|                    8.52|
|2014-11-22|                    8.52|
|2014-11-23|                    8.52|
+----------+----------------

In [32]:
# Insertar datos en la tabla de Redshift
df_cambio_to_write.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['REDSHIFT_URL']}:{env['REDSHIFT_PORT']}/{env['REDSHIFT_DATABASE']}") \
    .option("dbtable", f"{env['REDSHIFT_SCHEMA']}.tipo_de_cambio") \
    .option("user", env['REDSHIFT_USER']) \
    .option("password", env['REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

# Análisis comparativo

El objetivo es tener en una tabla la comparación de el índice de actividad económica, el valor de la canasta básica en pesos, el tipo de cambio y el valor de la canasta básica en dólares para analizar la variación de estas variables mes a mes.

## Preparación de columna date para hacer el merge

In [43]:
# La tabla de tipo de cambio tiene frecuencia diaria, es necesario pasarla a registros mensuales para poder comparar 
# con las otras tablas
import datetime as dt

# Se convierte la columna date en clase datetime y luego se convierte a formato mensual
df_cambio3 = df_cambio2.copy()
df_cambio3['date'] = [dt.datetime.strptime(fecha, '%Y-%m-%d') for fecha in df_cambio2.date]
df_cambio3['date'] = df_cambio3['date'].dt.to_period('M')
df_cambio3.head()

Unnamed: 0,date,tipo_cambio_bna_vendedor
4627,2014-11,8.51
4628,2014-11,8.51
4629,2014-11,8.51
4630,2014-11,8.51
4631,2014-11,8.51


In [44]:
# Ahora se puede agrupar por mes y calcular el promedio del tipo de cambio
df_cambio_final = df_cambio3.groupby(['date']).mean()
df_cambio_final

Unnamed: 0_level_0,tipo_cambio_bna_vendedor
date,Unnamed: 1_level_1
2014-11,8.515185
2014-12,8.556452
2015-01,8.602903
2015-02,8.684107
2015-03,8.780323
...,...
2022-12,178.790323
2023-01,188.798387
2023-02,198.401786
2023-03,209.580645


In [49]:
# Modificamos la columna date de canasta básica para que tenga el mismo formato que en la tabla tipo de cambio
df_canasta_final = df_canasta.copy()
df_canasta_final['date'] = [dt.datetime.strptime(fecha, '%Y-%m-%d') for fecha in df_canasta['date']]
df_canasta_final['date'] = df_canasta_final['date'].dt.to_period('M')
df_canasta_final.head()

Unnamed: 0,date,canasta_basica_alimentaria,canasta_basica_total
0,2013-01,2634.91,5813.43
1,2013-02,2645.91,5852.52
2,2013-03,2676.89,5974.26
3,2013-04,2702.55,6074.8
4,2013-05,2713.25,6150.23


In [50]:
# Modificamos la columna date de actividad económica para que tenga el mismo formato que en la tabla tipo de cambio
df_emae_final = df_emae.copy()
df_emae_final['date'] = [dt.datetime.strptime(fecha, '%Y-%m-%d') for fecha in df_emae['date']]
df_emae_final['date'] = df_emae_final['date'].dt.to_period('M')
df_emae_final.head()

Unnamed: 0,date,EMAE,variacion
0,2013-03,149.403946,5.04377
1,2013-04,155.941596,11.58142
2,2013-05,167.973479,23.613304
3,2013-06,156.271864,11.911689
4,2013-07,150.731001,6.370826


## Merge de Tablas

In [56]:
analisis_comparativo = pd.merge(df_cambio_final, 
                                df_canasta_final,
                                on='date').merge(df_emae_final, on='date')
analisis_comparativo.head()
  

Unnamed: 0,date,tipo_cambio_bna_vendedor,canasta_basica_alimentaria,canasta_basica_total,EMAE,variacion
0,2014-11,8.515185,4354.29,9821.03,140.877597,-3.482578
1,2014-12,8.556452,4406.42,9940.26,140.435938,-3.924238
2,2015-01,8.602903,4535.51,10140.82,133.998189,-10.361986
3,2015-02,8.684107,4569.9,10226.35,132.627887,-11.732288
4,2015-03,8.780323,4624.77,10399.31,149.409013,5.048838


In [57]:
analisis_comparativo.shape

(49, 6)

In [58]:
# Se descarta la columna variacion
analisis_comparativo.drop(columns=['variacion'], inplace=True)

In [62]:
# Se agregan las columnas con la canasta básica expresada en dólares
analisis_comparativo['canasta_basica_alimentaria_usd'] = round(analisis_comparativo['canasta_basica_alimentaria'] / analisis_comparativo['tipo_cambio_bna_vendedor'], 2)
analisis_comparativo['canasta_basica_total_usd']= round(analisis_comparativo['canasta_basica_total'] / analisis_comparativo['tipo_cambio_bna_vendedor'], 2)
analisis_comparativo.head()

Unnamed: 0,date,tipo_cambio_bna_vendedor,canasta_basica_alimentaria,canasta_basica_total,EMAE,canasta_basica_alimentaria_usd,canasta_basica_total_usd
0,2014-11,8.515185,4354.29,9821.03,140.877597,511.36,1153.35
1,2014-12,8.556452,4406.42,9940.26,140.435938,514.98,1161.73
2,2015-01,8.602903,4535.51,10140.82,133.998189,527.21,1178.77
3,2015-02,8.684107,4569.9,10226.35,132.627887,526.24,1177.59
4,2015-03,8.780323,4624.77,10399.31,149.409013,526.72,1184.39


## Creación de tabla y carga de datos en Redshift

In [67]:
cursor = conn.cursor()
cursor.execute(f"""
create table if not exists {env['REDSHIFT_SCHEMA']}.analisis_comparativo (
    date VARCHAR(10) distkey,
    tipo_cambio_bna_vendedor decimal(3,2),
    canasta_basica_alimentaria decimal(6,2),
    canasta_basica_total decimal(6,2),
    emae decimal(5,2),
    canasta_basica_alimentaria_usd decimal(4,2),
    canasta_basica_total_usd decimal(4,2)
) sortkey(date);
""")
conn.commit()
cursor.close()
print("Table created!")

Table created!


In [72]:
# Al crear el dataframe de Spark no toma bien la clase del datetime
# Por lo tanto, se convierte la columna date de vuelta a string para poder cargarlo en Redshift
analisis_comparativo['date'] = [time.strftime('%Y-%m') for time in analisis_comparativo['date']]

In [74]:
analisis_comparativo_to_write = spark.createDataFrame(analisis_comparativo, [column for column in analisis_comparativo.columns])
analisis_comparativo_to_write.printSchema()
analisis_comparativo_to_write.show()

root
 |-- date: string (nullable = true)
 |-- tipo_cambio_bna_vendedor: double (nullable = true)
 |-- canasta_basica_alimentaria: double (nullable = true)
 |-- canasta_basica_total: double (nullable = true)
 |-- EMAE: double (nullable = true)
 |-- canasta_basica_alimentaria_usd: double (nullable = true)
 |-- canasta_basica_total_usd: double (nullable = true)

+-------+------------------------+--------------------------+--------------------+------------------+------------------------------+------------------------+
|   date|tipo_cambio_bna_vendedor|canasta_basica_alimentaria|canasta_basica_total|              EMAE|canasta_basica_alimentaria_usd|canasta_basica_total_usd|
+-------+------------------------+--------------------------+--------------------+------------------+------------------------------+------------------------+
|2014-11|       8.515185185185185|                   4354.29|             9821.03| 140.8775968982631|                        511.36|                 1153.35|
|2014-

In [75]:
# Insertar datos en la tabla de Redshift
analisis_comparativo_to_write.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['REDSHIFT_URL']}:{env['REDSHIFT_PORT']}/{env['REDSHIFT_DATABASE']}") \
    .option("dbtable", f"{env['REDSHIFT_SCHEMA']}.analisis_comparativo") \
    .option("user", env['REDSHIFT_USER']) \
    .option("password", env['REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()