# This workshop demonstrates how to apply the ALS algorithm within pyspark ml library to the Movielens dataset and also to the Deskdrop dataset.


In [1]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=8c31e0de5dac1d86f9e16352c6086446eb0b2c343c57a9f2926e1a3ad1373089
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [2]:
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
spark = SparkSession.builder.getOrCreate()
#spark = SparkSession.builder.appName('workshop4B').master("local[*]").getOrCreate() # give more details, e.g. if you had 4 cores you could replace * with 4:
spark

In [None]:
# Note: if you want to stop the spark session at any time use the below
spark.stop()

In [4]:
# to read in data from a text file, first upload the data file into your google drive and then mount your google drive onto colab
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


We begin by first demonstrating how to use pySpark ALS on explicit ratings data (Movielens)..... 

## Step1: Load and prepare the Movielens data

In [5]:
# read in the movielens 100K datatset into a pyspark dataframe (a distributed dataframe)
# FYI: more info on spark dataframes: https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html

file = "/content/drive/My Drive/recsys/u_data.csv"
ratings_spdf = spark.read.csv(file, header=True)
newcolnames = ['userid','itemid','rating','datetime']
for c,n in zip(ratings_spdf.columns,newcolnames):
    ratings_spdf=ratings_spdf.withColumnRenamed(c,n)
ratings_spdf.printSchema()
ratings_spdf.show(10)

root
 |-- userid: string (nullable = true)
 |-- itemid: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- datetime: string (nullable = true)

+------+------+------+---------+
|userid|itemid|rating| datetime|
+------+------+------+---------+
|     1|     1|     5|874965758|
|     1|     2|     3|876893171|
|     1|     3|     4|878542960|
|     1|     4|     3|876893119|
|     1|     5|     3|889751712|
|     1|     6|     5|887431973|
|     1|     7|     4|875071561|
|     1|     8|     1|875072484|
|     1|     9|     5|878543541|
|     1|    10|     3|875693118|
+------+------+------+---------+
only showing top 10 rows



In [6]:
# FYI: show how many partitions the data is spread over
# for large datsets you can partition the data across multiple CPU's
# e.g. see:  https://sparkbyexamples.com/spark/spark-partitioning-understanding/
ratings_spdf.rdd.getNumPartitions()

1

In [7]:
# convert rating to float
ratings_spdf = ratings_spdf.withColumn("rating", ratings_spdf.rating.cast("Float"))
ratings_spdf.printSchema()
ratings_spdf.select("userid","itemid","rating").show(5)

root
 |-- userid: string (nullable = true)
 |-- itemid: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- datetime: string (nullable = true)

+------+------+------+
|userid|itemid|rating|
+------+------+------+
|     1|     1|   5.0|
|     1|     2|   3.0|
|     1|     3|   4.0|
|     1|     4|   3.0|
|     1|     5|   3.0|
+------+------+------+
only showing top 5 rows



In [8]:
# this generates a nice summary of the dataset
ratings_spdf.describe().show()

+-------+-----------------+------------------+------------------+-----------------+
|summary|           userid|            itemid|            rating|         datetime|
+-------+-----------------+------------------+------------------+-----------------+
|  count|           100000|            100000|            100000|           100000|
|   mean|        462.48475|         425.53013|           3.52986|8.8352885148862E8|
| stddev|266.6144201275064|330.79835632558417|1.1256735991443165|5343856.189502888|
|    min|                1|                 1|               1.0|        874724710|
|    max|               99|               999|               5.0|        893286638|
+-------+-----------------+------------------+------------------+-----------------+



In [9]:
# In general, userid and itemid are alphanumeric strings. Hence we need to convert them to numeric to facilitate matrix indexing.
# To do this we create dictionaries to map userids and itemids to integer indices (same as we did in workshop2)
# (Note: in this datafile, userid and itemid are integers anyway, so we could instead simply cast them to integer instead of creating a map)
import numpy as np
userids = np.sort([x.userid for x in ratings_spdf.select("userid").distinct().collect()])
userid_encode = {x: i for i, x in enumerate(userids)}
itemids = np.sort([x.itemid for x in ratings_spdf.select("itemid").distinct().collect()])
itemid_encode = {x: i for i, x in enumerate(itemids)}
print(len(userids), len(itemids))

943 1682


In [10]:
# copy the integer indices into the ratings dataframe
rdd2=ratings_spdf.rdd.map(lambda x: (userid_encode[x[0]],itemid_encode[x[1]],float(x[2])))
ratings_spdf = rdd2.toDF()

# reinsert the column names
for c,n in zip(ratings_spdf.columns,newcolnames):
    ratings_spdf=ratings_spdf.withColumnRenamed(c,n)

# show the results
ratings_spdf.printSchema()
ratings_spdf.show(5)

root
 |-- userid: long (nullable = true)
 |-- itemid: long (nullable = true)
 |-- rating: double (nullable = true)

