In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# NOTE: Assuming the notebook is running in a Databricks environment where 'spark' is automatically configured.
# If running locally, you must first create a SparkSession:
# spark = SparkSession.builder.appName("ETLPipeline").getOrCreate()

# --- 1. CONFIGURATION ---

# Define the source and destination table names
SOURCE_TABLE = "online_gaming_behavior_dataset"
SILVER_TABLE = "silver_player_events"
GOLD_METRICS_TABLE = "gold_player_metrics"
GOLD_RECO_TABLE = "gold_player_recos"


In [0]:
# --- 2. BRONZE TO SILVER LAYER (Clean & Standardize) ---
print("--- Starting Bronze to Silver Transformation ---")

# 2.1. Extract: Read the raw data
# NOTE: Using spark.table assumes the raw data is already registered as a table in your environment.
df_bronze = spark.table(SOURCE_TABLE)
print(f"Read {df_bronze.count()} raw records.")

--- Starting Bronze to Silver Transformation ---
Read 40034 raw records.


# 2. Transform (Cleaning & Type Casting):

In [0]:

# - Cast key columns to appropriate types (using the schema observed in the original notebook).
# - Drop rows where core identifying columns are null.
# - Fill missing categorical values with 'Unknown'.
df_silver = (
    df_bronze
    .withColumn("PlayerID", F.col("PlayerID").cast(IntegerType()))
    .withColumn("Age", F.col("Age").cast(IntegerType()))
    .withColumn("PlayTimeHours", F.col("PlayTimeHours").cast(DoubleType()))
    .withColumn("InGamePurchases", F.col("InGamePurchases").cast(IntegerType()))
    .withColumn("SessionsPerWeek", F.col("SessionsPerWeek").cast(IntegerType()))
    .withColumn("AvgSessionDurationMinutes", F.col("AvgSessionDurationMinutes").cast(IntegerType()))
    .withColumn("PlayerLevel", F.col("PlayerLevel").cast(IntegerType()))
    .withColumn("AchievementsUnlocked", F.col("AchievementsUnlocked").cast(IntegerType()))
    
    # Clean up string columns by trimming and lower-casing for consistent joins/analysis
    .withColumn("GameGenre", F.trim(F.col("GameGenre")).cast(StringType()))
    .withColumn("Location", F.trim(F.col("Location")).cast(StringType()))
    .withColumn("Gender", F.trim(F.col("Gender")).cast(StringType()))
    .withColumn("GameDifficulty", F.trim(F.col("GameDifficulty")).cast(StringType()))
    .withColumn("EngagementLevel", F.trim(F.col("EngagementLevel")).cast(StringType()))

    # Drop records where essential columns are missing
    .dropna(subset=["PlayerID", "Age", "Gender", "GameGenre", "EngagementLevel"])

    # Fill remaining potential nulls in categorical/string columns (though initial inspection showed none)
    .fillna({"Location": "Unknown", "GameDifficulty": "Unknown"})
)

In [0]:
# Configuration variable for the permanent table name
PERMANENT_SILVER_TABLE_NAME = "SILVER_PLAYER_EVENTS"

# 2.3. Load: Save the cleaned data as a permanent table
# Use .write.mode("overwrite").saveAsTable() to make the table persistent 
# and accessible across all sessions and notebooks.
df_silver.write.mode("overwrite").saveAsTable(PERMANENT_SILVER_TABLE_NAME)

print(f"Successfully saved and persisted Silver table as: {PERMANENT_SILVER_TABLE_NAME} (Rows: {df_silver.count()})")

# To display the content, you must read the saved table
display(spark.table(PERMANENT_SILVER_TABLE_NAME).limit(5))

Successfully saved and persisted Silver table as: SILVER_PLAYER_EVENTS (Rows: 40034)


PlayerID,Age,Gender,Location,GameGenre,PlayTimeHours,InGamePurchases,GameDifficulty,SessionsPerWeek,AvgSessionDurationMinutes,PlayerLevel,AchievementsUnlocked,EngagementLevel
9000,43,Male,Other,Strategy,16.271118760553215,0,Medium,6,108,79,25,Medium
9001,29,Female,USA,Strategy,5.525961380570566,0,Medium,5,144,11,10,Medium
9002,22,Female,USA,Sports,8.223755243499511,0,Easy,16,142,35,41,High
9003,35,Male,USA,Action,5.265351277318268,1,Easy,9,85,57,47,Medium
9004,33,Male,Europe,Action,15.53194452113429,0,Medium,2,131,95,37,Medium


