# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [1]:
# import libraries
import os

from pyspark.sql import SparkSession;
from pyspark.sql.functions import isnan,count,when,col, concat, desc, explode, lit, min, max, split, udf, isnull;
from pyspark.sql.types import IntegerType;

from pyspark.ml.feature import RegexTokenizer, CountVectorizer,IDF, StringIndexer,VectorAssembler, Normalizer, StandardScaler;
from pyspark.ml import Pipeline;
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier;
from pyspark.ml.evaluation import MulticlassClassificationEvaluator;

from pyspark.sql import Window
from pyspark.sql.functions import sum as Fsum

import re;
import datetime;
import matplotlib.pyplot as plt;
import pandas as pd;
import seaborn as sns;
import numpy as np;
from itertools import cycle, islice;

ModuleNotFoundError: No module named 'pyspark'

In [None]:
# create a Spark session
spark = SparkSession\
                    .builder\
                    .master('local')\
                    .appName("Sparkify")\
                    .getOrCreate();

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [None]:
#load data "mini_sparkify_event_data.json"
df = spark.read.json('mini_sparkify_event_data.json');

#see top records in dataset
df.persist()

In [None]:
df.head()

In [None]:
print(df.take(5));

In [None]:
#lets look at shema of data :
df.printSchema();

In [None]:
print((df.count(), len(df.columns)))

In [None]:
print("Number of  Rows in the dataframe:{}".format(df.count()));
print("Number of  columns in the dataframe:{}".format(len(df.columns)));
print("columns Present in data set in the dataframe{}".format(df.columns));

In [None]:
#discribing all the columns in dataframe
df.describe().toPandas().transpose()

In [None]:
df.toPandas().isnull().sum()

In [None]:
df.toPandas().isnull().sum().sum()

In [None]:
# drop NAs there is no null values in columns in userId and sessionId
df = df.dropna(how = 'any', subset = ['userId', 'sessionId'])

In [None]:
# inspect 'userId' column
df.select('userId').dropDuplicates().sort('userId').show(10)

In [None]:

# drop empty strings
df = df.filter(df['userId'] != '')

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [None]:
df.select('page','UserId').groupby('page').agg({'page':'count'}).select('page','count(page)').show()

In [None]:

# Churn is a label for user who cancelled
# Define a flag function
flag_cancelation_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())
# apply to the dataframe
df = df.withColumn("churn", flag_cancelation_event("page"))
#Define window bounds
windowval = Window.partitionBy("userId").rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
# Applying the window
df = df.withColumn("churn", Fsum("churn").over(windowval))

In [None]:
#creating a temp view from dataframe df,so that we can use sparkSQL for quick easy analysis
df.createOrReplaceTempView('data');

In [None]:
spark.sql("""
          SELECT
              churn,
              count(distinct userId)
            FROM
                data
            GROUP BY
                churn
            """).show()

In [None]:
spark.sql('SELECT COUNT(DISTINCT userId) As count_of_Uniq_users  FROM data').show()

In [None]:
user_with_downgrade = spark.sql("select distinct userId from data where page = 'Submit Downgrade'")
user_with_downgrade = user_with_downgrade.toPandas()['userId'].values

In [None]:
has_sub_downgrade = udf(lambda user: 1 if user in user_with_downgrade else 0, IntegerType())
spark.udf.register('has_sub_downgrade', has_sub_downgrade)


In [None]:
data_1 = spark.sql("""
    SELECT * ,
        has_sub_downgrade(userId) as hasSubDowngrade
    FROM
        data
    """)


In [None]:

data_1.createOrReplaceTempView('data_1')

In [None]:
feature_downgrade = spark.sql("""
    SELECT
        DISTINCT userId,
        hasSubDowngrade,
        churn
    FROM
        data_1
    """)

In [None]:
feature_downgrade.createOrReplaceTempView('feature_downgrade_tbl')

