In [2]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m28.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=0d3f5629a813fbf01a4026d1109f5c6002411cb3729785906b799aa9d1fde79f
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
import pandas as pd
import io
import matplotlib.pyplot as plt

from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.functions import desc
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import sum,avg,max,min,mean,count,sqrt
from pyspark.sql.functions import col,when
from pyspark.sql import functions as F 
from pyspark.sql.functions import col,isnan, when, count
from pyspark.ml.evaluation import RegressionEvaluator
import seaborn as sns

In [5]:
#Initialize a spark session.
def spark_intialization():
    spark = SparkSession \
        .builder \
        .appName("Pyspark Project") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

In [6]:
# Initialise spark object
spark = spark_intialization()
spark

In [7]:
from pyspark.sql import functions as pyspark_functions
from pyspark.sql.types import *
schema = StructType([ \
                     StructField("USER_ID", IntegerType(), True), \
                     StructField("Steam_Game", StringType(), True),\
                     StructField("Behaviour_Name", StringType(), True),\
                    StructField("Hours_played", FloatType(), True)])
dataframes = spark.read.schema(schema).csv("/content/drive/MyDrive/GOOGLE_COLAB/BigData/steam-200k.csv", header=False)
dataframes.show(10)

+---------+--------------------+--------------+------------+
|  USER_ID|          Steam_Game|Behaviour_Name|Hours_played|
+---------+--------------------+--------------+------------+
|151603712|The Elder Scrolls...|      purchase|         1.0|
|151603712|The Elder Scrolls...|          play|       273.0|
|151603712|           Fallout 4|      purchase|         1.0|
|151603712|           Fallout 4|          play|        87.0|
|151603712|               Spore|      purchase|         1.0|
|151603712|               Spore|          play|        14.9|
|151603712|   Fallout New Vegas|      purchase|         1.0|
|151603712|   Fallout New Vegas|          play|        12.1|
|151603712|       Left 4 Dead 2|      purchase|         1.0|
|151603712|       Left 4 Dead 2|          play|         8.9|
+---------+--------------------+--------------+------------+
only showing top 10 rows



In [8]:
dataframes = dataframes.withColumnRenamed("_c0","USER_ID").withColumnRenamed("_c1","Steam_Game").withColumnRenamed("_c2","Behaviour_Name").withColumnRenamed("_c3","Hours_played")
dataframes = dataframes.drop("_c4")
dataframes.show(10)

+---------+--------------------+--------------+------------+
|  USER_ID|          Steam_Game|Behaviour_Name|Hours_played|
+---------+--------------------+--------------+------------+
|151603712|The Elder Scrolls...|      purchase|         1.0|
|151603712|The Elder Scrolls...|          play|       273.0|
|151603712|           Fallout 4|      purchase|         1.0|
|151603712|           Fallout 4|          play|        87.0|
|151603712|               Spore|      purchase|         1.0|
|151603712|               Spore|          play|        14.9|
|151603712|   Fallout New Vegas|      purchase|         1.0|
|151603712|   Fallout New Vegas|          play|        12.1|
|151603712|       Left 4 Dead 2|      purchase|         1.0|
|151603712|       Left 4 Dead 2|          play|         8.9|
+---------+--------------------+--------------+------------+
only showing top 10 rows



In [9]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, lag, sum

window_spec = Window.orderBy('USER_ID')

data_with_prev_value = dataframes.withColumn('prev_value', lag(col('Behaviour_Name')).over(window_spec))

combined_data = data_with_prev_value.withColumn('new_feature', when((col('prev_value') == 'purchase') & (col('Behaviour_Name') == 'play'), 2).otherwise(1))

grouped1 = combined_data.filter(((col('prev_value') == 'purchase') & (col('Behaviour_Name') == 'play')) | \
                                        ((col('prev_value') == 'purchase') & (col('Behaviour_Name') == 'purchase')) |\
                                        ((col('prev_value') == 'null') & (col('Behaviour_Name') == 'purchase')) |\
                                  (col('prev_value') == 'play') & (col('Behaviour_Name') == 'play'));


grouped1.show(50)

