In [0]:
# Import required PySpark SQL functions
from pyspark.sql import functions as F

# Define the S3 path for the dataset
file_path = "s3://full-stack-bigdata-datasets/Big_Data/Project_Steam/steam_game_output.json"

# Read the JSON file into a PySpark DataFrame
df_raw = spark.read.json(file_path)

# Print the schema to inspect nested structures and data types
df_raw.printSchema()

# Display the first few rows for a preliminary check
display(df_raw.limit(5))

## 1.Data Preparation

Transformations:

Flatten the DataFrame by extracting all fields within the data struct to the root level.

Extract the platforms struct fields (windows, mac, linux) into distinct top-level boolean columns.

Parse the release_date string into a standard DateType and extract a release_year column for temporal analysis.

Cast price and discount from strings to doubles (assuming the price format "999" represents cents, which will be divided by 100).

Engineer two new metrics: total_reviews and positive_ratio (percentage of positive reviews) to evaluate game popularity and quality.

Create a separate DataFrame (df_genres) where the comma-separated genre string is split and exploded into individual rows for accurate genre-level analysis.


In [0]:
# src/01-data_preparation.py (to be run in the Databricks notebook)
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import col, split, explode, substring, round, when

# 1. Flatten the main structure
df_flat = df_raw.select("id", "data.*")

# 2. Extract platforms into distinct boolean columns
df_flat = df_flat.withColumn("is_windows", col("platforms.windows")) \
                 .withColumn("is_mac", col("platforms.mac")) \
                 .withColumn("is_linux", col("platforms.linux"))

# 3. Safely extract release year using substring and condition for empty strings
df_flat = df_flat.withColumn(
    "release_year",
    when(col("release_date") == "", None)
    .otherwise(substring(col("release_date"), 1, 4))
    .cast(IntegerType())
)

# 4. Clean prices and cast to numeric
# Handle potential empty strings defensively to avoid CAST errors
df_flat = df_flat.withColumn(
    "price_usd",
    when(col("price") == "", None).otherwise(col("price")).cast(DoubleType()) / 100
).withColumn(
    "discount_pct",
    when(col("discount") == "", None).otherwise(col("discount")).cast(DoubleType())
)

# 5. Feature Engineering: Total reviews and positive review ratio safely
df_flat = df_flat.withColumn("total_reviews", col("positive") + col("negative")) \
                 .withColumn(
                     "positive_ratio", 
                     when(col("total_reviews") == 0, None)
                     .otherwise(round(col("positive") / col("total_reviews") * 100, 2))
                 )

# 6. Prepare an exploded DataFrame specifically for genre analysis
df_genres = df_flat.withColumn("genre_array", split(col("genre"), r",\s*")) \
                   .withColumn("single_genre", explode(col("genre_array")))

# Display results for verification
display(df_flat.select("name", "release_year", "price_usd", "is_windows", "total_reviews", "positive_ratio").limit(10))

## 2.Macro Analysis

In [0]:
# src/2a-macro_analysis.py
from pyspark.sql.functions import col, desc, split, explode

# 1. Top Publishers by Number of Releases
df_top_publishers = df_flat.groupBy("publisher") \
                           .count() \
                           .orderBy(desc("count"))

# 2. Best Rated Games
# Filter for games with at least 500 reviews to avoid games with a single 100% positive review
df_best_rated = df_flat.filter(col("total_reviews") >= 500) \
                       .select("name", "publisher", "positive_ratio", "total_reviews") \
                       .orderBy(desc("positive_ratio"), desc("total_reviews"))

# 3. Game Releases per Year (including Covid impact analysis)
# Filter out null years and future anomaly years if any exist
df_releases_per_year = df_flat.filter(col("release_year").isNotNull()) \
                              .groupBy("release_year") \
                              .count() \
                              .orderBy("release_year")

# 4. Pricing and Discount Distribution
# Calculate the number of games currently offering a discount
df_discount_status = df_flat.withColumn(
    "is_discounted", 
    (col("discount_pct") > 0).cast("boolean")
).groupBy("is_discounted").count()

# 5. Most Represented Languages
# The languages column contains comma-separated strings (e.g., "English, French, German")
df_languages = df_flat.withColumn("lang_array", split(col("languages"), r",\s*")) \
                      .withColumn("language", explode(col("lang_array"))) \
                      .groupBy("language") \
                      .count() \
                      .orderBy(desc("count"))

# 6. Age Restriction Distribution
df_age_restrictions = df_flat.groupBy("required_age").count().orderBy(desc("count"))

