In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, expr, lit, split, explode, array, array_union, collect_list
from pyspark.ml.recommendation import ALS

In [2]:
def train_als_model(ratings):
    # Define ALS model
    als = ALS(
        maxIter=10,
        regParam=0.1,
        userCol="userId",
        itemCol="movieId",
        ratingCol="rating",
        coldStartStrategy="drop",
        nonnegative=True
    )
    # Train the model
    als_model = als.fit(ratings)
    return als_model

In [3]:
def get_top_n_recommendations(model, n_recommendations=100):
    # Generate top N movie recommendations for each user
    user_recs = model.recommendForAllUsers(n_recommendations)
    return user_recs

In [4]:
def get_movie_id(top_movies, n_recommendations=100):
    # Extract the top N movie IDs from the recommendations DataFrame
    top_movie_ids = top_movies.select(explode("recommendations.movieId").alias("movieId")).distinct().limit(n_recommendations).collect()
    return [row['movieId'] for row in top_movie_ids]

In [5]:
def compute_map(top_movies, ratings, n_recommendations=100):
    top_movie_id = get_movie_id(top_movies, n_recommendations)
    top_movie_id_expr = f"array({','.join([str(x) for x in top_movie_id])})"
    
    user_actual_movies = ratings.groupBy("userId").agg(
        expr("collect_list(movieId) as actual_movies")
    )
    
    precision_per_user = user_actual_movies.select(
        expr(f"size(array_intersect(actual_movies, {top_movie_id_expr})) as hits"),
        expr("size(actual_movies) as total_relevant"),
        lit(n_recommendations).alias("total_recommendations")
    ).selectExpr("hits / total_relevant as precision_at_k")
    
    mean_average_precision = precision_per_user.selectExpr("avg(precision_at_k) as MAP").first()['MAP']
    
    return mean_average_precision

In [6]:
def aggregate_genome_scores(genome_scores, genome_tags):
    # Join genome scores with genome tags to get tag names
    genome_scores_with_tags = genome_scores.join(genome_tags, on="tagId")
    
    # Aggregate the relevance scores for each movie
    movie_features = genome_scores_with_tags.groupBy("movieId").agg(
        collect_list(array("tag", "relevance")).alias("tag_relevance")
    )
    return movie_features

In [7]:
def process_data(spark):
    base_path = f'./ml-latest'
    train_path = f'{base_path}/train_ratings.parquet'
    val_path = f'{base_path}/val_ratings.parquet'
    test_path = f'{base_path}/test_ratings.parquet'
    movies_path = f'{base_path}/movies.parquet'
    genome_scores_path = f'{base_path}/genome-scores.parquet'
    genome_tags_path = f'{base_path}/genome-tags.parquet'
    
    train_ratings = spark.read.parquet(train_path, header=True, inferSchema=True)
    val_ratings = spark.read.parquet(val_path, header=True, inferSchema=True)
    test_ratings = spark.read.parquet(test_path, header=True, inferSchema=True)
    movies = spark.read.parquet(movies_path, header=True, inferSchema=True)
    genome_scores = spark.read.parquet(genome_scores_path, header=True, inferSchema=True)
    genome_tags = spark.read.parquet(genome_tags_path, header=True, inferSchema=True)
    
    # Aggregate genome scores to create additional movie features
    movie_features = aggregate_genome_scores(genome_scores, genome_tags)
    
    # Join ratings with movie features
    train_ratings = train_ratings.join(movie_features, on="movieId", how="left")
    val_ratings = val_ratings.join(movie_features, on="movieId", how="left")
    test_ratings = test_ratings.join(movie_features, on="movieId", how="left")
    
    # Train ALS model
    als_model = train_als_model(train_ratings)
    
    # Get top N recommendations
    top_recommendations = get_top_n_recommendations(als_model)
    
    # Compute MAP
    print("Computing MAP on Training data")
    train_map = compute_map(top_recommendations, train_ratings)
    print(f"Train MAP: {train_map}")
    
    print("Computing MAP on Validation data")
    val_map = compute_map(top_recommendations, val_ratings)
    print(f"Validation MAP: {val_map}")
    
    print("Computing MAP on Test data")
    test_map = compute_map(top_recommendations, test_ratings)
    print(f"Test MAP: {test_map}")


In [8]:
def main(spark):
    process_data(spark)

In [9]:
if __name__ == "__main__":
    spark = SparkSession.builder.appName('als_recommender').getOrCreate()
    
#     spark = SparkSession.builder \
#         .appName('als_recommender') \
#         .config("spark.sql.shuffle.partitions", "800") \
#         .config("spark.executor.memory", "16g") \
#         .config("spark.driver.memory", "16g") \
#         .config("spark.memory.fraction", "0.8") \
#         .config("spark.memory.storageFraction", "0.2") \
#         .getOrCreate()
    
    spark = SparkSession.builder \
    .appName('als_recommender') \
    .config('spark.executor.memory', '16g') \
    .config('spark.driver.memory', '16g') \
    .config('spark.executor.cores', '6') \
    .config('spark.executor.instances', '12') \
    .getOrCreate()
    
    main(spark)