In [None]:
spark.sql("select churn, sum(hasSubDowngrade)/count(distinct userId) As difference_in_submitting_downgrade from feature_downgrade_tbl group by churn").show()

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

In [None]:

spark.sql("SELECT churn,count(1)/count(DISTINCT userId) AS Freinds FROM data_1\
            WHERE page = 'Add Friend' GROUP BY churn").show();
Freinds_vs=spark.sql("SELECT churn,count(1)/count(DISTINCT userId) AS Freinds FROM data_1\
            WHERE page = 'Add Friend' GROUP BY churn").toPandas();

feature_friends = spark.sql("SELECT DISTINCT(userId),count(1) AS Friends FROM data_1\
        WHERE page = 'Add Friend' GROUP BY userId");

feature_friends.createOrReplaceTempView('feature_friends');

In [None]:

plt.title("Distribution of feature_friends vs Target Churn");
sns.barplot(x="churn", y="Freinds", data=Freinds_vs);
plt.show();

In [None]:
spark.sql("SELECT churn,count(1)/count(DISTINCT userId)  AS Playlist FROM data_1\
                        WHERE page ='Add to Playlist' GROUP BY churn").show();

Playlist_vs=spark.sql("SELECT churn,count(1)/count(DISTINCT userId)  AS Playlist FROM data_1\
                        WHERE page ='Add to Playlist' GROUP BY churn").toPandas();

feature_Playlist=spark.sql("SELECT DISTINCT(userId),count(1)/count(DISTINCT userId) AS Playlist FROM data_1\
            WHERE page ='Add to Playlist' GROUP BY userId");


feature_Playlist.createOrReplaceTempView('feature_add_Playlist');

In [None]:
plt.title("Distribution of feature_Playlist w.r.t Target Churn");
sns.barplot(x="churn", y="Playlist", data=Playlist_vs,palette="muted");
plt.show();

In [None]:

spark.sql("SELECT churn,\
          sum(length)/count(distinct userId)  \
          AS usageTime FROM data_1\
          WHERE page = 'NextSong' GROUP BY  churn").show();

usage_time_vs=spark.sql("SELECT churn,sum(length)/count(distinct userId) AS usageTime FROM data_1\
                        WHERE page = 'NextSong' GROUP BY  churn").toPandas();

feature_usage_time=spark.sql("SELECT DISTINCT(userId),sum(nvl(length, 0)) AS usageTime FROM data_1\
                              WHERE page = 'NextSong' GROUP BY userId");

feature_usage_time.createOrReplaceTempView('feature_usage_time');

In [None]:
plt.title("Distribution of feature_usage_time vs Target Churn");
sns.barplot(x="churn", y="usageTime", data=usage_time_vs,palette="muted");
plt.show();

In [None]:
spark.sql("SELECT churn,count(1)/count(DISTINCT userId) \
    AS Downgrade FROM data_1 \
    WHERE page ='Downgrade' GROUP BY churn").show();
Downgrade_vs=spark.sql("SELECT churn,count(1)/count(DISTINCT userId) \
    AS Downgrade FROM data_1 \
    WHERE page ='Downgrade' GROUP BY churn").toPandas();

feature_Downgrade=spark.sql("SELECT DISTINCT(userId),count(1)/count(DISTINCT userId) AS Downgrade FROM data_1\
                        WHERE page ='Downgrade' GROUP BY userId");

feature_Downgrade.createOrReplaceTempView('feature_Downgrade');

In [None]:
plt.title("Distribution of feature_Downgrade vs Target Churn");
sns.barplot(x="churn", y="Downgrade", data=Downgrade_vs,palette="muted");
plt.show();

In [None]:
spark.sql("SELECT churn,gender,count(1)/count(DISTINCT userId) gender_cnt FROM data_1 GROUP BY churn,gender").show();

gender_vs=spark.sql("SELECT churn,gender,count(1)/count(DISTINCT userId) gender_cnt FROM data_1 GROUP BY churn,gender").toPandas();

