# WSDM - Collaborative Filtering with SVD

In [27]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

## Load the preprocessed data
- data: training dataset

In [28]:
# Load the zip csv file
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

# Load data and have a look
df = sqlContext.read \
        .format('com.databricks.spark.csv') \
        .options(header='true', inferschema='true') \
        .load('./process/train.csv.gz')
df.show(5)

+-----+-------+-----------------+-------------------+---------------+------+
| msno|song_id|source_system_tab| source_screen_name|    source_type|target|
+-----+-------+-----------------+-------------------+---------------+------+
| 9176| 474849|          explore|            Explore|online-playlist|     1|
|19273|1425656|       my library|Local playlist more| local-playlist|     1|
|19273| 768950|       my library|Local playlist more| local-playlist|     1|
|19273| 150624|       my library|Local playlist more| local-playlist|     1|
| 9176| 210388|          explore|            Explore|online-playlist|     1|
+-----+-------+-----------------+-------------------+---------------+------+
only showing top 5 rows



### We will only choose three columns for the recommendations
- msno: user_id
- song_id
- target: score

In [29]:
# Keep only 3 columns
df = df.select(['msno', 'song_id', 'target'])
df.show(5)

+-----+-------+------+
| msno|song_id|target|
+-----+-------+------+
| 9176| 474849|     1|
|19273|1425656|     1|
|19273| 768950|     1|
|19273| 150624|     1|
| 9176| 210388|     1|
+-----+-------+------+
only showing top 5 rows



### Let's firstly check if the dataset if balanced
- From the result, we find that the positive and negative occupy almost the same
- So we don't need to rebalance the data

In [30]:
total    = df.count()
positive = df.filter(df['target']==1).count()
negative = df.filter(df['target']==0).count()

print("Positive: {} \nNegative: {}".format(float(positive)/total, float(negative)/total))

Positive: 0.5035170841614234 
Negative: 0.49648291583857657


### Random split the data into train and eval
- Train: 0.8
- Eval : 0.2

In [31]:
trainDF, evalDF = df.randomSplit([0.8, 0.2])

# Build the Recommendation Model

In [32]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

## Build and train the model

In [33]:
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=15, regParam=0.01, userCol="msno", itemCol="song_id", ratingCol="target",
          coldStartStrategy="drop")
model = als.fit(trainDF)

## Evaluate the model by computing the RMSE on the eval data

In [34]:
# Predict and evaluate on test dataset
predictions = model.transform(evalDF)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="target",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.4727457148038022


# Do the prediction on test data

## Load test data

In [35]:
# Load data and have a look
testDF = sqlContext.read \
        .format('com.databricks.spark.csv') \
        .options(header='true', inferschema='true') \
        .load('./process/test.csv.gz')
testDF.show(5)

+---+-----+-------+-----------------+-------------------+-------------------+
| id| msno|song_id|source_system_tab| source_screen_name|        source_type|
+---+-----+-------+-----------------+-------------------+-------------------+
|  0|17724|1248964|       my library|Local playlist more|      local-library|
|  1|17724|2226341|       my library|Local playlist more|      local-library|
|  2|  977| 382937|         discover|               null|song-based-playlist|
|  3| 1878|2292987|            radio|              Radio|              radio|
|  4| 1878| 874824|            radio|              Radio|              radio|
+---+-----+-------+-----------------+-------------------+-------------------+
only showing top 5 rows



### Keep only id, user_id and item_id columns

In [36]:
testDF = testDF.select(['id', 'msno', 'song_id'])
testDF.show(5)

+---+-----+-------+
| id| msno|song_id|
+---+-----+-------+
|  0|17724|1248964|
|  1|17724|2226341|
|  2|  977| 382937|
|  3| 1878|2292987|
|  4| 1878| 874824|
+---+-----+-------+
only showing top 5 rows



### Do the prediction

In [37]:
# Predict and evaluate on test dataset
submissionDF = model.transform(testDF)
submissionDF.show(50)

+-------+-----+-------+------------+
|     id| msno|song_id|  prediction|
+-------+-----+-------+------------+
|2290156|27042|   1591|         0.0|
|1882947| 9207|   1591|         0.0|
|2096729|22193|   1591|         0.0|
|2036051|23051|   1591|         0.0|
|2072862|14565|   1591|         0.0|
| 314769|30147|   1591|         0.0|
|2195521|25496|   1591|         0.0|
| 555135| 1477|   1591|         0.0|
|2417507|27984|   1591|         0.0|
|2088386|11900|   1591|         0.0|
|2359112|26842|   1591|         0.0|
|1994837| 4399|   1591|         0.0|
|1998933|32303|   1591|         0.0|
|2228743|21117|   1591|         0.0|
|1970000|15534|   1591|         0.0|
|2235383|30187|   1591|         0.0|
|2267287| 5137|   1591|         0.0|
|2373415| 4577|   1591|         0.0|
|2534365|30409|   1591|         0.0|
| 314846|12153|   1591|         0.0|
|1936320|11836|   1591|         0.0|
|2124460|22997|   1591|         0.0|
|2022643| 1914|   1591|         0.0|
|2528210|17891|   1591|         0.0|
|

### Transform the prediction column
- Make the range 0.0 -> 1.0

In [46]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import FloatType

def parseRange(x):
    if x < 0:
        return 0.0
    elif x > 1:
        return 1.0
    else:
        return x

udf = UserDefinedFunction(parseRange, FloatType())
submissionDF = submissionDF.withColumn("transformed_prediction", udf("prediction"))

In [53]:
submissionDF = submissionDF.select(["id", "transformed_prediction"])
submissionDF.coalesce(1).write.option("header", "true").csv("sample_file.csv")

Py4JJavaError: An error occurred while calling o357.csv.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:474)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:598)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 567.0 failed 1 times, most recent failure: Lost task 0.0 in stage 567.0 (TID 4465, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 489566 ms
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:188)
	... 45 more