In [0]:
# --------------------------------------------------------------------------------------------------
## 💎 GOLD LAYER (Business-Ready Tables)
# --------------------------------------------------------------------------------------------------

# --- 3. GOLD TABLE 1: Player_Metrics (Player Demographic and Core Stats) ---
print("\n--- Starting Silver to Gold (Player Metrics) Transformation ---")

# Aggregating core metrics to a per-player level for general analytics
df_gold_metrics = (
    df_silver.groupBy("PlayerID", "Age", "Gender", "Location")
    .agg(
        F.count("PlayerID").alias("event_count"),
        F.max(F.col("PlayTimeHours")).alias("total_play_time_hours"),
        F.avg(F.col("SessionsPerWeek")).alias("avg_sessions_per_week"),
        F.avg(F.col("AvgSessionDurationMinutes")).alias("avg_session_duration_minutes"),
        F.max(F.col("PlayerLevel")).alias("max_player_level"),
        F.max(F.col("AchievementsUnlocked")).alias("total_achievements_unlocked"),
        F.sum(F.col("InGamePurchases")).alias("total_in_game_purchases")
    )
)

# Load Gold Table 1
df_gold_metrics.createOrReplaceTempView(GOLD_METRICS_TABLE)
print(f"Created Gold Metrics Table: {GOLD_METRICS_TABLE} (Rows: {df_gold_metrics.count()})")
display(df_gold_metrics.limit(5))


--- Starting Silver to Gold (Player Metrics) Transformation ---
Created Gold Metrics Table: gold_player_metrics (Rows: 40034)


PlayerID,Age,Gender,Location,event_count,total_play_time_hours,avg_sessions_per_week,avg_session_duration_minutes,max_player_level,total_achievements_unlocked,total_in_game_purchases
9000,43,Male,Other,1,16.271118760553215,6.0,108.0,79,25,0
9001,29,Female,USA,1,5.525961380570566,5.0,144.0,11,10,0
9002,22,Female,USA,1,8.223755243499511,16.0,142.0,35,41,0
9003,35,Male,USA,1,5.265351277318268,9.0,85.0,57,47,1
9004,33,Male,Europe,1,15.53194452113429,2.0,131.0,95,37,0


In [0]:
# --------------------------------------------------------------------------------------------------
# --- 4. GOLD TABLE 2: Player_Recos (Ready for Recommendation/Churn Use Case) ---
print("\n--- Starting Silver to Gold (Player Recos) Transformation ---")

# For simplicity, we'll assume the downstream use case models (like the ones you had in your notebook)
# were intended to be pre-calculated in the pipeline. We select the relevant columns needed for those models.
df_gold_reco = (
    df_silver
    .select(
        "PlayerID",
        "GameGenre",
        "GameDifficulty",
        "EngagementLevel",
        "PlayTimeHours",
        "SessionsPerWeek",
        "AvgSessionDurationMinutes",
        "PlayerLevel",
        "InGamePurchases"
    )
    # Applying the core business logic transformations directly to the gold table:

    # 4.1. Derive 'Churn_Risk_Level' based on logic observed in original notebook
    .withColumn("AvgEngagementScore", (F.col("SessionsPerWeek") * 0.4) + (F.col("AvgSessionDurationMinutes") * 0.3) + (F.col("InGamePurchases") * 0.3))
    .withColumn(
        "Churn_Risk_Level",
        F.when(F.col("AvgEngagementScore") < 35, F.lit("HIGH_RISK_CHURN"))
        .when(F.col("AvgEngagementScore").between(35, 75), F.lit("MEDIUM_RISK_MONITOR"))
        .otherwise(F.lit("LOW_RISK_RETAINED"))
    )

    # 4.2. Derive 'Recommended_Action' based on logic observed in original notebook
    .withColumn(
        "Recommended_Action",
        F.when(F.col("Churn_Risk_Level") == "HIGH_RISK_CHURN", F.lit("Proactive Free Premium Gift"))
        .when(F.col("Churn_Risk_Level") == "MEDIUM_RISK_MONITOR", F.lit("Targeted Engagement Survey"))
        .otherwise(F.lit("Standard Check-in"))
    )

    # 4.3. Derive 'Difficulty_Adjustment' based on logic observed in original notebook
    .withColumn(
        "Difficulty_Adjustment",
        F.when((F.col("PlayerLevel") < 30) & (F.col("GameDifficulty") == "Hard"), F.lit("DECREASE_TO_MEDIUM"))
        .when((F.col("PlayerLevel") > 80) & (F.col("GameDifficulty") == "Easy"), F.lit("INCREASE_TO_MEDIUM"))
        .when((F.col("PlayerLevel").between(30, 80)) & (F.col("AvgSessionDurationMinutes") < 50), F.lit("OFFER_QUICK_WIN_BONUS"))
        .otherwise(F.lit("NO_ADJUSTMENT"))
    )

    .select(
        "PlayerID",
        "GameGenre",
        "EngagementLevel",
        "PlayerLevel",
        "GameDifficulty",
        "Churn_Risk_Level",
        "Recommended_Action",
        "Difficulty_Adjustment"
    )
)

