In [2]:
import requests
import urllib.parse

#consumo datos de la api datos.gob.ar
def get_api_call(ids, **kwargs):
    API_BASE_URL = "https://apis.datos.gob.ar/series/api/"
    kwargs["ids"] = ",".join(ids)
    return "{}{}?{}".format(API_BASE_URL, "series", urllib.parse.urlencode(kwargs))   

In [3]:
# JSON de precios del petroleo 
api_call = get_api_call(["Precio_petroleo_RgR2vi"])
print(api_call)

https://apis.datos.gob.ar/series/api/series?ids=Precio_petroleo_RgR2vi


In [4]:
#extraigo el JSON
result = requests.get(api_call).json()
print(result)

{'data': [['1980-01-01', 37.0], ['1980-02-01', 37.0], ['1980-03-01', 39.5], ['1980-04-01', 39.5], ['1980-05-01', 39.5], ['1980-06-01', 39.5], ['1980-07-01', 39.5], ['1980-08-01', 38.0], ['1980-09-01', 36.0], ['1980-10-01', 36.0], ['1980-11-01', 36.0], ['1980-12-01', 37.0], ['1981-01-01', 38.0], ['1981-02-01', 38.0], ['1981-03-01', 38.0], ['1981-04-01', 38.0], ['1981-05-01', 36.0], ['1981-06-01', 36.0], ['1981-07-01', 35.0], ['1981-08-01', 35.0], ['1981-09-01', 35.0], ['1981-10-01', 35.0], ['1981-11-01', 35.0], ['1981-12-01', 35.0], ['1982-01-01', 35.0], ['1982-02-01', 34.0], ['1982-03-01', 33.0], ['1982-04-01', 32.0], ['1982-05-01', 32.0], ['1982-06-01', 32.0], ['1982-07-01', 32.0], ['1982-08-01', 32.0], ['1982-09-01', 32.0], ['1982-10-01', 33.0], ['1982-11-01', 33.0], ['1982-12-01', 32.0], ['1983-01-01', 32.0], ['1983-02-01', 30.0], ['1983-03-01', 29.2], ['1983-04-01', 30.5], ['1983-05-01', 30.2], ['1983-06-01', 31.0], ['1983-07-01', 31.6], ['1983-08-01', 31.9], ['1983-09-01', 31.2], 

In [7]:
!pip install psycopg2-binary



In [8]:
!pip install pyspark



In [9]:
# Crear sesion de Spark
import os
import psycopg2

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col

# Postgres and Redshift JDBCs
driver_path = "/home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar"

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()


In [10]:
env = os.environ

In [11]:
connection = psycopg2.connect(
    host=env['AWS_REDSHIFT_HOST'],
    port=env['AWS_REDSHIFT_PORT'],
    dbname=env['AWS_REDSHIFT_DBNAME'],
    user=env['AWS_REDSHIFT_USER'],
    password=env['AWS_REDSHIFT_PASSWORD'],
)

In [12]:
# creacion de tabla en redshift           
cursor = connection.cursor()
cursor.execute(f"""
create table if not exists {env['AWS_REDSHIFT_SCHEMA']}.precio_petroleo (
    fecha VARCHAR(10) distkey,
    precio DECIMAL(6,1),
    frecuencia VARCHAR(12)
) sortkey(fecha);
""")
connection.commit()
cursor.close()
print("Table created!")

Table created!


In [14]:
# Creacion del DataFrame con los datos 
df = spark.createDataFrame(result['data'],["fecha", "precio"])
df.printSchema()
df.show()

root
 |-- fecha: string (nullable = true)
 |-- precio: double (nullable = true)

+----------+------+
|     fecha|precio|
+----------+------+
|1980-01-01|  37.0|
|1980-02-01|  37.0|
|1980-03-01|  39.5|
|1980-04-01|  39.5|
|1980-05-01|  39.5|
|1980-06-01|  39.5|
|1980-07-01|  39.5|
|1980-08-01|  38.0|
|1980-09-01|  36.0|
|1980-10-01|  36.0|
|1980-11-01|  36.0|
|1980-12-01|  37.0|
|1981-01-01|  38.0|
|1981-02-01|  38.0|
|1981-03-01|  38.0|
|1981-04-01|  38.0|
|1981-05-01|  36.0|
|1981-06-01|  36.0|
|1981-07-01|  35.0|
|1981-08-01|  35.0|
+----------+------+
only showing top 20 rows



In [19]:
df_to_write = df.withColumn('frecuencia', lit('Mensual'))
df_to_write.printSchema()
df_to_write.show()

root
 |-- fecha: string (nullable = true)
 |-- precio: double (nullable = true)
 |-- frecuencia: string (nullable = false)

+----------+------+----------+
|     fecha|precio|frecuencia|
+----------+------+----------+
|1980-01-01|  37.0|   Mensual|
|1980-02-01|  37.0|   Mensual|
|1980-03-01|  39.5|   Mensual|
|1980-04-01|  39.5|   Mensual|
|1980-05-01|  39.5|   Mensual|
|1980-06-01|  39.5|   Mensual|
|1980-07-01|  39.5|   Mensual|
|1980-08-01|  38.0|   Mensual|
|1980-09-01|  36.0|   Mensual|
|1980-10-01|  36.0|   Mensual|
|1980-11-01|  36.0|   Mensual|
|1980-12-01|  37.0|   Mensual|
|1981-01-01|  38.0|   Mensual|
|1981-02-01|  38.0|   Mensual|
|1981-03-01|  38.0|   Mensual|
|1981-04-01|  38.0|   Mensual|
|1981-05-01|  36.0|   Mensual|
|1981-06-01|  36.0|   Mensual|
|1981-07-01|  35.0|   Mensual|
|1981-08-01|  35.0|   Mensual|
+----------+------+----------+
only showing top 20 rows



In [20]:
#carga el dataframe en la tabla de redshift
df_to_write.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"{env['AWS_REDSHIFT_SCHEMA']}.precio_petroleo") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

In [15]:
# chequeo con una consulta si los datos estan en la tabla creada en redshift
query = f"select * from {env['AWS_REDSHIFT_SCHEMA']}.precio_petroleo"
data = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"({query}) as tmp_table") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [16]:
data.printSchema()
data.show()

root
 |-- fecha: string (nullable = true)
 |-- precio: double (nullable = true)
 |-- frecuencia: string (nullable = true)

+----------+------+----------+
|     fecha|precio|frecuencia|
+----------+------+----------+
|1980-01-01|  37.0|   Mensual|
|1980-02-01|  37.0|   Mensual|
|1980-03-01|  39.5|   Mensual|
|1980-04-01|  39.5|   Mensual|
|1980-05-01|  39.5|   Mensual|
|1980-06-01|  39.5|   Mensual|
|1980-07-01|  39.5|   Mensual|
|1980-08-01|  38.0|   Mensual|
|1980-09-01|  36.0|   Mensual|
|1980-10-01|  36.0|   Mensual|
|1980-11-01|  36.0|   Mensual|
|1980-12-01|  37.0|   Mensual|
|1981-01-01|  38.0|   Mensual|
|1981-02-01|  38.0|   Mensual|
|1981-03-01|  38.0|   Mensual|
|1981-04-01|  38.0|   Mensual|
|1981-05-01|  36.0|   Mensual|
|1981-06-01|  36.0|   Mensual|
|1981-07-01|  35.0|   Mensual|
|1981-08-01|  35.0|   Mensual|
+----------+------+----------+
only showing top 20 rows



In [17]:
connection.close()