# Load data into Spark DataFrame

In [1]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = 'pyspark-shell'

In [2]:
import findspark

In [3]:
#findspark.init('/path_to_spark/spark-x.x.x-bin-hadoopx.x')
findspark.init('/Users/xuejiwang/spark/spark-2.3.2-bin-hadoop2.7')

In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F

In [5]:
spark = SparkSession.builder.appName("basics").getOrCreate()

In [6]:
# We use matplotlib for plotting
import matplotlib.pyplot as plt

# This statement allow to display plot without asking to
%matplotlib inline

In [7]:
df = spark.read.csv('/Users/xuejiwang/Documents/dev/MusicBox/data/event_ds.csv',header=True).cache()
df

DataFrame[uid: string, event: string, song_id: string, date: string]

In [8]:
# create new or overwrite original field with withColumn
df = df.withColumn('date',F.col('date').cast('date'))
df

DataFrame[uid: string, event: string, song_id: string, date: date]

In [None]:
df.groupby('event').count().show()

In [12]:
df.show(10)

+---------+-----+--------+----------+
|      uid|event| song_id|      date|
+---------+-----+--------+----------+
|168551042|    P|  505355|2017-03-30|
|168551430|    P| 1179220|2017-03-30|
|168550728|    P|  285233|2017-03-30|
|168551408|    P|15750838|2017-03-30|
|168551179|    P|23485496|2017-03-30|
|168551546|    P| 1474915|2017-03-30|
|168551626|    P| 8131487|2017-03-30|
|168542680|    P|       0|2017-03-30|
|168551166|    P| 1244660|2017-03-30|
|168551166|    P|  945320|2017-03-30|
+---------+-----+--------+----------+
only showing top 10 rows



In [40]:
df_u_s_freq = df.groupby('uid','song_id').agg(F.count('*').alias('freq'))

In [41]:
df_u_s_freq.show()

+---------+--------+----+
|      uid| song_id|freq|
+---------+--------+----+
|168551357|23495106|   6|
|168551655|23497506|   4|
|168551768|   54279|   1|
|168551872|23491659|   2|
|168551841|23250905|   1|
|168547616| 3390277|   1|
|168534627| 3040697|   1|
|168538799| 5842834|  13|
|168551833| 5838906|   1|
|168551768|  500954|   2|
|168552239| 7091565|   1|
|168546836| 7077410|  17|
|114611921| 6819218|   6|
|168552364|14986792|  12|
|162578247|  555734|   2|
|168551752|  214760|   1|
|168553226| 4845651|   1|
|168551752|  220599|   1|
|168554165|20674107|   1|
|168554317| 3328156|   1|
+---------+--------+----+
only showing top 20 rows



## Play behavior

In [27]:
df_play = spark.read.csv('/Users/xuejiwang/Documents/dev/MusicBox/data/play/play_ds.csv',header=True)
df_play.show(5)

+---------+------+--------+----------+--------------------+-----------+
|      uid|device| song_id|      date|           play_time|song_length|
+---------+------+--------+----------+--------------------+-----------+
|168551042|    ar|  505355|2017-03-30|                 106|        277|
|168551430|    ar| 1179220|2017-03-30|                 231|        231|
|168550728|    ar|  285233|2017-03-30|                 282|        282|
|168551408|    ar|15750838|2017-03-30|7>(123.138.230.80)TM|          0|
|168551179|    ar|23485496|2017-03-30|                  26|        212|
+---------+------+--------+----------+--------------------+-----------+
only showing top 5 rows



In [32]:
df_play_filtered = df_play.filter((F.col('play_time')>=0) & (F.col('song_length') > 0))

In [33]:
df_play_per = df_play_filtered\
            .filter((F.col('date')>= snapshot_date-datetime.timedelta(time_window - 1)) & (F.col('date')<=snapshot_date))\
            .withColumn('finihsed_per',F.col('play_time')/F.col('song_length'))


In [35]:
df_play_per.columns

['uid',
 'device',
 'song_id',
 'date',
 'play_time',
 'song_length',
 'finihsed_per']

In [45]:
df_rec = df_u_s_freq.join(df_play_per,on=['uid','song_id'])

In [46]:
df_rec.columns

['uid',
 'song_id',
 'freq',
 'device',
 'date',
 'play_time',
 'song_length',
 'finihsed_per']

## Frequency weighted by percentage

In [160]:
df_rec_model = df_rec.withColumn('weighted_freq',F.col('freq')*F.col('finished_per')).collect()

In [197]:
df_rec_model.columns

