In [1]:
!pip install psycopg2-binary



In [2]:
import os
import psycopg2

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col

# Postgres and Redshift JDBCs
driver_path = "/home/coder/working_dir/spark_drivers/postgresql-42.5.2.jar"

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Clean and Load data to Redshift") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

In [3]:
env = os.environ

In [4]:
# Connect to Redshift using psycopg2
conn = psycopg2.connect(
    host=env['AWS_REDSHIFT_HOST'],
    port=env['AWS_REDSHIFT_PORT'],
    dbname=env['AWS_REDSHIFT_DBNAME'],
    user=env['AWS_REDSHIFT_USER'],
    password=env['AWS_REDSHIFT_PASSWORD']
)


In [5]:
cursor = conn.cursor()
cursor.execute(f"""
create table if not exists {env['AWS_REDSHIFT_SCHEMA']}.game_deals (
     internalName VARCHAR,
        title VARCHAR,
        metacriticLink VARCHAR,
        dealID VARCHAR,
        storeID INT,
        gameID INT,
        salePrice DOUBLE PRECISION,
        normalPrice DOUBLE PRECISION,
        isOnSale INT,
        savings DOUBLE PRECISION,
        metacriticScore INT,
        steamRatingText VARCHAR,
        steamRatingPercent INT,
        steamRatingCount INT,
        steamAppID INT,
        releaseDate INT,
        lastChange INT,
        dealRating DOUBLE PRECISION,
        thumb VARCHAR,
        execution_datetime VARCHAR
);
""")
conn.commit()
cursor.close()
print("Table created!")

Table created!


In [6]:
cursor = conn.cursor()
cursor.execute(f"""
SELECT
  distinct tablename
FROM
  PG_TABLE_DEF
WHERE
  schemaname = '{env['AWS_REDSHIFT_SCHEMA']}';
""")
# resultado = cursor.fetchall()
print(", ".join(map(lambda x: x[0], cursor.fetchall())))
cursor.close()

game_deals


In [6]:
# Import data from game_deals.csv
df_s = spark.read.option("header",True) \
        .option("inferSchema",True) \
        .csv("../../../data/landing/game_deals/", )
df_s.printSchema()
df_s.show()
df_s.count()

root
 |-- internalName: string (nullable = true)
 |-- title: string (nullable = true)
 |-- metacriticLink: string (nullable = true)
 |-- dealID: string (nullable = true)
 |-- storeID: integer (nullable = true)
 |-- gameID: integer (nullable = true)
 |-- salePrice: double (nullable = true)
 |-- normalPrice: double (nullable = true)
 |-- isOnSale: integer (nullable = true)
 |-- savings: double (nullable = true)
 |-- metacriticScore: integer (nullable = true)
 |-- steamRatingText: string (nullable = true)
 |-- steamRatingPercent: integer (nullable = true)
 |-- steamRatingCount: integer (nullable = true)
 |-- steamAppID: integer (nullable = true)
 |-- releaseDate: integer (nullable = true)
 |-- lastChange: integer (nullable = true)
 |-- dealRating: double (nullable = true)
 |-- thumb: string (nullable = true)
 |-- execution_datetime: string (nullable = true)

+--------------------+--------------------+--------------------+--------------------+-------+------+---------+-----------+--------+-

60

In [7]:
df_s = df_s.na.drop(subset=['execution_datetime'])
df_to_write = df_s.na.fill('/game/pc', subset=['metacriticLink'])

In [9]:
def get_installed_jdbc_drivers():
    spark = SparkSession.builder.getOrCreate()
    drivers = spark._sc.getConf().get("spark.driver.extraClassPath", "").split(":")
    return [driver for driver in drivers if "jdbc" in driver]

# Obtener los controladores JDBC instalados
installed_drivers = get_installed_jdbc_drivers()

# Imprimir los controladores JDBC instalados
for driver in installed_drivers:
    print(driver)

In [9]:
redshift_url = "jdbc:postgresql://{host}:{port}/{database}".format(
    host=env['AWS_REDSHIFT_HOST'],
    port=env['AWS_REDSHIFT_PORT'],
    database=env['AWS_REDSHIFT_DBNAME']
)

redshift_properties = {
    "user": env['AWS_REDSHIFT_USER'],
    "password": env['AWS_REDSHIFT_PASSWORD'],
    "driver": "org.postgresql.Driver",
    "dbtable": f"{env['AWS_REDSHIFT_SCHEMA']}.game_deals"
}

In [11]:
df_to_write.write.jdbc(url=redshift_url, table='game_deals' , mode="overwrite", properties=redshift_properties)

In [12]:
df_to_write.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"{env['AWS_REDSHIFT_SCHEMA']}.game_deals") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()