In [1]:
# CODE 1: ENVIRONMENT SETUP AND DATA LOADING - CLEANED DATASET

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import warnings
warnings.filterwarnings('ignore')

spark = SparkSession.builder \
    .appName("SteamGameRecommendationSystem") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.default.parallelism", "200") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "512m") \
    .getOrCreate()

print("=" * 80)
print("STEAM GAME RECOMMENDATION SYSTEM - DATA LOADING")
print("=" * 80)
print(f"\nSpark Version: {spark.version}\n")

games_df = spark.read.csv("games.csv", header=True, inferSchema=True, escape='"', multiLine=True)
print(f"[1/4] Games: {games_df.count():,} rows")
print("Schema:")
games_df.printSchema()
print("\nSample:")
games_df.show(3, truncate=False)

users_df = spark.read.csv("users.csv", header=True, inferSchema=True, escape='"')
print(f"\n[2/4] Users: {users_df.count():,} rows")
print("Schema:")
users_df.printSchema()
print("\nSample:")
users_df.show(3, truncate=False)

recommendations_df = spark.read.csv("recommendations.csv", header=True, inferSchema=True, escape='"', multiLine=True)
print(f"\n[3/4] Recommendations: {recommendations_df.count():,} rows")
print("Schema:")
recommendations_df.printSchema()
print("\nSample:")
recommendations_df.show(3, truncate=False)

games_metadata_df = spark.read.json("games_metadata.json", multiLine=True)
print(f"\n[4/4] Games Metadata: {games_metadata_df.count():,} rows")
print("Schema:")
games_metadata_df.printSchema()
print("\nSample:")
games_metadata_df.show(3, truncate=False)

print("\n" + "=" * 80)
print("DATA QUALITY CHECK")
print("=" * 80)

print("\nGames Missing Values:")
games_df.select([count(when(col(c).isNull(), c)).alias(c) for c in games_df.columns]).show()

print("Users Missing Values:")
users_df.select([count(when(col(c).isNull(), c)).alias(c) for c in users_df.columns]).show()

print("Recommendations Missing Values:")
recommendations_df.select([count(when(col(c).isNull(), c)).alias(c) for c in recommendations_df.columns]).show()

print("\n" + "=" * 80)
print("DATASET STATISTICS")
print("=" * 80)

unique_users_in_rec = recommendations_df.select('user_id').distinct().count()
unique_games_in_rec = recommendations_df.select('app_id').distinct().count()
total_possible_interactions = unique_users_in_rec * unique_games_in_rec
actual_interactions = recommendations_df.count()
sparsity = (1 - (actual_interactions / total_possible_interactions)) * 100

print(f"\nUnique Users: {unique_users_in_rec:,}")
print(f"Unique Games: {unique_games_in_rec:,}")
print(f"Possible Interactions: {total_possible_interactions:,}")
print(f"Actual Interactions: {actual_interactions:,}")
print(f"Matrix Sparsity: {sparsity:.4f}%")

print("\nGames Statistics:")
games_df.select('price_final', 'positive_ratio', 'user_reviews').describe().show()

print("\nRecommendations Statistics:")
recommendations_df.select('hours', 'helpful', 'funny').describe().show()

games_df.cache()
users_df.cache()
recommendations_df.cache()
games_metadata_df.cache()

print("\n✓ All datasets cached successfully")
print("=" * 80)


STEAM GAME RECOMMENDATION SYSTEM - DATA LOADING

Spark Version: 3.5.3

[1/4] Games: 50,872 rows
Schema:
root
 |-- app_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- date_release: date (nullable = true)
 |-- win: boolean (nullable = true)
 |-- mac: boolean (nullable = true)
 |-- linux: boolean (nullable = true)
 |-- rating: string (nullable = true)
 |-- positive_ratio: integer (nullable = true)
 |-- user_reviews: integer (nullable = true)
 |-- price_final: double (nullable = true)
 |-- price_original: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- steam_deck: boolean (nullable = true)


Sample:
+------+---------------------------------+------------+----+-----+-----+-------------+--------------+------------+-----------+--------------+--------+----------+
|app_id|title                            |date_release|win |mac  |linux|rating       |positive_ratio|user_reviews|price_final|price_original|discount|steam_deck|
+------+-------------------