['uid',
 'song_id',
 'event',
 'date',
 'freq_P_last_44',
 'freq_D_last_44',
 'freq_S_last_44',
 'device',
 'date',
 'play_time',
 'song_length',
 'finihsed_per',
 'weighted_freq']

In [187]:
df_down = spark.read.csv('/Users/xuejiwang/Documents/dev/MusicBox/data/down/down_ds.csv',header=True)
df_down.show(5)

+---------+------+-------+----------+
|      uid|device|song_id|      date|
+---------+------+-------+----------+
|168019810|    ar| 442554|2017-03-30|
|168019810|    ar|6334611|2017-03-30|
|168019810|    ar|9867382|2017-03-30|
|168019810|    ar|6660691|2017-03-30|
|168019810|    ar| 157606|2017-03-30|
+---------+------+-------+----------+
only showing top 5 rows



# Recommender
## Method1: Try to use frequency as rating

In [127]:
song_ratings = df_u_s_freq.fillna(0)

In [128]:
from pyspark.sql.types import IntegerType
for col_name in ['uid','song_id','freq']:
    song_ratings = song_ratings.withColumn(col_name, F.col(col_name).cast(IntegerType()))

In [129]:

song_ratings = song_ratings.withColumnRenamed('freq','rating')

In [130]:
song_ratings.show()

+---------+--------+------+
|      uid| song_id|rating|
+---------+--------+------+
|168551357|23495106|     6|
|168551655|23497506|     4|
|168551768|   54279|     1|
|168551872|23491659|     2|
|168551841|23250905|     1|
|168547616| 3390277|     1|
|168534627| 3040697|     1|
|168538799| 5842834|    13|
|168551833| 5838906|     1|
|168551768|  500954|     2|
|168552239| 7091565|     1|
|168546836| 7077410|    17|
|114611921| 6819218|     6|
|168552364|14986792|    12|
|162578247|  555734|     2|
|168551752|  214760|     1|
|168553226| 4845651|     1|
|168551752|  220599|     1|
|168554165|20674107|     1|
|168554317| 3328156|     1|
+---------+--------+------+
only showing top 20 rows



In [None]:
#song_ratings.withColumnRenamed('freq','rating').collect()

In [140]:
from pyspark.ml.feature import OneHotEncoder,StringIndexer

stringIndexer=StringIndexer(inputCol='song_id',outputCol='song_id_ind')
indexer = stringIndexer.fit(song_ratings)

In [None]:
song_ratings = indexer.transform(song_ratings)

In [144]:
song_ratings.show(10)

+---------+--------+------+-----------+
|      uid| song_id|rating|song_id_ind|
+---------+--------+------+-----------+
|168551357|23495106|     6|       38.0|
|168551655|23497506|     4|       78.0|
|168551768|   54279|     1|     2988.0|
|168551872|23491659|     2|      164.0|
|168551841|23250905|     1|      254.0|
|168547616| 3390277|     1|    53954.0|
|168534627| 3040697|     1|     6871.0|
|168538799| 5842834|    13|     6421.0|
|168551833| 5838906|     1|       58.0|
|168551768|  500954|     2|    27661.0|
+---------+--------+------+-----------+
only showing top 10 rows



## Build ALS recommendation model

In [131]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder


In [132]:
# Split train and test data
(training, test) = song_ratings.randomSplit([0.8,0.2])

In [151]:
# set cold start to drop
als = ALS(userCol = 'uid', itemCol = 'song_id_ind', ratingCol = 'rating', coldStartStrategy = 'drop')

In [152]:
# Tune model using ParamGridBuilder
param_grid = ParamGridBuilder().addGrid(als.rank, [12,13,14] )\
                                .addGrid(als.maxIter,[18,19,20])\
                                .addGrid(als.regParam,[.17,.18,.19])\
                                .build()

In [153]:
# Define evaluator at RMSE
evaluator = RegressionEvaluator(metricName = 'rmse', labelCol = 'rating',\
                                predictionCol = 'prediction')





In [154]:
# Build cross validatoin using TrainValidationSplit
tvs = TrainValidationSplit(
        estimator = als,
        estimatorParamMaps = param_grid,
        evaluator = evaluator)

In [156]:
# Fit ALS model to training data
model = tvs.fit(training)

IllegalArgumentException: 'Field "song_id_ind" does not exist.\nAvailable fields: uid, song_id, rating, TrainValidationSplit_487990d39e74aafe9a0f_rand'

In [157]:
# Extract best model from the tuning exercise using ParamGridBuilder
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

NameError: name 'best_model' is not defined

