In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS

In [None]:
spark = SparkSession.builder.appName("Game Recommendation System").getOrCreate()

In [None]:
game_df = spark.read.csv('games.csv', header=True).cache()
rating_df = spark.read.csv('ratings.csv', header=True).cache()

In [None]:
game_df = game_df.withColumn('game_id', game_df['game_id'].cast('int'))

rating_df = rating_df.withColumn('user_id', rating_df['user_id'].cast('int'))
rating_df = rating_df.withColumn('game_id', rating_df['game_id'].cast('int'))
rating_df = rating_df.withColumn('rating', rating_df['rating'].cast('int'))

In [None]:
rating_df.show()

In [None]:
game_df.show()

In [None]:
target_id = input('Enter ID: ')

In [None]:
rating_df.registerTempTable('ratings')
game_df.registerTempTable('games')

In [None]:
# Alternating Least Squares - PySpark Collaborative Filtering 
# ALS by default does explicit feedback
# Building ALU Model
als = ALS()
als.setMaxIter(5) # max number of iter to run 
als.setRegParam(0.01) # specifies the regularization param in ALS
als.setUserCol('user_id') # users column
als.setItemCol('game_id') # ratings column
als.setRatingCol('rating') # game's ratings
alsModel.setColdStartStrategy('drop') # in order to ensure we wont get NaN (Not a Number) values

In [None]:
# Training Model
(training_data, test_data) = rating_df.randomSplit([0.8, 0.2])

als_model = als.fit(training_data)

In [None]:
# Perdiction Error
predictions = als_model.transform(test_data)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="predictions")
rmse = evaluator.evaluate(predictions)

print("Error = ", str(rmse))

In [None]:
games_ratings_df = spark.sql('select * from games inner join ratings using (game_id);')
games_ratings_df.registerTempTable('games_ratings')

In [None]:
recommendations = als_model.recommendForUserSubset(games_ratings.where(games_ratings.user_id == int(target_id)), 5)
search = recommendations.collect()

for i in range(5):
    print(game_df.select('name').where(game_df.game_id == search[0][1][i][0]).collect())