In [2]:
# CODE 2: DATA PREPROCESSING FOR CLEANED DATASET

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler

print("=" * 80)
print("CODE 2: DATA PREPROCESSING")
print("=" * 80)

# Step 1: Clean and prepare recommendations
print("\n[STEP 1] Preparing recommendations...")
recommendations_clean = recommendations_df.select(
    'user_id',
    'app_id',
    'is_recommended',
    'hours',
    'helpful',
    'funny'
).fillna(0.0, subset=['hours', 'helpful', 'funny'])

# Create implicit rating
recommendations_clean = recommendations_clean.withColumn(
    'implicit_rating',
    when(col('is_recommended') == True,
         when(col('hours') > 0, 
              least(log(col('hours') + 1) + 1, lit(5.0))
         ).otherwise(lit(3.0))
    ).otherwise(
         when(col('hours') > 0,
              5.0 - least(log(col('hours') + 1) + 1, lit(5.0))
         ).otherwise(lit(2.0))
    )
)

print(f"✓ Recommendations cleaned: {recommendations_clean.count():,}")
recommendations_clean.select('user_id', 'app_id', 'is_recommended', 'hours', 'implicit_rating').show(10)

# Step 2: Prepare games features
print("\n[STEP 2] Preparing game features...")
games_features = games_df.select(
    'app_id',
    'title',
    'price_final',
    'positive_ratio',
    'user_reviews',
    'rating',
    'win',
    'mac',
    'linux',
    'steam_deck'
)

# Encode rating
rating_indexer = StringIndexer(
    inputCol='rating',
    outputCol='rating_encoded',
    handleInvalid='skip'
)
games_encoded = rating_indexer.fit(games_features).transform(games_features)

# Platform features
games_encoded = games_encoded.withColumn(
    'platform_count',
    (col('win').cast('int') + col('mac').cast('int') + col('linux').cast('int'))
).withColumn(
    'steam_deck_int',
    col('steam_deck').cast('int')
)

print(f"✓ Game features prepared: {games_encoded.count():,}")
games_encoded.select('app_id', 'title', 'price_final', 'rating_encoded', 'platform_count').show(10)

# Step 3: Scale numerical features
print("\n[STEP 3] Scaling numerical features...")
numerical_cols = ['price_final', 'positive_ratio', 'user_reviews']

assembler = VectorAssembler(
    inputCols=numerical_cols,
    outputCol='game_features_vector',
    handleInvalid='skip'
)
games_vectorized = assembler.transform(games_encoded)

scaler = StandardScaler(
    inputCol='game_features_vector',
    outputCol='scaled_game_features',
    withMean=True,
    withStd=True
)
scaler_model = scaler.fit(games_vectorized)
games_scaled = scaler_model.transform(games_vectorized)

print("✓ Features scaled")

# Step 4: Create user profiles
print("\n[STEP 4] Creating user profiles...")
user_profiles = recommendations_clean.groupBy('user_id').agg(
    count('*').alias('total_interactions'),
    avg('implicit_rating').alias('avg_rating_given'),
    sum('hours').alias('total_hours_played'),
    sum(when(col('is_recommended') == True, 1).otherwise(0)).alias('recommended_count')
).fillna({
    'avg_rating_given': 0.0,
    'total_hours_played': 0.0,
    'recommended_count': 0
})

user_profiles = user_profiles.withColumn(
    'engagement_level',
    when(col('total_interactions') > 100, 'very_high')
    .when(col('total_interactions') > 50, 'high')
    .when(col('total_interactions') > 10, 'medium')
    .otherwise('low')
)

# Encode engagement
engagement_indexer = StringIndexer(
    inputCol='engagement_level',
    outputCol='engagement_encoded'
)
user_profiles = engagement_indexer.fit(user_profiles).transform(user_profiles)

print(f"✓ User profiles created: {user_profiles.count():,}")
user_profiles.select('user_id', 'total_interactions', 'engagement_level').show(10)