feature_gender = spark.sql("SELECT DISTINCT(userId),CASE when gender='M' then 1 \
                            when gender='F' then 0 else 2 END AS gender_dummy FROM data_1");
 
feature_gender.createOrReplaceTempView('feature_gender');
spark.sql("select gender_dummy,count(DISTINCT(userId)) from feature_gender group by gender_dummy").show()

In [2]:
plt.title("Distribution of feature_gender vs Target Churn");
sns.barplot(x="churn", y="gender_cnt", hue='gender',data=gender_vs);
plt.show();

NameError: name 'plt' is not defined

In [None]:
spark.sql("SELECT churn,avg(distinct sessionId) \
    AS avg_no_sessions FROM data_1 group by churn").show();

avg_no_sessions_vs=spark.sql("SELECT churn,avg(distinct sessionId) AS avg_no_sessions \
                             FROM data_1 group by churn").toPandas();

feature_avg_no_sessions = spark.sql("SELECT DISTINCT(userId),avg(distinct sessionId) AS avg_no_sessions FROM data_1 group\
                            by userId");

feature_avg_no_sessions.createOrReplaceTempView('feature_avg_no_sessions');

In [None]:

plt.title("Distribution of feature_avg_no_sessions w.r.t Target [Churn]");
sns.barplot(x="churn", y="avg_no_sessions", data=avg_no_sessions_vs);
plt.show();

In [None]:
spark.sql("SELECT churn,count(1)/count(DISTINCT userId) AS level_paid \
    FROM data_1 WHERE level = 'paid' \
    GROUP BY churn").show();

level_paid_vs=spark.sql("SELECT churn,count(1)/count(DISTINCT userId) AS level_paid \
    FROM data_1 WHERE level = 'paid' GROUP BY churn").toPandas();

feature_level_paid = spark.sql("SELECT DISTINCT(userId),count(1)/count(DISTINCT userId) AS level_paid FROM data_1\
                    WHERE level = 'paid' GROUP BY userId");

feature_level_paid.createOrReplaceTempView('feature_level_paid');

In [None]:

plt.title("Distribution of feature_level_paid w.r.t Target [Churn]");
sns.barplot(x="churn", y="level_paid", data=level_paid_vs,palette="muted");
plt.show();

In [None]:
spark.sql("SELECT churn,count(1)/count(DISTINCT userId) AS level_free FROM data_1 WHERE level = 'free' GROUP BY churn").show();

level_free_vs=spark.sql("SELECT churn,count(1)/count(DISTINCT userId) AS level_free FROM data_1 WHERE level = 'free' GROUP BY churn").toPandas();

feature_level_free = spark.sql("SELECT DISTINCT(userId),count(1)/count(DISTINCT userId) AS level_free FROM data_1\
                    WHERE level = 'free' GROUP BY userId");

feature_level_free.createOrReplaceTempView('feature_level_free');

In [None]:
plt.title("Distribution of feature_level_free w.r.t Target [Churn]");
sns.barplot(x="churn", y="level_free", data=level_free_vs);
plt.show();

In [None]:
spark.sql("SELECT churn,count(1)/count(DISTINCT userId) as n_ThumpsUp \
    FROM data_1 WHERE page = 'Thumbs Up' GROUP BY churn").show()

feature_thumps_up_vs= spark.sql("SELECT churn, count(1)/count(DISTINCT userId) as n_ThumpsUp \
    FROM data_1 WHERE page = 'Thumbs Up' GROUP BY churn").toPandas()

feature_thumps_up = spark.sql("SELECT DISTINCT(userId),count(1)/count(DISTINCT userId) AS n_Thumpsup \
    FROM data_1 WHERE page = 'Thumbs Up' GROUP BY userId");



feature_thumps_up.createOrReplaceTempView('feature_thumps_up');

In [None]:

