# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [23]:
# import libraries
from pyspark.sql import SparkSession

import pyspark.sql.functions as psqf
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import avg, col, min, max, sum, count, udf

from pyspark.ml.classification import LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import  StandardScaler, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.types import IntegerType, FloatType
from pyspark.mllib.evaluation import MulticlassMetrics

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sns

In [24]:
# create a Spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("Sparkify_test") \
    .getOrCreate()

# Load and Clean Dataset

In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [25]:
sparkify_data = 'mini_sparkify_event_data.json'
df = spark.read.json(sparkify_data)

In [26]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [27]:
df.show(1)

+--------------+---------+---------+------+-------------+--------+---------+-----+---------------+------+--------+-------------+---------+---------+------+-------------+--------------------+------+
|        artist|     auth|firstName|gender|itemInSession|lastName|   length|level|       location|method|    page| registration|sessionId|     song|status|           ts|           userAgent|userId|
+--------------+---------+---------+------+-------------+--------+---------+-----+---------------+------+--------+-------------+---------+---------+------+-------------+--------------------+------+
|Martha Tilston|Logged In|    Colin|     M|           50| Freeman|277.89016| paid|Bakersfield, CA|   PUT|NextSong|1538173362000|       29|Rockpools|   200|1538352117000|Mozilla/5.0 (Wind...|    30|
+--------------+---------+---------+------+-------------+--------+---------+-----+---------------+------+--------+-------------+---------+---------+------+-------------+--------------------+------+
only showi

In [28]:
df.take(5)

[Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30'),
 Row(artist='Five Iron Frenzy', auth='Logged In', firstName='Micah', gender='M', itemInSession=79, lastName='Long', length=236.09424, level='free', location='Boston-Cambridge-Newton, MA-NH', method='PUT', page='NextSong', registration=1538331630000, sessionId=8, song='Canada', status=200, ts=1538352180000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"', userId='9'),
 Row(artist='Adam Lambert', auth='Logged In', firstName='Colin', gender='M', itemInSession=51, lastName='Freeman', length=282.8273, level='paid', location='

In [29]:
df.count(), len(df.columns)

(286500, 18)

In [30]:
#check for number of null values in each column:

df.select([psqf.count(psqf.when(psqf.isnull(c), c)).alias(c) for c in df.columns]).show()

+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+
|artist|auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId| song|status| ts|userAgent|userId|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+
| 58392|   0|     8346|  8346|            0|    8346| 58392|    0|    8346|     0|   0|        8346|        0|58392|     0|  0|     8346|     0|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+



In [31]:
#check for false ("null") user IDs
df.select("userId").dropDuplicates().sort("userId").show()

+------+
|userId|
+------+
|      |
|    10|
|   100|
|100001|
|100002|
|100003|
|100004|
|100005|
|100006|
|100007|
|100008|
|100009|
|100010|
|100011|
|100012|
|100013|
|100014|
|100015|
|100016|
|100017|
+------+
only showing top 20 rows



In [32]:
# drop empty userIds
users_not_empty = df.filter(df["userId"] != "")

In [33]:
#difference matches number of null values as it should:
df.count() - users_not_empty.count()

8346

### preliminary analysis

In [34]:
#check for number of users in the dataset:
num_users = users_not_empty.select("userId").distinct().count()
print("Number of users in dataset: {}".format(num_users))

Number of users in dataset: 225


In [35]:

users_not_empty.describe("sessionId").show()

+-------+------------------+
|summary|         sessionId|
+-------+------------------+
|  count|            278154|
|   mean|1042.5616241362698|
| stddev| 726.5010362219813|
|    min|                 1|
|    max|              2474|
+-------+------------------+



In [36]:
# check values of level (might be useful for churn definition):
users_not_empty.select("level").distinct().show()

+-----+
|level|
+-----+
| free|
| paid|
+-----+



In [37]:
#checkout how often each "page" is visited:
users_not_empty.groupby('page').agg({'page':'count'}).sort("count(page)").show()

+--------------------+-----------+
|                page|count(page)|
+--------------------+-----------+
|              Cancel|         52|
|Cancellation Conf...|         52|
|    Submit Downgrade|         63|
|      Submit Upgrade|        159|
|               Error|        252|
|       Save Settings|        310|
|               About|        495|
|             Upgrade|        499|
|                Help|       1454|
|            Settings|       1514|
|           Downgrade|       2055|
|         Thumbs Down|       2546|
|              Logout|       3226|
|         Roll Advert|       3933|
|          Add Friend|       4277|
|     Add to Playlist|       6526|
|                Home|      10082|
|           Thumbs Up|      12551|
|            NextSong|     228108|
+--------------------+-----------+



In [38]:
#We can already see that there is only a small amount of cancleations (52) in our dataset. 63 have downgraded.

In [39]:
#check statistics on the logs we have about our users:
users_not_empty.groupby("userId").count().describe("count").show()

+-------+-----------------+
|summary|            count|
+-------+-----------------+
|  count|              225|
|   mean|          1236.24|
| stddev|1329.531716432519|
|    min|                6|
|    max|             9632|
+-------+-----------------+



In [40]:
#same for sessionID:
users_not_empty.describe("sessionId").show()

+-------+------------------+
|summary|         sessionId|
+-------+------------------+
|  count|            278154|
|   mean|1042.5616241362698|
| stddev| 726.5010362219813|
|    min|                 1|
|    max|              2474|
+-------+------------------+



## Define Churn

The value "Cancellation Confirmation" in col "page" seems to identify customers who churned (see code below). We will add a column flagging customers who churned using a udf function:

### Define Churn

In [41]:
churn_event_def = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())

In [42]:
users_not_empty = users_not_empty.withColumn("churned_users", churn_event_def("page"))

In [43]:
users_not_empty.head()

Py4JJavaError: An error occurred while calling o365.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 71.0 failed 1 times, most recent failure: Lost task 0.0 in stage 71.0 (TID 2045) (192.168.178.24 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:70)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 24 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3519)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3516)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:70)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 24 more


