<i>Adapted from Recommenders ALS example</i>

# Running ALS on MIND (with PySpark)

Matrix factorization by [ALS](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/recommendation.html#ALS) (Alternating Least Squares) is a well known collaborative filtering algorithm.

This notebook provides an example of how to utilize and evaluate ALS PySpark ML (DataFrame-based API) implementation, meant for large-scale distributed datasets.

In [1]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import os
import sys
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType

from recommenders.utils.timer import Timer
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark

from tempfile import TemporaryDirectory
from recommenders.datasets.mind import download_mind
from recommenders.datasets.download_utils import unzip_file

print(f"System version: {sys.version}")
print("Spark version: {}".format(pyspark.__version__))


System version: 3.12.3 (main, Feb  4 2025, 14:48:35) [GCC 13.3.0]
Spark version: 3.5.4


Set the default parameters.

In [None]:
# top k items to recommend
TOP_K = 10

# MIND sizes: "demo", "small", or "large"
mind_type = 'demo'

# Column names for the dataset
COL_USER = "user_id"
COL_ITEM = "news_id"
COL_RATING = "rating"

### 0. Set up Spark context & directory

The following settings work well for debugging locally on VM - change when running on a cluster. We set up a giant single executor with many threads and specify memory cap. 

In [3]:
# the following settings work well for debugging locally on VM - change when running on a cluster
# set up a giant single executor with many threads and specify memory cap
spark = start_or_get_spark("ALS PySpark", memory="16g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

25/04/06 10:54:12 WARN Utils: Your hostname, sondre-ThinkPad-E580 resolves to a loopback address: 127.0.1.1; using 10.21.36.87 instead (on interface enx6c02e0d7834b)
25/04/06 10:54:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/06 10:54:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Setup data storage location

tmpdir = TemporaryDirectory()
data_path = tmpdir.name
train_zip, valid_zip = download_mind(size=mind_type, dest_path=data_path)
unzip_file(train_zip, os.path.join(data_path, 'train'), clean_zip_file=False)
unzip_file(valid_zip, os.path.join(data_path, 'valid'), clean_zip_file=False)
train_behaviors_path = os.path.join(data_path, "train", "behaviors.tsv")

  6%|▌         | 32.3k/519k [00:12<02:50, 2.85kKB/s]25/04/06 10:54:30 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
100%|██████████| 519k/519k [02:01<00:00, 4.29kKB/s] 
100%|██████████| 101k/101k [00:32<00:00, 3.07kKB/s] 


### 1. Download the MIND dataset

In [None]:
# Schema for behaviors.tsv
schema = StructType([
    StructField("impression_id", StringType(), True),  # Ignored for ALS
    StructField(COL_USER, StringType(), True),  # Will be converted later
    StructField("timestamp", StringType(), True),  # Convert to long if needed
    StructField("history", StringType(), True),  # List of past clicked news
    StructField("impressions", StringType(), True)  # Needs to be split into news_id + rating
])

# Load raw behaviors.tsv
data = (
    spark.read.option("sep", "\t").option("header", "false")
    .schema(schema)
    .csv(train_behaviors_path)
)
data = data.withColumn(
    "impressions",
    F.expr("filter(split(impressions, ' '), x -> x rlike '^[a-zA-Z0-9_]+-1$')")
)

# Explode and extract clicked news_id and rating
data = data.withColumn("impressions", F.explode("impressions"))
data = data.withColumn(COL_ITEM, F.split("impressions", "-")[0])
data = data.withColumn(COL_RATING, F.lit(1))

# Convert user_id and news_id to integers (ALS requires numeric IDs)
from pyspark.sql.functions import regexp_extract, col
from pyspark.sql.types import IntegerType

# Extract numeric part from IDs like "U123" or "N456"
data = data.withColumn(COL_USER, regexp_extract(col(COL_USER), r"\d+", 0).cast(IntegerType()))
data = data.withColumn(COL_ITEM, regexp_extract(col(COL_ITEM), r"\d+", 0).cast(IntegerType()))

# Drop unnecessary columns
data = data.select(COL_USER, COL_ITEM, COL_RATING)

# Count and remove articles / users with few interactions
user_counts = data.groupBy(COL_USER).count().filter(F.col("count") >= 10)
news_counts = data.groupBy(COL_ITEM).count().filter(F.col("count") >= 10)

data = data.join(user_counts, "user_id").join(news_counts, "news_id")


# Show transformed data
data.show()
data.groupBy(COL_RATING).count().show()


                                                                                

+-------+-------+------+-----+-----+
|news_id|user_id|rating|count|count|
+-------+-------+------+-----+-----+
| 128367|    362|     1|   29| 1817|
|  38395|   1417|     1|   14|  400|
|  19530|   2601|     1|   15|   84|
| 128367|   5100|     1|   10| 1817|
|   1088|   5880|     1|   22|  585|
| 117994|   8654|     1|   27|  927|
|  19553|   8864|     1|   14|   38|
| 128367|  10862|     1|   27| 1817|
| 128367|  11876|     1|   29| 1817|
| 128367|  12384|     1|   20| 1817|
| 128367|  13261|     1|   22| 1817|
|  12027|  14477|     1|   13|  504|
|  47711|  15398|     1|   11| 1277|
| 128367|  15479|     1|   16| 1817|
|   1088|  15557|     1|   15|  585|
|   1088|  15557|     1|   15|  585|
| 128367|  16224|     1|   14| 1817|
|  12027|  16383|     1|   13|  504|
|  53691|  17172|     1|   70|  134|
| 128367|  17388|     1|   18| 1817|
+-------+-------+------+-----+-----+
only showing top 20 rows



[Stage 12:>  (0 + 8) / 11][Stage 13:>  (0 + 0) / 11][Stage 14:>  (0 + 0) / 11]1]

### 2. Split the data using the Spark random splitter provided in utilities

In [None]:
train, test = spark_random_split(data, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

                                                                                

N train 69472




N test 23065


                                                                                

### 3. Train the ALS model on the training data, and get the top-k recommendations for our testing data

To article interactions movie ratings, we use the rating data in the training set as users' explicit feedback.

In [None]:
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM,
    "ratingCol": COL_RATING,
}


als = ALS(
    rank=50,
    maxIter=15,
    implicitPrefs=True,
    regParam=0.01,
    coldStartStrategy='drop',
    nonnegative=True,
    seed=42,
    alpha=45,
    **header
)

In [None]:
with Timer() as train_time:
    model = als.fit(train)

print(f"Took {train_time.interval} seconds for training.")

25/04/06 10:35:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

Took 23.36020694900003 seconds for training.


In [None]:
with Timer() as test_time:

    # Get the cross join of all user-item pairs and score them.
    users = train.select(COL_USER).distinct()
    items = train.select(COL_ITEM).distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
        .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

    # In Spark, transformations are lazy evaluation
    # Use an action to force execute and measure the test time 
    top_all.cache().count()

print(f"Took {test_time.interval} seconds for prediction.")

25/04/06 10:35:59 WARN Column: Constructing trivially true equals predicate, 'user_id#40 = user_id#40'. Perhaps you need to use aliases.
25/04/06 10:35:59 WARN Column: Constructing trivially true equals predicate, 'news_id#48 = news_id#48'. Perhaps you need to use aliases.

Took 111.45384966400002 seconds for prediction.


                                                                                

In [None]:
top_all.show()

+-------+-------+------------+
|user_id|news_id|  prediction|
+-------+-------+------------+
|      6|   5652|   0.8108743|
|      6|  13259|   0.6352661|
|      6|  14713|         0.0|
|      6|  29003| 0.017788233|
|      6|  31370| 0.058910854|
|      6|  32891|         0.0|
|      6|  35767|  0.34826005|
|      6|  43595|  0.23864116|
|      6|  45422|  0.20360477|
|      6|  54055| 0.010976363|
|      6|  59252| 0.061958354|
|      6|  63319|  0.16164823|
|     19|   6330|  0.11190914|
|     19|   9163|  0.36127266|
|     19|  26649|0.0040073525|
|     19|  32854|  0.14088264|
|     19|  34799| 8.937518E-4|
|     19|  44737|    0.414568|
|     19|  59653| 0.038909324|
|     19|  61023| 0.065751165|
+-------+-------+------------+
only showing top 20 rows



### 4. Evaluate how well ALS performs

In [None]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction", 
                                    relevancy_method="top_k")

                                                                                