24/05/15 19:11:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/15 19:11:42 WARN BlockManager: Block rdd_62_7 could not be removed as it was not found on disk or in memory
24/05/15 19:11:42 WARN BlockManager: Block rdd_63_7 could not be removed as it was not found on disk or in memory
24/05/15 19:11:42 ERROR Executor: Exception in task 7.0 in stage 15.0 (TID 425)
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.recommendation.ALS$UncompressedInBlockSort.allocate(ALS.scala:1569)
	at org.apache.spark.ml.recommendation.ALS$UncompressedInBlockSort.allocate(ALS.scala:1518)
	at org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:960)
	at org.apache.spark.ut

Py4JJavaError: An error occurred while calling o90.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 15.0 failed 1 times, most recent failure: Lost task 7.0 in stage 15.0 (TID 425) (cm041 executor driver): java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.recommendation.ALS$UncompressedInBlockSort.allocate(ALS.scala:1569)
	at org.apache.spark.ml.recommendation.ALS$UncompressedInBlockSort.allocate(ALS.scala:1518)
	at org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:960)
	at org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:708)
	at org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:534)
	at org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:462)
	at org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325)
	at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153)
	at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
	at org.apache.spark.ml.recommendation.ALS$UncompressedInBlock.org$apache$spark$ml$recommendation$ALS$UncompressedInBlock$$sort(ALS.scala:1489)
	at org.apache.spark.ml.recommendation.ALS$UncompressedInBlock.compress(ALS.scala:1449)
	at org.apache.spark.ml.recommendation.ALS$.$anonfun$makeBlocks$5(ALS.scala:1633)
	at org.apache.spark.ml.recommendation.ALS$$$Lambda$3174/0x000000084108e040.apply(Unknown Source)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$mapValues$3(PairRDDFunctions.scala:751)
	at org.apache.spark.rdd.PairRDDFunctions$$Lambda$3132/0x000000084133f040.apply(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:222)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1423)
	at org.apache.spark.storage.BlockManager$$Lambda$1284/0x000000084096b040.apply(Unknown Source)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
	at org.apache.spark.rdd.RDD$$Lambda$3108/0x0000000841330c40.apply(Unknown Source)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1423)
	at org.apache.spark.storage.BlockManager$$Lambda$1284/0x000000084096b040.apply(Unknown Source)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1253)
	at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:960)
	at org.apache.spark.ml.recommendation.ALS.$anonfun$fit$1(ALS.scala:709)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:691)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.recommendation.ALS$UncompressedInBlockSort.allocate(ALS.scala:1569)
	at org.apache.spark.ml.recommendation.ALS$UncompressedInBlockSort.allocate(ALS.scala:1518)
	at org.apache.spark.util.collection.TimSort$SortState.ensureCapacity(TimSort.java:960)
	at org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:708)
	at org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:534)
	at org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:462)
	at org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325)
	at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153)
	at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
	at org.apache.spark.ml.recommendation.ALS$UncompressedInBlock.org$apache$spark$ml$recommendation$ALS$UncompressedInBlock$$sort(ALS.scala:1489)
	at org.apache.spark.ml.recommendation.ALS$UncompressedInBlock.compress(ALS.scala:1449)
	at org.apache.spark.ml.recommendation.ALS$.$anonfun$makeBlocks$5(ALS.scala:1633)
	at org.apache.spark.ml.recommendation.ALS$$$Lambda$3174/0x000000084108e040.apply(Unknown Source)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$mapValues$3(PairRDDFunctions.scala:751)
	at org.apache.spark.rdd.PairRDDFunctions$$Lambda$3132/0x000000084133f040.apply(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:222)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1423)
	at org.apache.spark.storage.BlockManager$$Lambda$1284/0x000000084096b040.apply(Unknown Source)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
	at org.apache.spark.rdd.RDD$$Lambda$3108/0x0000000841330c40.apply(Unknown Source)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1423)
	at org.apache.spark.storage.BlockManager$$Lambda$1284/0x000000084096b040.apply(Unknown Source)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)


24/05/15 19:11:42 WARN BlockManager: Putting block rdd_63_4 failed due to exception org.apache.spark.TaskKilledException.
24/05/15 19:11:42 WARN BlockManager: Block rdd_63_4 could not be removed as it was not found on disk or in memory
24/05/15 19:11:42 WARN BlockManager: Putting block rdd_62_8 failed due to exception org.apache.spark.TaskKilledException.
24/05/15 19:11:42 WARN BlockManager: Block rdd_62_8 could not be removed as it was not found on disk or in memory
24/05/15 19:11:42 WARN BlockManager: Putting block rdd_63_8 failed due to exception org.apache.spark.TaskKilledException.
24/05/15 19:11:42 WARN BlockManager: Block rdd_63_8 could not be removed as it was not found on disk or in memory
