In [1]:
import requests
import os
import urllib.parse
import pandas as pd

In [2]:
# Obtener las variables de entorno
POSTGRES_DB = os.environ['POSTGRES_DB']
POSTGRES_USER = os.environ['POSTGRES_USER']
POSTGRES_PASSWORD = os.environ['POSTGRES_PASSWORD']
POSTGRES_HOST_AUTH_METHOD = os.environ['POSTGRES_HOST_AUTH_METHOD']
POSTGRES_PORT = os.environ['POSTGRES_PORT']
POSTGRES_HOST = os.environ['POSTGRES_HOST']

REDSHIFT_USER = os.environ['REDSHIFT_USER']
REDSHIFT_PASSWORD = os.environ['REDSHIFT_PASSWORD']
REDSHIFT_HOST = os.environ['REDSHIFT_HOST']
REDSHIFT_PORT = os.environ['REDSHIFT_PORT']
REDSHIFT_DB = os.environ['REDSHIFT_DB']
REDSHIFT_SCHEMA = os.environ['REDSHIFT_SCHEMA']

JUPYTER_ENABLE_LAB = os.environ['JUPYTER_ENABLE_LAB']
JUPYTER_TOKEN = os.environ['JUPYTER_TOKEN']
DRIVER_PATH = os.environ['DRIVER_PATH']


In [3]:
# Definir la API

def get_api_call(ids, **kwargs):
    API_BASE_URL = "https://apis.datos.gob.ar/series/api/"
    kwargs["ids"] = ",".join(ids)
    return "{}{}?{}".format(API_BASE_URL, "series", urllib.parse.urlencode(kwargs))

In [4]:
# Obtener la URL de la API
api_call = get_api_call(["Automotriz_produccion_s2nqOo"])
result = requests.get(api_call).json()

In [5]:
# Convertir los datos de la API a un DataFrame de Pandas
data_pd = pd.DataFrame(result['data'], columns=["date_from", "unidades"])

In [6]:
# Eliminar duplicados durante la carga inicial
data_pd = data_pd.drop_duplicates(subset=['date_from'])

In [7]:
# Crear la sesión de Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col


os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {DRIVER_PATH} --jars {DRIVER_PATH} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = DRIVER_PATH

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars", DRIVER_PATH) \
        .config("spark.executor.extraClassPath", DRIVER_PATH) \
        .getOrCreate()

In [8]:
%pip install psycopg2-binary

Note: you may need to restart the kernel to use updated packages.


In [9]:
# Conectar a Redshift usando psycopg2
import psycopg2
conn = psycopg2.connect(
    host=REDSHIFT_HOST,
    port=REDSHIFT_PORT,
    dbname=REDSHIFT_DB,
    user=REDSHIFT_USER,
    password=REDSHIFT_PASSWORD
)

In [10]:
# Crear la tabla en Redshift si no existe
cursor = conn.cursor()
cursor.execute(f"""
create table if not exists {os.environ['REDSHIFT_SCHEMA']}.cantidad_de_vehiculos_producida(
    date_from VARCHAR(10) distkey,
    unidades decimal(10,2),
    frequency varchar(12)
) sortkey(date_from);
""")
conn.commit()
cursor.close()

In [11]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Conexion entre Pyspark y Redshift") \
    .config("spark.jars", DRIVER_PATH) \
    .config("spark.executor.extraClassPath", DRIVER_PATH) \
    .getOrCreate()

# Lee la lista de tablas desde Redshift
table_list = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:redshift://{REDSHIFT_HOST}:{REDSHIFT_PORT}/{REDSHIFT_DB}") \
    .option("dbtable", f"{REDSHIFT_SCHEMA}.cantidad_de_vehiculos_producida") \
    .option("user", REDSHIFT_USER) \
    .option("password", REDSHIFT_PASSWORD) \
    .option("driver", DRIVER_PATH) \
    .load()

# Filtra las tablas por el esquema específico
filtered_tables = table_list.filter(f"table_schema = '{REDSHIFT_SCHEMA}'")

# Muestra la lista de tablas
filtered_tables.show()

Py4JJavaError: An error occurred while calling o40.load.
: java.lang.ClassNotFoundException: /home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:587)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:41)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:34)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


In [None]:
# Verificar la existencia de registros antes de la inserción
date_from_values = [row[0] for row in data_pd.values]
date_values_string = ",".join(["'{}'".format(value) for value in date_from_values])

sql_query = "SELECT * FROM " + REDSHIFT_SCHEMA + ".cantidad_de_vehiculos_producida" + " WHERE date_from IN (" + date_values_string + ")"
existing_data = spark.sql(sql_query)


existing_data = spark.sql(sql_query)
existing_data = existing_data.filter(col("date_from").isin(date_from_values))
existing_data = existing_data.select("*")

In [None]:
# Filtrar los registros duplicados
data_pd = data_pd[~data_pd['date_from'].isin(existing_data.toPandas()['date_from'])]

In [None]:
# Convertir el DataFrame de Pandas a un DataFrame de Spark
df = spark.createDataFrame(data_pd)


In [None]:
# Agregar una columna "frequency" al DataFrame de Spark
df_to_write = df.withColumn('frequency', lit('Mensual'))

In [None]:
# Escribir los datos en Redshift
df_to_write.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{REDSHIFT_HOST}:{REDSHIFT_PORT}/{REDSHIFT_DB}") \
    .option("dbtable", f"{REDSHIFT_SCHEMA}.cantidad_de_vehiculos_producida") \
    .option("user", REDSHIFT_USER) \
    .option("password", REDSHIFT_PASSWORD) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()