## Music Recommendation using Alternating least square algorithm

In [1]:
# Building the RDD
users_artists = sc.textFile("data/user_artist_data.txt")

In [2]:
# Quick peek
users_artists.first()

'1000002 1 55'

the RDD is composed of 3 columns: userid artistid playcount

Limitation of spark and ALS algorithm is that the features should be numerical, non negative 32 bits integers.
Meaning they shouldn't be over  2147483647

In [3]:
# Since we use the data set for some computation we persist the RDD
# We persist partially on memory and disk
users_artists.persist(StorageLevel.MEMORY_AND_DISK)

data/user_artist_data.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

In [4]:
users_artists.map(lambda line : int(line.split(' ')[0]) ).stats()

(count: 24296858, mean: 1947573.2653533542, stdev: 496000.544975, max: 2443548.0, min: 90.0)

In [5]:
users_artists.map(lambda line : int(line.split(' ')[1]) ).stats()

(count: 24296858, mean: 1718704.0937569223, stdev: 2539389.04017, max: 10794401.0, min: 1.0)

the max values seem to be under the treshold, no transformation is needed.
The RDD contains artists IDs, whose names are in another text file. Let's get those:

In [6]:
artists = sc.textFile("data/artist_data.txt") 

In [7]:
# Quick peek at the data
artists.take(10)

['1134999\t06Crazy Life',
 '6821360\tPang Nakarin',
 '10113088\tTerfel, Bartoli- Mozart: Don',
 '10151459\tThe Flaming Sidebur',
 '6826647\tBodenstandig 3000',
 '10186265\tJota Quest e Ivete Sangalo',
 '6828986\tToto_XX (1977',
 '10236364\tU.S Bombs -',
 '1135000\tartist formaly know as Mat',
 '10299728\tKassierer - Musik für beide Ohren']

In [8]:
def convert(terms):
    try:
        return int(terms[0]), str(terms[1]).strip()
    except:
        return (None,None)

artists_by_id = artists.map(lambda x : convert(x.split("\t")))

In [9]:
artists_by_id.cache()

PythonRDD[8] at RDD at PythonRDD.scala:48

In [10]:
# lookup of an artist ID
artists_by_id.lookup(10151459)

['The Flaming Sidebur']

The format of the RDD is a tuple (ID, name) separated by TAB, but few lines escape this rule, which means the parsing might fail for some of those lines, the possible solution is to use flatmap that could return 0 or more results unlike map function that should always get one result.

Now we move the third and last file, which maps the artists IDs that may be misspelled

In [11]:
artists_alias = sc.textFile("data/artist_alias.txt") 

In [12]:
# Quick peek
artists_alias.take(5)

['1092764\t1000311',
 '1095122\t1000557',
 '6708070\t1007267',
 '10088054\t1042317',
 '1195917\t1042317']

Same as above, we will transform the lines. However in this case we don't need an RDD bt instead a map

In [13]:
def convert_ids(terms):
    try:
        return int(terms[0]), int(terms[1])
    except:
        return (None,None)
artists_map = artists_alias.map(lambda x : convert_ids(x)).collectAsMap()

Again some peek at two mapped IDs : 1195917 and 1042317 for example, we an see below that they refer to the same artist.

In [14]:
artists_by_id.lookup(1195917)

['Deluxe Folk Implosion']

In [15]:
artists_by_id.lookup(1042317)

['Deluxx Folk Implosion']

### Building a first model

In [16]:
from pyspark.mllib.recommendation import *

Broadcast the artist alias, since it is to be used by all tasks, it lets spark to broadcast the data in an efficient manner while reducing the communication cost

In [17]:
from pyspark.sql import Row

b_artist_alias = sc.broadcast(artists_map)

# some data processing before the dataset is ready
def process(line):
    user_id, artist_id, count = [int(item) for item in line.split(' ')]
    # lookup in the map for the good id, if it doesn't exist stick with the one we have
    final_artist_id = b_artist_alias.value.get(artist_id,artist_id)
    return Row(userId=user_id, artistId=final_artist_id, count=count)
    
train_data = users_artists.map(lambda line : process(line)).cache()

The purpose of caching is to temporarily store the results after the computation, the reason behind is the fact that the ALS algorithm is iterative, therfore the train data will be used many times, so we have to avoid to redo the same computation everytime

In [18]:
# Quick peek 
train_data.take(2)

[Row(artistId=0, count=55, userId=1000002),
 Row(artistId=1000006, count=33, userId=1000002)]

