### Question 4 small

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, FloatType, StructType, StructField
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder \
    .appName("question4") \
    .getOrCreate()

24/05/11 21:54:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
ratings_schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", FloatType(), True),
    StructField("timestamp", IntegerType(), True)
])

train_small = spark.read.parquet("rec-small/train.parquet", header=True, schema=ratings_schema)
val_small = spark.read.parquet("rec-small/val.parquet", header=True, schema=ratings_schema)
test_small = spark.read.parquet("rec-small/test.parquet", header=True, schema=ratings_schema)
# Row(userId=503, movieId=68157, rating=4.5, timestamp=1335219485)

In [4]:
popular_movies = train_small.groupBy("movieId").agg(
                        avg("rating").alias("avg_rating"), 
                        count("rating").alias("num_ratings"))

damping_factor = 1000
popular_movies = popular_movies.withColumn("damped_popularity",
                                        (col("avg_rating")*col("num_ratings"))/(col("num_ratings") + damping_factor)
                                        )
popular_movies = popular_movies.orderBy(
                            col("damped_popularity").desc())

                                                                                
# Row(movieId=318, avg_rating=4.448339483394834, num_ratings=271, damped_popularity=0.9484657749803305)

In [5]:
test_small_joined = test_small.join(popular_movies, "movieId", "left")
# Row(movieId=68157, userId=503, rating=4.5, timestamp=1335219485, avg_rating=4.212328767123288, num_ratings=73, damped_popularity=0.28657968313140725)

In [6]:
window_spec = Window.partitionBy('userId').orderBy(F.desc('damped_popularity'))
ranked_test_small_joined = test_small_joined.withColumn('rank', F.rank().over(window_spec))
# Row(movieId=2959, userId=65, rating=4.5, timestamp=1494767045, avg_rating=4.3138297872340425, num_ratings=188, damped_popularity=0.6826599326599326, rank=1)

In [7]:
top_100_per_user = ranked_test_small_joined.filter(col("rank") <= 100)
top_100_per_user = top_100_per_user.drop(col("rank"))

In [8]:
top_100_per_user = top_100_per_user.rdd.map(
                        lambda row: (row["userId"], row["movieId"])).groupByKey().mapValues(list)
# (1, [356, 296, 2571, 260, 2959])

                                                                                

In [9]:
mean_ratings_per_user = test_small.groupBy("userId").agg(F.mean("rating").alias("mean_rating"))                                            
# Row(userId=148, mean_rating=3.7395833333333335)

movies_with_mean = test_small.join(mean_ratings_per_user, "userId", "inner")
# Row(userId=1, movieId=1, rating=4.0, timestamp=964982703, mean_rating=4.366379310344827)

movies_above_mean = movies_with_mean.filter(col("rating") > col("mean_rating"))
# Row(userId=1, movieId=1, rating=4.0, timestamp=964982703, mean_rating=4.366379310344827)



movies_above_mean_rdd = movies_above_mean.rdd.map(
                        lambda row: (row["userId"], row["movieId"])).groupByKey().mapValues(list)

# (21,[2376, 4545, 54286, 7570])

In [10]:
preds_and_labels = top_100_per_user.join(movies_above_mean_rdd).map(lambda row: (row[1][0], row[1][1])).collect()
preds_and_labels_par = spark.sparkContext.parallelize(preds_and_labels)

                                                                                

In [11]:
metrics = RankingMetrics(preds_and_labels_par)

MAP_score = metrics.meanAveragePrecision
ndcgAt5 = metrics.ndcgAt(5)
ndcgAt100 = metrics.ndcgAt(100)
recallAt5 = metrics.recallAt(5)
recallAt100 = metrics.recallAt(100)

print("MAP Score on full test dataset (users with > 10 movies watched): ", MAP_score)
print("NDCG at 5 on full test dataset (users with > 10 movies watched): ", ndcgAt5)
print("NDCG at 100 on full test dataset (users with > 10 movies watched): ", ndcgAt100)
print("Recall at 5 on full test dataset (users with > 10 movies watched): ", recallAt5)
print("Recall at 100 on full test dataset (users with > 10 movies watched): ", recallAt100)

MAP Score on full test dataset (users with > 10 movies watched):  0.6481317429645803
NDCG at 5 on full test dataset (users with > 10 movies watched):  0.7660117132601572
NDCG at 100 on full test dataset (users with > 10 movies watched):  0.8459597294943321
Recall at 5 on full test dataset (users with > 10 movies watched):  0.22462918722627528
Recall at 100 on full test dataset (users with > 10 movies watched):  0.9059719034064517


### Question 4 All

In [12]:
ratings_schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", FloatType(), True),
    StructField("timestamp", IntegerType(), True)
])

train = spark.read.parquet("rec/train.parquet", header=True, schema=ratings_schema)
val = spark.read.parquet("rec/val.parquet", header=True, schema=ratings_schema)
test = spark.read.parquet("rec/test.parquet", header=True, schema=ratings_schema)

