In [9]:
import os
import sys
import math
#Run with python 3.8

#Comentar as três linhas abaixo caso esteja a usar o portátil
spark_home = os.path.join(os.environ["CONDA_PREFIX"], "lib/python3.8/site-packages/pyspark")
os.environ["SPARK_HOME"] = "C:\spark\spark-3.5.5-bin-hadoop3"
os.environ["PATH"] = f"{spark_home}/bin:{os.environ['PATH']}"

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable 
#without this pyspark doesnt work on my computer

from pyspark import StorageLevel
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

from pyspark.ml.feature import CountVectorizer, MinHashLSH, HashingTF

  os.environ["SPARK_HOME"] = "C:\spark\spark-3.5.5-bin-hadoop3"


In [11]:
spark = SparkSession.builder.appName("Assignment3B").config("spark.driver.maxResultSize", "6g").config("spark.executor.memoryOverhead", "6g").config("spark.driver.memoryOverhead", "6g").config("spark.memory.fraction", "0.6").config("spark.sql.autoBroadcastJoinThreshold", -1).config("spark.memory.storageFraction", "0.3").config("spark.sql.shuffle.spill", "true").config("spark.driver.memory", "16g").config("spark.executor.memory", "6g").config("spark.sql.shuffle.partitions", "150").config("spark.memory.offHeap.enabled", "true").config("spark.memory.offHeap.size", "4g").getOrCreate()
sc = spark.sparkContext

### Pre-Processing

### Dataset
#https://grouplens.org/datasets/movielens/

#Using dataset recommended for education and development

In [15]:
data = spark.read.option("header", True).csv("ml-latest/ratings.csv")
data = data.drop("timestamp")
#limiting to 100000 samples
data = data.limit(1000000)

As the data format is UserID, MovieID, Rating. I will transform it into movieID, userId-rating as we are doing item-item CF this way i can create discreet sets of data, and apply a LSH to check users that rated the same for the same movie

In [18]:
data = data.withColumn("userId-rating", F.concat(F.col("userId"), F.lit("-"), F.col("rating")))
#Spliting the data 90% to implement and 10% for validation, seed for reproducibility
data, data_val = data.randomSplit([0.9, 0.1], seed=42)
#Will preprocess the data and validation data exactly the same way
data.limit(20).show()

+------+-------+------+-------------+
|userId|movieId|rating|userId-rating|
+------+-------+------+-------------+
|     1|      1|   4.0|        1-4.0|
|     1|   1036|   5.0|        1-5.0|
|     1|   1049|   3.0|        1-3.0|
|     1|   1066|   4.0|        1-4.0|
|     1|    110|   4.0|        1-4.0|
|     1|   1196|   3.5|        1-3.5|
|     1|   1210|   4.5|        1-4.5|
|     1|   1291|   5.0|        1-5.0|
|     1|   1293|   2.0|        1-2.0|
|     1|   1376|   3.0|        1-3.0|
|     1|   1396|   3.0|        1-3.0|
|     1|   1537|   4.0|        1-4.0|
|     1|    158|   4.0|        1-4.0|
|     1|   1909|   3.0|        1-3.0|
|     1|   1959|   4.0|        1-4.0|
|     1|   1960|   4.0|        1-4.0|
|     1|   2028|   5.0|        1-5.0|
|     1|   2085|   3.5|        1-3.5|
|     1|   2116|   4.0|        1-4.0|
|     1|   2336|   3.5|        1-3.5|
+------+-------+------+-------------+



In [19]:
#We will now trasform the movieId-ratings into the following format userId, [list of movieId-ratings]
feats = data.groupBy("movieId").agg(F.collect_list("userId-rating").alias("Id-Rates"))
feats.limit(20).show()
#feats_val = data_val.groupBy("movieId").agg(F.collect_list("userId-rating").alias("Id-Rates"))
#Now the format is ideal to construct the equivalent to a characteristics matrix

