In [1]:
# import relevant libraries
!pip install pyspark
import re
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import datetime
from time import time
from functools import reduce

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import avg, col, concat, desc, explode, lit, min, max, split, udf
from pyspark.sql.types import IntegerType, DateType
from pyspark.sql.window import Window

from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import MinMaxScaler
from pyspark.mllib.evaluation import MulticlassMetrics

%matplotlib inline





In [2]:
# create a Spark session
spark = SparkSession.builder.appName('SparkifyProj').getOrCreate()

In [3]:
# load gthe dataset
data = spark.read.json("D:\OneDrive - NITT\Custom_Download\mini_sparkify_event_data.json")

In [4]:
data.columns # list the column name

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId']

In [5]:
data.show(5) # display top 5 rows

+----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|          artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|  Martha Tilston|Logged In|    Colin|     M|           50| Freeman|277.89016| paid|     Bakersfield, CA|   PUT|NextSong|1538173362000|       29|           Rockpools|   200|1538352117000|Mozilla/5.0 (Wind...|    30|
|Five Iron Frenzy|Logged In|    Micah|     M|           79|    Long|236.09424| free|Boston-Cambridge-...|   PUT|NextSong|1538331630000| 

In [6]:
data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [7]:
data.count() # rows


286500

In [8]:
len(data.columns) # no of columns

18

In [9]:
# column wise NaN Values
print(f"{'Column Name':<20} {'Number of NaN':<20}")
print('--------------------------------------------')
for col in data.columns:
    n = data[(data[col].isNull()) | (data[col]=="")].count()
    
    print(f"{col:<20} {n:<20}")

Column Name          Number of NaN       
--------------------------------------------
artist               58392               
auth                 0                   
firstName            8346                
gender               8346                
itemInSession        0                   
lastName             8346                
length               58392               
level                0                   
location             8346                
method               0                   
page                 0                   
registration         8346                
sessionId            0                   
song                 58392               
status               0                   
ts                   0                   
userAgent            8346                
userId               8346                


In [10]:
# drop rows where user id is NaN
df_filtered = data.dropna(subset = ['userId'])

# filter the data to consider rows where userid is not empty
df_filtered = df_filtered.filter(df_filtered['userId'] != '')

In [11]:
# transform ts and registation time format
get_date_fn = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime("%Y-%m-%d %H:%M:%S"))
df_filtered = df_filtered.withColumn('event_time', get_date_fn('ts'))
df_filtered = df_filtered.withColumn('registration_time', get_date_fn('registration'))

In [12]:
# get day number
get_day_fn = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).day)
df_filtered = df_filtered.withColumn('day_of_month', get_day_fn('ts'))

In [13]:
# extract Operating System
dict_map = {'Compatible': 'Windows', 'Ipad': 'iPad', 'Iphone': 'iPhone', 'Macintosh': 'Mac',  
       'Windows nt 5.1': 'Windows', 'Windows nt 6.0': 'Windows', 'Windows nt 6.1': 'Windows', 
       'Windows nt 6.2': 'Windows',  'Windows nt 6.3': 'Windows', 'X11': 'Linux'}

get_operating_sys = udf(lambda x: dict_map[re.findall('\(([^\)]*)\)', x)[0].split(';')[0].capitalize()])
df_filtered = df_filtered.withColumn('operating_system', get_operating_sys(df_filtered.userAgent))

In [14]:
# extract state from location
get_location_fn = udf(lambda x:x[-2:len(x)])
df_filtered = df_filtered.withColumn('location_state', get_location_fn(df_filtered.location))
df_filtered.select('location_state').distinct().show(5)