# Display commands to generate Databricks visualizations
# Tip: Click the "+" tab next to the table output in Databricks to create a visualization
display(df_top_publishers.limit(10))
display(df_best_rated.limit(20))
display(df_releases_per_year)
display(df_discount_status)
display(df_languages.limit(10))
display(df_age_restrictions)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
# src/macro_analysis_update.py
from pyspark.sql.functions import col, regexp_extract, when, sum as _sum
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window

# 1. Extract numerical digits from the required_age string
df_flat = df_flat.withColumn(
    "extracted_age", 
    regexp_extract(col("required_age"), r"(\d+)", 1)
)

# 2. Safely cast to Integer and handle the "180" typo
df_flat = df_flat.withColumn(
    "required_age_num",
    when(col("extracted_age") == "", None)
    .otherwise(col("extracted_age"))
    .cast(IntegerType())
).withColumn(
    "required_age_num",
    when(col("required_age_num") == 180, 18)
    .otherwise(col("required_age_num"))
).drop("extracted_age")

# 3. Standard grouping (includes all games)
df_age_restrictions = df_flat.groupBy("required_age_num") \
                             .count() \
                             .orderBy("required_age_num")

# 4. Filtered DataFrame: Only games with an actual age restriction (> 0)
df_age_restricted_only = df_age_restrictions.filter(
    (col("required_age_num") > 0) & col("required_age_num").isNotNull()
)

# 5. Cumulative DataFrame: Total games accessible by a given age
# Define a window specification ordered by the required age
window_spec = Window.orderBy("required_age_num")

# Calculate the running total of the 'count' column
df_age_cumulative = df_age_restrictions.filter(col("required_age_num").isNotNull()) \
    .withColumn("cumulative_games_allowed", _sum("count").over(window_spec))

# Display the updated DataFrames for verification
display(df_age_restricted_only)
display(df_age_cumulative)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

## 3.Price analysis

In [0]:
# src/price_analysis.py
from pyspark.sql.functions import col, when, avg, round

# Ensure NULL prices are removed for accurate distributions
df_valid_prices = df_flat.filter(col("price_usd").isNotNull())

# 1. Number of releases by bins of price
df_price_bins = df_valid_prices.withColumn(
    "price_category",
    when(col("price_usd") == 0, "1. Free")
    .when((col("price_usd") > 0) & (col("price_usd") < 5), "2. Under $5")
    .when((col("price_usd") >= 5) & (col("price_usd") < 10), "3. $5 to $9.99")
    .when((col("price_usd") >= 10) & (col("price_usd") < 20), "4. $10 to $19.99")
    .when((col("price_usd") >= 20) & (col("price_usd") < 40), "5. $20 to $39.99")
    .when((col("price_usd") >= 40) & (col("price_usd") < 60), "6. $40 to $59.99")
    .otherwise("7. $60 and above")
)

df_releases_by_price_bin = df_price_bins.groupBy("price_category") \
                                        .count() \
                                        .orderBy("price_category")

# 2. Distribution Prices per Year (for Boxplot)
# Filtering out pre-2005 data (sparse data) and extreme price outliers (>$150) for better visualization scaling
df_prices_per_year = df_valid_prices.filter(
    col("release_year").isNotNull() & 
    (col("release_year") >= 2005) & 
    (col("price_usd") <= 150)
).select("release_year", "price_usd")

# 3. Additional Proposal: Average Discount per Price Category
df_discount_by_price = df_price_bins.filter(col("discount_pct").isNotNull()) \
                                    .groupBy("price_category") \
                                    .agg(round(avg("discount_pct"), 2).alias("avg_discount_pct")) \
                                    .orderBy("price_category")

# Display commands to configure in Databricks
display(df_releases_by_price_bin) 
display(df_prices_per_year)       
display(df_discount_by_price)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
# src/price_analysis_update.py
from pyspark.sql.functions import expr, min as _min, max as _max, avg, round

# 1. Calculate the exact boxplot statistics at the Spark engine level
df_prices_boxplot_stats = df_prices_per_year.groupBy("release_year") \
    .agg(
        _min("price_usd").alias("min_price"),
        expr("percentile_approx(price_usd, 0.25)").alias("Q1_price"),
        expr("percentile_approx(price_usd, 0.50)").alias("median_price"),
        expr("percentile_approx(price_usd, 0.75)").alias("Q3_price"),
        _max("price_usd").alias("max_price"),
        round(avg("price_usd"), 2).alias("mean_price")
    ) \
    .orderBy("release_year")