+-------+--------------------+
|movieId|            Id-Rates|
+-------+--------------------+
|      1|[1000-3.5, 1001-5...|
|     10|[1001-4.0, 1003-3...|
|    100|[1109-5.0, 1324-3...|
|   1000|[2401-4.0, 3078-4...|
| 100008|[4074-3.0, 4880-3...|
| 100017|[6001-3.5, 7629-3.5]|
| 100032|          [9401-4.5]|
| 100034|          [2651-2.5]|
| 100038|          [9068-3.5]|
| 100044|[2589-4.0, 305-4....|
| 100046|[3469-3.0, 3917-3...|
| 100054|          [8833-4.0]|
| 100058|           [305-2.0]|
| 100062|          [8204-3.0]|
| 100081|          [8833-2.0]|
| 100083|[1117-4.0, 1195-2...|
| 100087|           [389-3.0]|
| 100091|          [8833-2.5]|
| 100106|[4202-4.5, 4249-5...|
| 100108|[2004-3.5, 2172-1...|
+-------+--------------------+



In [20]:
#Count Vectorizer pretty much constructs the characteristic matrix, if there is too much data will substitute by HashingTF
#Countvectorizer substituted by HashingTF
htf = HashingTF(inputCol="Id-Rates", outputCol="vectorized_rates", numFeatures=2**16)
feat_vector = htf.transform(feats) #vector.transform caso use o countvectorizer
#feat_vector_val = htf.transform(feats_val)

In [21]:
#now we have our characteristics matrix interpreted as a list of vectors for ALL users
#The data format here is a list of [Indexes of Existing movieIc-rating on a user]
#As an example we have | (userId)1|(movieId-ratings)[1-4.0, 1036-5.0,...|(Vectorized)(28932,[8,9,23,36...|


### LSH

"""
We can now proceed to minHashing
Probability of candidate is 1-(1-s^k)^b, where k is the number of hashed rows and b is number of bands
and s is the threshold similarity
I will choose s = 0.5 because we are looking into identical behaviour in movie ratings
If User1 has a similarity rating of 0.5 with user2 then we can recommend movies for both users
As i dont know how many hash funtions to use ill just use the default value for pyspark's MinHashLSH 
(cant use the default value as it is not working too much memory)
"""

In [24]:
#MinHashLSH is a integrated minhashing function on pyspark
minhash_movie = MinHashLSH(inputCol="vectorized_rates", outputCol="hashed_rates", numHashTables=64)
minhash_model = minhash_movie.fit(feat_vector)
signatures = minhash_model.transform(feat_vector)
signatures.limit(20).show() #forcing system to calculate it
#MinHashLSH already bands and selects the number of rows per band, so no manually selection of bands and rows will be chosen 

+-------+--------------------+--------------------+--------------------+
|movieId|            Id-Rates|    vectorized_rates|        hashed_rates|
+-------+--------------------+--------------------+--------------------+
|      1|[1000-3.5, 1001-5...|(65536,[65,71,78,...|[[431617.0], [606...|
|     10|[1001-4.0, 1003-3...|(65536,[99,308,35...|[[1380108.0], [18...|
|    100|[1109-5.0, 1324-3...|(65536,[681,717,2...|[[1.3880476E7], [...|
|   1000|[2401-4.0, 3078-4...|(65536,[24495,302...|[[1.214088191E9],...|
| 100008|[4074-3.0, 4880-3...|(65536,[3865,3283...|[[7.62221094E8], ...|
| 100017|[6001-3.5, 7629-3.5]|(65536,[26398,405...|[[1.136325078E9],...|
| 100032|          [9401-4.5]|(65536,[34386],[1...|[[1.69828445E9], ...|
| 100034|          [2651-2.5]|(65536,[53851],[1...|[[5.5908619E8], [...|
| 100038|          [9068-3.5]|(65536,[31590],[1...|[[8.99163005E8], ...|
| 100044|[2589-4.0, 305-4....|(65536,[2549,4009...|[[5.204915E8], [2...|
| 100046|[3469-3.0, 3917-3...|(65536,[25537,315...|

In [25]:
#approxSimilarityJoin is a pyspark function to perform a LSH and filter by distance
sim_items = minhash_model.approxSimilarityJoin(feat_vector,feat_vector, 0.3, distCol="JaccardDistance")
#Avoiding repeated items
sim_items = sim_items.filter(F.col("datasetA.movieId") < F.col("datasetB.movieId"))
sim_items.limit(20).show()
#sim_items.show()
#sim_items is a dataframe that has the info on the distance and movies that i might want to check

+--------------------+--------------------+---------------+
|            datasetA|            datasetB|JaccardDistance|
+--------------------+--------------------+---------------+
|{109151, [7056-3....|{116989, [7056-3....|            0.0|
|{109151, [7056-3....|{146421, [7056-3....|            0.0|
|{109151, [7056-3....|{60585, [7056-3.0...|            0.0|
|{109151, [7056-3....|{174655, [7056-3....|            0.0|
|{109151, [7056-3....|{185537, [7056-3....|            0.0|
|{109151, [7056-3....|{186339, [7056-3....|            0.0|
|{109151, [7056-3....|{191815, [7056-3....|            0.0|
|{109151, [7056-3....|{194785, [7056-3....|            0.0|
|{109151, [7056-3....|{203419, [7056-3....|            0.0|
|{109151, [7056-3....|{204282, [7056-3....|            0.0|
|{109151, [7056-3....|{206533, [7056-3....|            0.0|
|{109151, [7056-3....|{227698, [7056-3....|            0.0|
|{109151, [7056-3....|{232475, [7056-3....|            0.0|
|{116989, [7056-3....|{146421, [7056-3..

In [26]:
#pair_sim is a friendly way to check if X movie is similar to Y movie, and how similar
pair_sim = sim_items.withColumn("movieA", F.col("datasetA.movieId")).withColumn("movieB", F.col("datasetB.movieId")).withColumn("Similarity", 1 - F.col("JaccardDistance"))
pair_sim = pair_sim.drop("datasetA", "datasetB", "JaccardDistance")
pair_sim.limit(20).show()

+------+------+----------+
|movieA|movieB|Similarity|
+------+------+----------+
|109151|116989|       1.0|
|109151|146421|       1.0|
|109151| 60585|       1.0|
|109151|174655|       1.0|
|109151|185537|       1.0|
|109151|186339|       1.0|
|109151|191815|       1.0|
|109151|194785|       1.0|
|109151|203419|       1.0|
|109151|204282|       1.0|
|109151|206533|       1.0|
|109151|227698|       1.0|
|109151|232475|       1.0|
|116989|146421|       1.0|
|116989| 60585|       1.0|
|116989|174655|       1.0|
|116989|185537|       1.0|
|116989|186339|       1.0|
|116989|191815|       1.0|
|116989|194785|       1.0|
+------+------+----------+




""" 
There is a problem of unnexisting data for example user 1 has the following:
+--------+------------------+---------------------+
|val_user|actual_rated_movie|actual_rating_by_user|
+--------+------------------+---------------------+
|       1|              1200|                  3.5|
|       1|              1214|                  4.0|
|       1|               596|                  4.0|
|       1|              7706|                  3.5|

But when we try to compare with the predictions
+----+------+-------------+----------------+
|user| movie|actual_rating|Predicted_rating|
+----+------+-------------+----------------+
|  22| 74727|          4.0|             4.0|
| 123|  6663|          5.0|             4.5|
| 305|130452|          4.0|             4.0|
+----+------+-------------+----------------+

1 doesn't exist at all, in fact for 100 000 samples only users 22, 123, 305 can be compared

That happens because during the prediction dataframe creation because we are "calculating"
the predicted ratings based on similar movies that the USER saw, so if the user didnt see any 
similar movies to the ones they still haven't seen there is no predictions to be done

Now to fill the utility matrix we need a strategy to handle these sorts of cases

As a solution i propose to instead calculate a average of the rating of similar movies 
and by default every user's predicted rating to be that average and to those users who
the predicted rating can be calculated by similar seen movies, it is changed
"""

In [28]:
#defaulting the predicted values for the ratings (similarity = 0.5 defined above)
default_rate_a = data.join(pair_sim, (F.col("movieId") == F.col("movieA")), "inner").select("userId","movieId","rating","movieB","Similarity")
default_rate_a = default_rate_a.alias("a").join(data.alias("b"), (F.col("a.movieB") == F.col("b.movieId")), "inner").select(F.col("a.userId").alias("userId"),F.col("a.movieId").alias("movieId"),F.col("a.movieB").alias("movieB"),F.col("Similarity"), F.col("b.rating").alias("ratingB"))
#Calculating the average rating
default_rate_a = default_rate_a.groupBy("userId","movieId").agg((F.sum(F.col("Similarity") * F.col("ratingB"))/F.sum(F.col("Similarity"))).alias("default_rating"))
#We start by checking what movies in the original data are present on the first column of the similar movies 
#Then we do the same for movies on the second column, this way no movies are unchecked
default_rate_b = data.join(pair_sim, (F.col("movieId") == F.col("movieB")), "inner").select("userId","movieId","rating","movieA","Similarity")
default_rate_b = default_rate_b.alias("a").join(data.alias("b"), (F.col("a.movieA") == F.col("b.movieId")), "inner").select(F.col("a.userId").alias("userId"),F.col("a.movieId").alias("movieId"),F.col("a.movieA").alias("movieB"),F.col("Similarity"), F.col("b.rating").alias("ratingB"))
#calculating the average rating
default_rate_b = default_rate_b.groupBy("userId","movieId").agg((F.sum(F.col("Similarity") * F.col("ratingB"))/F.sum(F.col("Similarity"))).alias("default_rating"))
#Then we unite both dataframes, using distinct to avoid duplicates
default_rate = default_rate_a.union(default_rate_b).distinct()
default_rate.limit(20).show()

+------+-------+--------------+
|userId|movieId|default_rating|
+------+-------+--------------+
|  6116|   5229|           3.0|
|  6823| 110173|           3.5|
|  3083| 133729|           2.5|
|  7010| 270578|           2.0|
|  5957| 128862|           3.5|
|  7644| 238108|           5.0|
|  6232| 173445|           3.5|
|  6116|   8621|           3.5|
|  3951|  59053|           3.5|
|  8833|  92204|           3.5|
|  7959| 132824|           2.0|
|  8833|  39305|           2.5|
|  7010| 224024|           2.0|
|  8833| 198919|           2.5|
|  8833| 151741|           3.0|
|  8833| 143065|           2.5|
|  9453| 169052|           2.0|
|  3884| 132159|           4.5|
|  8978| 134093|           3.0|
|  8374| 151347|           2.5|
+------+-------+--------------+



In [29]:
#The idea is to now create a dataframe of users and movies user hasn't rated(seen)
#dataframe data has userId, movieId, ratings and userId-ratings(unnecessary for this part)
#First we get all the movies available
all_movies = data.select("movieId").distinct()
all_movies = data.select("userId").distinct().crossJoin(all_movies)
#Second we get all the movies rated, by user
rated_movies = data.select("userId", "movieId")
#Lastly we join left_anti to exclude all the rated movies from all movies
#not_rated is now a dataframe with all the users and movies they havent rated
not_rated = all_movies.join(rated_movies,on=["userId", "movieId"], how="left_anti")
#Will now integrate the default rating into the unrated movies
not_rated = not_rated.alias("a").join(default_rate.alias("b"), F.col("a.movieId") == F.col("b.movieId")).select(F.col("a.userId").alias("userId"), F.col("a.movieId").alias("movieId"), "default_rating").distinct()
not_rated.limit(20).show()

+------+-------+--------------+
|userId|movieId|default_rating|
+------+-------+--------------+
|  1001| 100591|           3.0|
|  1010| 100591|           3.0|
|  1088| 100591|           3.0|
|  1093| 100591|           3.0|
|  1097| 100591|           3.0|
|  1108| 100591|           3.0|
|  1148| 100591|           3.0|
|  1302| 100591|           3.0|
|   131| 100591|           3.0|
|  1382| 100591|           3.0|
|  1418| 100591|           3.0|
|  1430| 100591|           3.0|
|  1447| 100591|           3.0|
|  1468| 100591|           3.0|
|  1469| 100591|           3.0|
|  1505| 100591|           3.0|
|  1537| 100591|           3.0|
|  1615| 100591|           3.0|
|  1646| 100591|           3.0|
|   165| 100591|           3.0|
+------+-------+--------------+



In [30]:
#We will now expand the not_rated dataframe to include a prediction of the rating of each user on each unrated film
#The idea is to verify similar movies to a unrated movie and see what similar movies
#a user rated and ponder a average based on this criteria
#dataframe that has the similar movies to the ones the user didnt rate on MovieB column
prediction_b = not_rated.join(pair_sim, (F.col("movieId") == F.col("movieA")), "inner").drop("movieA").select("userId", "movieId","movieB", "Similarity", "default_rating")
#dataframe that has the similar movie to the ones the user didnt rate on MovieA column, calling movieA as movieB for a successfull union
prediction_a = not_rated.join(pair_sim, (F.col("movieId") == F.col("movieB")), "inner").drop("movieB").select("userId", "movieId", F.col("movieA").alias("movieB"), "Similarity", "default_rating")
#Union of both dataframes without repetitions(.distinct())
prediction = prediction_b.union(prediction_a).distinct() #.persist(StorageLevel.DISK_ONLY)
prediction.limit(20).show()

+------+-------+------+----------+--------------+
|userId|movieId|movieB|Similarity|default_rating|
+------+-------+------+----------+--------------+
|  1007| 100591|144618|       1.0|           3.0|
|  1070| 100591| 83577|       1.0|           3.0|
|   113| 100591|124160|       1.0|           3.0|
|   113| 100591| 43626|       1.0|           3.0|
|   113| 100591| 66686|       1.0|           3.0|
|  1064| 100591| 25783|       1.0|           3.0|
|  1064| 100591|  7583|       1.0|           3.0|
|  1170| 100591|147028|       1.0|           3.0|
|  1170| 100591| 53916|       1.0|           3.0|
|  1170| 100591| 80661|       1.0|           3.0|
|  1183| 100591| 87595|       1.0|           3.0|
|  1257| 100591|175135|       1.0|           3.0|
|  1257| 100591| 72344|       1.0|           3.0|
|  1224| 100591|108799|       1.0|           3.0|
|  1224| 100591| 83577|       1.0|           3.0|
|  1306| 100591| 82467|       1.0|           3.0|
|  1264| 100591|279530|       1.0|           3.0|


"""
We join prediction with the original data to be able to access it's rating to calculate the predictions(not the defaulted ones)
The predicted rating is calculated and saved in a different dataframe because 
prediction_c is grouped by userId and unrated_movie and that eliminates the default rating that was calculated beforeb
The later join between final_prediction and prediction_c is just to integrate the
default rating into the predicted rating(named Rate_prediction(with_default))

Note that joining the default rating and the predicted rating into the same column will not
have any collision as predicted rating is exclusive of the existence of default rating
"""

In [32]:
#Here we join the prediction with the original data in order to calculate the predicted rating based on similar ratings
prediction = prediction.alias("a").join(data.alias("b"), ((F.col("movieB")==F.col("b.movieId")) & (F.col("a.userId") == F.col("b.userId"))), "left")
#Selecting the only needed columns
prediction = prediction.select(F.col("a.userId"), F.col("a.movieId").alias("unrated_movie"), F.col("movieB").alias("sim_movie"),F.col("Similarity"), F.col("rating").alias("rating_sim"), "default_rating")
#Created a new df to avoid eliminating the default rating
prediction_c = prediction.groupBy("userId", "unrated_movie").agg((F.sum(F.col("Similarity")*F.col("rating_sim"))/F.sum(F.col("Similarity"))).alias("Predicted_rating")).orderBy(F.asc("userId"))

#formatting to the only needed columns
final_prediction = prediction.select("userId", "unrated_movie", "default_rating").distinct()
#Join to fit the default rating calculated before into a column
final_prediction = final_prediction.alias("a").join(prediction_c.alias("b"), (F.col("a.userId")==F.col("b.userId"))&(F.col("a.unrated_movie") == F.col("b.unrated_movie")), how = "left")
#Joining the predicted rating and the default rating into the same column
final_prediction = final_prediction.withColumn("Rate_prediction(with_default)", F.when(F.col("Predicted_rating").isNotNull(), F.col("Predicted_rating")).otherwise(F.col("default_rating")))
#keeping this df in cache because it is a BIG dataframe
final_prediction = final_prediction.select("a.userId","a.unrated_movie","Rate_prediction(with_default)").distinct()#.persist(StorageLevel.DISK_ONLY)#.repartition(400)
final_prediction.limit(20).show()

+------+-------------+-----------------------------+
|userId|unrated_movie|Rate_prediction(with_default)|
+------+-------------+-----------------------------+
|     1|       103576|                          3.5|
|     1|       109812|                          4.0|
|     1|       111624|                          2.5|
|     1|       112735|                          3.5|
|     1|       113843|                          4.0|
|     1|       116945|                          3.5|
|     1|       122270|                          2.5|
|     1|       122270|           2.1666666666666665|
|     1|       123282|                          2.5|
|     1|       135761|                          3.5|
|     1|       135785|                          2.0|
|     1|       135787|                          4.5|
|     1|       139795|                          3.0|
|     1|       141137|                          1.5|
|     1|       146190|                          2.0|
|     1|       146358|                        

In [34]:
#This is the validation set where we check if some movies were actually rated by the users
#The objective of this dataframe is to check if our "averaged" rating is correct
#Join between the validation data and unrated movies dataframe to check the movies that 
#were thought to be unrated but were actually rated
val_df = data_val.alias("a").join(not_rated.alias("b"), (F.col("a.userId")==F.col("b.userId")) & (F.col("a.movieId") == F.col("b.movieId")), "left").select(F.col("a.userId").alias("val_user"), F.col("a.movieId").alias("actual_rated_movie"), F.col("a.rating").alias("actual_rating_by_user")).distinct()
val_df.limit(20).show()

+--------+------------------+---------------------+
|val_user|actual_rated_movie|actual_rating_by_user|
+--------+------------------+---------------------+
|       1|              1200|                  3.5|
|       1|              1214|                  4.0|
|       1|               596|                  4.0|
|       1|              7706|                  3.5|
|      10|              1148|                  3.5|
|      10|              2716|                  3.0|
|      10|             34150|                  2.5|
|      10|             34162|                  3.0|
|      10|             44022|                  3.0|
|      10|             59369|                  3.5|
|      10|             63113|                  4.0|
|      10|             76251|                  3.5|
|      10|             79293|                  3.5|
|      10|             87222|                  3.5|
|      10|             94864|                  4.5|
|     100|              1285|                  4.0|
|     100|  

In [38]:
#Here we create a dataframe that displays the ratings the movies that were predicted compared to the same movies
#left in the actual validation data
pred_actual_df = val_df.join(final_prediction, (F.col("val_user") == F.col("userId")) & (F.col("actual_rated_movie") == F.col("unrated_movie")), how="inner") #inner
pred_actual_df = pred_actual_df.select(F.col("val_user").alias("user"), F.col("actual_rated_movie").alias("movie"), F.col("actual_rating_by_user").alias("actual_rating"), F.col("Rate_prediction(with_default)")).distinct()
pred_actual_df.limit(20).show()
#pred_actual_df.write.option("header", True).csv("pred_vs_actual1M.csv")

+----+------+-------------+-----------------------------+
|user| movie|actual_rating|Rate_prediction(with_default)|
+----+------+-------------+-----------------------------+
|2402| 74582|          2.0|                          3.0|
| 265|  7626|          2.0|                          2.5|
|2639|101425|          4.0|                          3.5|
|3689|154470|          2.5|                          4.5|
|9404|173845|          5.0|                          2.5|
|4190| 26746|          2.0|                          3.5|
|4733|  4669|          3.0|                          3.0|
|4934|  6854|          4.0|                          3.0|
|6386|171461|          1.5|                          3.0|
|6416|  2509|          3.5|                          1.0|
|9521|  7583|          3.0|                          3.0|
|2935| 92923|          2.0|                          1.0|
| 301|256075|          3.0|                          3.5|
|8833| 87878|          3.0|                          4.0|
|9401|275173| 

"""
I was getting an extremely small dataframe for the predicted rating vs actual ratings such as this one
+----+------+-------------+----------------+
|user| movie|actual_rating|Predicted_rating| ###This dataframe was for 10K lines
+----+------+-------------+----------------+
|  22| 74727|          4.0|             4.0|
| 123|  6663|          5.0|             4.5|
| 305|130452|          4.0|             4.0|
+----+------+-------------+----------------+
So due to these small dataframes it was decided to include the default rating system
It's not available but even with the default rating included the dataframes were still small
Example: (prints counting the number of elements of a given dataframe)
validation dataframe size 10062
pred_actual_df count 365

Then after getting pred_actual_df to be the join between val_df and final_prediction to be "left"
the following  dataframe was the result
+----+------+-------------+-----------------------------+
|user| movie|actual_rating|Rate_prediction(with_default)|
+----+------+-------------+-----------------------------+
|  33| 48516|          4.5|                         NULL|
|  50|122900|          1.5|                         NULL|
|  53|  1206|          4.5|                         NULL|
|  11| 91529|          4.5|                         NULL|
|  82|  1093|          2.0|                         NULL|
|  91|   500|          3.0|                         NULL|
|  82|   968|          5.0|                          5.0|

We have plenty of movies rate_prediction as NULL, that requires the question even with the 
default_rating why does the rate prediction keep being null?

One of the hypothesis is that these movies only appear in the validation data 
hence they cannot be rated  

this dataframe below, movie_check, is to check the hypothesis above
If the following dataframe prints empty that would mean that there is no movie whose
Rate_prediction(with_default) is NULL present in the original dataframe named datab
"""

In [40]:
#filtering the pred_actual_df by Rate_prediction being Null
movie_check = pred_actual_df.filter(F.col("Rate_prediction(with_default)").isNull())
#joining with data to check if movie that has a Null predicted_rating is available in the original data
movie_check = movie_check.join(data, (F.col("movie") == F.col("movieId")) & (F.col("user")==F.col("userId")), "inner")
movie_check.take(1)

[]


"""
Empty dataframe, this means that all the Null ratings are due to movies not being
available in the original data
+----+-----+-------------+-----------------------------+------+-------+------+-------------+
|user|movie|actual_rating|Rate_prediction(with_default)|userId|movieId|rating|userId-rating|
+----+-----+-------------+-----------------------------+------+-------+------+-------------+
+----+-----+-------------+-----------------------------+------+-------+------+-------------+

To avoid big calculations ill keep the dataframe that compares actual vs predicted rating as
"inner" join so this dataframe will always appear empty

Also with the adding of more data the number of unavailable movies on the "training" data
compared to the validation data are expected to get relatively lower and lower because
this dataset features 86k movies, and it is expected with the increasing of the data more users
rating more movies.
"""


In [None]:
# 1. RMSE (Root Mean Square Error)
rmse = pred_actual_df.select(
    F.sqrt(F.avg(F.pow(F.col("actual_rating") - F.col("Rate_prediction(with_default)"), 2))).alias("RMSE"))
rmse.show()

# 2. MAE (Mean Absolute Error)
mae = pred_actual_df.select(
    F.avg(F.abs(F.col("actual_rating") - F.col("Rate_prediction(with_default)"))).alias("MAE"))
mae.show()

# 4. MAPE (Mean Absolute Percentage Error)
def calculate_ape(actual, predicted):
    return F.abs((actual - predicted) / actual) * 100

mape = pred_actual_df.withColumn("ape", 
    calculate_ape(F.col("actual_rating"), F.col("Rate_prediction(with_default)"))) \
    .select(F.avg("ape"))
mape.show()

# 5. Acurácia para classificações inteiras (se aplicável)
accuracy = pred_actual_df.withColumn("correct", 
    when(F.round(F.col("Rate_prediction(with_default)")) == F.col("actual_rating"), 1).otherwise(0)) \
    .select(F.avg("correct"))
accuracy.show()

+--------------------------------------------------------------------+
|SQRT(avg(POWER((actual_rating - Rate_prediction(with_default)), 2)))|
+--------------------------------------------------------------------+
|                                                   1.326080173877682|
+--------------------------------------------------------------------+



In [None]:
threshold = 3.5

precision = pred_actual_df.withColumn("relevant_pred", 
    col("Rate_prediction(with_default)") >= threshold) \
    .withColumn("relevant_actual", 
    col("actual_rating") >= threshold) \
    .withColumn("true_positive", 
    (col("relevant_pred") & col("relevant_actual")).cast("int")) \
    .withColumn("false_positive", 
    (col("relevant_pred") & ~col("relevant_actual")).cast("int")) \
    .agg(
        F.sum("true_positive").alias("tp"),
        F.sum("false_positive").alias("fp")
    ).withColumn("precision", col("tp") / (col("tp") + col("fp"))))
precision.show()

In [28]:
###This section is just to make sure the dimensions are correct when moving from dataframe to dataframe
#as it is easy to filter data unwantingly
"""
3966 movies that user 1 hasn't rated for 10k obs
3966 movies not rated by user 1
checks out, dimensions are correct for the unrated_movies for user 1, will assume correctness for all other users
"""
print(not_rated.filter(F.col("userId")==1).select("movieId").distinct().count())
print(prediction.filter(F.col("userId")==1).select("unrated_movie").distinct().count())

"""
prediction size at union 134925998
prediction size after left join with data, should be equal to the previous number 134925998
Checks out, dimensions are the same
"""
print(f"prediction size at union {prediction.count()}")
print(f"prediction size after left join with data, should be equal to the previous number {prediction.count()}")

#print(f"prediction_c size after grouping {prediction_c.count()}")
#print(f"final prediction dataframe size {final_prediction.count()}")

"""
validation data size 10062
validation dataframe size 10062
checks out
"""
print(f"validation data size {data_val.count()}")
print(f"validation dataframe size {val_df.count()}")

"""
pred_actual_df count 10072 (intersection between unrated movies and validation data)
How many NULL ratings 9707 ()
movie_check count 0
As stated earlier this is expected when pred_actual_df is created using a left join
"""
print(f"pred_actual_df count {pred_actual_df.count()}")
print(f"How many NULL ratings {pred_actual_df.filter(F.col('Rate_prediction(with_default)').isNull()).count()}")
print(f"movie_check count {movie_check.count()}")


10306
10306
prediction size at union 9115745967
prediction size after left join with data, should be equal to the previous number 9115745967
validation data size 200092
validation dataframe size 200092
pred_actual_df count 740
How many NULL ratings 0
movie_check count 0
