# Model 3: user-based/collaborative filtering (ALS)
- Recommend games to users based on similar users’ preferences.
- input: purchased_games
- This code run successfully on Google Colab

In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

spark_version = "3.5.5"
!wget -q https://dlcdn.apache.org/spark/spark-{spark_version}/spark-{spark_version}-bin-hadoop3.tgz

!tar xf spark-{spark_version}-bin-hadoop3.tgz

!pip install findspark

!pip install pyspark==3.5.5

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
import findspark, os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.5-bin-hadoop3"

findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SteamGameRecommender") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

In [None]:
from google.colab import drive

drive.mount('/content/drive')
mount_point = "/content/drive/MyDrive/colab_mount/"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
purchased_games = spark.read.csv(f'{mount_point}purchased_games_demo.csv', header=True, inferSchema=True)

In [None]:
purchased_games.show(5)

+-----------------+--------------------+
|         playerid|             library|
+-----------------+--------------------+
|76561198060698936|[60, 1670, 3830, ...|
|76561198287452552|[10, 80, 100, 240...|
|76561198040436563|[10, 80, 100, 300...|
|76561198042412488|[300, 240, 220, 3...|
|76561198119605821|[47870, 108600, 5...|
+-----------------+--------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import regexp_replace, split, col

# Step 1: remove brackets, spaces, and split into array of strings
purchased_games = purchased_games.withColumn(
    "library",
    split(regexp_replace("library", r"[\[\]\s]", ""), ",")
)

In [None]:
from pyspark.sql.functions import split
from pyspark.sql.functions import explode

# Split the list into an array (assuming it's stored as a string)
# user_game_pairs = purchased_games.withColumn("gameid", explode("library"))

# Explode the array into individual rows
user_game_pairs = purchased_games.select(col("playerid"), explode(col("library")).alias("gameid"))

# Convert gameid to integer (if needed)
user_game_pairs = user_game_pairs.withColumn("gameid", col("gameid").cast("int"))

In [None]:
# remove playerid with null gameid
user_game_pairs = user_game_pairs.filter(user_game_pairs.gameid.isNotNull())
# user_game_pairs.filter(user_game_pairs.gameid.isNull()).show()

In [None]:
from pyspark.ml.feature import StringIndexer

# Create an indexer for playerid
player_indexer = StringIndexer(inputCol="playerid", outputCol="userIndex", handleInvalid="skip")
user_game_pairs = player_indexer.fit(user_game_pairs).transform(user_game_pairs)

# Convert the new indexed column to Integer
user_game_pairs = user_game_pairs.withColumn("userIndex", col("userIndex").cast("int"))

In [None]:
# Convert data into ALS-friendly format (implicit feedback)
from pyspark.sql.functions import lit

# Assign implicit rating of 1 for all purchases
user_game_pairs = user_game_pairs.withColumn("rating", lit(1))

In [None]:
from pyspark.ml.recommendation import ALS

als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="userIndex",
    itemCol="gameid",
    ratingCol="rating",
    implicitPrefs=True  # Since we are using implicit feedback
)

In [None]:
# check if there is null gameid in user_game_pairs
user_game_pairs.filter(user_game_pairs.gameid.isNull()).show()

+--------+------+---------+------+
|playerid|gameid|userIndex|rating|
+--------+------+---------+------+
+--------+------+---------+------+



In [None]:
model = als.fit(user_game_pairs)

In [None]:
# recommendation = model.recommendForAllUsers(10)
# recommendation.show(5, truncate=False)

In [None]:
# Load the mapping of playerid to userIndex
player_index_mapping = user_game_pairs.select("playerid", "userIndex").distinct()

In [None]:
def get_user_index(playerid):
    user_index = player_index_mapping.filter(player_index_mapping.playerid == playerid).select("userIndex").collect()
    return user_index[0]["userIndex"] if user_index else None

# playerid = 76561198049101130  # Example input
# user_index = get_user_index(playerid)

# if user_index is None:
#     print("Player ID not found in training data.")

# if user_index is not None:
#     top_recommendations = model.recommendForAllUsers(5).filter(f"userIndex == {user_index}").select("recommendations").collect()

#     if top_recommendations:
#         recommended_games = [row["gameid"] for row in top_recommendations[0]["recommendations"]]
#         print(f"Top 10 recommended game IDs for player {playerid}: {recommended_games}")
#     else:
#         print("No recommendations found for this user.")

In [None]:
user_recommendations = model.recommendForAllUsers(10)
user_recommendations.write.mode("overwrite").parquet("user_recommendations.parquet")

In [None]:
# Load recommendations from storage
user_recommendations = spark.read.parquet("user_recommendations.parquet")

# Get recommendations for a specific user
def get_recommendations(playerid):
    user_index = get_user_index(playerid)
    if user_index is None:
        return "User not found in training data."

    recommendations = user_recommendations.filter(f"userIndex == {user_index}").select("recommendations").collect()

    if recommendations:
        return [row["gameid"] for row in recommendations[0]["recommendations"]]
    else:
        return "No recommendations available."

In [None]:
# Example usage
print(get_recommendations(76561199081881137))

[730, 578080, 1172470, 304930, 596350, 444090, 218620, 291550, 431960, 230410]