# Step 5: Combine all features
print("\n[STEP 5] Creating final training dataset...")
training_data = recommendations_clean.join(
    user_profiles.select('user_id', 'engagement_level', 'engagement_encoded', 'avg_rating_given', 'total_hours_played'),
    on='user_id',
    how='inner'
).join(
    games_scaled.select(
        'app_id', 'title', 'price_final', 'rating_encoded', 'platform_count', 'steam_deck_int'
    ),
    on='app_id',
    how='inner'
)

print(f"✓ Training dataset: {training_data.count():,} rows")
print(f"  Unique users: {training_data.select('user_id').distinct().count():,}")
print(f"  Unique games: {training_data.select('app_id').distinct().count():,}")

training_data.printSchema()
training_data.show(10, truncate=False)

# Step 6: Reduce data for faster ALS training
print("\n[STEP 6] Reducing data for manageable ALS training...")
sample_fraction = 0.25
sampled_users = training_data.select('user_id').distinct().sample(fraction=sample_fraction, seed=42)
training_data_reduced = training_data.join(sampled_users, on='user_id', how='inner')

reduced_count = training_data_reduced.count()
print(f"✓ Data reduced: {reduced_count:,} rows (from {training_data.count():,})")
print(f"  Reduction: {(1 - reduced_count/training_data.count())*100:.1f}%")

# Cache and use reduced data
training_data = training_data_reduced
training_data.cache()
training_data.count()

print("\n✓ Training data cached")
print("=" * 80)


CODE 2: DATA PREPROCESSING

[STEP 1] Preparing recommendations...
✓ Recommendations cleaned: 41,154,794
+-------+-------+--------------+-----+------------------+
|user_id| app_id|is_recommended|hours|   implicit_rating|
+-------+-------+--------------+-----+------------------+
|  51580| 975370|          true| 36.3|  4.61899332664977|
|   2586| 304390|         false| 11.5|1.4742713556917444|
| 253880|1085660|          true|336.5|               5.0|
| 259432| 703080|          true| 27.4|  4.34638914516716|
|  23869| 526870|          true|  7.9| 3.186051276738094|
|  45425| 306130|          true|  8.6|3.2617630984737906|
|  88282| 238960|          true|538.8|               5.0|
|  63209|    730|         false|157.5|               0.0|
| 354512| 255710|          true| 18.7|3.9806186357439426|
| 454422| 289070|          true|397.5|               5.0|
+-------+-------+--------------+-----+------------------+
only showing top 10 rows


[STEP 2] Preparing game features...
✓ Game features prepa

In [4]:
# CODE 3: ALS COLLABORATIVE FILTERING - FAST

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

print("=" * 80)
print("CODE 3: ALS COLLABORATIVE FILTERING")
print("=" * 80)

training_data_als = training_data.select(
    col('user_id').cast('int').alias('user'),
    col('app_id').cast('int').alias('item'),
    col('implicit_rating').cast('float').alias('rating')
).filter(col('rating') > 0.0).repartition(100)

print(f"ALS samples: {training_data_als.count():,}")
training_data_als.select('user', 'item', 'rating').describe().show()

train_als, test_als = training_data_als.randomSplit([0.8, 0.2], seed=42)

als = ALS(
    userCol='user',
    itemCol='item',
    ratingCol='rating',
    maxIter=15,
    rank=20,
    regParam=0.05,
    alpha=10.0,
    coldStartStrategy='drop',
    nonnegative=True,
    seed=42
)

print("\nTraining ALS...")
als_model = als.fit(train_als)

als_predictions = als_model.transform(test_als)
als_valid_preds = als_predictions.filter(col('prediction').isNotNull())

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
als_rmse = evaluator.evaluate(als_valid_preds)

print(f"✓ ALS RMSE: {als_rmse:.4f}")

user_recs = als_model.recommendForAllUsers(10)
print(f"✓ Recommendations for {user_recs.count():,} users")
user_recs.limit(3).select('user', 'recommendations').show(3, truncate=False)

als_model.cache()
print("✓ ALS model cached")
print("=" * 80)