## Explore Data

#### Users churning %

In [28]:
# check which users have churned
user_churned = users_not_empty.groupBy('userId').agg(max('churned_users').alias('user_churned'))
user_churned.show()

Py4JJavaError: An error occurred while calling o248.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 40.0 failed 1 times, most recent failure: Lost task 0.0 in stage 40.0 (TID 1027) (192.168.178.24 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:70)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:70)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 26 more


In [None]:
# Percentage of users by Subscription status
churn_count_pd = user_churned.groupby('user_churned').count().toPandas().set_index('user_churned').sort_index()
plt.figure()
(churn_count_pd/churn_count_pd.sum()).plot.bar(legend=None)
churn_labels = ['Active', 'Churned']
x_pos = np.arange(len(churn_labels))
plt.xticks(x_pos,churn_labels)
plt.title('Percentage of users by Subscription status')
plt.ylabel('Percentage of users')
plt.xlabel('Subscription status')
plt.show()

Around 1/4 of the users have churned

#### merging user churned info to original df

In [None]:
# join subscription status to the original data frame to analyze potential features
users_not_empty = users_not_empty.join(user_churned, on=['userId'], how='left')

In [None]:
users_not_empty.count()

#### churned/active by events

In [None]:
users_not_empty.groupby(['page']).count().orderBy('count').show()

In [None]:
churn_page_count_pd = users_not_empty.groupby(['page','user_churned']).count().toPandas()

In [None]:
relevant_pages_list = ['Add Friend', 'Add to Playlist', 'Roll Advert', 'Thumbs Down', 'Thumbs Up']
churn_page_count_pd = users_not_empty.groupby(['page','user_churned']).count().toPandas()
# NextSong is by by far the largest value, so we delete it before calculating sum of the rest
churn_page_count_pd = churn_page_count_pd[churn_page_count_pd.page != 'NextSong']
# get the sum of all pages but NextSong
total_pages = churn_page_count_pd.groupby(['user_churned']).sum()
# filter for pages I want to use as potential features
churn_page_count_pd = churn_page_count_pd[churn_page_count_pd.page.isin(relevant_pages_list)]
churn_page_count_pd = (churn_page_count_pd.set_index(['page', 'user_churned']) / total_pages).reset_index()
churn_page_count_pd['user_churned'] = churn_page_count_pd['user_churned'].replace({0:'No', 1: 'Yes'})

