In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, rank, countDistinct, count
from pyspark.sql.window import Window
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
import pandas as pd

spark = SparkSession.builder.appName('video_games_review').config("spark.driver.memory", "8g").getOrCreate()
df_landing = pd.read_parquet('dataset/video_games_review_landing.parquet')
spark_df_landing =  spark.createDataFrame(df_landing)
spark_df_landing.show(vertical=True)

24/01/18 01:29:35 WARN Utils: Your hostname, rao.local resolves to a loopback address: 127.0.0.1; using 192.168.1.50 instead (on interface en0)
24/01/18 01:29:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/18 01:29:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/18 01:29:57 WARN TaskSetManager: Stage 0 contains a task of very large size (65804 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:>                                                          (0 + 1) / 1]

-RECORD 0--------------------------------
 Format_style     | null                 
 Edition_style    | null                 
 Platform_style   | null                 
 Color_style      | null                 
 number_of_images | 0                    
 reviewTime       | 2015-10-17 00:00:00  
 overall          | 5                    
 verified         | true                 
 reviewerID       | A1HP7NVNPFMA4N       
 asin             | 0700026657           
 reviewerName     | Ambrosia075          
 reviewText       | This game is a bi... 
 summary          | but when you do i... 
 unixReviewTime   | 1445040000           
 vote             | NaN                  
-RECORD 1--------------------------------
 Format_style     | null                 
 Edition_style    | null                 
 Platform_style   | null                 
 Color_style      | null                 
 number_of_images | 0                    
 reviewTime       | 2015-07-27 00:00:00  
 overall          | 4             

24/01/18 01:30:01 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 0 (TID 0): Attempting to kill Python Worker
                                                                                

In [2]:
df_rec = spark_df_landing.select('reviewerID', 'asin', 'overall').withColumnRenamed("reviewerID","userId") \
    .withColumnRenamed("asin","itemId") \
    .withColumnRenamed("overall","rating")
df_rec = df_rec.orderBy("userId", "itemId")

In [3]:
popularity_df = df_rec.groupBy('itemId') \
    .agg(count('*').alias('popularity')) \
    .orderBy(col('popularity').desc())

# Select the top 500 most popular items
top_popular_items = popularity_df.limit(500)
df_rec_filtered = df_rec.join(top_popular_items, on='itemId', how='inner')

# Create a column with the count of items per user and filter the base to select 
# only users with 5 items or more
user_window = Window.partitionBy("userId").orderBy(col("itemId").desc())
df_rec_filtered = df_rec_filtered.withColumn("num_items", expr("count(*) over (partition by userId)"))
df_rec_filtered = df_rec_filtered.filter(col("num_items")>=5)

In [5]:
# Count the number of unique items
num_unique_items = df_rec_filtered.select('itemId').distinct().count()
print(f"Number of unique items: {num_unique_items}")

# Count the number of unique users
num_unique_users = df_rec_filtered.select('userId').distinct().count()
print(f"Number of unique users: {num_unique_users}")

24/01/18 01:27:24 WARN TaskSetManager: Stage 5 contains a task of very large size (65803 KiB). The maximum recommended task size is 1000 KiB.
24/01/18 01:27:26 WARN TaskSetManager: Stage 8 contains a task of very large size (65803 KiB). The maximum recommended task size is 1000 KiB.
24/01/18 01:27:28 WARN TaskSetManager: Stage 18 contains a task of very large size (65803 KiB). The maximum recommended task size is 1000 KiB.


Number of unique items: 500


Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniconda/base/envs/pyspark/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniconda/base/envs/pyspark/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniconda/base/envs/pyspark/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [ ]:
import pandas as pd
import matplotlib.pyplot as plt

items_per_user = df_rec_filtered.groupBy('userId').count().select('count').toPandas()
# Plot the histogram
plt.hist(items_per_user['count'], bins=10, range=(1,15), edgecolor='black')
plt.xlabel('Number of Items')
plt.ylabel('Number of Users')
plt.title('Distribution of Number of Items per User')
plt.show()

In [None]:
# For example, 30% of items will be masked
percent_items_to_mask = 0.3 
# Determine the number of items to mask for each user
df_rec_final = df_rec_filtered.withColumn("num_items_to_mask", (col("num_items") * percent_items_to_mask).cast("int"))
# Masks items for each user
df_rec_final = df_rec_final.withColumn("item_rank", rank().over(user_window))

# Create a StringIndexer model to index the user ID column
indexer_user = StringIndexer(inputCol='userId', outputCol='userIndex').setHandleInvalid("keep")
indexer_item = StringIndexer(inputCol='itemId', outputCol='itemIndex').setHandleInvalid("keep")