In [19]:
# now that we have the data set we unpersist the users_artists
users_artists.unpersist()

data/user_artist_data.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

In [32]:
# Build the recommendation model using ALS on the training data
DataRDD = spark.createDataFrame(train_data)
# Split DATA to three parts : train, cross validation and test
# I take only a small subset to train because I am not running Spark on a cluster, but on standalone mode
train, cv, test = DataRDD.randomSplit([0.5,0.2,0.3])

In [33]:
# Now we are ready for training
model = ALS.train(train, rank=5)

After having trained the model, we will check it against a specific user, let's take the same example as the book : 
User with ID 2093760

In [22]:
example = train.filter(train.userId == 2093760)

In [23]:
example.show()

+--------+-----+-------+
|artistId|count| userId|
+--------+-----+-------+
|     378|    1|2093760|
|     813|    2|2093760|
|    1180|    1|2093760|
| 1255340|    3|2093760|
+--------+-----+-------+



In [24]:
example.cache()

DataFrame[artistId: bigint, count: bigint, userId: bigint]

In [25]:
# get the names of th artists
for row in example.collect():
    print(artists_by_id.lookup(row[0]))

['Blackalicious']
['Jurassic 5']
['David Gray']
['The Saw Doctors']


In [43]:
# predict using the model
recommendations = model.recommendProducts(2093760,4)

In [45]:
# From what we can see we have some recommandations for this customer
for rating in recommendations:
    print(rating)

Rating(user=2093760, product=1601, rating=8305166.521485837)
Rating(user=2093760, product=5841, rating=7852986.318502047)
Rating(user=2093760, product=1609, rating=7832512.200677894)
Rating(user=2093760, product=1515, rating=7457170.596640263)


In [46]:
# let's get the artist names
for rating in recommendations:
    print(artists_by_id.lookup(rating.product))

['The Railway Raver']
['Atfc']
['Lagowski']
['Chaos A.D.']


### Model evaluation
Now I will use the cross validation data set in order to evaluate the accuracy of my recommendations
The metric I will use is AUC, which is the probability that a good recommendation will rank above a bad one.
In the context of recommenders we use mean AUC, since AUC will be computed for each user in cv set.

In [55]:
# Get predicted counts on all existing user-artist pairs

cvData = cv.rdd.map(lambda p: (p.userId, p.artistId))

# Here we build predictions and structure them in tuples
predictions = model.predictAll(cvData).map(lambda r: ((r.user, r.product), r.rating))

In [64]:

countsTuple = cv.rdd.map(lambda r: ((r.userId, r.artistId), r[1]))

scoreAndLabels = predictions.join(countsTuple).map(lambda tup: tup[1])

In [70]:
predictions.first()

((2062512, 2672), 1387114.0771991187)

In [67]:
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics

# Instantiate regression metrics to compare predicted and actual ratings
metrics = RegressionMetrics(scoreAndLabels)

# Root mean sqaured error
print("RMSE = %s" % metrics.rootMeanSquaredError)

# R-squared
print("R-squared = %s" % metrics.r2)

Py4JJavaError: An error occurred while calling o957.rootMeanSquaredError.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 478.0 failed 1 times, most recent failure: Lost task 5.0 in stage 478.0 (TID 600, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ec2-user/spark/python/pyspark/sql/session.py", line 507, in prepare
    verify_func(obj, schema)
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1360, in _verify_type
    _verify_type(v, f.dataType, f.nullable)
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1324, in _verify_type
    raise TypeError("%s can not accept object %r in type %s" % (dataType, obj, type(obj)))
TypeError: DoubleType can not accept object 4 in type <class 'int'>

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1091)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1084)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.summary$lzycompute(RegressionMetrics.scala:57)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.summary(RegressionMetrics.scala:54)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.SSerr$lzycompute(RegressionMetrics.scala:65)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.SSerr(RegressionMetrics.scala:65)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.meanSquaredError(RegressionMetrics.scala:99)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.rootMeanSquaredError(RegressionMetrics.scala:108)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ec2-user/spark/python/pyspark/sql/session.py", line 507, in prepare
    verify_func(obj, schema)
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1360, in _verify_type
    _verify_type(v, f.dataType, f.nullable)
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1324, in _verify_type
    raise TypeError("%s can not accept object %r in type %s" % (dataType, obj, type(obj)))
TypeError: DoubleType can not accept object 4 in type <class 'int'>

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	... 1 more