+-------+--------------------+--------------+------------+----------+-----------+
|USER_ID|          Steam_Game|Behaviour_Name|Hours_played|prev_value|new_feature|
+-------+--------------------+--------------+------------+----------+-----------+
|   5250|     Cities Skylines|          play|       144.0|  purchase|          2|
|   5250|Deus Ex Human Rev...|          play|        62.0|  purchase|          2|
|   5250|            Portal 2|          play|        13.6|  purchase|          2|
|   5250|         Alien Swarm|          play|         4.9|  purchase|          2|
|   5250|     Team Fortress 2|          play|         0.8|  purchase|          2|
|   5250|              Dota 2|          play|         0.2|  purchase|          2|
|   5250|Counter-Strike So...|      purchase|         1.0|  purchase|          1|
|   5250|       Day of Defeat|      purchase|         1.0|  purchase|          1|
|   5250|  Deathmatch Classic|      purchase|         1.0|  purchase|          1|
|   5250|       

In [10]:
average = grouped1.groupBy("Steam_Game") \
            .agg(mean("Hours_played").alias("mean_Hourplayed")) \
             .select("Steam_Game", "mean_Hourplayed")
grouped = grouped1.join(average, on="Steam_Game", how="inner")
grouped.show(20)

+--------------------+-------+--------------+------------+----------+-----------+------------------+
|          Steam_Game|USER_ID|Behaviour_Name|Hours_played|prev_value|new_feature|   mean_Hourplayed|
+--------------------+-------+--------------+------------+----------+-----------+------------------+
|     Cities Skylines|   5250|          play|       144.0|  purchase|          2| 24.74000000804663|
|Deus Ex Human Rev...|   5250|          play|        62.0|  purchase|          2|22.032926833393372|
|            Portal 2|   5250|          play|        13.6|  purchase|          2|16.639169672806663|
|         Alien Swarm|   5250|          play|         4.9|  purchase|          2| 4.973856203331082|
|     Team Fortress 2|   5250|          play|         0.8|  purchase|          2|62.814682541611305|
|              Dota 2|   5250|          play|         0.2|  purchase|          2| 151.2374038764504|
|Counter-Strike So...|   5250|      purchase|         1.0|  purchase|          1| 84.066637

In [11]:
from pyspark.sql.functions import when
newfeature2 = grouped.withColumn("rating", 
                  when(grouped["Hours_played"] == 1.0 * grouped["mean_Hourplayed"] * grouped["new_feature"], 1)
                  .when(grouped["Hours_played"] >= 0.9 * grouped["mean_Hourplayed"] * grouped["new_feature"], 5)
                   .when((grouped["Hours_played"] >= 0.7 * grouped["mean_Hourplayed"] * grouped["new_feature"]) & (grouped["Hours_played"] < 0.9 * grouped["mean_Hourplayed"]*grouped["new_feature"]), 4)
                   .when((grouped["Hours_played"] >= 0.4 * grouped["mean_Hourplayed"] * grouped["new_feature"]) & (grouped["Hours_played"] < 0.7 * grouped["mean_Hourplayed"]*grouped["new_feature"]), 3)
                   .when((grouped["Hours_played"] >= 0.1 * grouped["mean_Hourplayed"] * grouped["new_feature"]) & (grouped["Hours_played"] < 0.4 * grouped["mean_Hourplayed"]*grouped["new_feature"]), 2)
                   .otherwise(0))
newfeature2.show()

+--------------------+-------+--------------+------------+----------+-----------+------------------+------+
|          Steam_Game|USER_ID|Behaviour_Name|Hours_played|prev_value|new_feature|   mean_Hourplayed|rating|
+--------------------+-------+--------------+------------+----------+-----------+------------------+------+
|     Cities Skylines|   5250|          play|       144.0|  purchase|          2| 24.74000000804663|     5|
|Deus Ex Human Rev...|   5250|          play|        62.0|  purchase|          2|22.032926833393372|     5|
|            Portal 2|   5250|          play|        13.6|  purchase|          2|16.639169672806663|     3|
|         Alien Swarm|   5250|          play|         4.9|  purchase|          2| 4.973856203331082|     3|
|     Team Fortress 2|   5250|          play|         0.8|  purchase|          2|62.814682541611305|     0|
|              Dota 2|   5250|          play|         0.2|  purchase|          2| 151.2374038764504|     0|
|Counter-Strike So...|   525

In [12]:
pandasdf = newfeature2.toPandas()