+------+------+------+
|userid|itemid|rating|
+------+------+------+
|     0|     0|   5.0|
|     0|   794|   3.0|
|     0|   905|   4.0|
|     0|  1016|   3.0|
|     0|  1127|   3.0|
+------+------+------+
only showing top 5 rows



In [11]:
# split data into training and test sets (these are also spark dataframes)
(training, test) = ratings_spdf.randomSplit([0.8, 0.2])
print(type(training))
print("trainset=",training.count(), "test set=", test.count())

<class 'pyspark.sql.dataframe.DataFrame'>
trainset= 80008 test set= 19992


## Step2: Build the recommendation model using ALS on the training data

Parameters for the ml ALS algorithm:

*   numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
*   rank is the number of latent factors in the model (defaults to 10).
*   maxIter is the maximum number of iterations to run (defaults to 10).
*   regParam specifies the regularization parameter in ALS (defaults to 1.0).
*   implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback). When selecting implicit, the algorithm defined in "Collaborative Filtering for Implicit datasets (Yifan Hu; Yehuda Koren; Chris Volinsky) is used.
*  alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).
*   nonnegative specifies whether or not to use nonnegative constraints for least squares (defaults to false).

https://spark.apache.org/docs/latest/ml-collaborative-filtering.html

In [12]:
# Note: we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
# By default, Spark assigns NaN predictions during ALSModel.transform when a user and/or item factor is not present in the model. 
# This is useful in a production system but undesirable during cross-validation, since NaN predicted values will result in NaN results for the evaluation metric.
# The number of latent features (rank) is set below at 15. Please experiment with diffent values to try to get optimum values

als = ALS(maxIter=20, rank=15, regParam=0.01, userCol="userid", itemCol="itemid", ratingCol="rating", coldStartStrategy="drop", implicitPrefs=False)


In [13]:
# fit the model
model = als.fit(training)

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 49596)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/IPython/core/interactiveshell.py", line 3326, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-13-e97d9f887b1c>", line 2, in <module>
    model = als.fit(training)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/ml/base.py", line 205, in fit
    return self._fit(dataset)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/ml/wrapper.py", line 383, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/lib/python3.8/dist-packages/pyspark/ml/wrapper.py", line 380, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/lib/python3.8/dist-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/usr/local/lib/python3.8/dist-pack

ConnectionRefusedError: ignored

In [None]:
# Evaluate the model by computing the MAE (or RMSE) on the test data
# (try comparing the performance treating the data as explicit ratings versus implicit ratings)
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")
error = evaluator.evaluate(predictions)
print("Mean Absolute error = ", error)

In [None]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 recommendations for each movie (ie for each movie show the top 10 'similar' movies (Note: similarity is based on user ratings not item features)
movieRecs = model.recommendForAllItems(10)

# Note: both of the above outputs are Spark dataframes
userRecs.show(10)
movieRecs.show(10)

In [None]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)

In [None]:
# To view the recommendations using the actual movie names we first load the movie names from file.
import pandas as pd
file = "/content/drive/My Drive/recsys/u_item.csv"
itemdata = pd.read_csv(file, dtype=str)
print(itemdata[0:5]) # show a sample of the file, it also contains movie genre info - but we ignore this
titlelookup = dict(zip(itemdata["movie id"],itemdata["movie name"])) # create a lookup dictionary

In [None]:
# now swap the actual movie names into the movie->movie recommendations
recs = movieRecs.head(5)
for i in recs:
  print("target content=",titlelookup[itemids[i[0]]])
  print("recommended content:")
  for rec in i[1]:
    print(titlelookup[itemids[rec[0]]])
  print("\n")

Note: example code for using MLlib ALS is shown below

In [None]:
# FYI ASIDE: an example of using MLlib ALS
# note that model.train and model.trainImplicit are part of MLlib and operate on rdd data (not dataframe)
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.recommendation.ALS.html

import pyspark.mllib.recommendation as mllr
als = mllr.ALS()
mll_model = als.train(training.rdd, rank=15, iterations=20, lambda_=0.01)
#mll_model = als.trainImplicit(training.rdd, rank=15, iterations=20, lambda_=0.01) # use if the ratings are implicit

# Now we load the Deskdrop dataset, create an integer implicit rating and then compare the performance of implicit versus explict ALS on the same dataset

In this section, we show how a variety of implicit signals can be assembled into an integer (non-binary) implicit ratings matrix. This is then factorised using the Spark implicit ALS algorithm and used to generate recommendations.

The data set used is the deskdrop dataset. Deskdrop is an internal communications platform that allows company employees to share relevant articles with their peers, and collaborate around them. 


**part1: In the first half of this section we use Pandas to preprocess the data**

In [None]:
import pandas as pd
# import pyspark.pandas as ps  # alternative method

In [None]:
# load the deskdrop interaction events (12 months worth) for individual users
# the type of interaction (view, like, bookmark, follow, comment) is stored in the variable eventType
file = "/content/drive/My Drive/recsys/deskdrop_users_interactions.csv"