In [None]:
plt.figure(figsize=(6,6))
sns.barplot(y = 'page', x = 'count', data = churn_page_count_pd, hue = 'user_churned')
plt.title('Percentage of events by Subscription status')
plt.ylabel('Events')
plt.xlabel('Percentage of Events')

It seems that users that did not churn seem to engange more positive against sparkify

- they give more thumbs up
- they add more friends
- they add more songs to playlists

User that churn in contrast

- give more thumbs down
- they are more likely to roll advert

#### songs listened per user

In [None]:
songs_per_user = users_not_empty.filter(users_not_empty['song'].isNotNull()).select('userId', 'song').groupby('userId').count()
songs_per_user = songs_per_user.join(user_churned, on=['userId'], how='left')
songs_per_status = songs_per_user.select('count', 'user_churned').groupby('user_churned').agg(avg('count').alias('songs_per_status'))
songs_per_status = songs_per_status.toPandas().set_index('user_churned').sort_index()
songs_per_status.plot.bar()
churn_labels = ['Active', 'Churned']
x_pos = np.arange(len(churn_labels))
plt.xticks(x_pos,churn_labels)
plt.title('Songs listened per user by Subscription status')
plt.ylabel('Songs per user')
plt.xlabel('Subscription status')
plt.show()

Active users have more songs in average

#### time spend per user

In [None]:
songs_per_user = users_not_empty.filter(users_not_empty['length'].isNotNull()).select('userId', 'length').groupby('userId').agg(sum('length').alias('length_per_user'))
songs_per_user = songs_per_user.join(user_churned, on=['userId'], how='left')
songs_per_status = songs_per_user.select('length_per_user', 'user_churned').groupby('user_churned').agg(avg('length_per_user').alias('length_per_status'))
songs_per_status = songs_per_status.toPandas().set_index('user_churned').sort_index()
songs_per_status.plot.bar()
churn_labels = ['Active', 'Churned']
x_pos = np.arange(len(churn_labels))
plt.xticks(x_pos,churn_labels)
plt.title('Length per user by Subscription status')
plt.ylabel('Length per user')
plt.xlabel('Subscription status')
plt.show()

Active users have spend more time listening to songs on average

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

#### Engineer Features

In [None]:
# Number of thumbs up
feature_1 = users_not_empty.select('userID','page').where(users_not_empty.page == 'Thumbs Up').groupBy('userID').agg(count('page').alias('num_thumb_up'))

# Number of thumbs down
feature_2 = users_not_empty.select('userID','page').where(users_not_empty.page == 'Thumbs Down').groupBy('userID').agg(count('page').alias('num_thumb_down'))

# Number of adds to playlist
feature_3 = users_not_empty.select('userID','page').where(users_not_empty.page == 'Add to Playlist').groupBy('userID').agg(count('page').alias('num_add_paylist'))

# Number of adds to friends
feature_4 = users_not_empty.select('userID','page').where(users_not_empty.page == 'Add Friend').groupBy('userID').agg(count('page').alias('num_add_friends'))

# Number of Roll Adverts
feature_5 = users_not_empty.select('userID','page').where(users_not_empty.page == 'Roll Advert').groupBy('userID').agg(count('page').alias('num_roll_adverts'))

# Number of Songs played
feature_6 = users_not_empty.filter(users_not_empty['song'].isNotNull()).select('userId', 'song').groupBy('userID').agg(count('song').alias('songs_played'))

# Time Spend
feature_7 = users_not_empty.filter(users_not_empty['length'].isNotNull()).select('userId', 'length').groupBy('userID').agg(sum('length').alias('time_listened'))

# Artists listened
feature_8 = users_not_empty.filter(users_not_empty.page == 'NextSong').select('userId', 'artist').dropDuplicates().groupBy('userID').agg(count('artist').alias('artists_listened'))

#### Engineer target label

In [None]:
target = users_not_empty.select('userId', col('user_churned').alias('label')).dropDuplicates()

#### Join everything together

In [None]:
df_final  = feature_1.join(feature_2, on=['userID'], how='outer') \
    .join(feature_3, on=['userID'], how='outer') \
    .join(feature_4, on=['userID'], how='outer') \
    .join(feature_5, on=['userID'], how='outer') \
    .join(feature_6, on=['userID'], how='outer') \
    .join(feature_7, on=['userID'], how='outer') \
    .join(feature_8, on=['userID'], how='outer') \
    .join(target, on=['userID'], how='outer') \
    .drop('userID')

