# Mount

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


# PySpark Setup

## Download Java Virtual Machine (JVM)

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

## Download Hadoop

In [None]:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

## Install

In [None]:
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [None]:
!pip install -q findspark

## Find Spark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
findspark.find()

'/content/spark-3.0.0-bin-hadoop3.2'

# Imports

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import Row
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import numpy as np
from numpy import linalg as LA

# Spark Session

In [None]:
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
spark

# Datasets

In [None]:
dataset_dir = '/content/drive/MyDrive/Colab Datasets/RecommenderSystem/'

game_df = spark.read.csv(dataset_dir + 'games.csv', inferSchema=True, header=True).cache()
rating_df = spark.read.csv(dataset_dir +'ratings.csv', inferSchema=True, header=True).cache()

## Data preparation

In [None]:
rating_df = rating_df.withColumn('rating_normal', rating_df['rating']/5)
game_df = game_df.withColumn('meta_score_normal', game_df['meta_score']/100)

In [None]:
join_df = rating_df.join(game_df, on='game_id')

In [None]:
join_df.show(5)

+-------+-------+------+-------------+--------------------+------------+--------------------+----------+-----------------+
|game_id|user_id|rating|rating_normal|                name|release_date|             summary|meta_score|meta_score_normal|
+-------+-------+------+-------------+--------------------+------------+--------------------+----------+-----------------+
|      1|    314|     5|          1.0|The Legend of Zel...|   23-Nov-98|As a young boy, L...|        99|             0.99|
|      1|    439|     3|          0.6|The Legend of Zel...|   23-Nov-98|As a young boy, L...|        99|             0.99|
|      1|    588|     5|          1.0|The Legend of Zel...|   23-Nov-98|As a young boy, L...|        99|             0.99|
|      1|   1169|     4|          0.8|The Legend of Zel...|   23-Nov-98|As a young boy, L...|        99|             0.99|
|      1|   1185|     4|          0.8|The Legend of Zel...|   23-Nov-98|As a young boy, L...|        99|             0.99|
+-------+-------

In [None]:
join_df.printSchema()

root
 |-- game_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- rating_normal: double (nullable = true)
 |-- name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- meta_score: string (nullable = true)
 |-- meta_score_normal: double (nullable = true)



In [None]:
x = join_df.select('user_id', 'game_id', 'rating_normal')
# split into 70 train 30 test
train_test_split = x.randomSplit([0.7, 0.3])
train = train_test_split[0].withColumnRenamed('rating_normal', 'y')
test = train_test_split[1].withColumnRenamed('rating_normal', 'y*')

In [None]:
train_size = train.count()
test_size = test.count()

print(f''' - [Train/Test size] - {train_size}, {test_size}''')

 - [Train/Test size] - 686875, 294673


# ALS model

In [None]:
als = ALS(maxIter=20, regParam=0.01, coldStartStrategy='drop', seed=5)

als.setUserCol('user_id')
als.setItemCol('game_id')
als.setRatingCol('y')

ALS_92f6e2a5e803

In [None]:
model = als.fit(train)
print(' - [X] ALS model training is complete.')

 - [X] ALS model training is complete.


In [None]:
prediction = model.transform(test)
print(' - [X] ALS model training is complete.!')

 - [X] ALS model training is complete.!


In [None]:
prediction.join(game_df, 'game_id').select(
    'user_id', 'name', 'prediction', 'y*').show(n=10, truncate=False)

+-------+------------------------------+-----------+----+
|user_id|name                          |prediction |y*  |
+-------+------------------------------+-----------+----+
|20467  |Uncharted 3: Drake's Deception|0.022216773|0.03|
|588    |Uncharted 3: Drake's Deception|0.025167719|0.04|
|3922   |Uncharted 3: Drake's Deception|0.028978162|0.03|
|41282  |Uncharted 3: Drake's Deception|0.030626388|0.03|
|6630   |Uncharted 3: Drake's Deception|0.024680307|0.03|
|14603  |Uncharted 3: Drake's Deception|0.02730666 |0.04|
|15547  |Uncharted 3: Drake's Deception|0.021670384|0.04|
|33065  |Uncharted 3: Drake's Deception|0.027454996|0.03|
|18361  |Uncharted 3: Drake's Deception|0.028028283|0.04|
|11569  |Uncharted 3: Drake's Deception|0.026713159|0.02|
+-------+------------------------------+-----------+----+
only showing top 10 rows