interactions_df = pd.read_csv(file)
interactions_df.drop(['userAgent', 'userRegion', 'userCountry'], axis=1, inplace=True)
interactions_df.head(5)

In [None]:
#view the frequency of the different types of events
ratings_df = interactions_df[['contentId','personId', 'eventType']]
ratings_df['eventType'].value_counts()

In [None]:
# Create an implict integer rating called eventStrength based on the type of the interaction with the article
# E.g, assume a bookmark indicates a higher interest than a like etc.
# To do this, create a dictionary to associate each eventType with a weight.
event_type_strength = {
   'VIEW': 1.0,
   'LIKE': 2.0, 
   'BOOKMARK': 3.0, 
   'FOLLOW': 4.0,
   'COMMENT CREATED': 5.0,  
}
#ratings_df['eventStrength'] = ratings_df['eventType'].apply(lambda x: event_type_strength[x]) # using direct assignment gives a warning
temp = ratings_df['eventType'].apply(lambda x: event_type_strength[x])
ratings_df.insert(3,"eventStrength",temp,True)
ratings_df.sample(5) # show a sample

In [None]:
# Optional: 
# ignore repeat interactions by a user on a document that have the same type (eg ignore multiple views of a document)
# This is debatable: repeat views may indicate more interest in a document so perhaps we should keep, but do repeat bookmarks mean more interest or just forgetfulness?
# One variant is to ignore repeated likes, bookmarks and follows but do not ignore repeated views?
# If you have time, try experimenting with your own mapping of user interactions to implicit user ratings
print(ratings_df.shape) # show size before
ratings_df = ratings_df.drop_duplicates() 
print(ratings_df.shape) # show size after

In [None]:
# if a user has multiple interactions on the same content then sum the event strengths
# (we now use the summed eventStrengths as the implicit integer ratings)
ratings_df = ratings_df.groupby(['personId', 'contentId']).sum().reset_index()
print(ratings_df.shape)
ratings_df.sample(5)

In [None]:
# plot the eventStrengths for each user/content
import matplotlib.pyplot as plt
plt.plot(ratings_df.personId, ratings_df.eventStrength.sort_values())

In [None]:
# alternatively, we can plot the eventStrengths as a histogram
plt.hist(ratings_df.eventStrength, bins = 50)

In [None]:
# optional: truncate the strengths at 20 
ratings_df.loc[ratings_df.eventStrength > 20, 'eventStrength'] = 20
ratings_df.sample(5)

In [None]:
# convert column names to standard names
ratings_df.columns = ['userid','itemid','rating']
ratings_df.dtypes

In [None]:
# convert user and item IDs to integers
ratings_df = ratings_df.astype({"userid": int, "itemid":int})
ratings_df.dtypes

**part2:  now we convert the generated ratings to a spark dataframe and then build and apply the ALS model as was done above with Movielens**

Compare results obtained using explict ALS with those from implicit ALS, does treating the ratings as implicit give better results?

In [None]:
# convert from pandas dataframe to spark dataframe
ratings_spdf = spark.createDataFrame(ratings_df)
ratings_spdf.printSchema()
ratings_spdf.show(10)

In [None]:
# We convert the userID and itemID  to numeric indexes to facilitate matrix indexing.
# first create the mappings as dictionaries
import numpy as np
userids = np.sort([x.userid for x in ratings_spdf.select("userid").distinct().collect()])
userid_encode = {x: i for i, x in enumerate(userids)}
itemids = np.sort([x.itemid for x in ratings_spdf.select("itemid").distinct().collect()])
itemid_encode = {x: i for i, x in enumerate(itemids)}
print(len(userids), len(itemids))

In [None]:
# now copy the integer indices into the ratings dataframe
rdd2=ratings_spdf.rdd.map(lambda x: (userid_encode[x[0]],itemid_encode[x[1]],float(x[2])))
ratings_spdf = rdd2.toDF()
# restore the column names
newcolnames = ['userid','itemid','rating']
for c,n in zip(ratings_spdf.columns,newcolnames):
    ratings_spdf=ratings_spdf.withColumnRenamed(c,n)
ratings_spdf.show(10)

In [None]:
# split data into training and test sets
(training, test) = ratings_spdf.randomSplit([0.8, 0.2])
print("trainset=",training.count(), "test set=", test.count())

In [None]:
# perform the matrix factorisation
als = ALS(maxIter=20, rank=15, regParam=0.01, userCol="userid", itemCol="itemid", ratingCol="rating", coldStartStrategy="drop", implicitPrefs=True)
model = als.fit(training)

In [None]:
# Evaluate the model by computing the MAE (or RMSE) on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")
error = evaluator.evaluate(predictions)
print("Mean Absolute error = ", error)

In [None]:
# is the above MAE good or bad? calculating the stdev of the rating may help decide
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

stats = training.select(
    _mean(col('rating')).alias('mean'),
    _stddev(col('rating')).alias('std')
).collect()

mean = stats[0]['mean']
std = stats[0]['std']
print(mean,std)