# Fit the indexer model to the data and transform the DataFrame
df_rec_final = indexer_user.fit(df_rec_final).transform(df_rec_final)
df_rec_final = indexer_item.fit(df_rec_final).transform(df_rec_final)

# Convert the userIndex column to integer type
df_rec_final = df_rec_final.withColumn('userIndex', df_rec_final['userIndex'].cast('integer'))\
               .withColumn('itemIndex', df_rec_final['itemIndex'].cast('integer'))

train_df_rec = df_rec_final.filter(col("item_rank") > col("num_items_to_mask"))
test_df_rec = df_rec_final.filter(col("item_rank") <= col("num_items_to_mask"))

In [None]:
# Configure the ALS model
als = ALS(userCol='userIndex', itemCol='itemIndex', ratingCol='rating',
          coldStartStrategy='drop', nonnegative=True)


param_grid = ParamGridBuilder()\
             .addGrid(als.rank, [1, 20, 30])\
             .addGrid(als.maxIter, [20])\
             .addGrid(als.regParam, [.05, .15])\
             .build()
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

cv = CrossValidator(
        estimator=als,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=3)

model = cv.fit(train_df_rec)

best_model = model.bestModel
print('rank: ', best_model.rank)
print('MaxIter: ', best_model._java_obj.parent().getMaxIter())
print('RegParam: ', best_model._java_obj.parent().getRegParam())

In [None]:
# Train the model using the training data
model = als.fit(train_df_rec)

# Generate predictions on the test data
predictions = best_model.transform(test_df_rec)
predictions = predictions.withColumn("prediction", expr("CASE WHEN prediction < 1 THEN 1 WHEN prediction > 5 THEN 5 ELSE prediction END"))

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print(f'Root Mean Squared Error (RMSE): {rmse}')

In [None]:
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql.functions import col, collect_list

# Convert the predictions DataFrame to include all predictions per user
# Generate top-k recommendations for each user
userRecs = best_model.recommendForAllUsers(100)  # Top-100 recommendations for each user

# Prepare the input for RankingMetrics
user_ground_truth = test_df_rec.groupby('userIndex').agg(collect_list('itemIndex').alias('ground_truth_items'))
user_train_items = train_df_rec.groupby('userIndex').agg(collect_list('itemIndex').alias('train_items'))

# Join the recommendations and ground truth data on the user ID
user_eval = userRecs.join(user_ground_truth, on='userIndex').join(user_train_items, on='userIndex') \
    .select('userIndex', 'recommendations.itemIndex', 'ground_truth_items', 'train_items', 'recommendations.rating')
user_eval = user_eval.toPandas()
user_eval['itemIndex_filtered'] = user_eval.apply(lambda x:[b for (b,z) in zip(x.itemIndex, x.rating) if b not in x.train_items], axis=1)
user_eval['rating_filtered'] = user_eval.apply(lambda x:[z for (b,z) in zip(x.itemIndex, x.rating) if b not in x.train_items], axis=1)

In [None]:
import numpy as np
import math
def score(predicted, actual, metric):
        """
        Parameters
        ----------
        predicted : List
            List of predicted apps.
        actual : List
            List of masked apps.
        metric : 'precision' or 'ndcg'
            A valid metric for recommendation.
        Raises
        -----
        Returns
        -------
        m : float
            score.
        """
        valid_metrics = ['precision', 'ndcg']
        if metric not in valid_metrics:
            raise Exception(f"Choose one valid baseline in the list: {valid_metrics}")
        if metric == 'precision':
            m = np.mean([float(len(set(predicted[:k]) 
                                               & set(actual))) / float(k) 
                                     for k in range(1,len(actual)+1)])
        if metric == 'ndcg':
            v = [1 if i in actual else 0 for i in predicted]
            v_2 = [1 for i in actual]
            dcg = sum([(2**i-1)/math.log(k+2,2) for (k,i) in enumerate(v)])
            idcg = sum([(2**i-1)/math.log(k+2,2) for (k,i) in enumerate(v_2)])
            m = dcg/idcg
        return m

user_eval['precision'] = user_eval.apply(lambda x: score(x.itemIndex_filtered, x.ground_truth_items, 'precision'), axis=1)
user_eval['NDCG'] = user_eval.apply(lambda x: score(x.itemIndex_filtered, x.ground_truth_items, 'ndcg'), axis=1)

MAP = user_eval.precision.mean()
avg_NDCG = user_eval.NDCG.mean()