## 1) CARGA DE VARIABLES DE ENTORNO Y LIBRERIAS

In [50]:
import requests
from dotenv import load_dotenv
import os
import json
import psycopg2
import pandas as pd

load_dotenv()

env = os.environ


## 2) FUNCION DE EXTRACCION DE DATOS

In [51]:
def extract_data(url):

    response = requests.get(url)
    resultados = {}
    a = 1

    if response.status_code == 200:
        response_json = json.loads(response.text)
        results = response_json['data']['results']
                
        for i in results:
            id_character = i['id']
            nombre = i['name']
            descripcion = i['description'][0:254]
            comic_disponibles = i['comics']['available']
            series_disponibles = i['series']['available']
            historias_disponibles = i['stories']['available']
            modificacion = i['modified']

            if descripcion == '':
                descripcion = 'Sin descripcion'
            
            dic = {
                'id_character':id_character,
                'nombre': nombre,
                'descripcion': descripcion,
                'cantidad_de_comics': comic_disponibles,
                'cantidad_de_series':series_disponibles,
                'cantidad_de_historias': historias_disponibles,
                'fecha_modificacion': modificacion
            }

            resultados[f'{a}'] = dic
            a = a + 1

    # with open('texto.json','w') as file:
    #     json.dump(lista,file,indent=4)

    return resultados

In [52]:
# Guardo lo datos en una variable
resultados = extract_data(url=env['URL'])

## 3) Creo la SparkSession

In [53]:
# Crear sesion de Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col

# Postgres and Redshift JDBCs
driver_path = env['DRIVER_PATH_JAVA']

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

In [54]:
env = os.environ

Revisar documentación:
* [AWS Redshift + Spark documentation](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-redshift.html)
* [Spark + Redshift connector](https://github.com/spark-redshift-community/spark-redshift#readme)

## 4) ME CONECTO CON AWS VIA PSYCOPG2

In [55]:
# Conexion a Redshift usando psycopg2
conn = psycopg2.connect(
    host=env['AWS_REDSHIFT_HOST'],
    port=env['AWS_REDSHIFT_PORT'],
    dbname=env['AWS_REDSHIFT_DBNAME'],
    user=env['AWS_REDSHIFT_USER'],
    password=env['AWS_REDSHIFT_PASSWORD']
)

In [56]:
# query para creacion de tabla
schema = env['AWS_REDSHIFT_SCHEMA']

query_table = f'''
        CREATE TABLE IF NOT EXISTS {schema}.marvelCharacters(
            id_character int not null,
            nombre varchar(50) not null,
            descripcion varchar(255) null,
            cantidad_de_comics integer,
            cantidad_de_series integer,
            cantidad_de_historias integer,
            fecha_modificacion date,
            fecha_insercion_bd date
        )
        distkey(id_character)
        sortkey(id_character,fecha_insercion_bd);
'''

In [57]:
# Genero el cursor y luego ejecuto la query para crear la tabla y cierro la conexion
cursor = conn.cursor()
cursor.execute(query_table)
conn.commit()
cursor.close()
print("Table created!")

Table created!


In [58]:
# Query para controlar las tablas que tengo creadas en aws
query_table_create = f"""
SELECT
  distinct tablename
FROM
  PG_TABLE_DEF
WHERE
  schemaname = '{env['AWS_REDSHIFT_SCHEMA']}';
"""

In [59]:
# genero nueva conexion para ver cuantas tablas tengo en mi esquema
cursor = conn.cursor()
cursor.execute(query_table_create)
print(", ".join(map(lambda x: x[0], cursor.fetchall())))
cursor.close()

exportaciones_cereales, marvelcharacters


## 5) CREO UN DATAFRAME, VEO SUS CARACTERISTICAS, LO VISUALIZO Y LUEGO INSERTO DATOS.

In [60]:
# Creo el dataframe con la extraccion para subir los datos a AWS, aprovechando las bondades de pandas.
dff = pd.DataFrame.from_dict(resultados,orient='index')
df = spark.createDataFrame(dff)

In [61]:
# veo como estan las tablas y sus tipos de datos.
df.printSchema()
# visualizo la tabla
df.show()

root
 |-- id_character: long (nullable = true)
 |-- nombre: string (nullable = true)
 |-- descripcion: string (nullable = true)
 |-- cantidad_de_comics: long (nullable = true)
 |-- cantidad_de_series: long (nullable = true)
 |-- cantidad_de_historias: long (nullable = true)
 |-- fecha_modificacion: string (nullable = true)



+------------+--------------------+--------------------+------------------+------------------+---------------------+--------------------+
|id_character|              nombre|         descripcion|cantidad_de_comics|cantidad_de_series|cantidad_de_historias|  fecha_modificacion|
+------------+--------------------+--------------------+------------------+------------------+---------------------+--------------------+
|     1011334|             3-D Man|     Sin descripcion|                12|                 3|                   21|2014-04-29T14:18:...|
|     1017100|        A-Bomb (HAS)|Rick Jones has be...|                 4|                 2|                    7|2013-09-18T15:54:...|
|     1009144|              A.I.M.|AIM is a terroris...|                53|                36|                   57|2013-10-17T14:41:...|
|     1010699|         Aaron Stack|     Sin descripcion|                14|                 3|                   27|1969-12-31T19:00:...|
|     1009146|Abomination (Emil...

In [62]:
# Cargo los datos en la tabla
df.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"{env['AWS_REDSHIFT_SCHEMA']}.marvelCharacters") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

23/07/16 18:38:02 WARN PgConnection: Unsupported Server Version: 8.0.2
23/07/16 18:38:05 WARN PgConnection: Unsupported Server Version: 8.0.2 + 1) / 1]
23/07/16 18:38:07 WARN PgConnection: Unsupported Server Version: 8.0.2          


In [63]:
# Query Redshift using Spark SQL
query = f"select * from {env['AWS_REDSHIFT_SCHEMA']}.marvelCharacters"
data = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"({query}) as tmp_table") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .load()

23/07/16 18:38:09 WARN PgConnection: Unsupported Server Version: 8.0.2


In [64]:
data.printSchema()
data.show()

root
 |-- id_character: long (nullable = true)
 |-- nombre: string (nullable = true)
 |-- descripcion: string (nullable = true)
 |-- cantidad_de_comics: long (nullable = true)
 |-- cantidad_de_series: long (nullable = true)
 |-- cantidad_de_historias: long (nullable = true)
 |-- fecha_modificacion: string (nullable = true)



23/07/16 18:38:10 WARN PgConnection: Unsupported Server Version: 8.0.2 + 1) / 1]


+------------+--------------------+--------------------+------------------+------------------+---------------------+--------------------+
|id_character|              nombre|         descripcion|cantidad_de_comics|cantidad_de_series|cantidad_de_historias|  fecha_modificacion|
+------------+--------------------+--------------------+------------------+------------------+---------------------+--------------------+
|     1011334|             3-D Man|     Sin descripcion|                12|                 3|                   21|2014-04-29T14:18:...|
|     1017100|        A-Bomb (HAS)|Rick Jones has be...|                 4|                 2|                    7|2013-09-18T15:54:...|
|     1009144|              A.I.M.|AIM is a terroris...|                53|                36|                   57|2013-10-17T14:41:...|
|     1010699|         Aaron Stack|     Sin descripcion|                14|                 3|                   27|1969-12-31T19:00:...|
|     1009146|Abomination (Emil...

                                                                                

## 6) CIERRE DE CONEXION

In [65]:
cursor.close()