# Entregable 1

## Consigna:

Script que extraiga datos de una API pública y crear la tabla en Redshift para posterior carga de sus datos.

### Objetivos generales

✓ Tener un código inicial que será usado
en el proyecto final como un script ETL
inicial.

### Objetivos específicos

✓ El script debería extraer datos en JSON
y poder leer el formato en un
diccionario de Python.

✓ La entrega involucra la creación de una
versión inicial de la tabla donde los
datos serán cargados posteriormente.

Esto lo vamos a llevar a cabo usando `requests`, `Spark` y un driver de conexión de `Postgres`

## 1) Bajar datos de una API en formato JSON

Para este ejemplo vamos a usar la API de [Datos Argentina](https://www.datos.gob.ar/)

Y nos vamos a traer los datos de: Exportaciones de productos primarios. En millones de dólares FOB

Para probar la API ir a: [API de Series de Tiempo AR: Generador de URLs](https://datosgobar.github.io/series-tiempo-ar-call-generator/)

In [1]:
import numpy as np
import pandas as pd
import json 
import requests
import math
from datetime import timedelta
from astral import LocationInfo 
from astral.location import Location
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import StandardScaler
from joblib import dump, load
from pyspark.sql.functions import col, substring

In [2]:
import requests
import urllib.parse

def get_api_call(ids, **kwargs):
    API_BASE_URL = "https://apis.datos.gob.ar/series/api/"
    kwargs["ids"] = ",".join(ids)
    return "{}{}?{}".format(API_BASE_URL, "series", urllib.parse.urlencode(kwargs))

In [3]:
# Ejemplo: https://apis.datos.gob.ar/series/api/series?ids=75.3_IEC_0_M_26&start_date=2020-01-01
api_call = get_api_call(["74.3_IEPP_0_M_35"], start_date="1992-01-01")
print(api_call)

https://apis.datos.gob.ar/series/api/series?start_date=1992-01-01&ids=74.3_IEPP_0_M_35


In [4]:
result = requests.get(api_call).json()
print(result)

{'data': [['1992-01-01', 133.367679], ['1992-02-01', 188.034116], ['1992-03-01', 320.984466], ['1992-04-01', 375.242075], ['1992-05-01', 474.440656], ['1992-06-01', 442.905698], ['1992-07-01', 455.983718], ['1992-08-01', 346.11888], ['1992-09-01', 251.83012], ['1992-10-01', 180.399972], ['1992-11-01', 152.657852], ['1992-12-01', 170.560837], ['1993-01-01', 226.344594], ['1993-02-01', 218.95773], ['1993-03-01', 376.002446], ['1993-04-01', 375.657342], ['1993-05-01', 470.956346], ['1993-06-01', 402.156012], ['1993-07-01', 309.188763], ['1993-08-01', 245.806947], ['1993-09-01', 229.396576], ['1993-10-01', 144.645589], ['1993-11-01', 113.595346], ['1993-12-01', 160.927861], ['1994-01-01', 174.410652], ['1994-02-01', 198.24988], ['1994-03-01', 280.889311], ['1994-04-01', 387.628872], ['1994-05-01', 638.094045], ['1994-06-01', 502.878474], ['1994-07-01', 379.775061], ['1994-08-01', 310.702685], ['1994-09-01', 244.321473], ['1994-10-01', 194.560862], ['1994-11-01', 185.216891], ['1994-12-01',

## 2) Creación de la tabla y conexiones

In [5]:
!pip install psycopg2-binary



In [6]:
# Crear sesion de Spark
import os
import psycopg2

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col

# Postgres and Redshift JDBCs
driver_path = "/home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar"

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

In [7]:
env = os.environ

In [8]:
# Connect to Redshift using psycopg2
conn = psycopg2.connect(
    host=env['AWS_REDSHIFT_HOST'],
    port=env['AWS_REDSHIFT_PORT'],
    dbname=env['AWS_REDSHIFT_DBNAME'],
    user=env['AWS_REDSHIFT_USER'],
    password=env['AWS_REDSHIFT_PASSWORD']
)

In [9]:
cursor = conn.cursor()
cursor.execute(f"""
create table if not exists {env['AWS_REDSHIFT_SCHEMA']}.exportaciones_productos_primarios (
    date_from VARCHAR(10) distkey,
    millones_dolares decimal(10,2),
    frequency varchar(12),
    month varchar(12)
) sortkey(date_from);
""")
conn.commit()
cursor.close()
print("Table created!")

Table created!


In [10]:
cursor = conn.cursor()
cursor.execute(f"""
SELECT
  distinct tablename
FROM
  PG_TABLE_DEF
WHERE
  schemaname = '{env['AWS_REDSHIFT_SCHEMA']}';
""")
# resultado = cursor.fetchall()
print(", ".join(map(lambda x: x[0], cursor.fetchall())))
cursor.close()

exportaciones_productos_primarios


## 3) Transformación de la información extraída de la API

In [11]:
# Create the DataFrame with the specified column names
df = spark.createDataFrame(result['data'], ["date_from", "millones_dolares"])

In [12]:
df.printSchema()
df.show()

root
 |-- date_from: string (nullable = true)
 |-- millones_dolares: double (nullable = true)

+----------+----------------+
| date_from|millones_dolares|
+----------+----------------+
|1992-01-01|      133.367679|
|1992-02-01|      188.034116|
|1992-03-01|      320.984466|
|1992-04-01|      375.242075|
|1992-05-01|      474.440656|
|1992-06-01|      442.905698|
|1992-07-01|      455.983718|
|1992-08-01|       346.11888|
|1992-09-01|       251.83012|
|1992-10-01|      180.399972|
|1992-11-01|      152.657852|
|1992-12-01|      170.560837|
|1993-01-01|      226.344594|
|1993-02-01|       218.95773|
|1993-03-01|      376.002446|
|1993-04-01|      375.657342|
|1993-05-01|      470.956346|
|1993-06-01|      402.156012|
|1993-07-01|      309.188763|
|1993-08-01|      245.806947|
+----------+----------------+
only showing top 20 rows



In [16]:
#se incorporá la frecuencia de la data

In [13]:
df_to_write = df.withColumn('frequency', lit('Mensual'))
df_to_write.printSchema()
df_to_write.show()

root
 |-- date_from: string (nullable = true)
 |-- millones_dolares: double (nullable = true)
 |-- frequency: string (nullable = false)

+----------+----------------+---------+
| date_from|millones_dolares|frequency|
+----------+----------------+---------+
|1992-01-01|      133.367679|  Mensual|
|1992-02-01|      188.034116|  Mensual|
|1992-03-01|      320.984466|  Mensual|
|1992-04-01|      375.242075|  Mensual|
|1992-05-01|      474.440656|  Mensual|
|1992-06-01|      442.905698|  Mensual|
|1992-07-01|      455.983718|  Mensual|
|1992-08-01|       346.11888|  Mensual|
|1992-09-01|       251.83012|  Mensual|
|1992-10-01|      180.399972|  Mensual|
|1992-11-01|      152.657852|  Mensual|
|1992-12-01|      170.560837|  Mensual|
|1993-01-01|      226.344594|  Mensual|
|1993-02-01|       218.95773|  Mensual|
|1993-03-01|      376.002446|  Mensual|
|1993-04-01|      375.657342|  Mensual|
|1993-05-01|      470.956346|  Mensual|
|1993-06-01|      402.156012|  Mensual|
|1993-07-01|      309.1

In [15]:
#otra transformación que se hará, es extraer el mes de la fecha

In [14]:
df_to_write = df_to_write.withColumn('month', substring(col('date_from'),6,2))
df_to_write.printSchema()
df_to_write.show()

root
 |-- date_from: string (nullable = true)
 |-- millones_dolares: double (nullable = true)
 |-- frequency: string (nullable = false)
 |-- month: string (nullable = true)

+----------+----------------+---------+-----+
| date_from|millones_dolares|frequency|month|
+----------+----------------+---------+-----+
|1992-01-01|      133.367679|  Mensual|   01|
|1992-02-01|      188.034116|  Mensual|   02|
|1992-03-01|      320.984466|  Mensual|   03|
|1992-04-01|      375.242075|  Mensual|   04|
|1992-05-01|      474.440656|  Mensual|   05|
|1992-06-01|      442.905698|  Mensual|   06|
|1992-07-01|      455.983718|  Mensual|   07|
|1992-08-01|       346.11888|  Mensual|   08|
|1992-09-01|       251.83012|  Mensual|   09|
|1992-10-01|      180.399972|  Mensual|   10|
|1992-11-01|      152.657852|  Mensual|   11|
|1992-12-01|      170.560837|  Mensual|   12|
|1993-01-01|      226.344594|  Mensual|   01|
|1993-02-01|       218.95773|  Mensual|   02|
|1993-03-01|      376.002446|  Mensual|   03

## 4) Carga de datos en tabla de Amazon Redshift

In [17]:
df_to_write.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"{env['AWS_REDSHIFT_SCHEMA']}.exportaciones_productos_primarios") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

In [18]:
# Query Redshift using Spark SQL
query = f"select * from {env['AWS_REDSHIFT_SCHEMA']}.exportaciones_productos_primarios"
data = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"({query}) as tmp_table") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [19]:
data.printSchema()
data.show()

root
 |-- date_from: string (nullable = true)
 |-- millones_dolares: double (nullable = true)
 |-- frequency: string (nullable = true)
 |-- month: string (nullable = true)

+----------+----------------+---------+-----+
| date_from|millones_dolares|frequency|month|
+----------+----------------+---------+-----+
|1992-01-01|      133.367679|  Mensual|   01|
|1992-02-01|      188.034116|  Mensual|   02|
|1992-03-01|      320.984466|  Mensual|   03|
|1992-04-01|      375.242075|  Mensual|   04|
|1992-05-01|      474.440656|  Mensual|   05|
|1992-06-01|      442.905698|  Mensual|   06|
|1992-07-01|      455.983718|  Mensual|   07|
|1992-08-01|       346.11888|  Mensual|   08|
|1992-09-01|       251.83012|  Mensual|   09|
|1992-10-01|      180.399972|  Mensual|   10|
|1992-11-01|      152.657852|  Mensual|   11|
|1992-12-01|      170.560837|  Mensual|   12|
|1993-01-01|      226.344594|  Mensual|   01|
|1993-02-01|       218.95773|  Mensual|   02|
|1993-03-01|      376.002446|  Mensual|   03|

In [20]:
conn.close()