In [None]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')



Model:	ALS
Top K:	10
MAP:	0.024064
NDCG:	0.047498
Precision@K:	0.023703
Recall@K:	0.061861


                                                                                

demo:

Model:	ALS
Top K:	10
MAP:	0.020436
NDCG:	0.039740
Precision@K:	0.019577
Recall@K:	0.050302

small: 

Model:	ALS
Top K:	10
MAP:	0.024064
NDCG:	0.047498
Precision@K:	0.023703
Recall@K:	0.061861

### 5. Evaluate rating prediction

In [None]:
# Generate predicted ratings.
prediction = model.transform(test)
prediction.cache().show()




+-------+-------+------+-----+-----+------------+
|news_id|user_id|rating|count|count|  prediction|
+-------+-------+------+-----+-----+------------+
|   4612|  42635|     1|   11|   24| 0.010642283|
|  24176|  31236|     1|   17|  554|  0.39615127|
|  24176|  42635|     1|   11|  554|  0.25372392|
|  49712|  64822|     1|   12|  507|   0.6526933|
|  60872|  64822|     1|   12|  140| 0.029460328|
|  56211|  80451|     1|   13|  608|  0.01811748|
|  17059|  40011|     1|   11|  480| 0.017920684|
|  18406|  37489|     1|   10|   78| 0.029273797|
|   1034|  55283|     1|   30|  864|    0.360135|
|  15855|  31236|     1|   17|  337|  0.04366128|
|  26227|  22097|     1|   21|  130|   1.3879542|
|  55943|   5803|     1|   15|  541|0.0076231123|
|  36226|  61793|     1|   27|  452|   0.6577224|
|  51048|  12027|     1|   14| 1875| 0.095313706|
|  51048|  43935|     1|   25| 1875|   0.5148454|
|   4642|  40653|     1|   10| 1189| 0.014704079|
|   4642|  55283|     1|   30| 1189|   0.9233556|


                                                                                

In [None]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

                                                                                

Model:	ALS rating prediction
RMSE:	0.755191
MAE:	0.692073
Explained variance:	-inf
R squared:	-inf


  return 1 - np.divide(var1, var2)


In [None]:
# cleanup spark instance and clear temp directory
spark.stop()
tmpdir.cleanup()