In [51]:
# Print evaluation metrics and model parameters
print('RMSE = ' + str(rmse))
print('**Best Model**')
print('  Rank:'), best_model.rank
print('  MaxIter:'), best_model._java_ojb.parent().getMaxIter()
print('  RegParam:'), best_model._java_obj.parent().getRegParam()

NameError: name 'rmse' is not defined

## Recommendation

In [159]:
# Recommendation
user_recs = best_model.recommendForAllUsers(10)

In [None]:
# Write a function to recommend
def get_recommend_for_user(recs):
    recs = recs.select('recommendations.song_id', 'recommendations.rating')
    songs = recs.select('song_id').toPandas().iloc[0,0]
    ratings = recs.select('rating').toPandas().iloc[0,0]
    rating_matrix = pd.DataFrame(songs, columns = ['song_id'])
    rating_matrix['ratings'] = ratings
    ratings_matrix_ps = sqlContext.createDateFrame(rating_matrix)
    return rating_matrix_ps

# Method2: Use finished percentage as rating

In [None]:
song_ratings = df_play_per

In [166]:
song_ratings.show()

+---------+------+--------+----------+---------+-----------+--------------------+
|      uid|device| song_id|      date|play_time|song_length|        finihsed_per|
+---------+------+--------+----------+---------+-----------+--------------------+
|168551042|    ar|  505355|2017-03-30|      106|        277| 0.38267148014440433|
|168551430|    ar| 1179220|2017-03-30|      231|        231|                 1.0|
|168550728|    ar|  285233|2017-03-30|      282|        282|                 1.0|
|168551179|    ar|23485496|2017-03-30|       26|        212| 0.12264150943396226|
|168551546|    ar| 1474915|2017-03-30|        0|        243|                 0.0|
|168551626|    ar| 8131487|2017-03-30|      247|        271|  0.9114391143911439|
|168542680|    ar|       0|2017-03-30|      284|        285|  0.9964912280701754|
|168551166|    ar| 1244660|2017-03-30|      235|        235|                 1.0|
|168551166|    ar|  945320|2017-03-30|      205|        205|                 1.0|
|168551495|    a

In [188]:
for col_name in ['uid','song_id']:
    song_ratings = song_ratings.withColumn(col_name, F.col(col_name).cast(IntegerType()))


In [189]:
song_ratings

DataFrame[uid: int, device: string, song_id: int, date: string, play_time: string, song_length: string, finihsed_per: double]

In [180]:
from pyspark.ml.feature import OneHotEncoder,StringIndexer

stringIndexer=StringIndexer(inputCol='song_id',outputCol='song_id_ind')# some big value in song_id cannot be handled by ALS
indexed=stringIndexer.fit(song_ratings)

In [191]:
df_trans=indexed.transform(song_ratings)
df_trans.show()

+---------+------+--------+----------+---------+-----------+--------------------+-----------+
|      uid|device| song_id|      date|play_time|song_length|        finihsed_per|song_id_ind|
+---------+------+--------+----------+---------+-----------+--------------------+-----------+
|168551042|    ar|  505355|2017-03-30|      106|        277| 0.38267148014440433|    25082.0|
|168551430|    ar| 1179220|2017-03-30|      231|        231|                 1.0|      981.0|
|168550728|    ar|  285233|2017-03-30|      282|        282|                 1.0|     4832.0|
|168551179|    ar|23485496|2017-03-30|       26|        212| 0.12264150943396226|      854.0|
|168551546|    ar| 1474915|2017-03-30|        0|        243|                 0.0|     2284.0|
|168551626|    ar| 8131487|2017-03-30|      247|        271|  0.9114391143911439|      336.0|
|168542680|    ar|       0|2017-03-30|      284|        285|  0.9964912280701754|        0.0|
|168551166|    ar| 1244660|2017-03-30|      235|        235|

In [183]:
(training, test) = df_trans.randomSplit([0.8, 0.2]) 

In [194]:
als = ALS(maxIter=5, regParam=0.01, userCol='uid', itemCol='song_id_ind', ratingCol='finihsed_per',coldStartStrategy="drop") 

In [195]:
rec_model = als.fit(training)

Py4JJavaError: An error occurred while calling o22750.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 241.0 failed 1 times, most recent failure: Lost task 0.0 in stage 241.0 (TID 1742, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$9: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$9.apply(StringIndexer.scala:251)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$9.apply(StringIndexer.scala:246)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1364)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1337)
	at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1472)
	at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1472)
	at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1472)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1471)
	at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:918)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:674)
	at sun.reflect.GeneratedMethodAccessor186.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$9: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$9.apply(StringIndexer.scala:251)
	at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$9.apply(StringIndexer.scala:246)
	... 29 more
