
**Step 1:** We install PySpark which is the Python API for Apache Spark 

In [None]:
!pip install pyspark    # Installing pyspark Using pip

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


**Step 2:**
Since PySpark uses Java, we need to have Java while using PySpark. We use Open JDK(or Open Java Development Kit) which is a open source and free implementation of Java Platform.

Extra info for understanding: Java Development Kit (in short JDK) is Kit which provides the environment to Develop and execute(run ) the Java program. JDK contains Java Run Environment (JRE) and Java Virtual Machine(JVM)

Here, '8' refers to the version and headless are software capable of working on a device without a graphical user interface. Such software receives inputs and provides output through other interfaces like network or serial port and is common on servers and embedded devices.

In [None]:
!apt install openjdk-8-jdk-headless -qq    

**Step 3:**  We need to set environment variables for the JDK

**JAVA_HOME** is the name of an environment variable on the operating system that points to the installation directory of JDK (Java Development Kit) or JRE (Java Runtime Environment)

Spark is written in the Scala programming language and requires the Java Virtual Machine to run. Therefore,we have to download Java inorder to proceed further

In [None]:
import os
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64" 

**Step 4:** **PyDrive** is installed, as it simplifies many common Google Drive API tasks. It has the following features:

1. Simplifies OAuth2.0 into just few lines with flexible settings.

2. Wraps Google Drive API into classes of each resource to make your program more object-oriented.

3. Helps common operations else than API calls, such as content fetching and pagination control.

In [None]:
!pip install -U -q PyDrive

**Google-auth** is the Google authentication library for Python. This library provides the ability to authenticate to Google APIs using various methods.

**PyDrive** is a wrapper library of google-api-python-client that simplifies many common Google Drive API tasks

**Credentials** are used to obtain an access token from Google's authorization servers 

In [None]:

from pydrive.auth import GoogleAuth 
from pydrive.drive import GoogleDrive 
from google.colab import auth
from oauth2client.client import GoogleCredentials

auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

**Step 5:**Downloading the dataset

In [None]:
fileDownloaded = drive.CreateFile({'id':'1BAPG9YpWbubt3lbvXBjgbufTM3Bvz9GL'}) #downloading the file 
fileDownloaded.GetContentFile('mini_spark_event_data.json')

**Step 6:** Importing necessary libraries

In [None]:
import numpy as np #used for working with arrays

import pandas as pd # provides high-performance data manipulation in Python

from pyspark.sql import SparkSession  #SparkSession can be used create DataFrame , register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files.

from pyspark.sql.functions import udf #UDF is a User Defined Function that is used to create a reusable function in Spark

from pyspark.sql import Window #PySpark Window function performs statistical operations such as rank, row number, etc. on a group, frame, or collection of rows and returns results for each row individually.

from pyspark.sql import functions as F #importing module function inorder to define some functions

from pyspark.sql.functions import sum as Fsum #In order to get the cumulative sum of column in pyspark we will be using sum function

from pyspark.sql.types import ArrayType, BooleanType, LongType, FloatType, IntegerType #importing the required data types in order to perform certain operations

from pyspark.sql.functions import lit, udf, struct, countDistinct, collect_list, avg, count, col
# PySpark lit() function is used to add constant or literal value as a new column to the DataFrame
# PySpark UDF is a User Defined Function that is used to create a reusable function in Spark
# PySpark StructType & StructField classes are used to programmatically specify the schema to the DataFrame and create complex columns like nested struct, array, and map columns.
# PySpark count distinct is a function used in PySpark that are basically used to count the distinct number of element in a PySpark Data frame
# Collectlist is used to collect and return a list of non-unique elements
# avg() in PySpark is used to return the average value from a particular column in the DataFrame
# count() in PySpark is used to return the number of rows from a particular column in the DataFrame
# col() in pyspark returns a Column based on the given column name.

from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler
# VectorAssembler is a transformer that combines a given list of columns into a single vector column.
# Normalizer is a Transformer which transforms a dataset of Vector rows, normalizing each Vector to have unit norm
# The StandardScaler standardizes a set of features to have zero mean and a standard deviation of 1

from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
# PySpark MLlib API provides a Classifier class to classify data using required method

from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator #used to evaluate the classifiers

from pyspark.ml import Pipeline #A Pipeline consists of a sequence of stages, each of which is either an Estimator or a Transformer

import matplotlib.pyplot as plt # collection of functions that make matplotlib work like MATLAB

from sklearn.metrics import roc_curve #The roc_curve function calculates all FPR and TPR coordinates