In [13]:
popular_movies = train.groupBy("movieId").agg(
                        avg("rating").alias("avg_rating"), 
                        count("rating").alias("num_ratings"))

damping_factor = 1000
popular_movies = popular_movies.withColumn("damped_popularity",
                                        (col("avg_rating")*col("num_ratings"))/(col("num_ratings") + damping_factor)
                                        )
popular_movies = popular_movies.orderBy(
                            col("damped_popularity").desc())

# Row(movieId=318, avg_rating=4.4110477257678395, num_ratings=98165, damped_popularity=4.366565824635708)

In [14]:
test_joined = test.join(popular_movies, "movieId", "left")
# Row(movieId=68157, userId=503, rating=4.5, timestamp=1335219485, avg_rating=4.212328767123288, num_ratings=73, damped_popularity=0.28657968313140725)

In [15]:
window_spec = Window.partitionBy('userId').orderBy(F.desc('damped_popularity'))
ranked_test_joined = test_joined.withColumn('rank', F.rank().over(window_spec))
# Row(movieId=2959, userId=65, rating=4.5, timestamp=1494767045, avg_rating=4.3138297872340425, num_ratings=188, damped_popularity=0.6826599326599326, rank=1)

In [16]:
top_100_per_user = ranked_test_joined.filter(col("rank") <= 100)
top_100_per_user = top_100_per_user.drop(col("rank"))

In [17]:
top_100_per_user = top_100_per_user.rdd.map(
                        lambda row: (row["userId"], row["movieId"])).groupByKey().mapValues(list)
# (1, [356, 296, 2571, 260, 2959])

In [18]:
mean_ratings_per_user = test.groupBy("userId").agg(F.mean("rating").alias("mean_rating"))                                            
# Row(userId=148, mean_rating=3.7395833333333335)

movies_with_mean = test.join(mean_ratings_per_user, "userId", "inner")
# Row(userId=1, movieId=1, rating=4.0, timestamp=964982703, mean_rating=4.366379310344827)

movies_above_mean = movies_with_mean.filter(col("rating") > col("mean_rating"))
# Row(userId=1, movieId=1, rating=4.0, timestamp=964982703, mean_rating=4.366379310344827)


movies_above_mean_rdd = movies_above_mean.rdd.map(
                        lambda row: (row["userId"], row["movieId"])).groupByKey().mapValues(list)

# (21,[2376, 4545, 54286, 7570])

In [19]:
preds_and_labels = top_100_per_user.join(movies_above_mean_rdd).map(lambda row: (row[1][0], row[1][1])).collect()
preds_and_labels_par = spark.sparkContext.parallelize(preds_and_labels)


                                                                                

In [20]:
metrics = RankingMetrics(preds_and_labels_par)

MAP_score = metrics.meanAveragePrecision
ndcgAt5 = metrics.ndcgAt(5)
ndcgAt100 = metrics.ndcgAt(100)
recallAt5 = metrics.recallAt(5)
recallAt100 = metrics.recallAt(100)

print("MAP Score on full test dataset (users with > 10 movies watched): ", MAP_score)
print("NDCG at 5 on full test dataset (users with > 10 movies watched): ", ndcgAt5)
print("NDCG at 100 on full test dataset (users with > 10 movies watched): ", ndcgAt100)
print("Recall at 5 on full test dataset (users with > 10 movies watched): ", recallAt5)
print("Recall at 100 on full test dataset (users with > 10 movies watched): ", recallAt100)

24/05/11 21:54:55 WARN TaskSetManager: Stage 29 contains a task of very large size (1439 KiB). The maximum recommended task size is 1000 KiB.
24/05/11 21:54:55 WARN TaskSetManager: Stage 30 contains a task of very large size (1439 KiB). The maximum recommended task size is 1000 KiB.
24/05/11 21:54:55 WARN TaskSetManager: Stage 31 contains a task of very large size (1439 KiB). The maximum recommended task size is 1000 KiB.
24/05/11 21:54:56 WARN TaskSetManager: Stage 32 contains a task of very large size (1439 KiB). The maximum recommended task size is 1000 KiB.
24/05/11 21:54:56 WARN TaskSetManager: Stage 33 contains a task of very large size (1439 KiB). The maximum recommended task size is 1000 KiB.
24/05/11 21:54:57 WARN TaskSetManager: Stage 34 contains a task of very large size (1439 KiB). The maximum recommended task size is 1000 KiB.


MAP Score on full test dataset (users with > 10 movies watched):  0.683826696502222
NDCG at 5 on full test dataset (users with > 10 movies watched):  0.7439731984274025
NDCG at 100 on full test dataset (users with > 10 movies watched):  0.8485859585672962
Recall at 5 on full test dataset (users with > 10 movies watched):  0.38240747502257816
Recall at 100 on full test dataset (users with > 10 movies watched):  0.9456506802535892


                                                                                