plt.title("Distribution of feture Thumbs Up vs Target [Churn]");
sns.barplot(x="churn", y="n_ThumpsUp", data=feature_thumps_up_vs);
plt.show();

In [None]:
spark.sql("SELECT churn,count(1)/count(DISTINCT userId) As help FROM data_1 WHERE page ='Help' GROUP BY churn").show();
help_vs=spark.sql("SELECT churn,count(1)/count(DISTINCT userId) As help FROM data_1 WHERE page ='Help' GROUP BY churn").toPandas();
feature_help=spark.sql("SELECT DISTINCT(userId),count(1)/count(DISTINCT userId) AS help FROM data_1\
                        WHERE page ='Help' GROUP BY userId");

feature_help.createOrReplaceTempView('feature_help');

In [None]:
plt.title("Distribution of feature_help vs Target [Churn]");
sns.barplot(x="churn", y="help", data=help_vs);
plt.show();

In [None]:
feature_table = spark.sql("SELECT a.userId,b.Playlist,c.usageTime,\
                            d.Downgrade,e.help,f.friends,g.level_paid,k.n_Thumpsup,h.level_free,\
                            i.avg_no_sessions,j.gender_dummy,a.churn as label FROM data_1 as a\
                            LEFT OUTER JOIN feature_add_Playlist as b\
                            ON a.userId=b.userId \
                            LEFT OUTER JOIN feature_usage_time as c\
                            ON b.userId = c.userId\
                            LEFT OUTER JOIN feature_Downgrade as d\
                            ON c.userId=d.userId\
                            LEFT OUTER JOIN feature_help as e\
                            ON d.userId=e.userId\
                            LEFT OUTER JOIN feature_friends as f\
                            ON e.userId=f.userId\
                            LEFT OUTER JOIN feature_level_paid as g\
                            ON f.userId=g.userId\
                            LEFT OUTER JOIN feature_level_free as h\
                            ON g.userId=h.userId\
                            LEFT OUTER JOIN feature_avg_no_sessions as i\
                            ON h.userId=i.userId\
                            LEFT OUTER JOIN feature_gender j\
                            on i.userId=j.userId\
                            LEFT OUTER JOIN feature_thumps_up k\
                            on j.userId=k.userId");

feature_table.createOrReplaceTempView('feature_table')

#check distribution of class looks fine or not?
spark.sql("select label,count(distinct userId) as Class_dist from feature_table group by label").show()

In [None]:

feature_table.toPandas().isnull().sum()

In [None]:
#check in case if any Null values will be present.
#impute them with zero(0) for all except gender because we it will wrong imputation so we will keep value as 2 for all null treating it differently in model.

selected_feature_table = spark.sql("SELECT nvl(userId,0) as userId,nvl(Playlist,0) as Playlist,nvl(usageTime,0) as usageTime,\
        nvl(Downgrade,0) as Downgrade,nvl(help,0) as help,nvl(friends,0) as friends,nvl(level_paid,0) as level_paid,\
        nvl(level_free,0) as level_free,nvl(avg_no_sessions,0) as avg_no_sessions,nvl(gender_dummy,2) as gender, nvl(n_Thumpsup,0) as n_Thumpsup, nvl(label,0) as label  FROM\
        feature_table");

#crate master_feature_table
selected_feature_table.createOrReplaceTempView('selected_feature_table');

In [None]:
selected_feature_table.toPandas().isnull().sum()

In [None]:
plt.figure(figsize=(15,12))
plt.title('Correlation between all the selected columns', y=1.05, size=15)
sns.heatmap(selected_feature_table.toPandas().corr(),linewidths=0.1,vmax=1.0, 
            square=True, cmap=plt.cm.inferno, linecolor='white', annot=True)

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [None]:
#create VectorAssembler to push data to ML models
assembler = VectorAssembler(inputCols=["Playlist","usageTime","Downgrade","help",\
                                       "friends","level_paid","level_free","avg_no_sessions","gender","n_Thumpsup"],\
                            outputCol="inputFeatures")

