Install and import necessary packages to load data from GCP and build model-based collaborative filtering recommender system

In [None]:
pip install --upgrade google-cloud-storage # install necessary package to load data from GCP



In [None]:
pip install fsspec # install necessary package to load data from GCP



In [None]:
pip install gcsfs # install necessary package to load data from GCP



In [None]:
pip install pyspark # install necessary package to build model-based collaborative filtering recommender system



In [None]:
# import necessary packages and functions to load data from GCP
import os
from google.cloud import storage
import pandas as pd

In [None]:
# import necessary packages and functions to build model-based collaborative filtering recommender system
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

Spark Configuration

In [None]:
#spark configuration
spark = SparkSession.builder.getOrCreate()
spark

Load Data from Google Cloud Storage

In [None]:
# load data from GCP
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/content/eloquent-life-328614-f56ab675c052.json'
storage_client = storage.Client()
bucket = storage_client.get_bucket('data5006_batch')
ratings = pd.read_csv('gs://data5006_batch/Google_Game_Ratings.csv')
ratings

Unnamed: 0,user_id,item_id,rating
0,-9,Game133,3
1,0,Game9,2
2,0,Game14,4
3,0,Game129,4
4,0,Game152,5
...,...,...,...
289582,zzz zzz,Game2,2
289583,Zzzdreaminq,Game78,5
289584,zzzlepy,Game37,5
289585,Zzzz Zzzz,Game138,5


In [None]:
# convert pandas data frame into spark
ratings_values = ratings.values.tolist()
ratings_columns = list(ratings.columns)
ratings = spark.createDataFrame(ratings_values, ratings_columns)
ratings.show()

+-------+-------+------+
|user_id|item_id|rating|
+-------+-------+------+
|     -9|Game133|     3|
|      0|  Game9|     2|
|      0| Game14|     4|
|      0|Game129|     4|
|      0|Game152|     5|
|4:00 PM|Game123|     4|
|      1|Game112|     5|
|      2|Game107|     2|
|    6.1|Game161|     2|
|     69|Game129|     1|
|     77|  Game2|     2|
|    606| Game87|     5|
|    609|Game129|     5|
|    666| Game27|     5|
|    999|Game144|     5|
|   4284| Game10|     5|
|  29574| Game38|     3|
| Jun-99|Game154|     5|
|  39773| Game19|     3|
|  7-Mar| Game27|     2|
+-------+-------+------+
only showing top 20 rows



In [None]:
newcolnames = ['user_id','item_id','rating']
for c,n in zip(ratings.columns,newcolnames):
    ratings=ratings.withColumnRenamed(c,n)
ratings

DataFrame[user_id: string, item_id: string, rating: bigint]

In [None]:
#show how many partitions the data is spread over
ratings.rdd.getNumPartitions()

8

In [None]:
# ensure data is in correct format
ratings = ratings.withColumn("rating", ratings.rating.cast("Float"))
ratings.select("user_id","item_id","rating").show(10)

+-------+-------+------+
|user_id|item_id|rating|
+-------+-------+------+
|     -9|Game133|   3.0|
|      0|  Game9|   2.0|
|      0| Game14|   4.0|
|      0|Game129|   4.0|
|      0|Game152|   5.0|
|4:00 PM|Game123|   4.0|
|      1|Game112|   5.0|
|      2|Game107|   2.0|
|    6.1|Game161|   2.0|
|     69|Game129|   1.0|
+-------+-------+------+
only showing top 10 rows



In [None]:
# check the summary for the dataset
ratings.describe().show()

+-------+--------+-------+------------------+
|summary| user_id|item_id|            rating|
+-------+--------+-------+------------------+
|  count|  289587| 289587|            289587|
|   mean|Infinity|   null| 3.462389540966963|
| stddev|     NaN|   null|1.4904903638326985|
|    min|! Riuny!|  Game1|               1.0|
|    max|히하라라| Game99|               5.0|
+-------+--------+-------+------------------+



In [None]:
# convert user_id and item_id from string into numeric for matrix indexing
import numpy as np
userids = np.sort([x.user_id for x in ratings.select("user_id").distinct().collect()])
userid_encode = {x: i for i, x in enumerate(userids)}
itemids = np.sort([x.item_id for x in ratings.select("item_id").distinct().collect()])
itemid_encode = {x: i for i, x in enumerate(itemids)}
print(len(userids), len(itemids))

240669 170


In [None]:
# copy the integer indices into the ratings dataframe
rdd2=ratings.rdd.map(lambda x: (userid_encode[x[0]],itemid_encode[x[1]],float(x[2])))
ratings2 = rdd2.toDF()
for c,n in zip(ratings2.columns,newcolnames):
    ratings2=ratings2.withColumnRenamed(c,n)
ratings2.show(10)

+-------+-------+------+
|user_id|item_id|rating|
+-------+-------+------+
|    112|     38|   3.0|
|    155|    159|   2.0|
|    155|     45|   4.0|
|    155|     33|   4.0|
|    155|     59|   5.0|
|    614|     27|   4.0|
|    221|     15|   5.0|
|    393|      9|   2.0|
|    670|     69|   2.0|
|    686|     33|   1.0|
+-------+-------+------+
only showing top 10 rows



In [None]:
# split data into training and testing sets 
(training, testing) = ratings2.randomSplit([0.8, 0.2], seed = 66)
print("trainset=",training.count(), "testing set=", testing.count())

trainset= 231481 testing set= 58106


In [None]:
# build model-based collaborative filtering recommender system by ALS
als = ALS(maxIter=20, rank=15, regParam=0.01, userCol="user_id", itemCol="item_id", ratingCol="rating", coldStartStrategy="drop", implicitPrefs=False)
model = als.fit(training)

In [None]:
# evaluate the mdoel
predictions = model.transform(testing)
evaluator = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")
error = evaluator.evaluate(predictions)
print("Mean Absolute error = ", error)

Mean Absolute error =  1.3197082397718838


Make Recommendations for each user and game

In [None]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
gameRecs = model.recommendForAllItems(10)
userRecs.show(10)
gameRecs.show(10)



+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|     26|[{154, 6.0566382}...|
|     27|[{140, 8.06922}, ...|
|     28|[{147, 5.0413127}...|
|     31|[{136, 9.275289},...|
|     53|[{140, 8.70141}, ...|
|     65|[{147, 4.03305}, ...|
|     76|[{76, 8.509389}, ...|
|     85|[{140, 9.699274},...|
|    101|[{136, 8.72545}, ...|
|    103|[{136, 1.482472},...|
+-------+--------------------+
only showing top 10 rows

+-------+--------------------+
|item_id|     recommendations|
+-------+--------------------+
|      0|[{88656, 5.140218...|
|      1|[{87804, 6.380102...|
|      2|[{40901, 6.152019...|
|      3|[{139480, 8.44949...|
|      4|[{136844, 8.00370...|
|      5|[{83112, 7.80784}...|
|      6|[{161599, 6.82634...|
|      7|[{143305, 6.41417...|
|      8|[{204621, 13.3616...|
|      9|[{184876, 6.47255...|
+-------+--------------------+
only showing top 10 rows



In [None]:
user_recommendations = userRecs.toPandas()
item_recommendations = gameRecs.toPandas()

In [None]:
user_recommendations.to_csv('user_recommendations.csv') 
item_recommendations.to_csv('item_recommendations.csv') 