## Recommendations with Spark ALS

In [1]:
import pyspark

In [2]:
sc = pyspark.SparkContext("local[*]")

In [3]:
sc.version

'2.2.0'

In [4]:
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.recommendation import Rating

In [5]:
def expand_user(a, user):
    return [Rating(user, item, ranking) for item, ranking in enumerate(a) if ranking != 0]

In [6]:
def expand_all(a):
    return [expand_user(items, user) for user, items in enumerate(a)]

### Here we have ratings from eight users for six different movies: Titanic, Dirty Dancing, Die Hard, Terminator 2, Wayne's World, and Zoolander. Or in other words, two romantic films, two action films, and two comedies. Each row is a user, each column is a movie.

### The ratings are constructed so that if a user has seen both movies in one of these pairs, their ratings for the two movies are similar.

### There is no evidence in this data that anyone likes all three film genres.

In [7]:
rawdata_old = [
    [5,5,0,0,0,0],
    [0,0,5,5,0,0],
    [0,0,0,0,5,5],
    [0,1,5,5,5,0],
    [1,1,5,0,5,5],
    [5,5,0,5,1,1],
    [5,0,0,5,0,1],
    [5,5,5,0,1,0]
    ]

rawdata = [
    [5,5,0,0,0,5],
    [0,0,5,5,0,5],
    [0,0,0,0,5,5],
    [0,1,5,5,5,5],
    [1,1,5,0,5,5],
    [5,5,0,5,1,5],
    [5,0,0,5,0,5],
    [5,5,5,0,1,5]
    ]
list_of_ratings = expand_all(rawdata)

In [8]:
# construct an RDD of Ratings for every non-zero rating
ratings = [val for sublist in list_of_ratings for val in sublist]
ratingsRDD = sc.parallelize(ratings)
ratingsRDD.take(5)

[Rating(user=0, product=0, rating=5.0),
 Rating(user=0, product=1, rating=5.0),
 Rating(user=0, product=5, rating=5.0),
 Rating(user=1, product=2, rating=5.0),
 Rating(user=1, product=3, rating=5.0)]

In [9]:
ratingsRDD.take(100)