Py4JJavaError: An error occurred while calling o221.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 59.0 failed 1 times, most recent failure: Lost task 0.0 in stage 59.0 (TID 180) (DESKTOP-JID9S04 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$Lambda$3906/873611897.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.RDD$$Lambda$2494/1887070209.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2305/16270310.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$4125/834079987.apply(Unknown Source)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$4123/718461260.apply(Unknown Source)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$Lambda$3906/873611897.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.RDD$$Lambda$2494/1887070209.apply(Unknown Source)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2305/16270310.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
	... 29 more


In [None]:
# create feature "downgrade"
downgrade_event_fn = udf(lambda x: 0 if x != 'Submit Downgrade' else 1, IntegerType())
df_filtered = df_filtered.withColumn('downgrade_event', downgrade_event_fn('page'))
df_filtered = df_filtered.withColumn('downgrade', max('downgrade_event').over(Window.partitionBy('userId')))

In [None]:
# create feature "churn"
churn_event = udf(lambda x: 0 if x != 'Cancellation Confirmation' else 1, IntegerType())
df_filtered = df_filtered.withColumn('churn_event', churn_event('page'))
df_filtered = df_filtered.withColumn('churn', max('churn_event').over(Window.partitionBy('userId')))

# EDA

In [None]:
df_pd = df_filtered.toPandas()

In [None]:
sns.countplot(df_pd['churn'])
plt.show()

In [None]:
df_pd.head()

In [None]:
def statPlot(colname):
    
    ############# crarte df ################
    df1 = df_pd[[colname,'churn']]

    df1  = df1.groupby([colname, 
                              'churn']).size().reset_index().pivot(columns='churn', 
                                                                        index=colname, 
                                                                    values=0)
#     return df1
    df1['percentage_of_churn']= df1[1]*100/(df1[0]+ df1[1])
    # df_job['percentage_of_unsubscribers']= df_job['no']/(df_job['no']+ df_job['yes'])
    df1.dropna(inplace=True)
    df1= df1.sort_values('percentage_of_churn').reset_index()
    df1[colname] = df1[colname].astype('str')
    
    ##### PLOT ####################
    plt.figure(figsize=(15,5))
    ax = sns.barplot(df1[colname],df1['percentage_of_churn'],
                     palette="vlag")
    for bars in ax.containers:
        ax.bar_label(bars, fmt="%.2f%%")
    plt.show()

In [None]:
statPlot('gender')

In [None]:
statPlot('level')

In [None]:
statPlot('operating_system')

In [None]:
statPlot('location_state')

In [None]:
model_features = [] # store model features

In [None]:
# gender of the user
temp = df_filtered.select(['userId', 'gender']).dropDuplicates(['userId'])
temp = temp.replace(['F', 'M'], ['0', '1'], 'gender')
df_gender = temp.withColumn('gender', temp.gender.cast('int'))
model_features.append(df_gender)
df_gender.show(5)

In [None]:
# level of the user
temp = df_filtered.select(['userId', 'level']).dropDuplicates(['userId'])
temp = temp.replace(['paid', 'free'], ['0', '1'], 'level')
df_payment = temp.withColumn('level', temp.level.cast('int'))
model_features.append(df_payment)
df_payment.show(5)

In [None]:
# did the user downgrade
temp = df_filtered.select(['userId','downgrade']).dropDuplicates(['userId']) 
df_downgrade = temp.withColumn('downgrade', temp.downgrade.cast('int'))
model_features.append(df_downgrade)
df_downgrade.show(5)

In [None]:
# did the user churn
temp = df_filtered.select(['userId','churn']).dropDuplicates(['userId'])
df_churn = temp.withColumn('churn', temp.churn.cast('int'))
df_churn.show(5) 

In [None]:
# number of songs the user listened to in total
num_songs = df_filtered.select('userID','song').groupBy('userID').count()
model_features.append(num_songs)

In [None]:
# number of Thumbs-Up/Down
num_thumbs_up = df_filtered.select('userID','page').where(df_filtered.page == 'Thumbs Up').groupBy('userID').count().withColumnRenamed('count', 'num_thumbs_up') 
print(num_thumbs_up.show(5))
model_features.append(num_thumbs_up)
num_thumbs_down = df_filtered.select('userID','page').where(df_filtered.page == 'Thumbs Down').groupBy('userID').count().withColumnRenamed('count', 'num_thumbs_down') 
model_features.append(num_thumbs_down)
print(num_thumbs_down.show(5)) 

In [None]:
# number of songs added to playlist
num_playlist = df_filtered.select('userID','page').where(df_filtered.page == 'Add to Playlist').groupBy('userID').count().withColumnRenamed('count', 'num_playlist')
model_features.append(num_playlist)
num_playlist.show(5) 

In [None]:
# number of friends added
num_friends = df_filtered.select('userID','page').where(df_filtered.page == 'Add Friend').groupBy('userID').count().withColumnRenamed('count', 'num_friend')
model_features.append(num_friends)
num_friends.show(5)

In [None]:
# total length of listening
sum_listened = df_filtered.select('userID','length').groupBy('userID').sum().withColumnRenamed('sum(length)', 'sum_listened')
model_features.append(sum_listened)
sum_listened.show(5) 

In [None]:
# Number of songs listened per session
av_song_session = df_filtered.where('page == "NextSong"').groupby(['userId', 'sessionId']).count().groupby(['userId']).agg({'count':'avg'}).withColumnRenamed('avg(count)', 'av_song_session')
model_features.append(av_song_session)
av_song_session.show(5)

In [None]:
# number of artists listened to
num_artists = df_filtered.filter(df_filtered.page=="NextSong").select(['userId', 'artist']).dropDuplicates().groupby('userId').count().withColumnRenamed('count', 'num_artists') 
model_features.append(num_artists)
num_artists.show(5) 

In [None]:
# time since registration in days
days_member = df_filtered.select('userId','ts','registration').withColumn(
    'days_member',((df_filtered.ts - df_filtered.registration)/1000/3600/24)).groupBy('userId').agg(
    {'days_member':'max'}).withColumnRenamed('max(days_member)','days_member') 

model_features.append(days_member)
days_member.show(5) 

In [None]:
# session count per user
num_session = df_filtered.select('userId', 'sessionId').dropDuplicates().groupby('userId').count().withColumnRenamed('count', 'num_sessions') 
model_features.append(num_session)
num_session.show(5) 

In [None]:
# duration of the session
sessionStart = df_filtered.groupBy('userId', 'sessionId').min('ts').withColumnRenamed('min(ts)', 'start')
sessionEnd = df_filtered.groupBy('userId', 'sessionId').max('ts').withColumnRenamed('max(ts)', 'end')
durSession = sessionStart.join(sessionEnd, ['userId', 'sessionId'])
durSession = durSession.select('userId', 'sessionId', ((durSession.end-durSession.start)/(1000*60*60)).alias('dur_session'))
model_features.append(durSession)
durSession.show(5)

In [None]:
for i, feature_to_join in enumerate(model_features):
    df_churn = df_churn.join(feature_to_join,'userID','outer')

In [None]:
df_churn = df_churn.drop('userID') 
df_final = df_churn.na.fill(0)

In [None]:
print((df_final.count(), len(df_final.columns)))

In [None]:
# rename
df_final = df_final.withColumnRenamed("churn","label")

In [None]:
# vector assembler
assembler = VectorAssembler(inputCols=df_final.columns[1:], outputCol="features")
data = assembler.transform(df_final)
data

In [None]:
# standard scaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True)
scalerModel = scaler.fit(data)
data = scalerModel.transform(data)

In [None]:
# drop column "features" - as no longer needed
data = data.drop("features")

In [None]:
# split the data into train and test 
train, test = data.randomSplit([0.8, 0.2], seed=42)

In [None]:
from sklearn.metrics import classification_report,accuracy_score
def get_report(model):
    p_df = model.toPandas()
    y_true = p_df['label']
    y_pred = p_df['prediction']
    print(classification_report(y_true,y_pred))
    acc = accuracy_score(y_true,y_pred)
    return acc

# LogisticRegression

In [None]:
lr =  LogisticRegression(featuresCol='scaled_features', labelCol='label')
lr_clf = lr.fit(train)
lr_acc = get_report(lr_clf.transform(test))

# DecisionTreeClassifier

In [None]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="scaled_features")
dt_clf = dt.fit(train)
dt_acc = get_report(dt_clf.transform(test))

# RandomForestClassifier

In [None]:
rf = RandomForestClassifier(labelCol="label", featuresCol="scaled_features")
rf_clf = rf.fit(train)
rf_acc = get_report(rf_clf.transform(test))

# GBTClassifier

In [None]:
gbt = GBTClassifier(labelCol="label", featuresCol="scaled_features")
gbt_model = gbt.fit(train)
results_gbt = gbt_model.transform(test)

gbt_acc = get_report(results_gbt)

# Comparison

In [None]:
results = pd.DataFrame({
    'Model': ['Logistic Regression','Decision Tree','Random Forest', 'Gradient Boost Tree'],
    'Score': [lr_acc,
                dt_acc,
                rf_acc,
                gbt_acc]})

result_df = results.sort_values(by='Score', ascending=False)
result_df = result_df.set_index('Model')
result_df