from sklearn.metrics import precision_recall_curve #Precision-Recall curves summarize the trade-off between the true positive rate and the positive predictive value for a predictive model using different probability thresholds

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# CrossValidator begins by splitting the dataset into a set of folds which are used as separate training and test datasets
# ParamGridBuilder builds and returns all combinations of parameters specified by the param grid

**Step 7:** Creating Spark Session

In [None]:
#creating  SPARK SESSION
spark = SparkSession \
.builder \
.appname("Anisha_J_2211630_assignment4")\
.getOrCreate() #getOrCreate is used to get the value of a parameter in the user-supplied parameter map or its default value.

**Step 8**: Loading the dataset

In [None]:
#Loading the Data Set
df = "mini_spark_event_data.json"
df = spark.read.json(df)

**Step 9:** Cleaning the Dataset

Dropping some irrelevant columns 

Droppping some potential NA values

Filtering out the invalid Ids

In [None]:
# Cleaning the Dataset
df = df.drop(*['artist','song','firstName', 'lastName', 'id_copy'])             
df = df.dropna(how = 'any', subset = ['userId', 'sessionId'])                   
df = df.filter(df.userId!='').orderBy(["userId", "ts"], ascending=[True, True]) 
df = df.withColumn("userId", df["userId"].cast(IntegerType()))

**Step 10:** Defining Churn

“Churn” label is generated from the dataset by identifying users who confirm their subscription cancellation. Once churned users are identified, we can view how it behaves with other features in the dataset. We will be exploring the data to see trends and features that may influence the churn rate.

In [None]:
# Defining the Churn
# Create a churn label column for the dataset. It returns 1 if Cancellation Confirmation events happens
cancelation = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, 
                  IntegerType())  
# Applying to the dataframe
df = df.withColumn("churn", cancelation("page"))
# Defining window bounds
window = Window.partitionBy("userId").rangeBetween(Window.unboundedPreceding, 
                                                   Window.unboundedFollowing)
# Applying the window
df = df.withColumn("churn", Fsum("churn").over(window))

**Step 11:** Distribution of Users by Churn Type

Some new columns were made in this step to make the data exploration easier,the Feature engineering step comes later 

Making the level_shift Column

This column tells us how many times did the customer switched from paid to free service

In [None]:
# Distribution of users by Churn Type
window1 = Window.partitionBy().orderBy(["userId", "ts"])
df = df.withColumn("level_shift", (df.level!=F.lag(df.level).over(window1)) | 
                   (df.userId!=F.lag(df.userId).over(window1)))
df=df.fillna({'level_shift':0})
df= df.withColumn("level_shift", F.when(df["level_shift"]==False, 0).otherwise(1))

Making the last_ts column

This column will help us to select only records that happened in thelast 2 weeks of customer activity

The idea is that customer behavior should be different shortly before the churn happened

In [None]:
df= df.withColumn("ts", df.ts/1000)                        # trimming the last three zeros from the UNIX time (miliseconds)
df= df.withColumn("registration", df.registration/1000)

In [None]:
window2 = Window.partitionBy("userId")
df= df.withColumn("last_ts", F.max('ts').over(window2))
df=df.filter(df.last_ts - df.ts < 1300000) # approx no of sec in a 2 weeks

Making the Columns: pages_per_session, diff_time
pages_per_session is number of pages per session
diff_time is a number of days since a specific page was visited

In [None]:
window3 = Window.partitionBy(["userId", "sessionId"])
df= df.withColumn("pages_per_session", F.max('ItemInSession').over(window3))
df = df.withColumn("ts_time",F.to_timestamp(df.ts))                    #  unix to datetime
df = df.withColumn("last_ts_time",F.to_timestamp(df.last_ts))          # unix to datetime

In [None]:
df = df.withColumn("diff_time",F.datediff(df.last_ts_time, df.ts_time))  # how many days ago was the page visited
df=df.orderBy(["userId", "ts"], ascending=[True, True])                   
df.createOrReplaceTempView('data');                                       # Create a Temp Table to be used for SQL queries

In [None]:
df.limit(2).toPandas()
# Here, I only want to display the first 2 rows.So,I limit it by 2 and then call toPandas() to return a pandas dataframe

The column "page" seems to be most informative in the whole dataset

It shows which pages of the service were visited by users, timestamp is also provided

This column can be used to engineer useful features

In [None]:
df.select('page','UserId').groupby('page').agg({'page':'count'}).select('page','count(page)').show()

**Step 12:** Printing the Schema