[Rating(user=0, product=0, rating=5.0),
 Rating(user=0, product=1, rating=5.0),
 Rating(user=0, product=5, rating=5.0),
 Rating(user=1, product=2, rating=5.0),
 Rating(user=1, product=3, rating=5.0),
 Rating(user=1, product=5, rating=5.0),
 Rating(user=2, product=4, rating=5.0),
 Rating(user=2, product=5, rating=5.0),
 Rating(user=3, product=1, rating=1.0),
 Rating(user=3, product=2, rating=5.0),
 Rating(user=3, product=3, rating=5.0),
 Rating(user=3, product=4, rating=5.0),
 Rating(user=3, product=5, rating=5.0),
 Rating(user=4, product=0, rating=1.0),
 Rating(user=4, product=1, rating=1.0),
 Rating(user=4, product=2, rating=5.0),
 Rating(user=4, product=4, rating=5.0),
 Rating(user=4, product=5, rating=5.0),
 Rating(user=5, product=0, rating=5.0),
 Rating(user=5, product=1, rating=5.0),
 Rating(user=5, product=3, rating=5.0),
 Rating(user=5, product=4, rating=1.0),
 Rating(user=5, product=5, rating=5.0),
 Rating(user=6, product=0, rating=5.0),
 Rating(user=6, product=3, rating=5.0),


In [10]:
ratings

[Rating(user=0, product=0, rating=5),
 Rating(user=0, product=1, rating=5),
 Rating(user=0, product=5, rating=5),
 Rating(user=1, product=2, rating=5),
 Rating(user=1, product=3, rating=5),
 Rating(user=1, product=5, rating=5),
 Rating(user=2, product=4, rating=5),
 Rating(user=2, product=5, rating=5),
 Rating(user=3, product=1, rating=1),
 Rating(user=3, product=2, rating=5),
 Rating(user=3, product=3, rating=5),
 Rating(user=3, product=4, rating=5),
 Rating(user=3, product=5, rating=5),
 Rating(user=4, product=0, rating=1),
 Rating(user=4, product=1, rating=1),
 Rating(user=4, product=2, rating=5),
 Rating(user=4, product=4, rating=5),
 Rating(user=4, product=5, rating=5),
 Rating(user=5, product=0, rating=5),
 Rating(user=5, product=1, rating=5),
 Rating(user=5, product=3, rating=5),
 Rating(user=5, product=4, rating=1),
 Rating(user=5, product=5, rating=5),
 Rating(user=6, product=0, rating=5),
 Rating(user=6, product=3, rating=5),
 Rating(user=6, product=5, rating=5),
 Rating(user

In [11]:
rank = 4
numIterations = 5
als_lambda = 0.01
model = ALS.train(ratingsRDD, rank, numIterations, als_lambda, seed=5151, nonnegative=True)
# there is also a trainImplicit method that one uses when
# working with implicit ratings (it uses a different cost function)

In [12]:
# here we see the model's vector of features for each user
users = model.userFeatures().collect()
sorted(users, key=lambda x: x[0])

[(0, array('d', [0.6863377690315247, 0.0, 1.7894190549850464, 0.0])),
 (1, array('d', [1.4597450494766235, 0.0, 0.4064513146877289, 0.0])),
 (2, array('d', [1.724142074584961, 0.0, 0.0, 0.0])),
 (3, array('d', [1.7067937850952148, 0.0, 0.0, 0.0])),
 (4, array('d', [1.7011171579360962, 0.0, 0.0, 0.0])),
 (5, array('d', [0.4811097979545593, 0.0, 1.8936035633087158, 0.0])),
 (6, array('d', [0.641482949256897, 0.0, 1.75159752368927, 0.0])),
 (7, array('d', [0.5459887981414795, 0.0, 1.900052547454834, 0.0]))]

In [13]:
# and the features for the "products"
products = model.productFeatures().collect()
sorted(products, key=lambda x: x[0])

[(0, array('d', [0.6984666585922241, 0.0, 2.5750622749328613, 0.0])),
 (1, array('d', [0.6036437153816223, 0.0, 2.502629280090332, 0.0])),
 (2, array('d', [2.9763879776000977, 0.0, 1.5664623975753784, 0.0])),
 (3, array('d', [2.9261770248413086, 0.0, 1.8994628190994263, 0.0])),
 (4, array('d', [2.8576128482818604, 0.0, 0.0, 0.0])),
 (5, array('d', [2.934448003768921, 0.0, 1.6655101776123047, 0.0]))]

In [14]:
# recommend 3 items for user 2
model.recommendProducts(2, 3)

[Rating(user=2, product=2, rating=5.131715742469169),
 Rating(user=2, product=5, rating=5.0594052689798445),
 Rating(user=2, product=3, rating=5.045144926212743)]

In [15]:
model


<pyspark.mllib.recommendation.MatrixFactorizationModel at 0x7f3c1c1dbc88>

model

### Display the original matrix side-by-side with the reconstructed matrix. The values that were originally non-zero should be closely approximated, and the values that were zero (empty) now have predictions.

In [16]:
import sys
print(" original      reconstructed")
for user in range(0, len(rawdata)):
    for product in range (0, len(rawdata[0])):
        sys.stdout.write("%d " % rawdata[user][product])
    sys.stdout.write("    ")
    for product in range (0, len(rawdata[0])):
        sys.stdout.write("%0.0f " % model.predict(user, product))
    print(" ")

 original      reconstructed
5 5 0 0 0 5     5 5 5 5 2 5  
0 0 5 5 0 5     2 2 5 5 4 5  
0 0 0 0 5 5     1 1 5 5 5 5  
0 1 5 5 5 5     1 1 5 5 5 5  
1 1 5 0 5 5     1 1 5 5 5 5  
5 5 0 5 1 5     5 5 4 5 1 5  
5 0 0 5 0 5     5 5 5 5 2 5  
5 5 5 0 1 5     5 5 5 5 2 5  


In [17]:
print(" original         errors        predictions")
for user in range(0, len(rawdata)):
    for product in range (0, len(rawdata[0])):
        sys.stdout.write("%d " % rawdata[user][product])
    sys.stdout.write("    ")
    for product in range (0, len(rawdata[0])):
        if rawdata[user][product] != 0:
            prediction = model.predict(user, product)
            if rawdata[user][product] != round(prediction, 0):
                sys.stdout.write("%0.0f " % prediction)
            else:
                sys.stdout.write("- ")
        else:
            sys.stdout.write("- ")
    sys.stdout.write("    ")
    for product in range (0, len(rawdata[0])):
        if rawdata[user][product] == 0:
            prediction = model.predict(user, product)
            sys.stdout.write("%0.0f " % prediction)
        else:
            sys.stdout.write("- ")
    print(" ")

 original         errors        predictions
5 5 0 0 0 5     - - - - - -     - - 5 5 2 -  
0 0 5 5 0 5     - - - - - -     2 2 - - 4 -  
0 0 0 0 5 5     - - - - - -     1 1 5 5 - -  
0 1 5 5 5 5     - - - - - -     1 - - - - -  
1 1 5 0 5 5     - - - - - -     - - - 5 - -  
5 5 0 5 1 5     - - - - - -     - - 4 - - -  
5 0 0 5 0 5     - - - - - -     - 5 5 - 2 -  
5 5 5 0 1 5     - - - - 2 -     - - - 5 - -  


### Compute the mean squared error of the reconstructed matrix. This can be used to decide if the rank is sufficiently large.

In [18]:
evalRDD = ratingsRDD.map(lambda p: (p[0], p[1]))
evalRDD.take(5)

[(0, 0), (0, 1), (0, 5), (1, 2), (1, 3)]

In [19]:
evalRDD.take(100)

[(0, 0),
 (0, 1),
 (0, 5),
 (1, 2),
 (1, 3),
 (1, 5),
 (2, 4),
 (2, 5),
 (3, 1),
 (3, 2),
 (3, 3),
 (3, 4),
 (3, 5),
 (4, 0),
 (4, 1),
 (4, 2),
 (4, 4),
 (4, 5),
 (5, 0),
 (5, 1),
 (5, 3),
 (5, 4),
 (5, 5),
 (6, 0),
 (6, 3),
 (6, 5),
 (7, 0),
 (7, 1),
 (7, 2),
 (7, 4),
 (7, 5)]

In [20]:
evalRDD?

In [21]:
ratingsRDD?

In [22]:
evalRDD.map(lambda r: ((r[0], r[1]), r[2]))

PythonRDD[142] at RDD at PythonRDD.scala:48

In [23]:
predictions = model.predictAll(evalRDD).map(lambda r: ((r[0], r[1]), r[2]))
predictions.take(5)

[((0, 0), 5.087249550739095),
 ((0, 1), 4.892556002262072),
 ((0, 5), 4.994318144336759),
 ((1, 2), 4.981458316526886),
 ((1, 3), 5.043511585927757)]

In [24]:
predictions

PythonRDD[159] at RDD at PythonRDD.scala:48

In [25]:
ratingsAndPreds = ratingsRDD.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
ratingsAndPreds.take(5)

[((4, 2), (5.0, 5.063184657370243)),
 ((5, 1), (5.0, 5.029406628363571)),
 ((4, 5), (5.0, 4.991839848282638)),
 ((3, 1), (1.0, 1.0302953418251377)),
 ((4, 4), (5.0, 4.861134246950911))]

In [26]:
ratingsAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()

0.03812473925449567

With a larger dataset we would separate the rating data into training and test sets, and see how well our predicted ratings match the actual data.

### Questions