# ETL Videogame Data from Kaggle to Postgresql with PySpark 

In [1]:
import os
import psycopg2
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

load_dotenv()

True

### Connect to Kaggle API

In [2]:
from kaggle.api.kaggle_api_extended import KaggleApi
api = KaggleApi()
api.authenticate()

### Download file to Directory

In [3]:
# Check if directory exists
target_dir = "raw/vgsales"
os.makedirs(target_dir, exist_ok=True)

api.dataset_download_files(
    "gregorut/videogamesales",
    path=target_dir,
    unzip=True,
    quiet=False
)

Dataset URL: https://www.kaggle.com/datasets/gregorut/videogamesales
Downloading videogamesales.zip to raw/vgsales


100%|██████████| 381k/381k [00:00<00:00, 1.86MB/s]







### Intiate Spark Session

In [4]:
spark = SparkSession.builder \
    .appName("VGSales") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.16") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

:: loading settings :: url = jar:file:/Users/bryanburzon/miniforge3/envs/etl_portfolio/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/bryanburzon/.ivy2/cache
The jars for the packages stored in: /Users/bryanburzon/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e5fa5181-5522-4daa-826f-1e868e39d686;1.0
	confs: [default]
	found org.postgresql#postgresql;42.2.16 in central
	found org.checkerframework#checker-qual;3.5.0 in central
:: resolution report :: resolve 70ms :: artifacts dl 3ms
	:: modules in use:
	org.checkerframework#checker-qual;3.5.0 from central in [default]
	org.postgresql#postgresql;42.2.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	--------------------------------

### Load CSV to DF

In [5]:
df_raw = spark.read.csv(
    os.path.join(target_dir, "vgsales.csv"),
    header=True,
    inferSchema=True
)

### Clean Columns

In [6]:
cols_clean = [c.strip().lower().replace(" ", "_") for c in df_raw.columns]

### Convert Data to String 

In [7]:
df_stage = (
    df_raw.toDF(*cols_clean)
    .withColumn("rank", col("rank").cast("string"))
    .withColumn("year", col("year").cast("string"))
    .withColumn("na_sales", col("na_sales").cast("string"))
    .withColumn("eu_sales", col("eu_sales").cast("string"))
    .withColumn("jp_sales", col("jp_sales").cast("string"))
    .withColumn("other_sales", col("other_sales").cast("string"))
    .withColumn("global_sales", col("global_sales").cast("string"))
)

### Connect to Postgres

In [8]:
conn = psycopg2.connect(
    host=os.getenv("POSTGRES_HOST"),
    port=os.getenv("POSTGRES_PORT"),
    dbname=os.getenv("POSTGRES_DB"),
    user=os.getenv("POSTGRES_USER"),
    password=os.getenv("POSTGRES_PASSWORD")
)

### Load Data to Postgresql 

In [9]:
# Convert Spark DataFrame to pandas DataFrame
pdf = df_stage.toPandas()

# Insert data into PostgreSQL using psycopg2
with conn.cursor() as cur:
    # truncate the table before inserting new data
    cur.execute("TRUNCATE TABLE raw.videogamesales")
  
    for _, row in pdf.iterrows():
        cur.execute(
            """
            INSERT INTO raw.videogamesales 
            (rank, name, platform, year, genre, publisher, na_sales, eu_sales, jp_sales, other_sales, global_sales)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """,
            tuple(row[col] for col in cols_clean)
        )
    conn.commit()

### Close connection to Postgres

In [10]:
cur.close()
conn.close()

### Close Connection to PySpark

In [11]:
spark.stop()