# Slope One Algorithm in PySpark

Brendan Hawk  
2022-06-21

### A Formal Definition of the problem: Predicting Recommendations

Given a set of known ratings from Users A and B, and items I and J, can we predict what User B will rate Item J?

| | Item 1 | Item 2 | Item 3 | ... | Item _i_ |
|:-|-:|-:|-:|:-:|-:
| User 1 | 1.5 | 4 | 2 | ... | 4 |
| User 2 | 2 | **?** | 2 | ... | 3 |
| User 3 | 5 | 3 | 3 | ... | **?** |
| ... | ... | ... | ... | ... | ... |
| User u | 1 | 2 | **?** | ... | 3.5 |


The formal approach to this problem is often described in the context of applying direct mathematical solutions using _Matrix Decomposition_, where the full matrix (above) is decomposed or factorised into a set of two matrices of latent factors, one for the Users $H$ and one for the Items $W$ such that:

$$ \tilde{R} = HW $$

These models, however they're generated, have one major drawback: even generating the full matrix of combinations above may be impossible or prohibitively slow due to the steep increase in size as the number of Users and Items grows. Even the smallest of the datasets included in this project would yield a matrix of 5,400,000 cells. The largest would be as large as 16,240,000,000 cells! This takes a considerable amount of operating memory and time.

### Slope One

In a paper first published in 2005, two researchers[1] introduce the Slope One algorithm. In effect, the basic idea is to use prior knowledge of a user's previously rated items and other users' ratings for items in common with the first user and target item to generate a sort of matrix of deviations that can then be averaged into an approximation of the desired target prediction.

For example:

![a](https://upload.wikimedia.org/wikipedia/commons/c/cb/Simplicity_diagram.png)[2]

To calculate $Pred_{(B, J)}$ we first find the deviation of user A's ratings from item I to item J, then apply it to the known rating for user B, item I. Since both User A and B have rated item I, we can use that deviation to predict our target rating:

$$ Pred_{(B, J)} = R_{(B, I)} + (R_{(B, J)} - R_{(B, I)}) $$
$$ Pred_{(B, J)} = 2 + (1.5 - 1) $$
$$ Pred_{(B, J)} = 2.5 $$

To go a step further, we can use the averages of this differences when there is more data to aggregate for our prediction. This approach uses a weighted average, giving more weight to the differences that are accumulated from a larger number of users who have rated both items:

| | $i_1$ | $i_2$ | $i_3$ |
|:-|-:|-:|-:|
| $u_1$ | 3 | 5 | 4 |
| $u_2$ | 3 | 2 | **?** |
| $u_3$ | **?** | 2 | 4 |


$$ Pred_{u_3,i_1} = \frac{\sum{((R_{u_3,i_n} + b) * support)}}{\sum{support}} $$

where $b$ is the average difference in ratings between items, and $support$ is the numerator for these averages (ie the number of times two items were both rated by a user):

$$ b_{i_2, i_1} = \frac{\sum_n{R_{i_2} - R_{i_1}}}{n} $$

In our example above we have two sets of $b$ and $support$:

| Co-Rated Item | Target Item | $b$ | $support$ |
|:-:|:-:|:-:|:-:|
| $i_2$ | $i_1$ | ((5-3) + (2-3)) / 2 = 0.5 | 2 |
| $i_3$ | $i_1$ | (4-3) / 1 = 1 | 1 |

Finally, we can apply these averages and their weights to our known ratings for the target user:

$$ Pred_{u_3,i_1} = \frac{((R_{u_3,i_2} + b_{i_2,i_1}) * support_{i_2,i_1}) + ((R_{u_3,i_3} + b_{i_3,i_1}) * support_{i_3,i_1})}{support_{i_2,i_1}+support_{i_3,i_1}} $$
$$ Pred_{u_3,i_1} = \frac{((2+0.5)*2) + ((4+1)*1)}{2+1} $$
$$ Pred_{u_3,i_1} = 3.33 $$


[1]: Lemire, Daniel and Maclachlan, Anna (2007). Slope One Predictors for Online Rating-Based Collaborative Filtering. https://doi.org/10.48550/arXiv.cs/0702144  
[2]: https://en.wikipedia.org/wiki/Slope_One

### Advantages of Slope One

There are three main advantages to this algorithm that I find particularly compelling. First is the simplicity of its math. Matrix Decomposition approaches require based on much higher order math that can be difficult to understand and implement effectively. Current Big Data implementations also rely on a lot of iterative approximation, as in Gradient Descent, which requires time to train.

Second is its understandability. You can summarise an explanation of this algorithm by describing that it takes all common comparisons where users have rated similar items and uses an average difference between them to approximate the target rating. This would be understandable and is explainable to anyone, even people well outside the realm of Data Science or other analytics fields.

Lastly, and as someone with a software engineering background this is of particular interest to me: this model is easily updatable. the entire model is summarised in the values for $b$ and $support$, and if new data comes in (which it does, constantly!) you can update the model by simply accessing the model rows for the given item reconstitute new averages for the difference. This is only possible due to the storage of the $support$ value, so that you can always reverse the quotient of the average, add the new value to the numerator, increment the divisor, and calculate a new quotient. Other models mentioned here would be nearly impossible to update so succinctly, and would in effect require recalculating from scratch.

### Caveats

There are two main caveats to this approach. First, this approach still has relatively high memory requirements. While you no longer need to accomodate the entire matrix as described above, you need to have a matrix of all the differences as described. This is hard to calculate up front, as every user will have a different number of ratings and therefore combinations, but it should still be considerably less than a complete matrix as it is effectively like a dense variance matrix derived from a sparse matrix source.

Second, it is possible that there is no data to build a prediction on. If a user has given no ratings, or if a user has given no ratings that can be tied to our model by combinations of other ratings with the target item, the prediction for this model will be blank. As a somewhat naive approach here I will calculate the mean score for each item to impute any missing predictions.

### Applying Slope One to large datasets

For this example, we will use the Latest Small MovieLens dataset from GroupLens[3]. This dataset contains 100,000 ratings on 9000 movies by 600 users

There are two major steps to consider: building the model and formulating predictions. First, we will build our model.

While Slope One effectively requires the storage of a full matrix of differences between all combinations of movies, it is easy to store this in a "long" format that is far easier to parallelize, rather than a true "matrix" object. The task is then a simple aggregation step to build the average of the differences and the count of ratings used to create that average.

The first part of this is the expensive part. This is a partial cross join, where we take every user and find every combination of their ratings. In this implementation, order matters resulting in duplications of these pairs such as $(Item_1, Item_2)$ separate from $(Item_2, Item_1)$. There is room for improvement here in terms of memory or storage space used if you refactor the model to only store the "lower half" matrix of these differences (effectively, permutation instead of combination) and alter the final prediction algorithm accordingly as well.

[3]: F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems (TiiS) 5, 4: 19:1–19:19. https://doi.org/10.1145/2827872

In [None]:
from __future__ import print_function

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, FloatType
from pyspark.sql.functions import avg, col, sum, count

import numpy as np

# Get our Spark contexts ready
conf = SparkConf().setAppName("Slope-One").setMaster("local[*]")
sc = SparkContext(conf=conf)
sparkSql = SparkSession.builder.getOrCreate()

In [None]:
# Read the raw ratings data
ratings = sparkSql.read.csv(
    "./data/100k.csv.bz2",
    header=False,
    schema=StructType([
        StructField("userId", IntegerType(), False),
        StructField("movieId", IntegerType(), False),
        StructField("rating", FloatType(), False),
        StructField("timestamp", StringType(), False)
    ])
).select("userId", "movieId", "rating")

# Split the training and test sets
(training, test) = ratings.randomSplit([0.8, 0.2], seed=777)

# We'll use this a couple times, so caching it will help the performance
training.cache()

training.show(5)

In [None]:
# Calculate the movie means.
# There's a possibility the Slope One algorithm will not have enough data to make a prediction
# for a given (user, movie), and the mean score for a movie becomes a simple naive fallback.
movie_means = training.groupBy("movieId").agg(avg("rating").alias("mean"))

In [None]:
# Now, build the model.

# First, create two references to the training data with columns renamed
model_data_left = training.select(
    "userId",
    col("movieId").alias("movieId1"),
    col("rating").alias("rating1")
)
model_data_right = training.select(
    "userId",
    col("movieId").alias("movieId2"),
    col("rating").alias("rating2")
)

# This join will create rows for every combination of movies a user has rated,
# for every user in the training dataset. In development, this has performed MUCH faster
# than creating these combinations by hand using itertools.combinations or similar functions.
model = (
    model_data_left
    .join(model_data_right, on = "userId", how = "left")
    .filter("movieId1 != movieId2")
)

model.show(5)

In [None]:

# Now we can aggregate the final model values for (b, support) for every combination created
model = (
    model
    .withColumn("b", col("rating2") - col("rating1"))
    .groupBy("movieId1", "movieId2")
    .agg(sum("b").alias("b"), count("userId").alias("support"))
    .withColumn("b", col("b") / col("support"))
)

model.cache()
model.show(10)

### Generating the Predictions

This step is actually the slow part. Slope One would be a much better choice for an application that doesn't try to compute entire datasets at once. It would have an easy implementation for a database-driven application, where you could query and calculate very small sets of data (ie for a single user and item) in parallel.

However, I was able to implement this using PySpark to generate the entire set of missing ratings in our test dataset as follows. The aggregation of the data is once again a single easy step - the tricks are all in the joins beforehand, which can be very expensive in a distributed system.

First, I've implemented the process for a single example. You could imaging replacing the filter steps on the `training` and `model` values with sql `SELECT ...` statements instead.

In [None]:
# To make understanding the prediction process easier, the following implementation creates one prediction.
sample = (
    ratings
    .rdd
    .takeSample(False, 1, seed = 2022)[0]
)
prediction_user = sample["userId"]
prediction_movie = sample["movieId"]

# First, get a reference to the model data for this movie
prediction_model = model.filter("movieId2 = {}".format(prediction_movie))

(prediction_b, prediction_support) = (
    training
    # Get all ratings that this user has given for OTHER movies
    .filter("userId = {} and movieId != {}".format(prediction_user, prediction_movie))
    # Join all model values for those movies
    .withColumnRenamed("movieId", "movieId1")
    .join(prediction_model, on="movieId1")
    # Aggregate them into a single value for (b, support)
    # For every row, calculate sum((user_rating + b) * support) and sum(support)
    .agg(sum((col("rating") + col("b")) * col("support")).alias("b"), sum(col("support")).alias("support"))
    .collect()[0]
)

final_prediction = prediction_b / prediction_support

print("True Rating {:0.1f}".format(sample["rating"]))
print("Predicted Rating {:0.1f}".format(final_prediction))
print("Loss {:0.4f}".format(final_prediction - sample["rating"]))

Now, this only takes a couple seconds, but even for the test sample of the small dataset (20,000 rows) this would mean a full runtime of nearly a day.

Instead, we can replace the filter statements above with two more clever joins. These can be a bit complicated to reason and understand, but hopefully the comments on each join make the logic make sense.

In [None]:
# Making predictions for the entire test set relies on a series of somewhat complicated joins
# First: Join the movie means (our fallback for missing predictions) and alias a few of the columns
prediction_data = (
    test
    .join(movie_means, on="movieId")
    .select(
        col("userId").alias("user_pred"),
        col("movieId").alias("movie_pred"),
        col("rating").alias("rating_true"),
        "mean"
    )
)

print("Test data joined to movie means:")
prediction_data.show(5)

In [None]:
# This step joins the original training data (the co-ratings) for each user for all movies that are NOT the target movie
training_join = [training.userId == prediction_data.user_pred, training.movieId != prediction_data.movie_pred]
prediction_data = prediction_data.join(training, on=training_join, how="left")

print("Test data and means joined with original training data (co-ratings):")
prediction_data.show(5)

In [None]:
# This step joins the model data for the given (user, co-rating movie) combinations
model_join = [model.movieId2 == prediction_data.movie_pred, model.movieId1 == prediction_data.movieId]
prediction_data = prediction_data.join(model, on=model_join, how="left")

# Print out what this final table looks like before aggregation
print("Test data, means, and co-ratings joined with model data - Complete dataFrame for generating predictions:")
print(prediction_data.show(5))

In [None]:
# Now aggregate all the predictions as above.
predictions = (
    prediction_data
    .groupBy("user_pred", "movie_pred", "rating_true", "mean")
    # pred = sum((rating + b) * support) / sum(support)
    .agg((sum((col("rating") + col("b")) * col("support")) / sum("support")).alias("rating_pred"))
)

# Show some of our predictions
print("Sample of Predictions:")
print(predictions.show(5))

In [None]:
# If any of our predictions are null, let's check them out
print("Inspecting for missing predictions:")
print(predictions.filter("rating_pred IS NULL").show(25))

In [None]:
# calculate loss for the entire test set, here using RMSE as our loss function.
loss = np.array(
    predictions
    .rdd
    .map(lambda p: (p["rating_pred"] - p["rating_true"]) if p["rating_pred"] else (p["mean"] - p["rating_true"]))
    .collect()
)
loss = np.sqrt(np.sum(loss**2) / len(loss))
print("RMSE: {:0.4f}".format(loss))

# Results

The Slope One algorithm has its advantages, but in truth increasing accuracy over other models is not one of them. In development, accuracy (RMSE) ranges from ~0.85 to ~0.88.

For comparison, the `ALS` algorithm provided by PySpark ML has been implemented below. This uses all the defaults with no effort to tune the algorithm, and it performs just slightly worse than the Slope One algorithm above.

Given this, and other examples I have worked with in the past, Slope One seems to perform equal with other single models and is wide open for improvements through the use of other data work (such as sweeping other means or effects out of the original data) or as part of an ensemble model (perhaps the aggregations could be weighted by commonalities between users, such as how many genres they both like, etc.). This, coupled with the other benefits above, make it a sound option for collaborative filtering and recommendation applications.

I was able to run the included non-notebook version of this Slope One implementation on Google Compute using an `n1-highmem-2` for the master (2 vCPU, 13 GB memory) and 2 `n1-highmem-4` (4 vCPU, 26 GB memory) workers. Runtimes and Loss measures are listed below.

| Dataset | Runtime | Loss |
|:-|-:|-:|
| 100k | ~1 min | 0.8782 |
| 10M | ~15 min | 0.8562 |
| 27M | ~60 min | 0.8624 |

In [None]:

# For comparison, a quick and basic usage of pysaprk.ml.recommendation.ALS 
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import coalesce

# Build the recommendation model on the training data
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
model_als = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model_als.transform(test)

# Calculate the final loss for comparison
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
loss_als = evaluator.evaluate(predictions)
print("RMSE (ALS): {:0.4f}".format(loss_als))

In [None]:
# Stop the cluster
sc.stop()