In [1]:
# usual imports
import os
import sys
import time
import glob
import datetime
import sqlite3
import pandas as pd
import numpy as np # get it at: http://numpy.scipy.org/
# path to the Million Song Dataset subset (uncompressed)
# CHANGE IT TO YOUR LOCAL CONFIGURATION
msd_subset_path='./data/MSD/MillionSongSubset'
msd_subset_data_path=os.path.join(msd_subset_path,'data')
msd_subset_addf_path=os.path.join(msd_subset_path,'AdditionalFiles')
assert os.path.isdir(msd_subset_path),'wrong path' # sanity check
# path to the Million Song Dataset code
# CHANGE IT TO YOUR LOCAL CONFIGURATION
msd_code_path='./data/MSD/MSongsDB'
assert os.path.isdir(msd_code_path),'wrong path' # sanity check
# we add some paths to python so we can import MSD code
# Ubuntu: you can change the environment variable PYTHONPATH
# in your .bashrc file so you do not have to type these lines
sys.path.append(os.path.join(msd_code_path,'PythonSrc') )

taste_profile_data_path="./data/TeenyTinyEchoNestTasteProfileSubset.csv"
assert os.path.isfile(taste_profile_data_path)

# imports specific to the MSD
import hdf5_getters as GETTERS

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [2]:
def reduce_taste_subset(path='./data/FullEchoNestTasteProfileSubset.txt', 
                        to_path='./data/TeenyTinyEchoNestTasteProfileSubset.csv', downsample=0.001):
    data = pd.read_csv(path, sep="\t", header=None)
    data.columns = ['user', 'song', 'play_count']
    data.astype({'user': np.str, 'song': np.str, 'play_count': np.int32})
    data.sample(frac=downsample).to_csv(to_path, index=False)
# the following function simply gives us a nice string for
# a time lag in seconds
def strtimedelta(starttime,stoptime):
    return str(datetime.timedelta(seconds=stoptime-starttime))

In [3]:
data = pd.read_csv(taste_profile_data_path)

In [4]:
data.head()

Unnamed: 0,user,song,play_count
0,e45503e1d7c3c88fcea0da0947292afdbb26d27c,SOISQID12A6D4FD97C,1
1,ec04a29f15f25f1413256035c76be47675861682,SODSESK12A81C2178C,1
2,144889203bd3fbd4dcb1a64dca945c7aece4c14b,SOLNDSO12AF72ABA6C,1
3,d4ed4f49f3020cc18dd413acc8824ee02d72c48b,SOBOFXF12A6D4F83F0,2
4,67ab466c740e28e8afc3762b75571f5162ae9411,SOIGGWG12A6D4F85D4,1


In [5]:
song_id = data.iloc[1]['song']
print(data.shape)

(48374, 3)


In [6]:
a = "e45503e1d7c3c88fcea0da0947292afdbb26d27c"


Running the cell to get all valid_zone_ids showed that all songs in data are valid songs in the MSD

In [6]:
# Get the song ids which are also in the database. 
valid_song_ids = []

for song_id in data['song'].values:
    # let's redo all this work in SQLite in a few seconds
    t1 = time.time()
    # connect to database to get the metadata from MSD
    conn = sqlite3.connect(os.path.join(msd_subset_addf_path,
                                        'subset_track_metadata.db'))

    q = "SELECT DISTINCT song_id,artist_id,artist_name,Count(track_id) FROM songs"
    q += " WHERE song_id='" + song_id + "'"
    res = conn.execute(q)
    res = res.fetchall()
    conn.close()
    t2 = time.time()
    if res[0] is not None:
        valid_song_ids.append(song_id)


In [7]:
len(valid_song_ids)

48374

In [15]:


def tune_ALS(train_data, validation_data, maxIter, regParams, ranks):
    """
    grid search function to select the best model based on RMSE of
    validation data
    Parameters
    ----------
    train_data: spark DF with columns ['user', 'song', 'play_count']
    
    validation_data: spark DF with columns ['user', 'song', 'play_count']
    
    maxIter: int, max number of learning iterations
    
    regParams: list of float, one dimension of hyper-param tuning grid
    
    ranks: list of float, one dimension of hyper-param tuning grid
    
    Return
    ------
    The best fitted ALS model with lowest RMSE score on validation data
    """
    # initial
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in regParams:
            # get ALS model
            # Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
            als = ALS(userCol="user", itemCol="song", ratingCol="play_count", implicitPrefs=True, alpha=40).setMaxIter(maxIter).setRank(rank).setRegParam(reg).setColdStartStrategy("drop")
            # train ALS model
            model = als.fit(train_data)
            # evaluate the model by computing the RMSE on the validation data
            predictions = model.transform(validation_data)  # give Nan to data that hasn't been seen before. cold_start drops those
            evaluator = RegressionEvaluator(metricName="rmse",
                                            labelCol="play_count",
                                            predictionCol="prediction")
            rmse = evaluator.evaluate(predictions)
            print('{} latent factors and regularization = {}: '
                  'validation RMSE is {}'.format(rank, reg, rmse))
            if rmse < min_error:
                min_error = rmse
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors and '
          'regularization = {}'.format(best_rank, best_regularization))
    return best_model

In [16]:
user_mapping = {}
last_user = 0
song_mapping = {}
last_song = 0
for idx, row in data.iterrows():
    if row["user"] not in user_mapping:
        user_mapping[row["user"]] = last_user
        last_user += 1
    if row["song"] not in song_mapping:
        song_mapping[row["song"]] = last_song
        last_song += 1


In [17]:
prep_data = data.copy()
prep_data["user"] = prep_data["user"].map(lambda x: user_mapping[x])
prep_data["song"] = prep_data["song"].map(lambda x: song_mapping[x])

In [18]:
split_frac = 0.7
train_data = prep_data.iloc[:int(prep_data.shape[0] * split_frac)]
val_data = prep_data.iloc[int(prep_data.shape[0] * split_frac):]

In [19]:
spark_data = spark.createDataFrame(prep_data)
(training, val) = spark_data.randomSplit([0.8, 0.2])


In [20]:
train_data.head()

Unnamed: 0,user,song,play_count
0,0,0,1
1,1,1,1
2,2,2,1
3,3,3,2
4,4,4,1


In [21]:
model = tune_ALS(training, val, 30, [1e-3, 1e-2, 1e-1, 1, 2], [20, 100, 1000])

Py4JJavaError: An error occurred while calling o55.fit.
: org.apache.spark.SparkException: Job 58 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:932)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:930)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:930)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2128)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2041)
	at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
	at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:575)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1124)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1117)
	at org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1712)
	at org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1653)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$train$4.apply(ALS.scala:983)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$train$4.apply(ALS.scala:970)
	at scala.collection.immutable.Range.foreach(Range.scala:160)
	at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:970)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$fit$1.apply(ALS.scala:676)
	at org.apache.spark.ml.recommendation.ALS$$anonfun$fit$1.apply(ALS.scala:658)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:658)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 56568)
----------------------------------------


Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/SocketServer.py", line 295, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/SocketServer.py", line 321, in process_request
    self.finish_request(request, client_address)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/SocketServer.py", line 334, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/SocketServer.py", line 655, in __init__
    self.handle()
  File "/usr/local/opt/spark/python/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/usr/local/opt/spark/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/usr/local/opt/spark/python/pyspark/accumulators.py", line 245, in accum_up

In [8]:
model

NameError: name 'model' is not defined