In [13]:
pandasdf['Steam_Game'] = pandasdf['Steam_Game'].astype('category')
d = dict(enumerate(pandasdf['Steam_Game'].cat.categories))
pandasdf['GAME_ID'] = pandasdf['Steam_Game'].cat.codes
pandasdf

Unnamed: 0,Steam_Game,USER_ID,Behaviour_Name,Hours_played,prev_value,new_feature,mean_Hourplayed,rating,GAME_ID
0,Cities Skylines,5250,play,144.0,purchase,2,24.740000,5,850
1,Deus Ex Human Revolution,5250,play,62.0,purchase,2,22.032927,5,1243
2,Portal 2,5250,play,13.6,purchase,2,16.639170,3,3207
3,Alien Swarm,5250,play,4.9,purchase,2,4.973856,3,227
4,Team Fortress 2,5250,play,0.8,purchase,2,62.814683,0,4237
...,...,...,...,...,...,...,...,...,...
129506,Age of Empires II HD Edition,309626088,play,6.7,purchase,2,29.094937,2,174
129507,Robocraft,309812026,purchase,1.0,purchase,1,14.047447,0,3543
129508,Dota 2,309824202,purchase,1.0,purchase,1,151.237404,0,1331
129509,Dota 2,309824202,play,0.7,purchase,2,151.237404,0,1331


In [14]:
newfeature = spark.createDataFrame(pandasdf)

In [15]:
newfeature.show()

+--------------------+-------+--------------+-------------------+----------+-----------+------------------+------+-------+
|          Steam_Game|USER_ID|Behaviour_Name|       Hours_played|prev_value|new_feature|   mean_Hourplayed|rating|GAME_ID|
+--------------------+-------+--------------+-------------------+----------+-----------+------------------+------+-------+
|     Cities Skylines|   5250|          play|              144.0|  purchase|          2| 24.74000000804663|     5|    850|
|Deus Ex Human Rev...|   5250|          play|               62.0|  purchase|          2|22.032926833393372|     5|   1243|
|            Portal 2|   5250|          play| 13.600000381469727|  purchase|          2|16.639169672806663|     3|   3207|
|         Alien Swarm|   5250|          play|  4.900000095367432|  purchase|          2| 4.973856203331082|     3|    227|
|     Team Fortress 2|   5250|          play|  0.800000011920929|  purchase|          2|62.814682541611305|     0|   4237|
|              D

In [16]:
import random
'''
Generate Spark matrix Item for Model input

     item_id_1 -> [(userid_1, rating_1),
                   (userid_2, rating_2),...] 
'''

def ItemUsersPairs(line):
    '''
    Generate Item,(User,Rating) pairs
    '''
    #line = line.split(",")
    print(line)
    return line[0],(line[1],float(line[7]))

def ItemUserInteractions(item,users_with_rating,n):
    '''
    For users with # interactions > n,to subsample replace their interaction history
    with a sample of n items_with_rating
    '''
    if len(users_with_rating) > n:
        #return item,users_with_rating[:n]
        return item,random.sample(users_with_rating,n)
    else:
        #return item, users_with_rating[:n]
        return item, users_with_rating

user_item_pairs = newfeature2.rdd.map(lambda x : ItemUsersPairs(x)).groupByKey().map(lambda p: ItemUserInteractions(p[0],list(p[1]),100))

def f(x): return x
game_ratings_df_flatten = user_item_pairs.flatMapValues(f)
game_rating = game_ratings_df_flatten.map(lambda p:(p[0], p[1][0], p[1][1]))
# \                               .map(lambda p: Row(userId=int(p[1]), itemId=(p[0]),rating=float(p[2])))

print(" [Item => (User,Rating)..] RDD is shown below ")
user_item_pairs.take(1)

 [Item => (User,Rating)..] RDD is shown below 