#Lets normalize data
scaler = Normalizer(inputCol="inputFeatures", outputCol="ScaledFeatures");

In [None]:
#elasticNetParam corresponds to α and regParam corresponds to λ.
model_lr = LogisticRegression(featuresCol="ScaledFeatures", labelCol="label", maxIter=10, regParam=0.01)
model_rf = RandomForestClassifier(featuresCol="ScaledFeatures", labelCol="label")
model_gbt = GBTClassifier(featuresCol="ScaledFeatures", labelCol="label")


In [None]:
#Create Pipeline
pipeline_lr = Pipeline(stages=[assembler, scaler, model_lr]);
pipeline_rf = Pipeline(stages=[assembler, scaler, model_rf]);
pipeline_gbt = Pipeline(stages=[assembler, scaler, model_gbt])

In [None]:
# spilt data for Train and test 
training, test = selected_feature_table.randomSplit([0.8, 0.2]);

In [None]:
training.toPandas().shape

In [None]:
#Fit the model
import time

start_time= time.time()

model_lr_fitted = pipeline_lr.fit(training);
model_rf_fitted = pipeline_rf.fit(training);
model_gbt_fitted = pipeline_gbt.fit(training)

end_time= time.time()

print("Total exicute time:{}".format(end_time-start_time))

In [None]:
#Function which will calcualte Accuracy scores:
def model_performance_accuracy(model, test_data, metric = 'accuracy'):
    """ Calculate Model Scores using f1 metric 
        Input: 
            model- trained model or pipeline object
            metric- the metric used to measure performance
            data - data on which performance measurement should be done
        Output:
            score
    """
    evaluator = MulticlassClassificationEvaluator(metricName = metric)
    prediction_result = model.transform(test_data)
    # find f1 score
    score = evaluator.evaluate(prediction_result)
    #return score
    return score

In [None]:
#print both Model Testing F1-Score
print("Logistic Regression Classifier Accuracy:{}".format(model_performance_accuracy(model_lr_fitted, test)));
print("Random Forest  Classifier Accuracy:{}".format(model_performance_accuracy(model_rf_fitted, test)));
print("GBTClassifier  Classifier Accuracy:{}".format(model_performance_accuracy(model_gbt_fitted, test)));

In [None]:
#Function which will calcualte f1 scores:
def model_performance(model, test_data, metric = 'f1'):
    """ Calculate Model Scores using f1 metric 
        Input: 
            model- trained model or pipeline object
            metric- the metric used to measure performance
            data - data on which performance measurement should be done
        Output:
            score
    """
    evaluator = MulticlassClassificationEvaluator(metricName = metric)
    prediction_result = model.transform(test_data)
    # find f1 score
    score = evaluator.evaluate(prediction_result)
    #return score
    return score

In [None]:

#print both Model Testing F1-Score
print("Logistic Regression Classifier F1-Score:{}".format(model_performance(model_lr_fitted, test)));
print("Random Forest  Classifier F1-Score:{}".format(model_performance(model_rf_fitted, test)));
print("GBTClassifier  Classifier F1-Score:{}".format(model_performance(model_gbt_fitted, test)));

# Conclusion

A model is implemented to predict coustemer churn. Cleaned the rows with no userId and converted gender into binary form. For the model 10 featuers were built. Three predicting algorithm were tried which is logistic regression, Random Forest and GBTClassifier in which GBTClassifier is selcted as final model as its resultis better than other models. VectorAssembler is used to prepare and push the data to the model, which combines a given list of columns into a singel vector column. F1 score is used as the metric optimize.


<h3>Improvement</h3>
The features can be improved a lot after considering more factors, adding more domain knowledges and expertise. Although the volume of data may required tools such as spark to analyze, but we can use more data to have better results as the user base grow.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.