CODE 3: ALS COLLABORATIVE FILTERING
ALS samples: 9,896,021
+-------+-----------------+-----------------+------------------+
|summary|             user|             item|            rating|
+-------+-----------------+-----------------+------------------+
|  count|          9896021|          9896021|           9896021|
|   mean|7448383.135804987|601505.5614731415|3.8570932689966178|
| stddev|4016469.412926007|471804.4857823358|1.2242009086798202|
|    min|                2|               10|      0.0017992983|
|    max|         14306062|          2245890|               5.0|
+-------+-----------------+-----------------+------------------+


Training ALS...
✓ ALS RMSE: 1.5425
✓ Recommendations for 2,947,250 users
+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user|recommendations                                      

AttributeError: 'ALSModel' object has no attribute 'cache'

In [3]:
# DIAGNOSTIC CODE
print("Checking data quality before ALS:")
print(f"Total training records: {training_data.count():,}")
print(f"Null implicit_rating: {training_data.filter(col('implicit_rating').isNull()).count():,}")
print(f"Zero/negative ratings: {training_data.filter(col('implicit_rating') <= 0).count():,}")
print(f"Valid ratings: {training_data.filter(col('implicit_rating') > 0).count():,}")

print("\nRating distribution:")
training_data.filter(col('implicit_rating') > 0).select('implicit_rating').describe().show()


Checking data quality before ALS:
Total training records: 10,284,174
Null implicit_rating: 0
Zero/negative ratings: 388,153
Valid ratings: 9,896,021

Rating distribution:
+-------+--------------------+
|summary|     implicit_rating|
+-------+--------------------+
|  count|             9896021|
|   mean|  3.8570932650295293|
| stddev|  1.2242009099016957|
|    min|0.001799298330801...|
|    max|                 5.0|
+-------+--------------------+



In [7]:
# CODE 4: CONTENT-BASED FILTERING WITH GRADIENT BOOSTING

from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.sql.functions import col

print("=" * 80)
print("CODE 4: GRADIENT BOOSTING - CONTENT-BASED FILTERING")
print("=" * 80)

# Features that actually exist in training_data
feature_cols = ['price_final', 'rating_encoded', 'platform_count', 'steam_deck_int', 'engagement_encoded']

print(f"\nUsing features: {feature_cols}")

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol='features',
    handleInvalid='skip'
)

training_features = assembler.transform(training_data.fillna(0)).dropna(subset=['features'])

print(f"Training samples for GBT: {training_features.count():,}")

train_data, test_data = training_features.randomSplit([0.8, 0.2], seed=42)
print(f"Train: {train_data.count():,}, Test: {test_data.count():,}")

gbt = GBTRegressor(
    featuresCol='features',
    labelCol='implicit_rating',
    maxDepth=5,
    maxBins=32,
    minInstancesPerNode=1,
    subsamplingRate=0.8,
    seed=42
)

param_grid_gbt = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5, 7]) \
    .addGrid(gbt.maxBins, [32, 64]) \
    .build()

evaluator_gbt = RegressionEvaluator(
    metricName='rmse',
    labelCol='implicit_rating',
    predictionCol='prediction'
)

tvs_gbt = TrainValidationSplit(
    estimator=gbt,
    estimatorParamMaps=param_grid_gbt,
    evaluator=evaluator_gbt,
    trainRatio=0.8,
    seed=42
)

print("\nTraining Gradient Boosting...")
gbt_model = tvs_gbt.fit(train_data)

gbt_predictions = gbt_model.transform(test_data)
gbt_rmse = evaluator_gbt.evaluate(gbt_predictions)
print(f"✓ Gradient Boosting RMSE: {gbt_rmse:.4f}")

print("\nFeature Importance:")
feature_importance = gbt_model.bestModel.featureImportances
for i, importance in enumerate(feature_importance):
    if importance > 0:
        print(f"  {feature_cols[i]}: {importance:.4f}")

gbt_model.bestModel.cache()
print("\n✓ GBT model cached")
print("=" * 80)


CODE 4: GRADIENT BOOSTING - CONTENT-BASED FILTERING

Using features: ['price_final', 'rating_encoded', 'platform_count', 'steam_deck_int', 'engagement_encoded']
Training samples for GBT: 10,284,174
Train: 8,225,831, Test: 2,058,343

