# Entregable 1

Para este entregable, use una api publica de [Datos Argentina](https://datos.gob.ar/) que traiga los datos de la Tasa de Política Monetaria desde el primero de enero de 2021.

Hago las importaciones requeridas

In [1]:
import requests
import urllib.parse

Defino la función que me va a armar la URL

In [2]:
def get_api_call(ids,**kwargs):
    API_BASE_URL = "https://apis.datos.gob.ar/series/api/"
    kwargs["ids"]=",".join(ids)
    return "{}{}?{}".format(API_BASE_URL,"series",urllib.parse.urlencode(kwargs))

Obtengo la URL

In [3]:
api_call = get_api_call(["458.1_PATIO_COMIDAS_ABRI_M_13_59"],start_date="2019-01-01")
print(api_call)

https://apis.datos.gob.ar/series/api/series?start_date=2019-01-01&ids=458.1_PATIO_COMIDAS_ABRI_M_13_59


Extraigo el json

In [4]:
result = requests.get(api_call).json()
print(result)

{'data': [['2019-01-01', 1698016633.0], ['2019-02-01', 1522825858.0], ['2019-03-01', 1691796925.0], ['2019-04-01', 1621048103.9999998], ['2019-05-01', 1695702429.0], ['2019-06-01', 2123808021.0], ['2019-07-01', 2597287631.9999995], ['2019-08-01', 2084611571.0], ['2019-09-01', 1922844552.0], ['2019-10-01', 2156089695.0000005], ['2019-11-01', 2113445608.0], ['2019-12-01', 2695757975.0], ['2020-01-01', 2517475806.0000005], ['2020-02-01', 2582165442.0], ['2020-03-01', 1139797040.0], ['2020-04-01', 73371637.00000001], ['2020-05-01', 207465311.0], ['2020-06-01', 350476964.0], ['2020-07-01', 495231924.0], ['2020-08-01', 557869407.0], ['2020-09-01', 504654749.00000006], ['2020-10-01', 712014068.9999999], ['2020-11-01', 1116205106.0000002], ['2020-12-01', 1915016864.0], ['2021-01-01', 2042929616.0], ['2021-02-01', 2056192092.0], ['2021-03-01', 2308494267.0], ['2021-04-01', 1703331046.0], ['2021-05-01', 1157837784.0], ['2021-06-01', 1657915412.0], ['2021-07-01', 3537280618.0], ['2021-08-01', 319

Instalo psycopg2

In [5]:
!pip install psycopg2-binary



Realizo las importaciones necesarias

In [6]:
import os
import psycopg2
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col

Cargo el driver

In [7]:
driver_path = "/home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar"
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH']=driver_path

Creo la sesion en spark

In [8]:
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars",driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate ()

Cargo en una variable llamada env las variables de entorno

In [9]:
env= os.environ

Creo una conexion usando psycopg2

In [10]:
conn = psycopg2.connect(
    host=env['AWS_REDSHIFT_HOST'],
    port=env['AWS_REDSHIFT_PORT'],
    dbname=env['AWS_REDSHIFT_DBNAME'],
    user=env['AWS_REDSHIFT_USER'],
    password=env['AWS_REDSHIFT_PASSWORD']
)

Creo un cursor para ejecutar queries en la conexion.

In [11]:
cursor = conn.cursor()
cursor.execute(f"""
create table if not exists {env['AWS_REDSHIFT_SCHEMA']}.ventas_centros_compras_comida (
    date_from VARCHAR(10) distkey,
    millones_pesos decimal(10,2),
    frequency varchar(12)
) sortkey(date_from);
""")
conn.commit()
cursor.close()
print("Table created!")

Table created!


Verifico que efectivamente la tabla haya sido creada

In [12]:
cursor = conn.cursor()
cursor.execute(f"""
SELECT
    distinct tablename
FROM
    PG_TABLE_DEF
WHERE
    schemaname='{env['AWS_REDSHIFT_SCHEMA']}';
""")
resultado = cursor.fetchall()
print(", ".join(map(lambda x: x[0],resultado)))
cursor.close()

ventas_centros_compras_comida


Una vez verificado que se creo la tabla, paso a trabajar con los datos. Creo un dataframe con los datos del json.

In [13]:
df=spark.createDataFrame(result['data'],['date_from','ventas_pesos'])
df.printSchema()
df.show()

root
 |-- date_from: string (nullable = true)
 |-- ventas_pesos: double (nullable = true)

+----------+--------------------+
| date_from|        ventas_pesos|
+----------+--------------------+
|2019-01-01|       1.698016633E9|
|2019-02-01|       1.522825858E9|
|2019-03-01|       1.691796925E9|
|2019-04-01|1.6210481039999998E9|
|2019-05-01|       1.695702429E9|
|2019-06-01|       2.123808021E9|
|2019-07-01|2.5972876319999995E9|
|2019-08-01|       2.084611571E9|
|2019-09-01|       1.922844552E9|
|2019-10-01|2.1560896950000005E9|
|2019-11-01|       2.113445608E9|
|2019-12-01|       2.695757975E9|
|2020-01-01|2.5174758060000005E9|
|2020-02-01|       2.582165442E9|
|2020-03-01|        1.13979704E9|
|2020-04-01| 7.337163700000001E7|
|2020-05-01|        2.07465311E8|
|2020-06-01|        3.50476964E8|
|2020-07-01|        4.95231924E8|
|2020-08-01|        5.57869407E8|
+----------+--------------------+
only showing top 20 rows



Agrego una nueva columna con el numero de semana ISO y año otro para el mes y año. Para ello, necesito importar date_format de pyspark.sql.functions

In [14]:
from pyspark.sql.functions import date_format,expr,concat_ws,lpad,weekofyear
df_MonthYear = df.withColumn("month_year", date_format(col("date_from"), "MMM-yy"))
df_to_write = df_MonthYear.withColumn("iso_week_year", concat_ws("-",expr("EXTRACT(YEAROFWEEK FROM date_from)"), lpad(weekofyear('date_from'), 3, "W0")))
df_to_write.printSchema()
df_to_write.show()

root
 |-- date_from: string (nullable = true)
 |-- ventas_pesos: double (nullable = true)
 |-- month_year: string (nullable = true)
 |-- iso_week_year: string (nullable = false)

+----------+--------------------+----------+-------------+
| date_from|        ventas_pesos|month_year|iso_week_year|
+----------+--------------------+----------+-------------+
|2019-01-01|       1.698016633E9|    Jan-19|     2019-W01|
|2019-02-01|       1.522825858E9|    Feb-19|     2019-W05|
|2019-03-01|       1.691796925E9|    Mar-19|     2019-W09|
|2019-04-01|1.6210481039999998E9|    Apr-19|     2019-W14|
|2019-05-01|       1.695702429E9|    May-19|     2019-W18|
|2019-06-01|       2.123808021E9|    Jun-19|     2019-W22|
|2019-07-01|2.5972876319999995E9|    Jul-19|     2019-W27|
|2019-08-01|       2.084611571E9|    Aug-19|     2019-W31|
|2019-09-01|       1.922844552E9|    Sep-19|     2019-W35|
|2019-10-01|2.1560896950000005E9|    Oct-19|     2019-W40|
|2019-11-01|       2.113445608E9|    Nov-19|     2019-

Para grabar los datos, planteo:

In [15]:
df_to_write.write \
    .format("jdbc")\
    .option("url",f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}")\
    .option("dbtable",f"{env['AWS_REDSHIFT_SCHEMA']}.ventas_centros_compras_comida") \
    .option("user",env['AWS_REDSHIFT_USER'])\
    .option("password",env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver","org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

Verifico que los datos se hayan cargado

In [16]:
query= f" SELECT * FROM {env['AWS_REDSHIFT_SCHEMA']}.ventas_centros_compras_comida"
data = spark.read \
    .format("jdbc") \
    .option("url",f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}")\
    .option("dbtable",f"({query}) as tmp_table") \
    .option("user",env['AWS_REDSHIFT_USER'])\
    .option("password",env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver","org.postgresql.Driver") \
    .load()
data.printSchema()
data.show()

root
 |-- date_from: string (nullable = true)
 |-- ventas_pesos: double (nullable = true)
 |-- month_year: string (nullable = true)
 |-- iso_week_year: string (nullable = true)

+----------+--------------------+----------+-------------+
| date_from|        ventas_pesos|month_year|iso_week_year|
+----------+--------------------+----------+-------------+
|2019-01-01|       1.698016633E9|    Jan-19|     2019-W01|
|2019-02-01|       1.522825858E9|    Feb-19|     2019-W05|
|2019-03-01|       1.691796925E9|    Mar-19|     2019-W09|
|2019-04-01|1.6210481039999998E9|    Apr-19|     2019-W14|
|2019-05-01|       1.695702429E9|    May-19|     2019-W18|
|2019-06-01|       2.123808021E9|    Jun-19|     2019-W22|
|2019-07-01|2.5972876319999995E9|    Jul-19|     2019-W27|
|2019-08-01|       2.084611571E9|    Aug-19|     2019-W31|
|2019-09-01|       1.922844552E9|    Sep-19|     2019-W35|
|2019-10-01|2.1560896950000005E9|    Oct-19|     2019-W40|
|2019-11-01|       2.113445608E9|    Nov-19|     2019-W