[('Cities Skylines',
  [(50769696, 2.0),
   (1936551, 0.0),
   (112986636, 0.0),
   (64514291, 0.0),
   (64514291, 5.0),
   (163930591, 0.0),
   (64754418, 0.0),
   (261857176, 0.0),
   (122551425, 2.0),
   (77818807, 0.0),
   (11373749, 0.0),
   (83975567, 5.0),
   (23672423, 3.0),
   (275437638, 5.0),
   (24366790, 5.0),
   (228209477, 0.0),
   (298950, 0.0),
   (135012938, 0.0),
   (86055705, 2.0),
   (20704366, 3.0),
   (77980915, 4.0),
   (100070732, 5.0),
   (120237789, 2.0),
   (50744279, 2.0),
   (76296787, 3.0),
   (42005897, 3.0),
   (80154824, 0.0),
   (133422169, 2.0),
   (172518437, 0.0),
   (159365538, 2.0),
   (10253354, 2.0),
   (66255019, 0.0),
   (34919318, 2.0),
   (130460859, 5.0),
   (144071257, 0.0),
   (135400225, 5.0),
   (263936784, 2.0),
   (117370965, 3.0),
   (3449240, 2.0),
   (83671083, 0.0),
   (46028967, 2.0),
   (142475478, 3.0),
   (263936784, 0.0),
   (54826284, 2.0),
   (145825155, 2.0),
   (27262175, 2.0),
   (9823354, 0.0),
   (170377069, 5.0),
   

In [17]:
pip install numerize

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting numerize
  Downloading numerize-0.12.tar.gz (2.7 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: numerize
  Building wheel for numerize (setup.py) ... [?25l[?25hdone
  Created wheel for numerize: filename=numerize-0.12-py3-none-any.whl size=3174 sha256=816e6cd343304dda4c7a13a4b5800af37b37af5ba8991f3b54e128ea950c36e1
  Stored in directory: /root/.cache/pip/wheels/d3/85/3a/c063c399e65688038ecc09dd96a4ac5b188d98e10dcd7c51a8
Successfully built numerize
Installing collected packages: numerize
Successfully installed numerize-0.12


In [18]:
from numerize import numerize
# Model Train -Test Split


(training,test) = game_rating.randomSplit([0.8,0.2],2000)

game_ratings  =  training.map(lambda p: Row(userId=int(p[1]), itemId=(p[0]), rating=float(p[2])))
game_ratings_2  =  training.map(lambda p: Row(userId2=int(p[1]), itemId2=(p[0]), rating2=float(p[2])))
game_ratingsdf = spark.createDataFrame(game_ratings)
game_ratingsdf2 = spark.createDataFrame(game_ratings_2)

#Generating Item1,Item2 => UserRating1,UserRating2 combinations on train data
game_df = game_ratingsdf.join(game_ratingsdf2, ( \
                                                           (game_ratingsdf.itemId != game_ratingsdf2.itemId2) & \
                                                           (game_ratingsdf.userId == game_ratingsdf2.userId2)) \
                                        ,'left') \
                                  .select("itemId","itemId2","rating","rating2")
game_df1 = game_df.na.fill(0)
game_user_ratingrdd = game_df1.rdd

#Total Count of Item-Item Pair and their rating data by users in Training Data
ItemPairCount  = game_user_ratingrdd.count()
print("Total Item,item Pair record count in Training Data : ", numerize.numerize(ItemPairCount))

#Item-Item Pair and their rating data by users in Training Data
print("Item1-Item2=>Rating1,Rating2 Dataframe input to the model is shown below : ")
game_df1.show(50,truncate=False)

Total Item,item Pair record count in Training Data :  7.8M
Item1-Item2=>Rating1,Rating2 Dataframe input to the model is shown below : 
+---------------+----------------------------------------------------+------+-------+
|itemId         |itemId2                                             |rating|rating2|
+---------------+----------------------------------------------------+------+-------+
|Cities Skylines|Deus Ex Human Revolution                            |5.0   |5.0    |
|Cities Skylines|Alien Swarm                                         |5.0   |3.0    |
|Cities Skylines|Day of Defeat                                       |5.0   |2.0    |
|Cities Skylines|Half-Life 2 Episode Two                             |5.0   |2.0    |
|Cities Skylines|Half-Life                                           |0.0   |3.0    |
|Cities Skylines|Team Fortress Classic                               |0.0   |5.0    |
|Cities Skylines|Far Cry 3                                           |0.0   |2.0    |
|Citi

In [19]:
# Generating Cosine Distance for item-item pair for all user ratings
pairwise_items = game_user_ratingrdd.map(lambda p: ((p[0],p[1]),(p[2],p[3])))\
                                     .map(lambda p:(p[0],p[1],p[1][0]*p[1][0],p[1][1]*p[1][1],p[1][0]*p[1][1]))\
                                     .map(lambda p: Row(item_pair=p[0], rating_pair=p[1],cosim_x = p[2],cosim_y = p[3],cosim_xy = p[4] ))
pairwise_item_df = spark.createDataFrame(pairwise_items)

pairwise = pairwise_item_df.groupBy("item_pair").agg(sum("cosim_x").alias("Cosim_sumx"),\
                                                sum("cosim_y").alias("Cosim_sumy"),\
                                                sum("cosim_xy").alias("Cosim_sumxy"),\
                                               )
pairwsie_sqrt = pairwise.withColumn("Cosim_sumx_sqrt",sqrt("Cosim_sumx")).withColumn("Cosim_sumy_sqrt",sqrt("Cosim_sumy"))
pairwise_cosine = pairwsie_sqrt.withColumn("Cosine_Similarity", (pairwsie_sqrt.Cosim_sumxy /((pairwsie_sqrt.Cosim_sumx_sqrt * pairwsie_sqrt.Cosim_sumy_sqrt))))
pairwise_cosine = pairwise_cosine.na.fill(0)
pairwise_cosinerdd = pairwise_cosine.rdd

#Cosine Distance calculated for our rating data for each game-game combination
print("Cosine Distance calculated for our rating data for each game-game combination : ")
pairwise_cosine.show(truncate=False)

Cosine Distance calculated for our rating data for each game-game combination : 
+---------------------------------------------------------------------------------------------------+----------+----------+-----------+------------------+------------------+------------------+
|item_pair                                                                                          |Cosim_sumx|Cosim_sumy|Cosim_sumxy|Cosim_sumx_sqrt   |Cosim_sumy_sqrt   |Cosine_Similarity |
+---------------------------------------------------------------------------------------------------+----------+----------+-----------+------------------+------------------+------------------+
|{1... 2... 3... KICK IT! (Drop That Beat Like an Ugly Baby), Age of Empires II HD The Forgotten}   |4.0       |1.0       |2.0        |2.0               |1.0               |1.0               |
|{1... 2... 3... KICK IT! (Drop That Beat Like an Ugly Baby), Company of Heroes (New Steam Version)}|4.0       |4.0       |4.0        |2.0         

In [20]:
def keyOnFirstItem(item_pair,item_sim_data):
    '''
    For each item-item pair, make the first item's id the key
    '''
    (item1_id,item2_id) = item_pair
    return item1_id,(item2_id,item_sim_data)

def nearestNeighbors(item_id,items_and_sims,n):
    '''
    Sort the predictions list by similarity and select the top-N neighbors
    '''
    items_and_sims.sort(key=lambda x: x[1],reverse=True)
    return item_id, items_and_sims[:n]

In [21]:
# Generate Top K Neighbours based on Cosine Similarity Distance
pair_wise_nn = pairwise_cosinerdd.map(lambda p:keyOnFirstItem(p[0],p[6]))\
                  .groupByKey()\
                  .map( lambda p : (p[0], list(p[1])))\
                  .map( lambda p: nearestNeighbors(p[0],p[1],5))\
                  .map( lambda p: Row(item=p[0], item_rating_list=p[1]))

def f(x): return x
cosine_pairs = pair_wise_nn.flatMapValues(f)

cosine_pairsnn = cosine_pairs.map(lambda p:(p[0],p[1][0],p[1][1]))\
                             .map(lambda p: Row(item=p[0], item_nn=p[1], item_cosine = p[2]))

cosine_pairsnn_df = spark.createDataFrame(cosine_pairsnn)

topknn = game_ratingsdf.join(cosine_pairsnn_df, cosine_pairsnn_df.item == game_ratingsdf.itemId, 'left')


topknn_cosim = topknn.withColumn("totalratings",topknn.rating * topknn.item_cosine )\
                     .withColumn("CosimTotal", topknn.item_cosine + topknn.item_cosine )

topknn_cosine = topknn_cosim.groupBy("itemId","item_nn").agg(sum("totalratings").alias("total_ratings"),\
                                                sum("CosimTotal").alias("CosineTotal"),\
                                               )
topknn_cosimrdd = topknn_cosim.rdd
print("Generate Top 5 Neighbours based on Cosine Similarity Distance for USER_ID - 151603712 : ")
topknn_cosim.where("userId = 151603712").show(5)

Generate Top 5 Neighbours based on Cosine Similarity Distance for USER_ID - 151603712 : 
+---------+-------+------+-------+--------------------+-----------+------------+----------+
|   userId| itemId|rating|   item|             item_nn|item_cosine|totalratings|CosimTotal|
+---------+-------+------+-------+--------------------+-----------+------------+----------+
|151603712|Eldevin|   0.0|Eldevin|Age of Empires II...|        1.0|         0.0|       2.0|
|151603712|Eldevin|   0.0|Eldevin|             Loadout|        1.0|         0.0|       2.0|
|151603712|Eldevin|   0.0|Eldevin|Memories of a Vag...|        1.0|         0.0|       2.0|
|151603712|Eldevin|   0.0|Eldevin|The Ship Single P...|        1.0|         0.0|       2.0|
|151603712|Eldevin|   0.0|Eldevin|   The Ship Tutorial|        1.0|         0.0|       2.0|
+---------+-------+------+-------+--------------------+-----------+------------+----------+
only showing top 5 rows



In [22]:
#Evaluation
Testratings  =  test.map(lambda p: Row(TestItemID=str(p[0]),testrating=float(p[2])))\
                    .map(lambda p: Row(TestItemID=str(p[0]),testrating=(p[1])))
Predratings  =  topknn_cosimrdd.map(lambda p: Row(PredItemID=str(p[1]),Predictedrating=(p[2])))

Testratingsdf = spark.createDataFrame(Testratings)
Predratingsdf = spark.createDataFrame(Predratings)

Pred =  Predratingsdf.join(Testratingsdf,Testratingsdf.TestItemID ==  Predratingsdf.PredItemID,"inner")

Pred = Pred.na.fill(0)
preds = Pred.select("Predictedrating","testrating")

evaluator = RegressionEvaluator(metricName="rmse", labelCol="testrating",
                                predictionCol="Predictedrating")
rmse = evaluator.evaluate(preds)
print("RMSE of KNN Implementation using Cosine Similarity: ",rmse)

RMSE of KNN Implementation using Cosine Similarity:  2.075639044968428


In [23]:
##Pearson Coefficient Implementation for SteamGame Recommender System

#(training,test) = game_rating.randomSplit([0.8,0.2],2000)
pe_cf_game_ratings  =  training.map(lambda p: Row(userId=int(p[1]), itemId=str(p[0]),rating=float(p[2])))
pe_cf_game_ratings_2  =  training.map(lambda p: Row(userId2=int(p[1]), itemId2=str(p[0]),rating2=float(p[2])))
pe_cf_game_ratingsdf = spark.createDataFrame(pe_cf_game_ratings)
pe_cf_game_ratingsdf2 = spark.createDataFrame(pe_cf_game_ratings_2)

## Subtracting Mean User Game Ratings from the ratings data 
User_mean = pe_cf_game_ratingsdf.groupBy("userId").agg({'rating' : 'mean'}).withColumnRenamed("avg(rating)", "user_mean")\
                                                                            .withColumnRenamed("userId", "meanuserId")
pe_cf_game_ratingsdf_Umean = pe_cf_game_ratingsdf.join(User_mean, ( \
                                                           (pe_cf_game_ratingsdf.userId == User_mean.meanuserId)) \
                                        ,'left').select(pe_cf_game_ratingsdf.userId,pe_cf_game_ratingsdf.itemId,pe_cf_game_ratingsdf.rating,User_mean.user_mean)

pe_cf_game_ratingsdf2_Umean = pe_cf_game_ratingsdf2.join(User_mean, ( \
                                                           (pe_cf_game_ratingsdf2.userId2 == User_mean.meanuserId)) \
                                        ,'left').select(pe_cf_game_ratingsdf2.userId2,pe_cf_game_ratingsdf2.itemId2,pe_cf_game_ratingsdf2.rating2,User_mean.user_mean)                                                    
pe_cf_MeanDeviation= pe_cf_game_ratingsdf_Umean.withColumn("UserRatingDeviation",pe_cf_game_ratingsdf_Umean.rating - pe_cf_game_ratingsdf_Umean.user_mean)

pe_cf_MeanDeviation2= pe_cf_game_ratingsdf2_Umean.withColumn("UserRatingDeviation2",pe_cf_game_ratingsdf2_Umean.rating2 - pe_cf_game_ratingsdf2_Umean.user_mean)


#Generating Item1,Item2 => UserRating1,UserRating2 combinations on train data
pe_cf_game_df = pe_cf_MeanDeviation.join(pe_cf_MeanDeviation2, ( \
                                                           (pe_cf_MeanDeviation.itemId != pe_cf_MeanDeviation2.itemId2) & \
                                                           (pe_cf_MeanDeviation.userId == pe_cf_MeanDeviation2.userId2)) \
                                        ,'left') \
                                  .select("itemId","itemId2","UserRatingDeviation","UserRatingDeviation")
pe_cf_game_df1 = pe_cf_game_df.na.fill(0)
pe_cf_game_user_ratingrdd = pe_cf_game_df1.rdd

# Generating Cosine Distance for item-item pair for all user ratings
pe_cf_pairwise_items = pe_cf_game_user_ratingrdd.map(lambda p: ((p[0],p[1]),(p[2],p[3])))\
                                     .map(lambda p:(p[0],p[1],p[1][0]*p[1][0],p[1][1]*p[1][1],p[1][0]*p[1][1]))\
                                     .map(lambda p: Row(item_pair=p[0], rating_pair=p[1],cosim_x = p[2],cosim_y = p[3],cosim_xy = p[4] ))
pe_cf_pairwise_item_df = spark.createDataFrame(pe_cf_pairwise_items)
pe_cf_pairwise = pe_cf_pairwise_item_df.groupBy("item_pair").agg(sum("cosim_x").alias("Cosim_sumx"),\
                                                sum("cosim_y").alias("Cosim_sumy"),\
                                                sum("cosim_xy").alias("Cosim_sumxy"),\
                                               )
pe_cf_pairwsie_sqrt = pe_cf_pairwise.withColumn("Cosim_sumx_sqrt",sqrt("Cosim_sumx")).withColumn("Cosim_sumy_sqrt",sqrt("Cosim_sumy"))
pe_cf_pairwise_cosine = pe_cf_pairwsie_sqrt.withColumn("Cosine_Similarity", (pe_cf_pairwsie_sqrt.Cosim_sumxy /((pe_cf_pairwsie_sqrt.Cosim_sumx_sqrt * pe_cf_pairwsie_sqrt.Cosim_sumy_sqrt)+0.5))+0)
pe_cf_pairwise_cosine = pe_cf_pairwise_cosine.na.fill(0)
pe_cf_pairwise_cosinerdd = pe_cf_pairwise_cosine.rdd


# Generate Top K Neighbours based on Cosine Similarity Distance
pe_cf_pair_wise_nn = pe_cf_pairwise_cosinerdd.map(lambda p:keyOnFirstItem(p[0],p[6]))\
                  .groupByKey()\
                  .map( lambda p : (p[0], list(p[1])))\
                  .map( lambda p: nearestNeighbors(p[0],p[1],5))\
                  .map(lambda p: Row(item=p[0], item_rating_list=p[1]))

def f(x): return x
pe_cf_cosine_pairs = pe_cf_pair_wise_nn.flatMapValues(f)
pe_cf_cosine_pairsnn = pe_cf_cosine_pairs.map(lambda p:(p[0],p[1][0],p[1][1]))\
                             .map(lambda p: Row(item=p[0], item_nn=p[1], item_cosine = p[2]))
pe_cf_cosine_pairsnn_df = spark.createDataFrame(pe_cf_cosine_pairsnn)

pe_cf_topknn = pe_cf_game_ratingsdf.join(pe_cf_cosine_pairsnn_df, pe_cf_cosine_pairsnn_df.item == pe_cf_game_ratingsdf.itemId, 'left')
pe_cf_topknn_cosim = pe_cf_topknn.withColumn("totalratings",pe_cf_topknn.rating * pe_cf_topknn.item_cosine*2 )\
                     .withColumn("CosimTotal",pe_cf_topknn.item_cosine + pe_cf_topknn.item_cosine )
pe_cf_topknn_cosine = pe_cf_topknn_cosim.groupBy("itemId","item_nn").agg(sum("totalratings").alias("total_ratings"),\
                                                sum("CosimTotal").alias("CosineTotal"),\
                                               )
pe_cf_topknn_cosim = pe_cf_topknn_cosine.withColumn("PearsonDistance",(pe_cf_topknn_cosine.total_ratings / pe_cf_topknn_cosine.CosineTotal) )\
                .select("itemId","item_nn","PearsonDistance")
pe_cf_topknn_cosimrdd = pe_cf_topknn_cosim.rdd

In [24]:
#Pearson Coefficient Implementation Evaluation
Testratings  =  test.map(lambda p: Row(TestItemID=str(p[0]),testrating=float(p[2])))\
                    .map(lambda p: Row(TestItemID=str(p[0]),testrating=(p[1])))
Testratingsdf = spark.createDataFrame(Testratings)
pe_cf_Predratings  =  pe_cf_topknn_cosimrdd.map(lambda p: Row(PredItemID=str(p[1]),Predictedrating=(p[2])))
pe_cf_Predratingsdf = spark.createDataFrame(pe_cf_Predratings)

pe_cf_Pred =  pe_cf_Predratingsdf.join(Testratingsdf,Testratingsdf.TestItemID ==  pe_cf_Predratingsdf.PredItemID,"inner")
pe_cf_Pred = pe_cf_Pred.na.fill(0)
pe_cf_preds = pe_cf_Pred.select("Predictedrating","testrating")

pe_cf_evaluator = RegressionEvaluator(metricName="rmse", labelCol="testrating",
                                predictionCol="Predictedrating")
pe_cf_rmse = pe_cf_evaluator.evaluate(pe_cf_preds)
print("RMSE of KNN Implementation using Pearson Coefficient Distance ",pe_cf_rmse)

RMSE of KNN Implementation using Pearson Coefficient Distance  1.9758699000549214


In [36]:
##Recommend similar game for input game based on cosine similarity
def UserGameRecommenderPC(userId):
    '''
    #Recommend Top K nearest game for a input userID
    '''
    overall_avg_user_mean = game_ratingsdf.filter(game_ratingsdf['userId'] == userId).agg({'rating' : 'mean'}).collect()[0][0]
    overall_user_rated_game_count = game_ratingsdf.filter(game_ratingsdf['userId'] == userId).select("itemId").distinct().count()
    print("User -",userId,"rated a total of",overall_user_rated_game_count,"games")
    #print("\n")
    print("Overall Avg Rating by user -",userId,"is",overall_avg_user_mean)
    #print("\n")
    User_rated_game = game_ratingsdf.filter(game_ratingsdf['userId'] == userId).select("userId","itemId")\
                                  .withColumnRenamed("itemId", "UserItemId")
    
    user_nn_game = User_rated_game.join(pe_cf_topknn_cosim, User_rated_game.UserItemId == pe_cf_topknn_cosim.itemId)

    user_nn_game.join(newfeature,user_nn_game.UserItemId == newfeature.Steam_Game,'left')\
                                 .select("userId","UserItemId").distinct().limit(5)\
                                #  .show(truncate=False)

    user_nn = User_rated_game.join(pe_cf_topknn_cosim,User_rated_game.UserItemId == pe_cf_topknn_cosim.itemId)\
                              .select("userId","item_nn","PearsonDistance")

    usernn = user_nn.sort(col('PearsonDistance').desc())

    usernn = usernn.select("userId", "item_nn", "PearsonDistance").withColumnRenamed("userId", "USER_ID").withColumnRenamed("item_nn", "Recommended Game")
    
    return usernn

topNUserrecommender = UserGameRecommenderPC(113196318) #UserId : 5250, 151603712, 50769696, 163432200, 113196318
topNUserrecommender.show(5)

User - 113196318 rated a total of 33 games
Overall Avg Rating by user - 113196318 is 2.9655172413793105
+---------+--------------------+------------------+
|  USER_ID|    Recommended Game|   PearsonDistance|
+---------+--------------------+------------------+
|113196318|Rising Storm/Red ...|3.5535714285714297|
|113196318|Euro Truck Simula...|3.5535714285714297|
|113196318|              Arma 3| 3.553571428571429|
|113196318|RaceRoom Racing E...|3.5535714285714284|
|113196318|              Arma 2|3.5535714285714244|
+---------+--------------------+------------------+
only showing top 5 rows

