In [1]:
# Importamos los paquetes
!pip install requests
!pip install redshift_connector
!pip install psycopg2-binary

Collecting redshift_connector
  Downloading redshift_connector-2.0.911-py3-none-any.whl (112 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m112.6/112.6 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting scramp<1.5.0,>=1.2.0 (from redshift_connector)
  Downloading scramp-1.4.4-py3-none-any.whl (13 kB)
Collecting boto3<2.0.0,>=1.9.201 (from redshift_connector)
  Downloading boto3-1.26.152-py3-none-any.whl (135 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.6/135.6 kB[0m [31m10.4 MB/s[0m eta [36m0:00:00[0m
Collecting lxml>=4.6.5 (from redshift_connector)
  Downloading lxml-4.9.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl (6.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.8/6.8 MB[0m [31m18.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting botocore<2.0.0,>=1.12.201 (from redshift_connector)
  Downloading botocore-1.29.152-py3-none

In [2]:
# Importamos bibliotecas
import pandas as pd
import redshift_connector
from sqlalchemy import create_engine
import requests
from datetime import date
from os import environ as env
import os
import psycopg2
env = os.environ

In [108]:
# Importar Funciones de Spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col, current_date, date_format, year, min, max


driver_path = '/home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar'
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path


In [20]:
def create_spark_session():
    
    '''
    Función para crear la sesión de Spark.
    '''
    
    spark = SparkSession.builder \
            .master('local') \
            .appName('Conexion entre Pyspark y Redshift') \
            .config('spark.jars', driver_path) \
            .config('spark.executor.extraClassPath', driver_path) \
            .getOrCreate()



In [23]:
def connect_to_redshift():
    
    '''
    Función para conectar a Redshift.
    '''
    
    conn = psycopg2.connect(
      host=env['REDSHIFT_HOST'],
      port=env['REDSHIFT_PORT'],
      dbname=env['REDSHIFT_DB'],
      user=env['REDSHIFT_USER'],
      password=env['REDSHIFT_PASSWORD']
    )

In [118]:
create_spark_session()
connect_to_redshift()

In [10]:
def extract_data():
    
    '''
    Función para extraer los datos de la API(Top 250 de mejores pelicuas de IMDB)
    '''
    
    api_key_value = env['API_KEY']
    endpoint = 'https://imdb-api.com/en/API/Top250Movies/' + api_key_value

    response = requests.get(endpoint)
    data = response.json()
    data_items = data['items']
    data_spark = spark.createDataFrame(data_items)

    return data_spark

In [97]:
data_extracted = extract_data()

In [98]:
data_extracted.show(5)

+--------------------+--------------------+---------+----------+---------------+--------------------+----+--------------------+----+
|                crew|           fullTitle|       id|imDbRating|imDbRatingCount|               image|rank|               title|year|
+--------------------+--------------------+---------+----------+---------------+--------------------+----+--------------------+----+
|Frank Darabont (d...|The Shawshank Red...|tt0111161|       9.2|        2750709|https://m.media-a...|   1|The Shawshank Red...|1994|
|Francis Ford Copp...|The Godfather (1972)|tt0068646|       9.2|        1913342|https://m.media-a...|   2|       The Godfather|1972|
|Christopher Nolan...|The Dark Knight (...|tt0468569|       9.0|        2723486|https://m.media-a...|   3|     The Dark Knight|2008|
|Francis Ford Copp...|The Godfather Par...|tt0071562|       9.0|        1303106|https://m.media-a...|   4|The Godfather Par...|1974|
|Sidney Lumet (dir...| 12 Angry Men (1957)|tt0050083|       9.0|     

In [109]:
def transformation_data(data):
    
    '''
    Función para transformar los datos recibidos de la API.
    '''
  
    df_transformed = data.dropDuplicates()
    df_transformed = df_transformed.withColumn('created_date', date_format(current_date(), 'yyyy-MM-dd'))
    df_transformed = df_transformed.withColumn('imDbRatingCount', col('imDbRatingCount') / 1000)
    df_transformed = df_transformed.withColumn('age_of_movie', year(current_date()) - col('year'))

    min_value = df_transformed.select(min(col('imDbRating')).cast('double')).collect()[0][0]
    max_value = df_transformed.select(max(col('imDbRating')).cast('double')).collect()[0][0]

    df_transformed = df_transformed.withColumn('rating_scaled', (col('imDbRating') - min_value / (max_value - min_value)))

    df_transformed = df_transformed.withColumn('rating_category', when(col('imDbRating') <= 5, 'bajo')
                                      .when(col('imDbRating') <= 8, 'medio')
                                      .otherwise('alto'))

    return df_transformed


In [110]:
df_transformed = transformation_data(data_extracted)

In [112]:
df_transformed.show(5)

+--------------------+--------------------+---------+----------+---------------+--------------------+----+--------------------+----+------------+------------+------------------+---------------+
|                crew|           fullTitle|       id|imDbRating|imDbRatingCount|               image|rank|               title|year|created_date|age_of_movie|     rating_scaled|rating_category|
+--------------------+--------------------+---------+----------+---------------+--------------------+----+--------------------+----+------------+------------+------------------+---------------+
|Akira Kurosawa (d...|        Ikiru (1952)|tt0044741|       8.2|         81.989|https://m.media-a...|  99|               Ikiru|1952|  2023-06-13|        71.0|1.5333333333333288|          medio|
|Ron Howard (dir.)...|         Rush (2013)|tt1979320|       8.0|        491.828|https://m.media-a...| 220|                Rush|2013|  2023-06-13|        10.0|1.3333333333333295|          medio|
|Gillo Pontecorvo ...|The Batt

In [133]:
def load_data(data):
    
    '''
    Función para guardar los datos en una tabla. Si la tabla ya existe 
    volveremos agregar los datos con la fecha de consulta.
    '''
  
    data.write \
        .format("jdbc") \
        .option("url", f"jdbc:postgresql://{env['REDSHIFT_HOST']}:{env['REDSHIFT_PORT']}/{env['REDSHIFT_DB']}") \
        .option("dbtable", f"{env['REDSHIFT_SCHEMA']}.top_movies_imdb") \
        .option("user", env['REDSHIFT_USER']) \
        .option("password", env['REDSHIFT_PASSWORD']) \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()

In [134]:
load_data(df_transformed)

In [130]:
def fetch_data():
    
    '''
    Función para consultar una tabla de Redshift
    '''    

    query = f"select * from {env['REDSHIFT_SCHEMA']}.top_movies_imdb"

    df_fetched = spark.read \
                .format("jdbc") \
                .option("url", f"jdbc:postgresql://{env['REDSHIFT_HOST']}:{env['REDSHIFT_PORT']}/{env['REDSHIFT_DB']}") \
                .option("dbtable", f"({query}) as tmp_table") \
                .option("user", env['REDSHIFT_USER']) \
                .option("password", env['REDSHIFT_PASSWORD']) \
                .option("driver", "org.postgresql.Driver") \
                .load()

    return df_fetched

In [131]:
top_movies_imdb_date = fetch_data()

In [132]:
top_movies_imdb_date.show(5)

+--------------------+--------------------+---------+----------+---------------+--------------------+----+--------------------+----+------------+------------+------------------+---------------+
|                crew|           fulltitle|       id|imdbrating|imdbratingcount|               image|rank|               title|year|created_date|age_of_movie|     rating_scaled|rating_category|
+--------------------+--------------------+---------+----------+---------------+--------------------+----+--------------------+----+------------+------------+------------------+---------------+
|Akira Kurosawa (d...|        Ikiru (1952)|tt0044741|       8.2|         81.989|https://m.media-a...|  99|               Ikiru|1952|  2023-06-13|        71.0|1.5333333333333288|          medio|
|Ron Howard (dir.)...|         Rush (2013)|tt1979320|       8.0|        491.828|https://m.media-a...| 220|                Rush|2013|  2023-06-13|        10.0|1.3333333333333295|          medio|
|Gillo Pontecorvo ...|The Batt