In [None]:
df.printSchema():
# used to print or display the schema of the DataFrame in the tree format along with column name and data type

In [None]:
# Investigation if there are differences between churned and non-churned users
# label             - 0 if non_churned, 1 if churned
# song_count        - avg number of songs played by churned/non_churned users
# error             - avg number of errors occuring 
# friends           - avg number of "friends" on thhe application
# playlist_count    - avg number of visits to the Playlist page
# thumbs_up         - avg number of clicking the 'thumbs up'
# thumbs_down       - avg number of clicking the 'thumbs down'
# downgrade         - avg number of visits to the downgrade page
# count_session_dist- avg number of sessions made
# count_diff_time   - avg number of days (in the last two weeks) in which the user used the app
# pages per session - avg numbers of pages (or any activity or changes) per session
# duration          - avg number of days since the user joined (division with 86400 as a proxy of seconds within a day)
# level_shift       - avg number of level changes (free, paid) per customer
# usage_time        - avg total time spent using the app
stats = spark.sql(" WITH prep as( \
SELECT userId, \
max(churn)                                                          as label, \
count(case when page = 'NextSong' then userId else null end)        as song_count, \
count(case when page = 'Error' then userId else null end)           as error, \
count(case when page = 'Add Friend' then userId else null end)      as friends, \
count(case when page = 'Add to Playlist' then userId else null end) as playlist_count, \
count(case when page = 'Thumbs Up' then userId else null end)       as thumbs_up, \
count(case when page = 'Thumbs Down' then userId else null end)     as thumbs_down, \
count(case when page = 'Downgrade' then userId else null end)       as downgrade, \
count(distinct sessionId)                                           as count_session_dist, \
count(distinct diff_time)                                           as count_diff_time, \
avg(distinct pages_per_session)                                     as pages_per_session, \
(max(ts) - min(registration))/86400                                 as duration, \
sum(level_shift)                                                    as level_shift, \
sum(length)                                                         as usage_time \
FROM data \
GROUP BY userId) \
SELECT label, \
count(label)             as cnt, \
avg(song_count)          as song_count, \
avg(error)               as error, \
avg(friends)             as friends, \
avg(playlist_count)      as playlist_count, \
avg(thumbs_up)           as thumbs_up, \
avg(thumbs_down)         as thumbs_down, \
avg(downgrade)           as downgrade, \
avg(count_session_dist)  as count_session_dist, \
avg(count_diff_time)     as count_diff_time, \
avg(pages_per_session)   as pages_per_session, \
avg(duration)            as duration, \
avg (level_shift)        as level_shift, \
avg(usage_time)          as usage_time \
FROM prep \
GROUP BY label")
# We can see that for most dimensions/features there are differences between churned and non-churned users


In [None]:
stats.toPandas()
#computes statistics for all numerical or string columns and then call toPandas() to return a pandas dataframe

**Step 13:** Feature Engineering

Feature Enigneering

Based on the previous analysis, all 14 investigated features will be included.

Here I am making a Temp Table which holds all the features.

The temp table will be used as model input

All data is aggregated per userId

In [None]:
features = spark.sql("SELECT userId, \
max(churn)                                                          as label, \
count(case when page = 'NextSong' then userId else null end)        as song_count, \
count(case when page = 'Error' then userId else null end)           as error, \
count(case when page = 'Add Friend' then userId else null end)      as friends, \
count(case when page = 'Add to Playlist' then userId else null end) as playlist_count, \
count(case when page = 'Thumbs Up' then userId else null end)       as thumbs_up, \
count(case when page = 'Thumbs Down' then userId else null end)     as thumbs_down, \
count(case when page = 'Downgrade' then userId else null end)       as downgrade, \
count(distinct sessionId)                                           as count_session_dist, \
count(distinct diff_time)                                           as count_diff_time, \
round(avg(distinct pages_per_session),0)                            as pages_per_session, \
round((max(ts) - min(registration))/86400,0)                        as duration, \
round(sum(level_shift),0)                                           as level_shift, \
round(sum(length),0)                                                as usage_time \
FROM data \
GROUP BY userId");
features.createOrReplaceTempView('features');
features=features.na.drop()

**Step 14:** Splitting the dataset into test and train

In [None]:
#Train test Split
training, test = feature.randomSplit([0.8,0.2])

**Step 15:** Assembling and Normalizing the data