# Evaluation

In [None]:
# Prediction count
pred_count = prediction.count()
# Drop rows with any missing data
prediction = prediction.dropna(how="any", subset=["prediction"])
# Clean prediction count
clean_pred_count = prediction.count()

print(f''' - [X] [Nan values] - {pred_count - clean_pred_count}''')

 - [X] [Nan values] - 0


In [None]:
evaluator = RegressionEvaluator(labelCol='y*', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(prediction)
print(f''' - [X] [Root Mean Square Error] - {rmse}''')

 - [X] [Root Mean Square Error] - 0.013039743453157025


# Recommendation System

## Cosine Simularity

In [None]:
def cosine_similarity(vector_1, vector_2):
		v1 = np.asarray(vector_1)
		v2 = np.asarray(vector_2)
		cs = v1.dot(v2) / (LA.norm(v1) * LA.norm(v2))
		return(cs)

In [None]:
def get_recommendations(similar_df):
    recom_df = train.join(similar_df, train.game_id == similar_df.item_index)
    recom_df = recom_df.select('game_id', 'similarity_score').distinct()
    recom_df = recom_df.orderBy(col('similarity_score').desc()).limit(5)
    recom_df = recom_df.join(game_df, on = 'game_id')
    recom_df.show()
 
def compute_cosine_similarity(itemFactors, game_id):
		item = itemFactors.where(col('id') == game_id).select(col('features'))
		item_features = item.rdd.map(lambda x: x.features).first()

		res = []
		for row in itemFactors.rdd.toLocalIterator():
			_id = row.__getattr__('id')
			features = row.__getattr__('features')
			similarity_score = cosine_similarity(features, item_features)
			if _id != game_id:
				res.append([_id, similarity_score])

		R = Row('item_index', 'similarity_score')
		return spark.createDataFrame([R(col[0], float(col[1])) for col in res])
  
in_game_id = int(input('[INP] Enter item index to generate similar recommendations: '))
print("Showing games similar to:")
game_df.filter(col("game_id")==in_game_id).show()
similar_itmes_df = compute_cosine_similarity(model.itemFactors, in_game_id)
get_recommendations(similar_itmes_df)


[INP] Enter item index to generate similar recommendations: 4
Showing games similar to:
+-------+-----------+------------+--------------------+----------+-----------------+
|game_id|       name|release_date|             summary|meta_score|meta_score_normal|
+-------+-----------+------------+--------------------+----------+-----------------+
|      4|SoulCalibur|    8-Sep-99|This is a tale of...|        98|             0.98|
+-------+-----------+------------+--------------------+----------+-----------------+

+-------+------------------+--------------------+------------+--------------------+----------+-----------------+
|game_id|  similarity_score|                name|release_date|             summary|meta_score|meta_score_normal|
+-------+------------------+--------------------+------------+--------------------+----------+-----------------+
|      5|0.9999999983794955|  Super Mario Galaxy|   12-Nov-07|[Metacritic's 200...|        97|             0.97|
|    178|0.9999999978130983|Thief:

## ALS recommender system

In [None]:
user_recsys = model.recommendForAllUsers(10)

user_recsys.show(5)

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|    148|[[5207, 0.0344616...|
|    463|[[5207, 0.0348015...|
|    471|[[5207, 0.0288380...|
|    496|[[5207, 0.0405066...|
|    833|[[5207, 0.0329119...|
+-------+--------------------+
only showing top 5 rows



In [None]:
user_recsys = model.recommendForUserSubset(join_df.select('user_id').distinct().limit(3), 10)
user_recsys.show(10)

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|  32592|[[5207, 0.0368112...|
|  35982|[[5207, 0.0344293...|
|  19984|[[5207, 0.0454410...|
+-------+--------------------+



In [None]:
game_recsys = model.recommendForAllItems(10)

game_recsys

DataFrame[game_id: int, recommendations: array<struct<user_id:int,rating:float>>]

In [None]:
game_recsys = model.recommendForItemSubset(join_df.select('game_id').distinct().limit(3), 10)
game_recsys.show(10)

+-------+--------------------+
|game_id|     recommendations|
+-------+--------------------+
|    471|[[21791, 0.039841...|
|    463|[[21791, 0.042800...|
|    148|[[21791, 0.037504...|
+-------+--------------------+