# Load Gold Table 2
df_gold_reco.createOrReplaceTempView(GOLD_RECO_TABLE)
print(f"Created Gold Reco Table: {GOLD_RECO_TABLE} (Rows: {df_gold_reco.count()})")
display(df_gold_reco.limit(5))


--- Starting Silver to Gold (Player Recos) Transformation ---
Created Gold Reco Table: gold_player_recos (Rows: 40034)


PlayerID,GameGenre,EngagementLevel,PlayerLevel,GameDifficulty,Churn_Risk_Level,Recommended_Action,Difficulty_Adjustment
9000,Strategy,Medium,79,Medium,HIGH_RISK_CHURN,Proactive Free Premium Gift,NO_ADJUSTMENT
9001,Strategy,Medium,11,Medium,MEDIUM_RISK_MONITOR,Targeted Engagement Survey,NO_ADJUSTMENT
9002,Sports,High,35,Easy,MEDIUM_RISK_MONITOR,Targeted Engagement Survey,NO_ADJUSTMENT
9003,Action,Medium,57,Easy,HIGH_RISK_CHURN,Proactive Free Premium Gift,NO_ADJUSTMENT
9004,Action,Medium,95,Medium,MEDIUM_RISK_MONITOR,Targeted Engagement Survey,NO_ADJUSTMENT


In [0]:
avg_playtime = df_gold_metrics.agg(F.avg("total_play_time_hours")).collect()[0][0]

reco_by_preference = (
    df_gold_metrics
    # Create a feature to flag high-engagement, high-spending players
    # NOTE: Using the aggregated column name 'total_in_game_purchases'
    .withColumn("is_high_spender", F.col("total_in_game_purchases") > 0) 
    .withColumn(
        "AI_Recommendation",
        F.when(
            # 1. High Engagement & High Spender
            (F.col("EngagementLevel") == "High") & F.col("is_high_spender"),
            F.lit("Elite Spender Cosmetic Pack")
        )
        .when(
            # 2. High Play Time in Action Genre
            (F.col("total_play_time_hours") > avg_playtime) & (F.col("GameGenre") == "Action"),
            F.lit("Advanced Sniper Rifle Blueprint")
        )
        .when(
            # 3. Low Level Player
            # NOTE: Using the aggregated column name 'max_player_level'
            F.col("max_player_level") < 20, 
            F.lit("Tutorial Completion Quest")
        )
        .otherwise(F.lit("Daily Login Reward Crate"))
    )
    .select(
        F.col("PlayerID"),
        F.col("GameGenre"),
        F.col("EngagementLevel"),
        F.col("AI_Recommendation")
    )
)



In [0]:
# Configuration for the permanent table name
PERSISTENT_GOLD_METRICS_TABLE = "gold_player_metrics"

# 3. Load Gold Table 1 (Permanent Save)
# Replace the temporary view command with a permanent save command.
# 'overwrite' ensures the table is fully updated every time the ETL runs.
df_gold_metrics.write.mode("overwrite").saveAsTable(PERSISTENT_GOLD_METRICS_TABLE)

print(f"Created and permanently saved Gold Metrics Table: {PERSISTENT_GOLD_METRICS_TABLE} (Rows: {df_gold_metrics.count()})")



Created and permanently saved Gold Metrics Table: gold_player_metrics (Rows: 40034)
