In [0]:
# Student Name: Haotian Gong
# Analytics Goal: My goal is to provide customized game recommendation.
# ML Model and Algorithm: 
#I am using ALS (alternative least squares) model to implement 
# collaborative filtering on user-item pairs, and recommend 10 games
# for each validation users.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

from time import time

from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.recommendation import ALS


In [0]:
spark = SparkSession.builder.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")\
                            .config("spark.network.timeout", "36000000000s")\
                            .config("spark.executor.heartbeatInterval", "36000000000s")\
                            .config('spark.driver.maxResultSize','32g')\
                            .config('spark.default.parallelism', 8)\
                            .config('spark.sql.shuffle.partitions',8)\
                            .config('spark.driver.memory', '32g')\
                            .config('spark.kryoserializer.buffer.max', '2047m')\
                            .appName("Haotian_model")\
                            .getOrCreate()

In [0]:
database = 'steam'
collection = 'steam'
user_name = 'haotian'
password = 'haotian'
address = 'gogogo.chu66.mongodb.net'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"

In [0]:
df = spark.read.format("mongo").option("uri",connection_string).load()

In [0]:
df.printSchema()

In [0]:
# 1. Select two features: UserID, GameID, and lebel: voted_up (explicit rating)
df_rating = df[['steamid', 'appid', 'voted_up']]

In [0]:
df_rating.count()

In [0]:
# 2. delete duplicated UserID and GameID pairs
df_rating = df_rating.distinct()

In [0]:
df_rating.count()

In [0]:
# 3. Delete games which have fewer than 100 user ratings;
   # delete users who rated less than 10 games.
small_app_df = df_rating.groupby('appid').count().orderBy('count')
small_app = small_app_df.filter("count < 100").select('appid')
df_rating_no_small_app = df_rating.join(small_app, "appid", "left_anti")

small_id_df = df_rating.groupby('steamid').count().orderBy('count')
small_id = small_id_df.filter("count < 10").select('steamid')
df_rating_no_small_app_id = df_rating_no_small_app.join(small_id, "steamid", "left_anti")

In [0]:
# Cache data, run 2 minutes for preprocessing
df_rating_no_small_app_id.cache().show()

In [0]:
# 150 millions of rows that will be used for our model
df_rating_no_small_app_id.count()

In [0]:
df_rating_no_small_app_id.printSchema()

In [0]:
df_rating_no_small_app_id2 = df_rating_no_small_app_id.na.drop("any")

In [0]:
df_rating_no_small_app_id2 = df_rating_no_small_app_id2.withColumn("steamid", col("steamid").cast(StringType()))\
                                                       .withColumn("appid", col("appid").cast(StringType()))

In [0]:
# 4. Encode UserID and GameID, so the range is shrinked.
def indexStringColumns(df, cols):
    # variable newdf will be updated several times
    newdf = df
    
    for c in cols:
        # For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
        
        # Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        # and then drops the original columns.
        # and drop the "-num" suffix. 
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf
 
df_rating2 = indexStringColumns(df_rating_no_small_app_id2, ['steamid', 'appid'])

In [0]:
df_rating2.printSchema()

In [0]:
# 5. Change data type for all three columns to interger, as the ALS model
    # only accept interger values for now.
df_rating2 = df_rating2.withColumn("steamid", col("steamid").cast(IntegerType()))\
                       .withColumn("appid", col("appid").cast(IntegerType()))\
                       .withColumn("voted_up", col("voted_up").cast(IntegerType()))

In [0]:
# Change lable name to rating.
df_rating2 = df_rating2.withColumnRenamed('voted_up', 'rating')

In [0]:
# 6. training, test split
(df_training, df_test) = df_rating2.randomSplit([0.8, 0.2])

In [0]:
# 7. Model training, using 13 seconds.
start = time()
als = ALS(maxIter=5, regParam=0.01, userCol="steamid", 
          itemCol="appid", ratingCol="rating")
model = als.fit(df_training)

print("second ", time() - start)

In [0]:
# 8. Evaluate the model by computing the RMSE on the test data
predictions = model.transform(df_test)
predictions = predictions.na.drop()

In [0]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Based on a rule of thumb, it can be said that RMSE values 
# between 0.2 and 0.5 shows that the model can relatively predict the data accurately.

In [0]:
# 9. Generate top 10 item recommendations for each user
userRecs = model.recommendForAllUsers(10)

In [0]:
userRecs.show()