# Entregable 1

Pasos a seguir:
* Bajar datos de una API en formato JSON
* Cargar datos en la tabla de Redshift

Esto lo haremos usando la libreria `requests`, `Spark` y un driver de conexión de `Postgres`

## 1) Bajar datos de una API en formato JSON
Usaremos la API de [Datos Argentina](https://www.datos.gob.ar/)

Y nos vamos a traer los datos de: De cantidad de automoviles producidos y exportados mensualmente

Para probar la API ir a: [API de Series de Tiempo AR: Generador de URLs](https://datosgobar.github.io/series-tiempo-ar-call-generator/)

In [1]:
#importamos la Libreria request y definimos una funcion para llamar a la api

import requests
import urllib.parse

def get_api_call(ids, **kwargs):
    API_BASE_URL = "https://apis.datos.gob.ar/series/api/"
    kwargs["ids"] = ",".join(ids)
    return "{}{}?{}".format(API_BASE_URL, "series", urllib.parse.urlencode(kwargs))

In [2]:
# Ejemplo: https://apis.datos.gob.ar/series/api/series?ids=Automotriz_produccion_s2nqOo,Automotriz_expos_ItCfsr
api_call = get_api_call(["Automotriz_produccion_s2nqOo,Automotriz_expos_ItCfsr"])
print(api_call)

https://apis.datos.gob.ar/series/api/series?ids=Automotriz_produccion_s2nqOo%2CAutomotriz_expos_ItCfsr


In [3]:
#Una vez genereado el link usamos request para traernos los datos del la api e imprimimos
result = requests.get(api_call).json()
print(result)

{'data': [['2003-01-01', 8752.0, 9504.0], ['2003-02-01', 9907.0, 7216.0], ['2003-03-01', 13906.0, 9454.0], ['2003-04-01', 15558.0, 10656.0], ['2003-05-01', 14134.0, 8671.0], ['2003-06-01', 13514.0, 8715.0], ['2003-07-01', 12134.0, 8633.0], ['2003-08-01', 15350.0, 9012.0], ['2003-09-01', 16342.0, 9999.0], ['2003-10-01', 16714.0, 8838.0], ['2003-11-01', 19403.0, 7646.0], ['2003-12-01', 13462.0, 9713.0], ['2004-01-01', 12016.0, 5323.0], ['2004-02-01', 14152.0, 10250.0], ['2004-03-01', 22190.0, 10857.0], ['2004-04-01', 20542.0, 11559.0], ['2004-05-01', 21513.0, 11539.0], ['2004-06-01', 22476.0, 8567.0], ['2004-07-01', 21203.0, 13011.0], ['2004-08-01', 23568.0, 12648.0], ['2004-09-01', 25635.0, 14900.0], ['2004-10-01', 25459.0, 16473.0], ['2004-11-01', 28861.0, 16448.0], ['2004-12-01', 22787.0, 14661.0], ['2005-01-01', 21543.0, 10804.0], ['2005-02-01', 12871.0, 9007.0], ['2005-03-01', 27948.0, 15134.0], ['2005-04-01', 27593.0, 14291.0], ['2005-05-01', 26675.0, 14137.0], ['2005-06-01', 26573

##### (Ya con los datos de la api seguimos al segundo paso que seria cargar los datos en la tabla de Redshift)
## 2) Cargar datos en la tabla de Redshift

```SQL
create table if not exists fabiolecce93_coderhouse.vehiculos_producidos_vs_exportados (
    date_from VARCHAR(10) distkey,
    vehiculos_producidos decimal(10),
    vehiculos_exportados decimal(10),
    frequency VARCHAR(12)
) sortkey(date_from);
```

In [4]:
#intalamos psycopg2-binary
!pip install psycopg2-binary



In [5]:
# Crear sesion de Spark
import os
import psycopg2

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col

# Postgres and Redshift JDBCs
driver_path = "/home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar"

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

In [6]:
#creamos la variable env con os
env = os.environ

In [7]:
# Nos conectamos a Redshift usando psycopg2 y antes debemos tener el archivo .env creado con las credenciales
conn = psycopg2.connect(
    host=env['AWS_REDSHIFT_HOST'],
    port=env['AWS_REDSHIFT_PORT'],
    dbname=env['AWS_REDSHIFT_DBNAME'],
    user=env['AWS_REDSHIFT_USER'],
    password=env['AWS_REDSHIFT_PASSWORD']
)

In [8]:
#Creamos la tabla si no existe en el schema e imprimimos, si se creo la tabla se muestra "Tabla creada!"
cursor = conn.cursor()
cursor.execute(f"""
create table if not exists {env['AWS_REDSHIFT_SCHEMA']}.vehiculos_producidos_vs_exportados (
    date_from VARCHAR(10) distkey,
    vehiculos_producidos decimal(10),
    vehiculos_exportados decimal(10),
    frequency varchar(12)
) sortkey(date_from);
""")
conn.commit()
cursor.close()
print("Tabla creada!")

Tabla creada!


In [9]:
#comprobamos si se creo la tabla revisando los diferentes nombre de las tablas que tiene el schema
cursor = conn.cursor()
cursor.execute(f"""
SELECT
  distinct tablename
FROM
  PG_TABLE_DEF
WHERE
  schemaname = '{env['AWS_REDSHIFT_SCHEMA']}';
""")
# resultado = cursor.fetchall()
print(", ".join(map(lambda x: x[0], cursor.fetchall())))
cursor.close()

vehiculos_exportados, vehiculos_producidos_vs_exportados


In [10]:
# Creamos el dataFrame con las diferentes columnas
df = spark.createDataFrame(result['data'], ["date_from", "vehiculos_producidos", "vehiculos_exportados"])

In [27]:
#imprimimos dataframe para ver los datos
df.printSchema()
df.show()

root
 |-- date_from: string (nullable = true)
 |-- vehiculos_producidos: double (nullable = true)
 |-- vehiculos_exportados: double (nullable = true)

+----------+--------------------+--------------------+
| date_from|vehiculos_producidos|vehiculos_exportados|
+----------+--------------------+--------------------+
|2003-01-01|              8752.0|              9504.0|
|2003-02-01|              9907.0|              7216.0|
|2003-03-01|             13906.0|              9454.0|
|2003-04-01|             15558.0|             10656.0|
|2003-05-01|             14134.0|              8671.0|
|2003-06-01|             13514.0|              8715.0|
|2003-07-01|             12134.0|              8633.0|
|2003-08-01|             15350.0|              9012.0|
|2003-09-01|             16342.0|              9999.0|
|2003-10-01|             16714.0|              8838.0|
|2003-11-01|             19403.0|              7646.0|
|2003-12-01|             13462.0|              9713.0|
|2004-01-01|            

In [33]:
#Haremos una columna que sera el calculo del porcentaje de exportados sobre los producidos: 100*(exportados/producidos)(%)
df2 = df.withColumn('%_Porcentaje_exportacion', 100*df.vehiculos_exportados/df.vehiculos_producidos)
df2.printSchema()
df2.show()

root
 |-- date_from: string (nullable = true)
 |-- vehiculos_producidos: double (nullable = true)
 |-- vehiculos_exportados: double (nullable = true)
 |-- %_Porcentaje_exportacion: double (nullable = true)

+----------+--------------------+--------------------+------------------------+
| date_from|vehiculos_producidos|vehiculos_exportados|%_Porcentaje_exportacion|
+----------+--------------------+--------------------+------------------------+
|2003-01-01|              8752.0|              9504.0|      108.59232175502743|
|2003-02-01|              9907.0|              7216.0|       72.83738770566266|
|2003-03-01|             13906.0|              9454.0|       67.98504242772904|
|2003-04-01|             15558.0|             10656.0|       68.49209409949864|
|2003-05-01|             14134.0|              8671.0|       61.34852129616527|
|2003-06-01|             13514.0|              8715.0|       64.48867840757732|
|2003-07-01|             12134.0|              8633.0|       71.147189714

In [32]:
#Agregaremos otra columna literaria donde esrcibiremos la frecuencia que sera mensual
df_to_write = df2.withColumn('frequency', lit('Mensual'))
df_to_write.printSchema()
df_to_write.show()

root
 |-- date_from: string (nullable = true)
 |-- vehiculos_producidos: double (nullable = true)
 |-- vehiculos_exportados: double (nullable = true)
 |-- %_Porcentaje_exportacion: double (nullable = true)
 |-- frequency: string (nullable = false)

+----------+--------------------+--------------------+------------------------+---------+
| date_from|vehiculos_producidos|vehiculos_exportados|%_Porcentaje_exportacion|frequency|
+----------+--------------------+--------------------+------------------------+---------+
|2003-01-01|              8752.0|              9504.0|      108.59232175502743|  Mensual|
|2003-02-01|              9907.0|              7216.0|       72.83738770566266|  Mensual|
|2003-03-01|             13906.0|              9454.0|       67.98504242772904|  Mensual|
|2003-04-01|             15558.0|             10656.0|       68.49209409949864|  Mensual|
|2003-05-01|             14134.0|              8671.0|       61.34852129616527|  Mensual|
|2003-06-01|             13514.

In [24]:
df_to_write.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"{env['AWS_REDSHIFT_SCHEMA']}.vehiculos_producidos_vs_exportados") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

In [25]:
# Query Redshift usando Spark SQL
query = f"select * from {env['AWS_REDSHIFT_SCHEMA']}.vehiculos_producidos_vs_exportados"
data = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"({query}) as tmp_table") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [26]:
#imprimimos como que la data en la tabla
data.printSchema()
data.show()

root
 |-- date_from: string (nullable = true)
 |-- vehiculos_producidos: double (nullable = true)
 |-- vehiculos_exportados: double (nullable = true)
 |-- %_porcentaje_exportacion: double (nullable = true)
 |-- frequency: string (nullable = true)

+----------+--------------------+--------------------+------------------------+---------+
| date_from|vehiculos_producidos|vehiculos_exportados|%_porcentaje_exportacion|frequency|
+----------+--------------------+--------------------+------------------------+---------+
|2003-01-01|              8752.0|              9504.0|      108.59232175502743|  Mensual|
|2003-02-01|              9907.0|              7216.0|       72.83738770566266|  Mensual|
|2003-03-01|             13906.0|              9454.0|       67.98504242772904|  Mensual|
|2003-04-01|             15558.0|             10656.0|       68.49209409949864|  Mensual|
|2003-05-01|             14134.0|              8671.0|       61.34852129616527|  Mensual|
|2003-06-01|             13514.0

In [31]:
#Cerramos la conexión
conn.close()