In [None]:
df_final = df_final.fillna(0)

In [None]:
df_final.head()

#### vector assembler, normalizer and scaler

In [None]:
feature_cols = ['num_thumb_up', 'num_thumb_down', 'num_add_paylist', 'num_add_friends', 'num_roll_adverts', 'songs_played',
               'time_listened', 'artists_listened']

In [None]:
assembler = VectorAssembler(inputCols=feature_cols, outputCol="NumFeatures")
df_final = assembler.transform(df_final)

In [None]:
df_final.head()

In [None]:
scaler = StandardScaler(inputCol="NumFeatures", outputCol="features", withStd=True)
scalerModel = scaler.fit(df_final)
df_final = scalerModel.transform(df_final)

In [None]:
df_final.head()

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [None]:
train, test = df_final.select('label', 'features').randomSplit([0.8, 0.2], seed=1)

In [None]:
'''
since seen above the ratio between active/churned customers is ~75%/25% which I would consider imbalanced,
I choose f1 score as optimization metric
'''

eval_f1 = MulticlassClassificationEvaluator(metricName='f1')

#### logistic regression

In [None]:
lr = LogisticRegression(maxIter=10)

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam,[0.0, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5]) \
    .build()
cv = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=eval_f1,
                          numFolds=3)

In [None]:
cvModel_lr = cv.fit(train)

In [None]:
cvModel_lr.avgMetrics

#### gradient boost tree

In [None]:
gbt = GBTClassifier(maxIter = 10, seed=1)

In [None]:
paramGrid_gb = ParamGridBuilder() \
    .addGrid(gbt.maxDepth,[5, 10]) \
    .build()
cv_gb = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid_gb,
                          evaluator=eval_f1,
                          numFolds=3)

In [None]:
cvModel_gb = cv_gb.fit(train)

In [None]:
cvModel_gb.avgMetrics

#### evaluate

In [None]:
predictions_lr = cvModel_lr.transform(test)
predictions_gb = cvModel_gb.transform(test)

In [None]:
# from: https://stackoverflow.com/questions/58404845/confusion-matrix-to-get-precsion-recall-f1score

#important: need to cast to float type, and order by prediction, else it won't work
preds_and_labels_lr = predictions_lr.select(['prediction','label'])

#select only prediction and label columns
preds_and_labels_lr = preds_and_labels_lr.withColumn('label', col('label').cast(FloatType())).orderBy('prediction')

metrics_lr = MulticlassMetrics(preds_and_labels_lr.rdd.map(tuple))

print(metrics_lr.confusionMatrix().toArray())

In [None]:
metrics_lr.accuracy

In [None]:
# from: https://stackoverflow.com/questions/58404845/confusion-matrix-to-get-precsion-recall-f1score

#important: need to cast to float type, and order by prediction, else it won't work
preds_and_labels_gb = predictions_gb.select(['prediction','label'])

#select only prediction and label columns
preds_and_labels_gb = preds_and_labels_gb.withColumn('label', col('label').cast(FloatType())).orderBy('prediction')

metrics_gb = MulticlassMetrics(preds_and_labels_gb.rdd.map(tuple))

print(metrics_gb.confusionMatrix().toArray())

In [None]:
metrics_gb.accuracy

Results:
- The accuracy of both models is identical
- logistic regression has the better f1 score
- looking at the evaluation matrics, we can see that logistic regression is only predicting active customers which is quite bad
- it is especially bad because false positive hurt as more than false negatives
- therefore I will choose the GBT Model and further analyze it
- more information and additional explanation regarding this can be found in my medium post which is linked in the github readme


#### Looking at feature importance of GBT Model

In [None]:
feature_imp = cvModel_gb.bestModel.featureImportances.values
feature_imp

In [None]:
y_pos = np.arange(len(feature_imp))

plt.barh(y_pos, feature_imp, align='center')
plt.yticks(y_pos, feature_cols)
plt.xlabel('Feature Importance')
plt.title('GBT Feature Importances')

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.