Training Gradient Boosting...
✓ Gradient Boosting RMSE: 1.3231

Feature Importance:
  price_final: 0.5172
  rating_encoded: 0.2820
  platform_count: 0.0903
  steam_deck_int: 0.0004
  engagement_encoded: 0.1101


AttributeError: 'GBTRegressionModel' object has no attribute 'cache'

In [8]:
# CODE 5: RANDOM FOREST FOR RATING PREDICTION - WITH DATA REDUCTION

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.sql.functions import col

print("=" * 80)
print("CODE 5: RANDOM FOREST - CONTENT-BASED FILTERING")
print("=" * 80)

# Reduce training data to ~1 million (10 lakh) for faster RF training
print("\n[STEP 1] Reducing training data for faster RF training...")
current_size = training_data.count()
target_size = 1000000  # 10 lakh
sample_fraction = target_size / current_size

training_data_rf_reduced = training_data.sample(fraction=sample_fraction, seed=42)
reduced_size = training_data_rf_reduced.count()

print(f"Original size: {current_size:,}")
print(f"Target size: {target_size:,}")
print(f"Reduced size: {reduced_size:,}")
print(f"Reduction: {(1 - reduced_size/current_size)*100:.1f}%")

# Create features
print("\n[STEP 2] Creating feature vectors...")
feature_cols = ['price_final', 'rating_encoded', 'platform_count', 'steam_deck_int', 'engagement_encoded']

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol='features',
    handleInvalid='skip'
)

training_features = assembler.transform(training_data_rf_reduced.fillna(0)).dropna(subset=['features'])

print(f"Features created: {training_features.count():,}")

# Split data
train_data_rf, test_data_rf = training_features.randomSplit([0.8, 0.2], seed=42)
print(f"Train: {train_data_rf.count():,}, Test: {test_data_rf.count():,}")

# Random Forest model
rf = RandomForestRegressor(
    featuresCol='features',
    labelCol='implicit_rating',
    numTrees=50,
    maxDepth=10,
    seed=42
)

# Hyperparameter grid
param_grid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [8, 10]) \
    .build()

evaluator_rf = RegressionEvaluator(
    metricName='rmse',
    labelCol='implicit_rating',
    predictionCol='prediction'
)

tvs_rf = TrainValidationSplit(
    estimator=rf,
    estimatorParamMaps=param_grid_rf,
    evaluator=evaluator_rf,
    trainRatio=0.8,
    seed=42
)

print("\n[STEP 3] Training Random Forest...")
rf_model = tvs_rf.fit(train_data_rf)

# Evaluate
rf_predictions = rf_model.transform(test_data_rf)
rf_rmse = evaluator_rf.evaluate(rf_predictions)
print(f"✓ Random Forest RMSE: {rf_rmse:.4f}")

print("\nFeature Importance:")
feature_importance_rf = rf_model.bestModel.featureImportances
for i, importance in enumerate(feature_importance_rf):
    if importance > 0:
        print(f"  {feature_cols[i]}: {importance:.4f}")

rf_model.bestModel.cache()
print("\n✓ RF model cached")
print("=" * 80)


CODE 5: RANDOM FOREST - CONTENT-BASED FILTERING

[STEP 1] Reducing training data for faster RF training...
Original size: 10,284,174
Target size: 1,000,000
Reduced size: 1,000,928
Reduction: 90.3%

[STEP 2] Creating feature vectors...
Features created: 1,000,928
Train: 800,728, Test: 200,200

[STEP 3] Training Random Forest...
✓ Random Forest RMSE: 1.3326

Feature Importance:
  price_final: 0.4631
  rating_encoded: 0.3319
  platform_count: 0.0328
  steam_deck_int: 0.0001
  engagement_encoded: 0.1720


AttributeError: 'RandomForestRegressionModel' object has no attribute 'cache'

In [9]:
# CODE 6: STACKING ENSEMBLE MODEL

from pyspark.ml.regression import LinearRegression

print("=" * 80)
print("CODE 6: STACKING ENSEMBLE")
print("=" * 80)

