In [42]:
# Imports 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os

In [43]:
# Initialize Spark
# key pain point here is making sure Spark can communicate with S3 and work with parquet file format
spark = SparkSession.builder \
    .appName("RAWG Data Processing") \
    .config("spark.hadoop.fs.s3a.access.key", os.getenv('AWS_ACCESS_KEY_ID')) \
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv('AWS_SECRET_ACCESS_KEY')) \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

spark

In [44]:
# Read data from S3
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("s3a://rawg-pyspark/raw/games/games_2024.csv")

df.show()

+------+--------------------+----------+------+-------------+----------+--------+--------------------+--------------------+--------------------+----------+
|    id|                name|  released|rating|ratings_count|metacritic|playtime|           platforms|                slug|    background_image|rating_top|
+------+--------------------+----------+------+-------------+----------+--------+--------------------+--------------------+--------------------+----------+
|804753|    Dragon’s Dogma 2|2024-03-21|  4.06|           44|      NULL|    28.0|PC, PlayStation 5...|     dragons-dogma-2|https://media.raw...|       5.0|
|398405|  Sons of the Forest|2024-02-22|  3.62|           87|      NULL|    11.0|                  PC|  sons-of-the-forest|https://media.raw...|       4.0|
|301516|          Last Epoch|2024-02-21|   3.6|           68|      NULL|     7.0|    PC, macOS, Linux|          last-epoch|https://media.raw...|       4.0|
|718135|            Palworld|2024-01-19|  3.48|           98|   

In [45]:
# Apply transformations
transformed_df = df \
    .filter((col("ratings_count") > 10)) \
    .withColumn("year", year("released")) \
    .withColumn("month", month("released")) \
    .withColumn("quarter", quarter("released")) \
    .withColumn("days_since_release", datediff(current_date(), to_date("released"))) \
    .withColumn("platform_count", size(split(col("platforms"), ", "))) \
    .withColumn("rating_category", when(col("rating") >= 4.5, "Excellent") # basically an enum
                                   .when(col("rating") >= 3.5, "Good")
                                   .when(col("rating") >= 2.5, "Average")
                                   .otherwise("Poor")) \
    .fillna({"playtime": 0})  # Fill missing values

transformed_df.show()

+------+--------------------+----------+------+-------------+----------+--------+--------------------+--------------------+--------------------+----------+----+-----+-------+------------------+--------------+---------------+
|    id|                name|  released|rating|ratings_count|metacritic|playtime|           platforms|                slug|    background_image|rating_top|year|month|quarter|days_since_release|platform_count|rating_category|
+------+--------------------+----------+------+-------------+----------+--------+--------------------+--------------------+--------------------+----------+----+-----+-------+------------------+--------------+---------------+
|804753|    Dragon’s Dogma 2|2024-03-21|  4.06|           44|      NULL|    28.0|PC, PlayStation 5...|     dragons-dogma-2|https://media.raw...|       5.0|2024|    3|      1|               257|             3|           Good|
|398405|  Sons of the Forest|2024-02-22|  3.62|           87|      NULL|    11.0|                  P

In [46]:
# Finally -- Save transformed data partitioned by quarter. 
# Will move on to Athena to analyze our data now that it is saved, partitioned, and ready to query. 
transformed_df.write.partitionBy("quarter").mode("overwrite").parquet("s3a://rawg-pyspark/processed/games/")
print("Transformed data saved as Parquet by quarter to S3")

Transformed data saved as Parquet by quarter to S3