In [None]:
# Make VectorAssembler - this is a Pypark specific step
# All input features must be in one column before feeding into the model
assembler = VectorAssembler(inputCols=["userId","song_count","error","friends","playlist_count", \
                                       "thumbs_up","thumbs_down","downgrade", "count_session_dist",\
                                       "count_diff_time","pages_per_session", "duration","level_shift",\
                                       "usage_time"], \
                            outputCol="inputFeatures")

In [None]:
#Normalizing the  Data
scaler = Normalizer(inputCol = "inputFeatures", outputCol = "features")

**Step 16:** Building  ML models

Fitting logistic regression,Gradient boosting and Random Forest classifier with default parameter

In [None]:
lr = LogisticRegression
gbt = GBTClassifier()
rf = RandomForestClassifier()

**Step 17:** Building Pipelines

In [None]:
#Building pipelines
pipeline1 = Pipeline(stages = [assembler,scaler, lr])
pipeline2 = Pipeline(stages = [assembler,scaler, gbt])
pipeline3 = Pipeline(stages = [assembler,scaler, rf])

In [None]:
# metric chosen is f1 (we want to catch true positives (churn customers), but
# we do not want to waste money on false positives (investing in retaining 
# non-churn customers, which are loyal anyway)
# Note that Recall might also be justified to use here (if the cost offalsepositives is low)

paramgrid =ParamGridBuilder()\
.addGrid(lr.regParam, [0.0, 0,1])\
.addGrid(lr.maxIter, [10])\
.build()

**Step 18:** Evaluating and Cross Validating

In [None]:
evaluator = MulticlassClassificationEvaluator(metricName = "f1")

In [None]:
crossval= CrossValidator(estimator=pipeline1,  
                         estimatorParamMaps=paramgrid,
                         evaluator = evaluator , 
                         numFolds=3
                        )
# K-fold cross validation performs model selection by splitting the dataset into a set of non-overlapping randomly partitioned folds which are used as separate training and test datasets.
# e.g., with k=3 folds, K-fold cross validation will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. Each fold is used as the test set exactly once.

In [None]:
cvModel1 = crosscval.fit(training) #fitting the model to training data

In [None]:
evaluator.evaluate(cvModel1.transform(test)) #evaluating the model

**Step 19 :Gradient boosting Classifier**

In [None]:
# Specify multiple parameters in the paramgrid, in case you have
# enough processing power 
paramgrid1 =ParamGridBuilder()\
.build()

evaluator = MulticlassClassificationEvaluator(metricName ='f1')

In [None]:
crossval= CrossValidator(estimator=pipeline2,  
                         estimatorParamMaps=paramgrid1,
                         evaluator=evaluator, 
                         numFolds=3
                        )

In [None]:
cvModel2 = crossval.fit(training) #fitting the model to training data

In [None]:
evaluator.evaluate(cvModel2.transform(test)) #evaluating the model

**Step 20 :Random Forest Classifier**

In [None]:
#Specify multiple parameters in the paramgrid, in case you have enough processign power
paramgrid2 = ParamGridBuilder().build()

In [None]:
evaluator = MulticlassClassificationEvaluator(metricName ='f1')


In [None]:
crossval= CrossValidator(estimator=pipeline3,  
                         estimatorParamMaps=paramgrid2,
                         evaluator=evaluator, 
                         numFolds=3
                        )

In [None]:
cvModel3 = crossval.fit(training) #fitting the model to training data

In [None]:
evaluator.evaluate(cvModel3.transform(test)) #Evaluates the output with optional parameters.

In [None]:
predictions = cvmodel2 #making predictions

In [None]:
predictions = cvModel2.transform(test)
predictiona.limit(2).toPandas()
# Here, I only want to display the first 2 rows.So,I limit it by 2 and then call toPandas() to return a pandas dataframe

### **Step 21: ROC curve**

A ROC curve is constructed by plotting the true positive rate (TPR) against the false positive rate (FPR).

In [None]:
#defining a ROC curve
def roc(ax, predictions, labels, title='ROC curve'):
    pp = predictions.toPandas()['probability'].apply(lambda x:x[1]).values
    tpr, fpr, _ = roc_curve(labels, pp)
    ax.plot(tpr, fpr) #plotting true positive rate and false positive rate
    ax.set_facecolor('xkcd:wheat')
    ax.set_xlabel('False Positive Rate') #setting up X and Y labels
    ax.set_ylabel('True Positive Rate')
    ax.set_title(title) #setting the title
    
#plt.clf() # to be used for AWS EMR 
labels=predictions.toPandas()['label']
fig = plt.figure()
ax = fig.add_subplot(111) #adding subplots 
roc(ax, predictions,labels)