# Get predictions from both models
print("\nGenerating base model predictions...")
gbt_preds = gbt_model.bestModel.transform(test_data).select(
    'user_id', 'app_id', 'implicit_rating', col('prediction').alias('gbt_pred')
)

rf_preds = rf_model.bestModel.transform(test_data_rf).select(
    'user_id', 'app_id', col('prediction').alias('rf_pred')
)

# Combine predictions
ensemble_data = gbt_preds.join(
    rf_preds.select('user_id', 'app_id', 'rf_pred'),
    on=['user_id', 'app_id'],
    how='inner'
)

print(f"Ensemble training samples: {ensemble_data.count():,}")

# Meta-learner features
meta_assembler = VectorAssembler(
    inputCols=['gbt_pred', 'rf_pred'],
    outputCol='meta_features'
)
meta_data = meta_assembler.transform(ensemble_data)

# Train meta-learner
meta_learner = LinearRegression(
    featuresCol='meta_features',
    labelCol='implicit_rating',
    maxIter=100
)

print("Training meta-learner...")
ensemble_model = meta_learner.fit(meta_data)

# Evaluate ensemble
ensemble_preds = ensemble_model.transform(meta_data)
evaluator_ensemble = RegressionEvaluator(
    metricName='rmse',
    labelCol='implicit_rating',
    predictionCol='prediction'
)
ensemble_rmse = evaluator_ensemble.evaluate(ensemble_preds)

print(f"✓ Ensemble RMSE: {ensemble_rmse:.4f}")
print(f"\nMeta-learner weights:")
print(f"  GBT coefficient: {ensemble_model.coefficients[0]:.4f}")
print(f"  RF coefficient: {ensemble_model.coefficients[1]:.4f}")
print(f"  Intercept: {ensemble_model.intercept:.4f}")

ensemble_model.cache()
print("\n✓ Ensemble model cached")
print("=" * 80)


CODE 6: STACKING ENSEMBLE

Generating base model predictions...
Ensemble training samples: 26,449
Training meta-learner...
✓ Ensemble RMSE: 1.3123

Meta-learner weights:
  GBT coefficient: 0.9359
  RF coefficient: 0.0220
  Intercept: 0.1359


AttributeError: 'LinearRegressionModel' object has no attribute 'cache'

In [11]:
# CODE 7: EXPLAINABILITY & XAI

import pandas as pd
import numpy as np

print("=" * 80)
print("CODE 7: MODEL EXPLAINABILITY (XAI)")
print("=" * 80)

# Get sample predictions
sample_size = 1000
# Get sample predictions
predictions_sample = ensemble_preds.select(
    'gbt_pred', 'rf_pred', 'prediction', 'implicit_rating'
).limit(sample_size).toPandas()

print(f"\nAnalyzing {len(predictions_sample)} predictions...")

# Calculate errors correctly with Pandas abs()
predictions_sample['error'] = predictions_sample['prediction'] - predictions_sample['implicit_rating']
predictions_sample['abs_error'] = predictions_sample['error'].abs()
predictions_sample['percentage_error'] = (predictions_sample['abs_error'] / (predictions_sample['implicit_rating'].abs() + 0.1)) * 100

print("\nPrediction Error Statistics:")
print(predictions_sample[['prediction', 'implicit_rating', 'error', 'abs_error']].describe())

# The rest remains unchanged...


print("\nError Distribution:")
print(f"  Mean Error: {predictions_sample['error'].mean():.4f}")
print(f"  Std Dev: {predictions_sample['error'].std():.4f}")
print(f"  Max Error: {predictions_sample['error'].max():.4f}")
print(f"  Min Error: {predictions_sample['error'].min():.4f}")
print(f"  Median Abs Error: {predictions_sample['abs_error'].median():.4f}")

# High error cases
high_error_threshold = predictions_sample['abs_error'].quantile(0.75)
high_errors = predictions_sample[predictions_sample['abs_error'] > high_error_threshold]

print(f"\nHigh Error Cases (Top 25%, threshold: {high_error_threshold:.4f}):")
print(f"  Count: {len(high_errors)}")
print(high_errors[['gbt_pred', 'rf_pred', 'prediction', 'implicit_rating', 'error']].head(10))