# Display the aggregated statistics
display(df_prices_boxplot_stats)

# 2. Alternative: Random Sampling for Native UI
# If the native UI Boxplot is strictly required, generate a statistically sound random sample under 10k rows
# 60,000 rows * 0.15 fraction = ~9,000 rows
df_prices_sampled = df_prices_per_year.sample(withReplacement=False, fraction=0.15, seed=42)

display(df_prices_sampled)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

## 4.Genre Analysis

In [0]:
# src/genre_analysis.py
from pyspark.sql.functions import col, count, avg, desc, regexp_replace, regexp_extract, round, sum as _sum
from pyspark.sql.types import DoubleType

# 1. Most represented genres
df_genre_counts = df_genres.groupBy("single_genre") \
                           .agg(count("*").alias("game_count")) \
                           .orderBy(desc("game_count"))

# 2. Genres with the best positive/negative review ratio
# Filter for games with >= 500 reviews, and ensure the genre has at least 10 qualified games to avoid outliers
df_genre_ratings = df_genres.filter(col("total_reviews") >= 500) \
                            .groupBy("single_genre") \
                            .agg(
                                round(avg("positive_ratio"), 2).alias("avg_positive_ratio"),
                                count("*").alias("qualified_games")
                            ) \
                            .filter(col("qualified_games") >= 10) \
                            .orderBy(desc("avg_positive_ratio"))

# 3. Publisher's favorite genres
# To keep the visualization readable, filter for a subset of top publishers first
# Note: df_top_publishers was created in the macro_analysis.py script
top_publishers = [row['publisher'] for row in df_top_publishers.limit(5).collect()]

df_publisher_genres = df_genres.filter(col("publisher").isin(top_publishers)) \
                               .groupBy("publisher", "single_genre") \
                               .count() \
                               .orderBy("publisher", desc("count"))

# 4. Most lucrative genres (Proxy Calculation)
# Extract the lower bound of the 'owners' range, remove commas, and cast to numeric
df_lucrative = df_genres.withColumn(
    "min_owners", 
    regexp_replace(regexp_extract(col("owners"), r"^([\d,]+)", 1), ",", "").cast(DoubleType())
).withColumn(
    "estimated_revenue_usd", 
    col("min_owners") * col("price_usd")
)

df_genre_revenue = df_lucrative.filter(col("estimated_revenue_usd").isNotNull()) \
                               .groupBy("single_genre") \
                               .agg(
                                   round(_sum("estimated_revenue_usd"), 2).alias("total_estimated_revenue"),
                                   round(avg("estimated_revenue_usd"), 2).alias("avg_estimated_revenue_per_game")
                               ) \
                               .orderBy(desc("total_estimated_revenue"))

# Display commands to configure in Databricks
display(df_genre_counts.limit(20))
display(df_genre_ratings.limit(20))
display(df_publisher_genres)
display(df_genre_revenue.limit(20))

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

## 5.Platform Analysis

In [0]:
# src/platform_analysis.py
from pyspark.sql.functions import col, sum as _sum, round, count

# 1. Overall Platform Availability
# Cast boolean columns to integers (True becomes 1, False becomes 0) and sum them
df_platform_totals = df_flat.select(
    _sum(col("is_windows").cast("int")).alias("Windows"),
    _sum(col("is_mac").cast("int")).alias("Mac"),
    _sum(col("is_linux").cast("int")).alias("Linux")
)

# Unpivot the columns using stack() to make it compatible with Databricks Bar/Pie charts
unpivot_expr = "stack(3, 'Windows', Windows, 'Mac', Mac, 'Linux', Linux) as (platform, game_count)"
df_platform_distribution = df_platform_totals.selectExpr(unpivot_expr)

# 2. Platform Adoption by Genre
# Calculate the percentage of games within each genre that support each operating system
# Filter for genres with a significant number of games (>= 100) to ensure statistical relevance
df_genre_platforms = df_genres.groupBy("single_genre") \
    .agg(
        count("*").alias("total_games"),
        round((_sum(col("is_windows").cast("int")) / count("*")) * 100, 2).alias("windows_pct"),
        round((_sum(col("is_mac").cast("int")) / count("*")) * 100, 2).alias("mac_pct"),
        round((_sum(col("is_linux").cast("int")) / count("*")) * 100, 2).alias("linux_pct")
    ) \
    .filter(col("total_games") >= 100) \
    .orderBy(col("mac_pct").desc(), col("linux_pct").desc())

# Display commands to configure in Databricks
display(df_platform_distribution)
display(df_genre_platforms.limit(20))

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.