In [1]:
import os
import time
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

# spark imports
import sys
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType

# data science imports
import math
import numpy as np
import pandas as pd

# recommenders imports

from recommenders.utils.timer import Timer
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.utils.notebook_utils import store_metadata

In [2]:
print(f"System version: {sys.version}")
print("Spark version: {}".format(pyspark.__version__))

System version: 3.9.20 (main, Oct  3 2024, 07:27:41) 
[GCC 11.2.0]
Spark version: 3.5.3


In [3]:
# top k items to recommend
TOP_K = 10

# Column names for the dataset
COL_USER = "user_id"
COL_TRACK = "track_id"
COL_COUNT = "playcount"

In [4]:
# the following settings work well for debugging locally on VM - change when running on a cluster
# set up a giant single executor with many threads and specify memory cap
spark = start_or_get_spark("ALS PySpark", memory="16g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

24/11/03 15:49:14 WARN Utils: Your hostname, manuel-albino-asus resolves to a loopback address: 127.0.1.1; using 192.168.87.143 instead (on interface wlp1s0)
24/11/03 15:49:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/03 15:49:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# read in the dataset into pyspark DataFrame
song_ratings = spark.read.option("header", "true") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv("../remappings/data/Modified_Listening_History.txt")

#remapping
song_ratings = song_ratings.withColumn("track_id_temp", song_ratings.track_id).withColumn("user_id_temp", song_ratings.user_id)

song_ratings = song_ratings.withColumn("track_id", song_ratings.user_id_temp).withColumn("user_id", song_ratings.track_id_temp)

# key = old column, value = new column
mapping = {
    "track_id": COL_USER,
    "user_id": COL_TRACK,
    "playcount": COL_COUNT
}

song_ratings = song_ratings.select(*[F.col(old).alias(new) for old, new in mapping.items()])
sample = song_ratings.sample(False, 0.001, 0)

# show matrix (track, user, playcount)
song_ratings.show(2, truncate=False)

                                                                                

+-------+--------+---------+
|user_id|track_id|playcount|
+-------+--------+---------+
|0      |27758   |1        |
|0      |2350    |1        |
+-------+--------+---------+
only showing top 2 rows



In [6]:
train, test = spark_random_split(song_ratings, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

                                                                                

N train 7281001




N test 2430300


                                                                                

In [7]:
alpha = 1 

# Transform playcount to confidence using the current alpha
train_with_confidence = train.withColumn("confidence", 1 + alpha * F.log(1 + F.col(COL_COUNT)))

header = {
    "userCol": COL_USER,
    "itemCol": COL_TRACK,
    "ratingCol": "confidence",
}

als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=True,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [8]:
with Timer() as train_time:
    model = als.fit(train_with_confidence)

print("Took {} seconds for training.".format(train_time.interval))



                                                                                

Took 46.41544395599999 seconds for training.


In [9]:
with Timer() as test_time:

    # Get the cross join of all user-item pairs and score them.
    users = train.select(COL_USER).distinct()
    items = train.select(COL_TRACK).distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_TRACK] == train[COL_TRACK]),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_COUNT}"].isNull()).select('pred.' + COL_USER, 'pred.' + COL_TRACK, 'pred.' + "prediction")

    # In Spark, transformations are lazy evaluation
    # Use an action to force execute and measure the test time 
    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))



24/11/03 15:50:24 WARN Column: Constructing trivially true equals predicate, 'user_id#47 = user_id#47'. Perhaps you need to use aliases.
24/11/03 15:50:24 WARN Column: Constructing trivially true equals predicate, 'track_id#48 = track_id#48'. Perhaps you need to use aliases.
24/11/03 15:54:04 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-b7a6da90-aa6a-4ffe-a71e-d6c2d013ab2c/02/temp_shuffle_88309633-36d2-4c94-84d4-75a36f5d182b
24/11/03 15:54:04 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-b7a6da90-aa6a-4ffe-a71e-d6c2d013ab2c/12/temp_shuffle_9b7727d8-82db-4157-9e83-2399a9bf5f8e
24/11/03 15:54:04 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-b7a6da90-aa6a-4ffe-a71e-d6c2d013ab2c/36/temp_shuffle_b359a879-1ee4-4d81-9124-4d17bf4a4a7b
24/11/03 15:54:04 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-b7a6da90-aa6a-4ffe-a71e-d6c2d013ab2c/27/temp_shuffle_ac161440-6d38-455a-9355-1516795dc391
24/11/03 15:54:04 WARN DiskBlockObjectWriter: Error deleting /tm

Py4JJavaError: An error occurred while calling o202.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 714.0 failed 1 times, most recent failure: Lost task 1.0 in stage 714.0 (TID 756) (192.168.87.143 executor driver): java.io.IOException: No space left on device
	at java.base/java.io.FileOutputStream.writeBytes(Native Method)
	at java.base/java.io.FileOutputStream.write(FileOutputStream.java:354)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at org.apache.spark.io.MutableCheckedOutputStream.write(MutableCheckedOutputStream.scala:43)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
	at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:519)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:69)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:334)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.IOException: No space left on device
	at java.base/java.io.FileOutputStream.writeBytes(Native Method)
	at java.base/java.io.FileOutputStream.write(FileOutputStream.java:354)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at org.apache.spark.io.MutableCheckedOutputStream.write(MutableCheckedOutputStream.scala:43)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
	at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:519)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:69)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:334)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
top_all.show()

In [None]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item=COL_TRACK, 
                                    col_rating=COL_COUNT, col_prediction="prediction", 
                                    relevancy_method="top_k")

In [None]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

In [None]:
# Generate predicted ratings.
prediction = model.transform(test)
prediction.cache().show()

In [None]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_TRACK, 
                                    col_rating=COL_COUNT, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

In [None]:
# cleanup spark instance
spark.stop()