# Model contribution analysis
predictions_sample['gbt_weight'] = 0.5
predictions_sample['rf_weight'] = 0.5
predictions_sample['gbt_contribution'] = predictions_sample['gbt_pred'] * predictions_sample['gbt_weight']
predictions_sample['rf_contribution'] = predictions_sample['rf_pred'] * predictions_sample['rf_weight']

print(f"\nAverage Model Contributions:")
print(f"  GBT avg contribution: {predictions_sample['gbt_contribution'].mean():.4f}")
print(f"  RF avg contribution: {predictions_sample['rf_contribution'].mean():.4f}")

print("\n✓ Explainability analysis complete")
print("=" * 80)


CODE 7: MODEL EXPLAINABILITY (XAI)

Analyzing 1000 predictions...

Prediction Error Statistics:
        prediction  implicit_rating        error    abs_error
count  1000.000000      1000.000000  1000.000000  1000.000000
mean      3.670801         3.655151     0.015649     1.056508
std       0.440082         1.398482     1.321994     0.794094
min       2.431049         0.000000    -2.430318     0.000740
25%       3.378209         2.791759    -0.933366     0.523196
50%       3.721290         3.949675    -0.362464     0.888497
75%       4.037504         5.000000     0.756439     1.347650
max       4.733809         5.000000     4.510799     4.510799

Error Distribution:
  Mean Error: 0.0156
  Std Dev: 1.3220
  Max Error: 4.5108
  Min Error: -2.4303
  Median Abs Error: 0.8885

High Error Cases (Top 25%, threshold: 1.3477):
  Count: 250
    gbt_pred   rf_pred  prediction  implicit_rating     error
7   3.971173  3.923575    3.939050         2.410987  1.528063
16  4.478852  4.310445    4.42271

In [None]:
# CODE 8: MODEL COMPARISON & FINAL RECOMMENDATIONS

from pyspark.sql.functions import desc, row_number, col
from pyspark.sql.window import Window

print("=" * 80)
print("CODE 8: MODEL COMPARISON & FINAL RECOMMENDATIONS")
print("=" * 80)

# Model comparison dictionary (fill with your actual RMSE values)
model_comparison = {
    'ALS': als_rmse,
    'Gradient Boosting': gbt_rmse,
    'Random Forest': rf_rmse,
    'Stacking Ensemble': ensemble_rmse
}

print("\n" + "=" * 60)
print("MODEL PERFORMANCE COMPARISON")
print("=" * 60)

# Convert values to list for max()
# Use built-in max (ensure pyspark.sql.functions.max is not imported)
max_rmse = __builtins__.max(list(model_comparison.values()))

for i, (model, rmse) in enumerate(sorted(model_comparison.items(), key=lambda x: x[1]), 1):
    improvement = ((max_rmse - rmse) / max_rmse) * 100
    print(f"{i}. {model:25} RMSE: {rmse:.4f}  (Improvement: {improvement:.2f}%)")


best_model_name = __builtins__.min(model_comparison, key=model_comparison.get)
print(f"\n✓ Best Model: {best_model_name}")

# Generate final recommendations from ensemble
print("\n" + "=" * 60)
print("GENERATING FINAL RECOMMENDATIONS")
print("=" * 60)

# Features to use for prediction
feature_cols = ['price_final', 'rating_encoded', 'platform_count', 'steam_deck_int', 'engagement_encoded']

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features', handleInvalid='skip')

all_features = assembler.transform(training_data.fillna(0)).dropna(subset=['features'])

# Get predictions from GBT and RF models
gbt_all_preds = gbt_model.bestModel.transform(all_features).select(
    'user_id', 'app_id', 'implicit_rating', col('prediction').alias('gbt_pred')
)

rf_all_preds = rf_model.bestModel.transform(all_features).select(
    'user_id', 'app_id', col('prediction').alias('rf_pred')
)

# Join predictions to prepare for ensemble
ensemble_all = gbt_all_preds.join(
    rf_all_preds.select('user_id', 'app_id', 'rf_pred'),
    on=['user_id', 'app_id'],
    how='inner'
)

