# Entregable 3

## Consigna:

Objetivos generales

✓ El script de la 2da entrega debe
correr en un container de Docker y
estará embebido en un DAG de
Airflow dentro del container.


### Objetivos específicos

✓ El container debe ser lo más liviano
posible como para que el script
funcione sin problemas. Cualquier
usuario podría correr el container y
que el script esté listo para su
ejecución.

### Formato
✓ Dockerfile y código con todo lo necesario para
correr (si es necesario incluir un manual de
instrucciones o pasos para correrlo), subido en
repositorio de Github o en Google Drive.

### Sugerencias
✓ La base de datos donde estará esta tabla no
hace falta que viva en el container, sino que se
tiene en cuenta que es un Redshift en la nube.
✓ Investigar sobre Docker Compose para facilitar
la tarea.
✓ Revisar el instrumento de evaluación


## 1) Bajar datos de una API en formato JSON

Para este ejemplo vamos a usar la API de [Datos Argentina](https://www.datos.gob.ar/)

Y nos vamos a traer los datos de: Exportaciones de productos primarios. En millones de dólares FOB

Para probar la API ir a: [API de Series de Tiempo AR: Generador de URLs](https://datosgobar.github.io/series-tiempo-ar-call-generator/)

In [1]:
import numpy as np
import pandas as pd
import json 
import requests
import math
from datetime import timedelta,datetime
from astral import LocationInfo 
from astral.location import Location
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import StandardScaler
from joblib import dump, load
from pyspark.sql.functions import col, substring

In [2]:
import requests
import urllib.parse

def get_api_call(ids, **kwargs):
    API_BASE_URL = "https://apis.datos.gob.ar/series/api/"
    kwargs["ids"] = ",".join(ids)
    return "{}{}?{}".format(API_BASE_URL, "series", urllib.parse.urlencode(kwargs))

In [3]:
# Ejemplo: https://apis.datos.gob.ar/series/api/series?ids=75.3_IEC_0_M_26&start_date=2020-01-01
api_call = get_api_call(["74.3_IEPP_0_M_35"], start_date="1992-01-01")
print(api_call)

https://apis.datos.gob.ar/series/api/series?start_date=1992-01-01&ids=74.3_IEPP_0_M_35


In [4]:
result = requests.get(api_call).json()
print(result)

{'data': [['1992-01-01', 133.367679], ['1992-02-01', 188.034116], ['1992-03-01', 320.984466], ['1992-04-01', 375.242075], ['1992-05-01', 474.440656], ['1992-06-01', 442.905698], ['1992-07-01', 455.983718], ['1992-08-01', 346.11888], ['1992-09-01', 251.83012], ['1992-10-01', 180.399972], ['1992-11-01', 152.657852], ['1992-12-01', 170.560837], ['1993-01-01', 226.344594], ['1993-02-01', 218.95773], ['1993-03-01', 376.002446], ['1993-04-01', 375.657342], ['1993-05-01', 470.956346], ['1993-06-01', 402.156012], ['1993-07-01', 309.188763], ['1993-08-01', 245.806947], ['1993-09-01', 229.396576], ['1993-10-01', 144.645589], ['1993-11-01', 113.595346], ['1993-12-01', 160.927861], ['1994-01-01', 174.410652], ['1994-02-01', 198.24988], ['1994-03-01', 280.889311], ['1994-04-01', 387.628872], ['1994-05-01', 638.094045], ['1994-06-01', 502.878474], ['1994-07-01', 379.775061], ['1994-08-01', 310.702685], ['1994-09-01', 244.321473], ['1994-10-01', 194.560862], ['1994-11-01', 185.216891], ['1994-12-01',

## 2) Creación de la tabla y conexiones

In [5]:
# Crear sesion de Spark
import os
import psycopg2

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col

# Postgres and Redshift JDBCs
driver_path = "/home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar"

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

In [6]:
env = os.environ

In [7]:
# Connect to Redshift using psycopg2
conn = psycopg2.connect(
    host=env['AWS_REDSHIFT_HOST'],
    port=env['AWS_REDSHIFT_PORT'],
    dbname=env['AWS_REDSHIFT_DBNAME'],
    user=env['AWS_REDSHIFT_USER'],
    password=env['AWS_REDSHIFT_PASSWORD']
)

In [8]:
cursor = conn.cursor()
cursor.execute(f"""
create table if not exists {env['AWS_REDSHIFT_SCHEMA']}.exportaciones_productos_primarios_2 (
    date_from VARCHAR(10) distkey,
    millones_dolares decimal(10,2),
    frequency varchar(12),
    month smallint,
    year smallint,
    tasa_variacion_mensual decimal(10,2)
) sortkey(date_from);
""")
conn.commit()
cursor.close()
print("Table created!")

Table created!


In [9]:
cursor = conn.cursor()
cursor.execute(f"""
SELECT
  distinct tablename
FROM
  PG_TABLE_DEF
WHERE
  schemaname = '{env['AWS_REDSHIFT_SCHEMA']}';
""")
# resultado = cursor.fetchall()
print(", ".join(map(lambda x: x[0], cursor.fetchall())))
cursor.close()

exportaciones_productos_primarios, exportaciones_productos_primarios_2


## 3) Transformación de la información extraída de la API

In [10]:
# Create the DataFrame with the specified column names
df = spark.createDataFrame(result['data'], ["date_from", "millones_dolares"])

In [11]:
df.printSchema()
df.show()

root
 |-- date_from: string (nullable = true)
 |-- millones_dolares: double (nullable = true)

+----------+----------------+
| date_from|millones_dolares|
+----------+----------------+
|1992-01-01|      133.367679|
|1992-02-01|      188.034116|
|1992-03-01|      320.984466|
|1992-04-01|      375.242075|
|1992-05-01|      474.440656|
|1992-06-01|      442.905698|
|1992-07-01|      455.983718|
|1992-08-01|       346.11888|
|1992-09-01|       251.83012|
|1992-10-01|      180.399972|
|1992-11-01|      152.657852|
|1992-12-01|      170.560837|
|1993-01-01|      226.344594|
|1993-02-01|       218.95773|
|1993-03-01|      376.002446|
|1993-04-01|      375.657342|
|1993-05-01|      470.956346|
|1993-06-01|      402.156012|
|1993-07-01|      309.188763|
|1993-08-01|      245.806947|
+----------+----------------+
only showing top 20 rows



## 4) ETL con pandas

In [12]:
df = df.toPandas()

In [13]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 2 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   date_from         100 non-null    object 
 1   millones_dolares  100 non-null    float64
dtypes: float64(1), object(1)
memory usage: 1.7+ KB


In [None]:
#se incorporá la frecuencia de la data

In [14]:
df['frequency'] = "Mensual"
df.head()

Unnamed: 0,date_from,millones_dolares,frequency
0,1992-01-01,133.367679,Mensual
1,1992-02-01,188.034116,Mensual
2,1992-03-01,320.984466,Mensual
3,1992-04-01,375.242075,Mensual
4,1992-05-01,474.440656,Mensual


In [None]:
#otra transformación que se hará, es extraer el mes de la fecha

In [15]:
df['date_from'] = pd.to_datetime(df['date_from'])
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 3 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   date_from         100 non-null    datetime64[ns]
 1   millones_dolares  100 non-null    float64       
 2   frequency         100 non-null    object        
dtypes: datetime64[ns](1), float64(1), object(1)
memory usage: 2.5+ KB


In [16]:
df['month'] = df['date_from'].dt.month
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 4 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   date_from         100 non-null    datetime64[ns]
 1   millones_dolares  100 non-null    float64       
 2   frequency         100 non-null    object        
 3   month             100 non-null    int32         
dtypes: datetime64[ns](1), float64(1), int32(1), object(1)
memory usage: 2.9+ KB


In [None]:
#además se va a extraer el año de la fecha

In [17]:
df['year'] = df['date_from'].dt.year
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 5 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   date_from         100 non-null    datetime64[ns]
 1   millones_dolares  100 non-null    float64       
 2   frequency         100 non-null    object        
 3   month             100 non-null    int32         
 4   year              100 non-null    int32         
dtypes: datetime64[ns](1), float64(1), int32(2), object(1)
memory usage: 3.2+ KB


In [18]:
df.groupby('year').millones_dolares.median()

year
1992    286.407293
1993    237.601762
1994    262.605392
1995    346.027128
1996    486.819529
1997    489.810426
1998    516.632990
1999    387.962121
2000    460.438146
Name: millones_dolares, dtype: float64

In [None]:
#se va a incluir una nueva feature que muestra la tasa de varición mensual respecto al periodo anterior

In [19]:
df['tasa_variacion_mensual'] = df.millones_dolares.pct_change()

In [20]:
#reemplazamos el valor nulo de la primer fila por 0
df['tasa_variacion_mensual'].fillna(0,inplace=True)

In [23]:
print(df.head())
print(df.info())

   date_from  millones_dolares frequency  month  year  tasa_variacion_mensual
0 1992-01-01        133.367679   Mensual      1  1992                0.000000
1 1992-02-01        188.034116   Mensual      2  1992                0.409893
2 1992-03-01        320.984466   Mensual      3  1992                0.707054
3 1992-04-01        375.242075   Mensual      4  1992                0.169035
4 1992-05-01        474.440656   Mensual      5  1992                0.264359
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 6 columns):
 #   Column                  Non-Null Count  Dtype         
---  ------                  --------------  -----         
 0   date_from               100 non-null    datetime64[ns]
 1   millones_dolares        100 non-null    float64       
 2   frequency               100 non-null    object        
 3   month                   100 non-null    int32         
 4   year                    100 non-null    int32         
 5   tasa

In [None]:
# Existencia de duplicados

In [24]:
if df.duplicated().sum() == 0:
    print("No existen duplicados")
else:
    print("Existen {cantidad} registros duplicados".format(cantidad = df.duplicated.sum()))

No existen duplicados


In [None]:
#convertimos el DataFrame de pandas a spark

In [25]:
df_to_write = spark.createDataFrame(df)

## 4) Carga de datos en tabla de Amazon Redshift

In [26]:
df_to_write.show()

+-------------------+----------------+---------+-----+----+----------------------+
|          date_from|millones_dolares|frequency|month|year|tasa_variacion_mensual|
+-------------------+----------------+---------+-----+----+----------------------+
|1992-01-01 00:00:00|      133.367679|  Mensual|    1|1992|                   0.0|
|1992-02-01 00:00:00|      188.034116|  Mensual|    2|1992|    0.4098926922166801|
|1992-03-01 00:00:00|      320.984466|  Mensual|    3|1992|    0.7070544049570238|
|1992-04-01 00:00:00|      375.242075|  Mensual|    4|1992|   0.16903499934479704|
|1992-05-01 00:00:00|      474.440656|  Mensual|    5|1992|   0.26435889685345115|
|1992-06-01 00:00:00|      442.905698|  Mensual|    6|1992|  -0.06646765533517018|
|1992-07-01 00:00:00|      455.983718|  Mensual|    7|1992|  0.029527775458874306|
|1992-08-01 00:00:00|       346.11888|  Mensual|    8|1992|   -0.2409402653276318|
|1992-09-01 00:00:00|       251.83012|  Mensual|    9|1992|  -0.27241726888749895|
|199

In [31]:
df_to_write.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"{env['AWS_REDSHIFT_SCHEMA']}.exportaciones_productos_primarios_2") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

In [32]:
# Query Redshift using Spark SQL
query = f"select * from {env['AWS_REDSHIFT_SCHEMA']}.exportaciones_productos_primarios_2"
data = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"({query}) as tmp_table") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [33]:
data.printSchema()
data.show()

root
 |-- date_from: timestamp (nullable = true)
 |-- millones_dolares: double (nullable = true)
 |-- frequency: string (nullable = true)
 |-- month: long (nullable = true)
 |-- year: long (nullable = true)
 |-- tasa_variacion_mensual: double (nullable = true)

+-------------------+----------------+---------+-----+----+----------------------+
|          date_from|millones_dolares|frequency|month|year|tasa_variacion_mensual|
+-------------------+----------------+---------+-----+----+----------------------+
|1992-01-01 00:00:00|      133.367679|  Mensual|    1|1992|                   0.0|
|1992-02-01 00:00:00|      188.034116|  Mensual|    2|1992|    0.4098926922166801|
|1992-03-01 00:00:00|      320.984466|  Mensual|    3|1992|    0.7070544049570238|
|1992-04-01 00:00:00|      375.242075|  Mensual|    4|1992|   0.16903499934479704|
|1992-05-01 00:00:00|      474.440656|  Mensual|    5|1992|   0.26435889685345115|
|1992-06-01 00:00:00|      442.905698|  Mensual|    6|1992|  -0.0664676553

In [34]:
conn.close()