# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [96]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, BooleanType, StringType

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

In [97]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Sparkify") \
    .getOrCreate()

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [98]:
dataset_path = "data/mini_sparkify_event_data.json"

In [99]:
df = spark.read.json(dataset_path)
df.persist()

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [100]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Submit Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

Let's define the churn function that will find set 1 when row means churn and 0 otherwise. To find a churn we'll use the `page` when `Submit Downgrade` and `Cancellation Confirmation` event happened.

In [101]:
def isChurn(page):
    if page == "Submit Downgrade" or page == "Cancellation Confirmation":
        return True
    return False

In [102]:
def churn(value):
    if value == "Submit Downgrade" or value == "Cancellation Confirmation":
        return 1
    return 0

Now let's apply this function our dataframe.

In [103]:
setChurn = udf(churn, IntegerType())
df = df.withColumn("churn", setChurn(df.page))

So now let's take churned users id's and let's put them to separate collection. First let's collect id of users that left our service by reasons.

In [104]:
churned_ids = df.filter(df.churn == 1).select("userId").dropDuplicates().collect()
churned_ids = [i.userId for i in churned_ids]

Now let's define two functions that will find users that are churned and not churned.

In [105]:
isInChurn = udf(lambda x: x in churned_ids, BooleanType())
notInChurn = udf(lambda x: x not in churned_ids, BooleanType())

Now let's combine firstName and lastName column into one column for further processing.

In [106]:
def concatName(firstName, lastName):
    s = ''.join([firstName, lastName])
    print(s)
    return s

In [107]:
combineFirstAndLastName = udf(concatName, StringType())

In [108]:
df = df.withColumn("name", combineFirstAndLastName(df.firstName, df.lastName))
df = df.drop('firstName', 'lastName')

### Do we have null values across the dataset?

Now as we have simple churn defined, let's take a look more on how the dataset looks llike. First let's see which column has a null value and how much of these values are there

In [109]:
for column in df.columns:
    print("Column {} contains {} null values".format(column, df.where(col(column).isNull()).count()))

Column artist contains 58392 null values
Column auth contains 0 null values
Column gender contains 8346 null values
Column itemInSession contains 0 null values
Column length contains 58392 null values
Column level contains 0 null values
Column location contains 8346 null values
Column method contains 0 null values
Column page contains 0 null values
Column registration contains 8346 null values
Column sessionId contains 0 null values
Column song contains 58392 null values
Column status contains 0 null values
Column ts contains 0 null values
Column userAgent contains 8346 null values
Column userId contains 0 null values
Column churn contains 0 null values


Py4JJavaError: An error occurred while calling o3923.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 877.0 failed 1 times, most recent failure: Lost task 0.0 in stage 877.0 (TID 44589, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\Programy\anaconda\envs\sparkify\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "D:\Programy\anaconda\envs\sparkify\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "D:\Programy\anaconda\envs\sparkify\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 352, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "D:\Programy\anaconda\envs\sparkify\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 142, in dump_stream
    for obj in iterator:
  File "D:\Programy\anaconda\envs\sparkify\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 341, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "D:\Programy\anaconda\envs\sparkify\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 85, in <lambda>
  File "D:\Programy\anaconda\envs\sparkify\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-106-155c162a332e>", line 2, in concatName
TypeError: sequence item 0: expected str instance, NoneType found

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2836)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2835)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
	at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\Programy\anaconda\envs\sparkify\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "D:\Programy\anaconda\envs\sparkify\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "D:\Programy\anaconda\envs\sparkify\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 352, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "D:\Programy\anaconda\envs\sparkify\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 142, in dump_stream
    for obj in iterator:
  File "D:\Programy\anaconda\envs\sparkify\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 341, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "D:\Programy\anaconda\envs\sparkify\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 85, in <lambda>
  File "D:\Programy\anaconda\envs\sparkify\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-106-155c162a332e>", line 2, in concatName
TypeError: sequence item 0: expected str instance, NoneType found

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


So it looks like some of the values are nulls. We can see that number of different values are the same across all columns, let's see what can be cause of this situation.

Let's check if the same number null values have correclation with columns.

In [None]:
df.select("page").where((col("artist").isNull()) | (col("length").isNull()) | (col("song").isNull())).dropDuplicates().collect()

So these pages are not connected with playing a song at all, that's then understandable why the 'artist', 'length' and 'song' columns contain null values - they are not existing is such situation.

Now let's see the columns for first and last name, gender and userAgent.

In [None]:
df.select("page").where((col("firstName").isNull()) | (col("gender").isNull()) | (col("lastName").isNull()) | (col("location").isNull()) | (col("registration").isNull()) | (col("userAgent").isNull())).dropDuplicates().collect()

