# Step 2: ALS Game Recommendation
This notebook filters clean data, prepares user/item indices, trains ALS, evaluates it, and saves recommendations.

In [1]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, when
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer

from pyspark.ml.evaluation import RegressionEvaluator


In [2]:
import os
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ALS_Game_Recommender") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "2g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/09 23:31:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# ✅ Extra memory-friendly configs
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)


In [5]:

# ✅ Load cleaned reviews
df = spark.read.parquet("hdfs:///project/processed/steam_review_english.parquet")
df.printSchema()
df.show(5)


root
 |-- _c0: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- app_name: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: string (nullable = true)
 |-- timestamp_updated: string (nullable = true)
 |-- recommended: string (nullable = true)
 |-- votes_helpful: string (nullable = true)
 |-- votes_funny: string (nullable = true)
 |-- weighted_vote_score: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- steam_purchase: string (nullable = true)
 |-- received_for_free: string (nullable = true)
 |-- written_during_early_access: string (nullable = true)
 |-- author_steamid: string (nullable = true)
 |-- author_num_games_owned: string (nullable = true)
 |-- author_num_reviews: string (nullable = true)
 |-- author_playtime_forever: string (nullable = true)
 |-- author_playtime_last_two_weeks: string (nullable = true)
 |-- author_playt

In [6]:
from pyspark.sql.types import FloatType, IntegerType, BooleanType

# ✅ Fix column types after loading the dataframe
df = df.withColumn("author_playtime_forever", col("author_playtime_forever").cast(FloatType())) \
       .withColumn("author_playtime_at_review", col("author_playtime_at_review").cast(FloatType())) \
       .withColumn("author_playtime_last_two_weeks", col("author_playtime_last_two_weeks").cast(FloatType())) \
       .withColumn("recommended", when(col("recommended") == "True", True).otherwise(False)) \
       .withColumn("votes_helpful", col("votes_helpful").cast(IntegerType())) \
       .withColumn("votes_funny", col("votes_funny").cast(IntegerType())) \
       .withColumn("author_num_games_owned", col("author_num_games_owned").cast(IntegerType())) \
       .withColumn("author_num_reviews", col("author_num_reviews").cast(IntegerType()))


In [7]:

# ✅ Select relevant columns
data_rec = df.select("app_id", "app_name", "review_id", "language", "author_steamid",
                     "timestamp_created", "author_playtime_forever", "recommended")


In [8]:

# ✅ Filter games with at least 200 reviews
app_counts = data_rec.groupBy("app_name").count()
popular_apps = app_counts.filter(col("count") >= 200)
data_rec = data_rec.join(popular_apps, on="app_name", how="inner")


In [9]:

# ✅ Remove outliers based on playtime
def remove_outliers(df, column):
    q1, q3 = df.approxQuantile(column, [0.25, 0.75], 0.01)
    iqr = q3 - q1
    lower_limit = q1 - 2 * iqr
    upper_limit = q3 + 2 * iqr
    return df.filter((col(column) >= lower_limit) & (col(column) <= upper_limit))

data_rec = remove_outliers(data_rec, "author_playtime_forever")


                                                                                

In [10]:

# ✅ Identify power users based on playtime
mean_playtime = data_rec.agg(mean("author_playtime_forever").alias("mean")).collect()[0]["mean"]
power_users = data_rec.filter(col("author_playtime_forever") >= 5 * mean_playtime)
power_users = power_users.filter(col("author_steamid") >= 76560000000000000)


                                                                                

In [11]:

# ✅ Prepare ALS training data
als_data = power_users.select("author_steamid", "app_id", "app_name", "recommended")


In [14]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# ✅ Generate unique author_index
author_ids = als_data.select("author_steamid").distinct()
author_ids = author_ids.withColumn("author_index", row_number().over(Window.orderBy("author_steamid")) - 1)

# ✅ Generate unique app_index
app_ids = als_data.select("app_name").distinct()
app_ids = app_ids.withColumn("app_index", row_number().over(Window.orderBy("app_name")) - 1)

# ✅ Join back into ALS data
als_data = als_data.join(author_ids, on="author_steamid", how="inner")
als_data = als_data.join(app_ids, on="app_name", how="inner")


In [15]:

# ✅ Convert recommended → rating (True = 5, False = 1)
als_data = als_data.withColumn("rating", when(col("recommended") == True, 5).otherwise(1))


In [16]:

# ✅ Split into train/test
(train, test) = als_data.randomSplit([0.8, 0.2], seed=42)


In [17]:

# ✅ Train ALS Model
als = ALS(
    userCol="author_index",
    itemCol="app_index",
    ratingCol="rating",
    rank=15,
    maxIter=10,
    regParam=0.1,
    coldStartStrategy="drop",
    nonnegative=True
)

model = als.fit(train)


25/06/09 23:36:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 23:36:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 23:36:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 23:36:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 23:36:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 23:36:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 2

In [18]:

# ✅ Evaluate ALS with RMSE
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"✅ ALS RMSE: {rmse:.4f}")


25/06/09 23:36:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 23:36:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 23:36:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 23:36:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 23:36:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 23:36:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 2

✅ ALS RMSE: 2.4438


                                                                                

In [19]:

# ✅ Generate recommendations
user_recs = model.recommendForAllUsers(5)


In [20]:

# ✅ Save to HDFS
output_path = "hdfs:///project/outputs/app_recommendations.parquet"
user_recs.write.mode("overwrite").parquet(output_path)
print("✅ Recommendations saved to:", output_path)




✅ Recommendations saved to: hdfs:///project/outputs/app_recommendations.parquet


                                                                                