meta_assembler = VectorAssembler(inputCols=['gbt_pred', 'rf_pred'], outputCol='meta_features')
meta_all = meta_assembler.transform(ensemble_all)

final_preds = ensemble_model.transform(meta_all)

# Filter top recommendations with predicted rating threshold
recs = final_preds.select(
    col('user_id').cast('int'),
    col('app_id').cast('int'),
    col('prediction').alias('predicted_rating')
).filter(col('predicted_rating') > 3.5)

print(f"\nRecommendations with predicted rating > 3.5: {recs.count():,}")

# Rank top 10 recommendations per user
window_spec = Window.partitionBy('user_id').orderBy(desc('predicted_rating'))
top_recs = recs.withColumn('rank', row_number().over(window_spec)) \
    .filter(col('rank') <= 10)

# Join with game titles for display
final_recommendations = top_recs.join(
    games_df.select('app_id', col('title').alias('game_title')),
    on='app_id',
    how='left'
)

print(f"\nFinal recommendations with titles: {final_recommendations.count():,}")
print("\nSample Top Game Recommendations:")
final_recommendations.select(
    'user_id', 'game_title', 'predicted_rating', 'rank'
).orderBy('user_id', 'rank').limit(50).show(50, truncate=False)

# Save recommendations for downstream use
# final_recommendations.coalesce(1).write.mode('overwrite').parquet('steam_recommendations_final')
# print("\n✓ Recommendations saved to 'steam_recommendations_final' directory")

# # Print summary stats
# print("\n" + "=" * 60)
# print("RECOMMENDATION SYSTEM SUMMARY")
# print("=" * 60)
# print(f"Total Unique Users: {training_data.select('user_id').distinct().count():,}")
# print(f"Total Unique Games: {training_data.select('app_id').distinct().count():,}")
# print(f"Training Interactions: {training_data.count():,}")
# print(f"Final Recommendations Generated: {final_recommendations.count():,}")
# print(f"Best Model: {best_model_name} (RMSE: {model_comparison[best_model_name]:.4f})")
# print("\n✓ Recommendation System Complete!")
# print("=" * 80)


CODE 8: MODEL COMPARISON & FINAL RECOMMENDATIONS

MODEL PERFORMANCE COMPARISON
1. Stacking Ensemble         RMSE: 1.3123  (Improvement: 14.92%)
2. Gradient Boosting         RMSE: 1.3231  (Improvement: 14.22%)
3. Random Forest             RMSE: 1.3326  (Improvement: 13.61%)
4. ALS                       RMSE: 1.5425  (Improvement: 0.00%)

✓ Best Model: Stacking Ensemble

GENERATING FINAL RECOMMENDATIONS

Recommendations with predicted rating > 3.5: 7,054,384

Final recommendations with titles: 6,675,817

Sample Top Game Recommendations:
+-------+-------------------------------------+------------------+----+
|user_id|game_title                           |predicted_rating  |rank|
+-------+-------------------------------------+------------------+----+
|2      |Subnautica: Below Zero               |4.299773105641976 |1   |
|2      |GTFO                                 |4.037504495999813 |2   |
|2      |Among Us                             |3.8260133887510923|3   |
|2      |Brawlhalla        

Py4JJavaError: An error occurred while calling o4389.parquet.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:802)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


In [17]:
# Print summary stats
print("\n" + "=" * 60)
print("RECOMMENDATION SYSTEM SUMMARY")
print("=" * 60)
print(f"Total Unique Users: {training_data.select('user_id').distinct().count():,}")
print(f"Total Unique Games: {training_data.select('app_id').distinct().count():,}")
print(f"Training Interactions: {training_data.count():,}")
print(f"Final Recommendations Generated: {final_recommendations.count():,}")
print(f"Best Model: {best_model_name} (RMSE: {model_comparison[best_model_name]:.4f})")
print("\n✓ Recommendation System Complete!")
print("=" * 80)


RECOMMENDATION SYSTEM SUMMARY
Total Unique Users: 3,445,406
Total Unique Games: 35,837
Training Interactions: 10,284,174
Final Recommendations Generated: 6,675,817
Best Model: Stacking Ensemble (RMSE: 1.3123)

✓ Recommendation System Complete!