So the reason why these columns contains null values is that, the events are connected to the operations done by the users out of the logged session.

### How many users churned from the service?

In [None]:
print("Number of churned users: {}".format(df.filter(isInChurn(df.userId)).select("userId").dropDuplicates().count()))
print("Number of staying users: {}".format(df.filter(notInChurn(df.userId)).select("userId").dropDuplicates().count()))

So it looks like for the sparkify mini event data less users had events `Submit Downgrade` or `Cancellation Confirmation` from users which didn't take that options, however there can be users that returned to the service after leaving it e.g. downgraded from premium and still were using it for free - this situation allow us only to get money from advertisements during the free sessions.

### How churn rate distribute over genders?

Let's see which users of which gender are more likely to churn. First let's get the dataframe of churn users.

In [None]:
users_gender=df.filter(isInChurn(df.userId)).select("userId", "gender", "churn").dropDuplicates(["userId"]).toPandas()
users_gender.head()

But what is it? The `churn` column contains 0? Indeed it's because we filter out the users with Id's that performed downgrade of the account and due to `dropDuplicates()` the information about churn for downgrade was missed. Let's fix it for the sake of answering the question.

In [None]:
users_gender["churn"] = 1
users_gender.head()

In [None]:
users_gender = pd.concat([users_gender,\
                                   df.filter(notInChurn(df.userId)).select("userId", "gender", "location", "churn").dropDuplicates(["userId"]).toPandas()], axis=0)

In [None]:
plt.clf()
ax = sns.countplot(x="gender", hue="churn", data=users_gender)
plt.show()

So we can see that there are slightly more male users that churn from the service. Now let's clear this pandas dataframe.

In [None]:
users_gender.drop(users_gender.index, inplace=True)

### How many users were still using the service after churn?

To answer for this question let's collect the users with churned by history of their events with ascending timestamp, to know how everything grew.

In [None]:
churned_users = [df.select("*").where(col("userId") == row).sort('ts').collect() for row in churned_ids]

In [None]:
staying_ids = df.filter(notInChurn(df.userId)).select("userId").dropDuplicates().collect()
staying_ids = [i.userId for i in staying_ids]

staying_users = [df.select("*").where(col("userId") == row).sort('ts').collect() for row in staying_ids]

Now let's see is some users still were using the service after `Downgrading`. After cancellation the user has removed account and has different id.

In [None]:
back_ids = []
for user in churned_users:
    churned_row = False
    r = None
    for row in user:
        if row.page == "Submit Downgrade":
            churned_row = True
            r = row
            continue
        if row.churn == 0 and churned_row == True and row.song != None:
            back_ids.append(row.userId)
            break
            
print("Number of users that downgraded churn: {}, so the downgrading users are {}% of churned users.".format(len(back_ids), (len(back_ids)/len(churned_users))*100))

### How long users were using the service before downgrade?

In [None]:
downgrade_number_of_days = []

for user in churned_users:
    first_user = True
    start = None
    for row in user:
        if first_user == True:
            start = datetime.fromtimestamp(row.ts/1000.0)
            first_user = False
            continue
        if row.page == "Submit Downgrade":
            stopTimestamp = row.ts
            downgrade_number_of_days.append((datetime.fromtimestamp(row.ts/1000.0) - start).days)

plt.clf()
plt.figure(figsize=(15,10))
# ax = sns.distplot(downgrade_number_of_days, kde=False, bins=30)
plt.hist(downgrade_number_of_days, bins = 40)
# plt.xticks(range(1,41,2))
# plt.xticks(range(0,51))
plt.show()

### Does some users re-created account after cancellation?

In [None]:
churned_names = df.filter(isInChurn(df.userId)).select("userId","name").dropDuplicates(["userId"])

### How much songs churned and not churned users listened to and how much time on listening they spent?

So now let's count songs for churned and non churned users. First let's get the users that churn.

In [None]:
churn_songs = []
churn_song_length = []
for churn_user in churned_users:
    song_counter = 0
    song_length = 0
    for row in churn_user:
        if isChurn(row.page):
            break
        if row.song != None:
            song_counter += 1
            song_length += row.length
    churn_songs.append(song_counter)
    churn_song_length.append(song_length)

In [None]:
plt.clf()
plt.figure(figsize=(15,10))
plt.hist(churn_songs)

Now let's take a look how does it look like for staying users.

In [None]:
stay_songs = []
stay_song_length = []
for stay_user in staying_users:
    song_counter = 0
    song_length = 0
    for row in stay_user:
        if row.song != None:
            song_counter += 1
            song_length += row.length
    stay_songs.append(song_counter)
    stay_song_length.append(song_length)

In [None]:
plt.clf()
plt.figure(figsize=(15,10))
plt.hist(stay_songs)

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.