# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [1]:
import os

# os.environ

In [2]:
import findspark

findspark.init('/home/bruno/LIBS/spark')

In [67]:
from datetime import datetime

import numpy as np

from sklearn.metrics import confusion_matrix

from pyspark.sql import SparkSession

from pyspark.sql.functions import min as smin, max as smax, sum as ssum, round as sround
from pyspark.sql.functions import isnan, isnull, when, first, avg, last, count, countDistinct, col, lag, lead, coalesce, lit, split, trim

from pyspark.sql.window import Window
from pyspark.sql.functions import to_date, date_format, from_unixtime, to_timestamp

from pyspark.sql.types import DateType, TimestampType, IntegerType
 
import jupyter_utils as j

from pyspark import SparkContext

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, DecisionTreeClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StandardScaler, StringIndexer, VectorAssembler
 
SparkContext.setSystemProperty('spark.logConf', 'True')
SparkContext.setSystemProperty('spark.default.parallelism', '16')
SparkContext.setSystemProperty('spark.executor.memory', '4g')
SparkContext.setSystemProperty('spark.driver.memory', '8g')
SparkContext.setSystemProperty('spark.reducer.maxSizeInFlight', '96m')
SparkContext.setSystemProperty('spark.shuffle.consolidateFiles', 'True') 
SparkContext.setSystemProperty('spark.shuffle.service.index.cache.size', '500m')

SparkContext.setSystemProperty('spark.driver.extraJavaOptions', '-server -Xmx8G')
# SparkContext.setSystemProperty('spark.executor.extraJavaOptions', '-server -Xmx8G -XX:+UseG1GC')

SparkContext.setSystemProperty('spark.executor.extraJavaOptions', '-server -XX:+UseG1GC')

In [4]:
j.reload(j)

In [5]:
# filepath = 'sparkify_full_csv_data.csv'
filepath = 'medium_sparkify_event_data.json'

In [6]:
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

spark.sparkContext.setLogLevel('INFO')

In [7]:
spark.sparkContext.getConf().getAll()

[('spark.executor.extraJavaOptions', '-server -XX:+UseG1GC'),
 ('spark.executor.memory', '4g'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.host', '192.168.0.102'),
 ('spark.shuffle.service.index.cache.size', '500m'),
 ('spark.driver.port', '36921'),
 ('spark.reducer.maxSizeInFlight', '96m'),
 ('spark.shuffle.consolidateFiles', 'True'),
 ('spark.driver.extraJavaOptions', '-server -Xmx8G'),
 ('spark.default.parallelism', '16'),
 ('spark.logConf', 'True'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1585179570049'),
 ('spark.driver.memory', '8g'),
 ('spark.app.name', 'Sparkify'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [8]:
# df = spark.read.option("inferSchema", "true").option("header", "true").option("encoding", "utf-8").csv(filepath)
df = spark.read.option("inferSchema", "true").option("header", "true").option("encoding", "utf-8").json(filepath)

In [None]:
from pyspark import StorageLevel

print(df.storageLevel)

df.persist(StorageLevel.MEMORY_ONLY)

print(df.storageLevel)

In [106]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [None]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

In [None]:
df.groupby('auth').agg(count(col('auth'))).show(50)

In [None]:
df.where(~df.auth.isin(['Logged In', 'Cancelled'])).show(1)

In [None]:
df.where((df.auth == 'Logged In') & (df.page == 'Home')).show(10)

In [9]:
log4jLogger = spark.sparkContext._jvm.org.apache.log4j

LOGGER = log4jLogger.LogManager.getLogger('driver_logger')

def info(message, print_on_notebook = True):
    LOGGER.info(message)
    
    if print_on_notebook:
        print(message)
    
info('Logger instance created')

Logger instance created


In [10]:
CHURN_CANCELLATION_PAGE = 'Cancellation Confirmation'
REGISTRATION_PAGE = 'Submit Registration'
milliseconds_to_hours = 3600 * 1000
minutes_to_hours = 60 * 60
TRUE = 1
FALSE = 0

def clean_dataframe(df):
    
    info('Starting data cleaning...')
    
    total_before = df.count()
    
    # Keep only logged records
    df = df.where(df.auth.isin(['Logged In', 'Cancelled']))
    
    # Records without userId
    df = df.where(col('userId').isNotNull())
    
    # Create a date column for the event
    df = df.withColumn('date', from_unixtime(col('ts') / 1000).cast(DateType()))
    
    # Location
    # df = df.withColumn('state', trim(split((split('location', ',').getItem(1)), '-').getItem(0)))
    
    # Relevant windows
    w_session = Window.partitionBy('sessionId').orderBy('ts')
    w_user_session = Window.partitionBy('sessionId', 'userId').orderBy('ts').rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    w_user = Window.partitionBy('userId').orderBy('ts').rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    
    # Create features
    df = df.withColumn('previous_page', lag(df.page).over(w_session))
    df = df.withColumn('last_event_ts', last(col('ts')).over(w_user))
    df = df.withColumn('last_page', last(col('page')).over(w_user))
    df = df.withColumn('register_page', first(col('previous_page')).over(w_user))
    df = df.withColumn('first_ts', first(col('ts')).over(w_user))
    df = df.withColumn('ts_elapsed', last(df.ts).over(w_session) - first(df.ts).over(w_user_session))
    df = df.withColumn('session_duration', smax(df.ts_elapsed).over(w_user_session))
     
    info('Finished data cleaning...')
    info(f'Number of removed rows: {total_before - df.count()}')
    
    return df

In [88]:
df.show()

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+-----------+-------------+---------+--------------------+------+-------------+--------------------+------+----------+-------------+-------------+---------+-------------+-------------+----------+----------------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|       page| registration|sessionId|                song|status|           ts|           userAgent|userId|      date|previous_page|last_event_ts|last_page|register_page|     first_ts|ts_elapsed|session_duration|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+-----------+-------------+---------+--------------------+------+-------------+--------------------+------+----------+-------------+-------------+---------+-------------+-------------+----------+----------------+
|               

In [11]:
df = clean_dataframe(df)

Starting data cleaning...
Finished data cleaning...
Number of removed rows: 15700


# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [None]:
df.groupBy('page').count().orderBy('count', ascending = False).show(50)

Some questions about the data:

- Are errors related to downgrading canceling the service?
- Having a certain number of friends or a sense of community can decrease the churn?
- Thumbs down are related to churn? (could the quality of the songs catalog affect the churn)
- The advertising is not annoying the users?
- Users with stay connected for more time have less change to churn?
- Is the home page relevant?
- Users, who access the downgrade page are how much more willing to churn?

In [None]:
df.groupBy('status').count().orderBy('count', ascending = False).show(20)

In [None]:
df.filter('userId = 92').groupBy('page').count().orderBy('count', ascending = False).show(50)

In [None]:
df.filter('userId = 92').groupBy('page').count().orderBy('count', ascending = False).show(50)

In [None]:
df.filter('userId = 92').groupBy('userAgent').count().orderBy('count', ascending = False).show(50, False)

In [None]:
df.filter('userId = 92 and song != \'null\' ').groupBy('song').count().orderBy('count', ascending = False).show(50, False)

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

In [12]:
def create_session_dimension(df):
    
    # sessions from the user
    df_sessions = df.orderBy(df.sessionId).groupBy('sessionId', 'userId').agg(
        smax(df.ts).alias('max_event_ts'),
        smin(df.ts).alias('min_event_ts'),
        ssum(df.length).alias('session_n_total_playback'), # Based on songs length
        count(when(df.page == 'Thumbs Up', True)).alias("session_n_likes"),
        count(when(df.page == 'Thumbs Down', True)).alias("session_n_dislikes"),
        count(when(df.page == 'NextSong', True)).alias("session_n_songs"),
        count(when(df.page == 'Add Friend', True)).alias("session_n_friends"),
        count(when(df.page == 'Add to Playlist', True)).alias("session_n_add_playlist"),
        count(when(df.page == 'Home', True)).alias("session_n_home"),
        count(when(df.page == 'Roll Advert', True)).alias("session_n_ads"),
        count(when(df.page == 'Help', True)).alias("session_n_help"),
        count(when(df.page == 'Error', True)).alias("session_n_error"),
        count(when(df.page == 'Settings', True)).alias("session_n_sets"),
        count(col('page')).alias('session_n_actions'),
        first(col('session_duration')).alias('session_duration')
    ) 
    
    # Calculate the interval until the next session
    w_user_sessions_interval = Window.partitionBy('userId').orderBy('min_event_ts')
    df_sessions = df_sessions.withColumn('interval_to_session', col('min_event_ts') - lag(col('max_event_ts')).over(w_user_sessions_interval))
    
    # Calculate average time in hours for each session
    df_session_time = df_sessions.groupBy('userId').agg(
       (avg(df_sessions.session_duration) / milliseconds_to_hours).alias('session_hours')
    )
    df_sessions = df_sessions.join(df_session_time, on = 'userId')
    
    # We should remove the null lines before count/group to not account 2 times the mean interval
    df_sessions = df_sessions.groupBy('userId').agg(  
        (avg(df_sessions.interval_to_session) / milliseconds_to_hours).alias('session_avg_time_away'),
        ((avg(df_sessions.session_n_total_playback) / minutes_to_hours) / first(col('session_hours'))).alias('session_avg_playback'), 
        (avg(df_sessions.session_n_likes) / first(col('session_hours'))).alias('session_avg_likes'),
        (avg(df_sessions.session_n_dislikes) / first(col('session_hours'))).alias('session_avg_dislikes'),
        (avg(df_sessions.session_n_songs) / first(col('session_hours'))).alias('session_avg_songs'),
        (avg(df_sessions.session_n_friends) / first(col('session_hours'))).alias('session_avg_friends'),
        (avg(df_sessions.session_n_add_playlist) / first(col('session_hours'))).alias('session_avg_added_playlist'),
        (avg(df_sessions.session_n_home) / first(col('session_hours'))).alias('session_avg_home'),
        (avg(df_sessions.session_n_ads) / first(col('session_hours'))).alias('session_avg_ads'),
        (avg(df_sessions.session_n_help) / first(col('session_hours'))).alias('session_avg_help'),
        (avg(df_sessions.session_n_error) / first(col('session_hours'))).alias('session_avg_errors'),
        (avg(df_sessions.session_n_sets) / first(col('session_hours'))).alias('session_avg_settings'),
        (avg(df_sessions.session_n_actions) / first(col('session_hours'))).alias('session_avg_actions')
    )
    
    return df_sessions

def create_user_dimension(df):
    
    df_user_profile = df.groupby('userId')\
        .agg( 

            # first(col('state')).alias('state'),
            first(when(col('gender') == 'M', TRUE).otherwise(FALSE)).alias('male'),

            smin(col('first_ts')).alias('ts_start'),
            smax(col('last_event_ts')).alias('ts_end'),        
        
            ((smax(col('last_event_ts')) - smin(col('first_ts'))) / milliseconds_to_hours).alias('time_window'),
        
            # Subscription
            count(when(col('page') == 'Submit Downgrade', True)).alias('n_downgrades'),
            count(when(col('page') == 'Submit Upgrade', True)).alias('n_upgrades'),
            last(when(col('level') == 'paid', TRUE).otherwise(FALSE)).alias('paid'),
            first(when(col('last_page') == CHURN_CANCELLATION_PAGE, TRUE).otherwise(FALSE)).alias('canceled'),

            # Streaming
            count(when(col('page') == 'NextSong', True)).alias('n_songs'),
            count(when(col('page') == 'Thumbs Up', True)).alias('n_likes'),
            count(when(col('page') == 'Thumbs Down', True)).alias('n_dislikes'),
            countDistinct(col('sessionId')).alias('n_sess'),
            (avg(col('session_duration')) / milliseconds_to_hours).alias('avg_session_duration'),

            # Community
            count(when(col('page') == 'Add Friend', True)).alias('n_friends'),
            count(when(col('page') == 'Add to Playlist', True)).alias('n_added_to_playlist'),

            # Other
            count(when(col('page') == 'Home', True)).alias('n_home'),
            count(when(col('page') == 'Roll Advert', True)).alias('n_ads'),
            count(when(col('page') == 'Help', True)).alias('n_help'),
            count(when(col('page') == 'Error', True)).alias('n_errors'),
            count(when(col('page') == 'Settings', True)).alias('n_settings'),
            count(col('page')).alias('n_actions')
        )
    
    
    # Location
    # states = list(map(lambda c: c[0].strip(), df.select(['state']).distinct().rdd.collect()))
    # for state in states:
    #    df_user_profile = df_user_profile.withColumn(state.lower(), when(df_user_profile.state == state, 1).otherwise(0))
    
    return df_user_profile

def create_days_dimension(df):
    
    df_unique_days = df.groupby('userId').agg(countDistinct('date').alias('n_days'))
    
    df_daily_actions = df.groupby('userId', 'date').agg(count('page').alias('total'))
    df_daily_actions = df_daily_actions.groupby('userId').agg(avg('total').alias('avg_daily_actions')) 

    df_days = df_unique_days.join(df_daily_actions, df_unique_days.userId == df_daily_actions.userId)
    
    # Remove duplicated column after join
    df_days = df_days.drop(df_daily_actions.userId)
    
    return df_days

def sort_features(df, columns_order):
    _columns = df.columns
    _columns.sort()
    
    for _idx, _val in list(enumerate(columns_order)):
        _columns.pop(_columns.index(_val))
        _columns.insert(_idx, _val)
        
    assert len(_columns) == len(df.columns)

    return _columns

In [68]:
binary_features = [ 'paid', 'male' ]

numeric_features = [
    'avg_daily_actions',
    'avg_session_duration', 
    'n_actions',
    'n_added_to_playlist',
    'n_ads',
    'n_days',
    'n_dislikes',
    'n_downgrades',
    'n_errors',
    'n_friends',
    'n_help',
    'n_home',
    'n_likes',
    'n_sess',
    'n_settings',
    'n_songs',
    'n_upgrades', 
    'session_avg_actions',
    'session_avg_added_playlist',
    'session_avg_ads',
    'session_avg_dislikes',
    'session_avg_errors',
    'session_avg_friends',
    'session_avg_help',
    'session_avg_home',
    'session_avg_likes',
    'session_avg_playback',
    'session_avg_settings',
    'session_avg_songs',
    'session_avg_time_away',
    'time_window'
]

columns_all = [
    'canceled',
    'male',
    'paid',
    'avg_daily_actions',
    'avg_session_duration', 
    'n_actions',
    'n_added_to_playlist',
    'n_ads',
    'n_days',
    'n_dislikes',
    'n_downgrades',
    'n_errors',
    'n_friends',
    'n_help',
    'n_home',
    'n_likes',
    'n_sess',
    'n_settings',
    'n_songs',
    'n_upgrades', 
    'session_avg_actions',
    'session_avg_added_playlist',
    'session_avg_ads',
    'session_avg_dislikes',
    'session_avg_errors',
    'session_avg_friends',
    'session_avg_help',
    'session_avg_home',
    'session_avg_likes',
    'session_avg_playback',
    'session_avg_settings',
    'session_avg_songs',
    'session_avg_time_away',
    'time_window'
]

columns_to_train = [
    'male',
    'paid',
    'avg_daily_actions',
    'avg_session_duration', 
    'n_actions',
    'n_added_to_playlist',
    'n_ads',
    'n_days',
    'n_dislikes',
    'n_downgrades',
    'n_errors',
    'n_friends',
    'n_help',
    'n_home',
    'n_likes',
    'n_sess',
    'n_settings',
    'n_songs',
    'n_upgrades', 
    'session_avg_actions',
    'session_avg_added_playlist',
    'session_avg_ads',
    'session_avg_dislikes',
    'session_avg_errors',
    'session_avg_friends',
    'session_avg_help',
    'session_avg_home',
    'session_avg_likes',
    'session_avg_playback',
    'session_avg_settings',
    'session_avg_songs',
    'session_avg_time_away',
    'time_window'
]

In [64]:
df_sessions = create_session_dimension(df)
df_days = create_days_dimension(df)

df_users = create_user_dimension(df)
df_users = df_users.orderBy(df_users.userId).join(df_days, on = 'userId')

_columns = sort_features(df_users, [ 'userId', 'male', 'paid', 'canceled'])
_columns = list(set(df_users.schema.names + df_sessions.schema.names) - set(['ts_start', 'ts_end', 'state']))

df_users = df_users.orderBy(df_users.userId).join(df_sessions, on = 'userId').select(_columns) 

In [65]:
# Create the new dataframe
df_users = df_users.select(columns_all).fillna(0)

In [None]:
### WARN: Only round to display
# Enforces the order for some columns
df_users.select([sround(c, 0).cast(dataType = IntegerType()).alias(c) for c in _columns]).fillna(0).show(2, True, vertical = True)

In [None]:
df_users.select(_columns).fillna(0).toPandas().to_csv('sparkify_data_final.csv', index = False)

In [None]:
df.agg(countDistinct(df.userId).alias('unique_users')).show()

In [None]:
df_users.orderBy(df_users.userId).join(df_sessions, on = 'userId').select(_columns).count()

In [None]:
df_users.orderBy(df_users.userId).join(df_sessions, on = 'userId').select(_columns).groupBy('canceled').agg(count(df_users.canceled).alias('total')).show()

- Advertises number (per session and all)
    - The user **100010** returned after some idle time and received a considerable amount of advertises;
    - Also, after thumbs down, I received two advertisements on four sounds. Then canceled the service.
- Number of sessions
- Paid subscription time
- Avg songs before an ad
- Number of skipped songs

In [None]:
df.schema.names

In [None]:
to_date(df.ts.cast(dataType=TimestampType()))

In [None]:
df.where(df.userId == user_id).select(['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level', 
 'page',
 'sessionId',
 'song', 
 'ts', 
 'userId']).orderBy('sessionId', 'itemInSession').withColumn('datetime', date_format((df.ts/1000).cast(dataType=TimestampType()), 'HH:mm:ss dd-MM-YYYY')).show(350, True)

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [None]:
columns_to_exclude = set(['userId'])

columns_to_use = list(set(df_users.columns) - columns_to_exclude)

columns_to_train = list(set(columns_to_use) - set(['canc']))

columns_to_use.sort()
columns_to_train.sort()

print(f'Columns: {columns_to_use}\n')
print(f'Columns to train: {columns_to_train}')

In [22]:
CHURN_LABEL = 'canceled'
TRAIN_SPLIT_RATIO = .7
TEST_SPLIT_RATIO = .3

SPLIT_RATIO = [TRAIN_SPLIT_RATIO, TEST_SPLIT_RATIO]

In [24]:
def plot_confusion_matrix(y_test, y_predictions):
    
    # auc = roc_auc_score(y_test, y_predictions)
    cm = confusion_matrix(y_test, y_predictions, labels = [1, 0])
    
    tn = cm[0, 0]
    tp = cm[1, 1]
    fn = cm[1, 0]
    fp = cm[0, 1]
    
    total = np.sum(cm) # tn + tp + fn + fp
    accuracy = (tp + tn) / total
    precision = (tp) / (tp + fp)
    recall = (tp) / (tp + fn) 
    
    print(cm)

def evaluate_multiclass_classifier(predictions, columns):
    metrics_to_evaluate = [ 'accuracy', 'f1', 'weightedPrecision', 'weightedRecall' ]
    
    result = {}
    for metric in metrics_to_evaluate:
        evaluator = MulticlassClassificationEvaluator(labelCol = columns[0], predictionCol = columns[1], metricName = metric)
        value = evaluator.evaluate(predictions)
        result[metric] = value
        print(f'{metric}: {value}') 
    
    return result

def train_random_forest_classifier(data, columns, train_cloumns):
    
    # Split train/test
    (train_df, test_df) = data.randomSplit(SPLIT_RATIO, seed = 42)
    
    # Create the indexer for labels
    l_indexer = StringIndexer(inputCol = CHURN_LABEL, outputCol = 'idx_labels')
    f_binaries = VectorAssembler(inputCols = binary_features, outputCol = 'bin_features')
    f_numeric = VectorAssembler(inputCols = numeric_features, outputCol = 'num_features')
    
    f_scaler = StandardScaler(inputCol = 'num_features', outputCol = 'num_features_escaled', withStd = True, withMean = True)
    
    f_all = VectorAssembler(inputCols = [ 'bin_features' , 'num_features_escaled' ], outputCol = 'features')
    
    l_translator = IndexToString(inputCol = 'prediction', outputCol = 'predictedLabel', labels = [ 'Not churn', 'Churn' ])
    
    rf_classifier = RandomForestClassifier(labelCol = 'idx_labels', featuresCol = 'features', numTrees = 10, maxBins = 5, impurity = 'entropy', minInstancesPerNode = 3, seed = 42)
    
    pipeline = Pipeline(stages = [ l_indexer, f_binaries, f_numeric, f_scaler, f_all, rf_classifier, l_translator ])
    
    # Train the model
    model = pipeline.fit(train_df)

    # Test the model
    predictions = model.transform(test_df)

    return model.stages[2], predictions
    

def create_pipeline(model):
    
    l_indexer = StringIndexer(inputCol = CHURN_LABEL, outputCol = 'idx_labels')
    f_binaries = VectorAssembler(inputCols = binary_features, outputCol = 'bin_features')
    f_numeric = VectorAssembler(inputCols = numeric_features, outputCol = 'num_features')
    f_scaler = StandardScaler(inputCol = 'num_features', outputCol = 'num_features_escaled', withStd = True, withMean = True)
    f_all = VectorAssembler(inputCols = [ 'bin_features' , 'num_features_escaled' ], outputCol = 'features')
    pipeline = Pipeline(stages = [ l_indexer, f_binaries, f_numeric, f_scaler, f_all, model ])
    
    return pipeline

def create_random_forest_pipeline():
    rf_classifier = RandomForestClassifier(labelCol = 'canceled', featuresCol = 'features', seed = 42)
    return create_pipeline(rf_classifier)

def create_gradient_boost_pipeline():
    gbt_classifier = GBTClassifier(labelCol = 'canceled', maxDepth = 5, maxIter = 100, seed = 42)
    return create_pipeline(gbt_classifier)

In [None]:
df_users.show(1, False)

In [None]:
model, predictions = train_random_forest_classifier(df_users, columns_all, columns_to_train)

In [None]:
evaluate_multiclass_classifier(predictions, ('canceled', 'prediction'))

In [None]:
df_results = predictions.select(['canceled', 'prediction', 'predictedLabel']).toPandas()
df_results['prediction'] = df_results.prediction.apply(int)
plot_confusion_matrix(df_results['canceled'], df_results['prediction'])

In [None]:
plot_confusion_matrix(df_results['canceled'], df_results['prediction'])

In [None]:
plot_confusion_matrix(df_results['canceled'], df_results['prediction'])

In [None]:
model, predictions = train_random_forest_classifier(df_users, columns_to_use, columns_to_train)

In [None]:
evaluate_multiclass_classifier(predictions, ('canc', 'prediction'))

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol = 'canc', metricName = 'areaUnderROC')

evaluator.evaluate(predictions)

In [None]:
maxBins = 5, impurity = 'entropy', minInstancesPerNode = 3, seed = 42

In [None]:
list(range(5, 45, 5))

In [27]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, Evaluator
from pyspark import since, keyword_only


class F1score(Evaluator):

    @keyword_only
    def __init__(self, predictionCol = 'prediction', labelCol = 'label'):
        self.predictionCol = predictionCol
        self.labelCol = labelCol
        self.uid = 'f'

    def evaluate(self, dataset):
        
        # Calculate F1 score 
        tp = dataset.where((dataset[self.labelCol] == 1) & (dataset[self.predictionCol] == 1)).count()
        fp = dataset.where((dataset[self.labelCol] == 0) & (dataset[self.predictionCol] == 1)).count()
        tn = dataset.where((dataset[self.labelCol] == 0) & (dataset[self.predictionCol] == 0)).count()
        fn = dataset.where((dataset[self.labelCol] == 1) & (dataset[self.predictionCol] == 0)).count()
        
        # Add epsilon to prevent division by zero
        precision = tp / (tp + fp + 0.00001)
        recall = tp / (tp + fn + 0.00001)
        
        f1 = 2 * precision * recall / (precision + recall + 0.00001)
        
        return f1

    def isLargerBetter(self):
        return True

In [28]:
import uuid 

class FNR(Evaluator):

    @keyword_only
    def __init__(self, predictionCol = 'prediction', labelCol = 'label'):
        self.predictionCol = predictionCol
        self.labelCol = labelCol
        self.uid = str(uuid.uuid4())

    def evaluate(self, dataset):
        
        tp = dataset.where((dataset[self.labelCol] == 1) & (dataset[self.predictionCol] == 1)).count()
        fp = dataset.where((dataset[self.labelCol] == 0) & (dataset[self.predictionCol] == 1)).count()
        tn = dataset.where((dataset[self.labelCol] == 0) & (dataset[self.predictionCol] == 0)).count()
        fn = dataset.where((dataset[self.labelCol] == 1) & (dataset[self.predictionCol] == 0)).count()
        
        
        fnr = fn / (tp + fn)
        
        return fnr

    def isLargerBetter(self):
        return False

In [29]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
def create_grid_search(pipeline, param_grid, evaluator = FNR(labelCol = 'canceled')):
    
    return CrossValidator(estimator = pipeline, estimatorParamMaps = param_grid, evaluator = evaluator, numFolds = 3, parallelism = 16, seed = 42)

def random_forest_grid_search(pipeline):
    
    model = pipeline.getStages()[-1]

    grid_rf = ParamGridBuilder().addGrid(model.maxDepth, [5, 10, 15, 20, 25]) 
    grid_rf = grid_rf.addGrid(model.impurity, ['gini']) 
    grid_rf = grid_rf.addGrid(model.maxBins, [5, 10, 15, 20, 25, 30, 35, 40])
    grid_rf = grid_rf.addGrid(model.numTrees, [10, 20, 40, 60, 70])
    grid_rf = grid_rf.build()
    
    print(f'Number of models to train: {len(grid_rf)}')
        
    return create_grid_search(pipeline, grid_rf)

def gradient_boost_grid_search(pipeline):
    
    model = pipeline.getStages()[-1]

    grid_gbt = ParamGridBuilder().addGrid(model.maxDepth, [2, 4, 6, 8, 10])
    grid_gbt = grid_gbt.addGrid(model.maxIter, [20, 25, 40, 50, 100])
    grid_gbt = grid_gbt.addGrid(model.maxBins, [2])
    grid_gbt = grid_gbt.addGrid(model.subsamplingRate, [.5, .8, 1])
    grid_gbt = grid_gbt.build()
    
    print(f'Number of models to train: {len(grid_gbt)}')
   
    return create_grid_search(pipeline, grid_gbt)

In [36]:
# Create the new dataframe
# data = df_users.select(columns_to_use).fillna(0)

# Split train/test
(train_df, test_df) = df_users.randomSplit(SPLIT_RATIO, seed = 42)

In [None]:
pipeline = create_random_forest_pipeline()
cv_rf = random_forest_grid_search(pipeline)
cv_rf_results = cv_rf.fit(train_df)

In [None]:
pipeline = create_random_forest_pipeline()
cv_rf = gradient_boost_grid_search(pipeline)
cv_rf_results = cv_rf.fit(train_df)

In [None]:
min(cv_rf_results.avgMetrics)

In [None]:
predictions = cv_rf_results.bestModel.transform(test_df)

df_results = predictions.select(['canceled', 'prediction']).toPandas()
df_results['prediction'] = df_results.prediction.apply(int)

plot_confusion_matrix(df_results['canceled'], df_results['prediction'])

In [None]:
pipeline = create_gradient_boost_pipeline()
cv_gbt = gradient_boost_grid_search(pipeline)
_results = cv_gbt.fit(train_df)

In [37]:
pipeline = create_gradient_boost_pipeline()
cv_gbt = gradient_boost_grid_search(pipeline)
_results = cv_gbt.fit(train_df)

Number of models to train: 75


Py4JJavaError: An error occurred while calling o169025.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 190287.0 failed 1 times, most recent failure: Lost task 11.0 in stage 190287.0 (TID 18541775, localhost, executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3898)
	at java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3705)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2115)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2836)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2835)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
	at sun.reflect.GeneratedMethodAccessor272.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3898)
	at java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3705)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2115)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)


In [152]:
predictions = _results.bestModel.transform(test_df)

df_results = predictions.select(['canceled', 'prediction']).toPandas()
df_results['prediction'] = df_results.prediction.apply(int)

plot_confusion_matrix(df_results['canceled'], df_results['prediction'])

[[19 12]
 [ 5 96]]


In [157]:
predictions = _results.bestModel.transform(test_df)

df_results = predictions.select(['canceled', 'prediction']).toPandas()
df_results['prediction'] = df_results.prediction.apply(int)

plot_confusion_matrix(df_results['canceled'], df_results['prediction'])

[[19 12]
 [ 5 96]]


In [None]:
pipeline = create_gradient_boost_pipeline()
cv_gbt = gradient_boost_grid_search(pipeline)
_results = cv_gbt.fit(train_df)

In [None]:
predictions = _results.bestModel.transform(test_df)

df_results = predictions.select(['canceled', 'prediction']).toPandas()
df_results['prediction'] = df_results.prediction.apply(int)

plot_confusion_matrix(df_results['canceled'], df_results['prediction'])

In [None]:
_results

In [125]:
pipeline = create_gradient_boost_pipeline()
cv_gbt = gradient_boost_grid_search(pipeline)
_results = cv_gbt.fit(train_df)

Number of models to train: 25


In [126]:
predictions = _results.bestModel.transform(test_df)

df_results = predictions.select(['canceled', 'prediction']).toPandas()
df_results['prediction'] = df_results.prediction.apply(int)

plot_confusion_matrix(df_results['canceled'], df_results['prediction'])

[[21 10]
 [12 89]]


In [144]:
_results.bestModel.stages[-1].extractParamMap()

{Param(parent='GBTClassifier_ba2654d5e488', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees.'): False,
 Param(parent='GBTClassifier_ba2654d5e488', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext'): 10,
 Param(parent='GBTClassifier_ba2654d5e488', name='featureSubsetStrategy', doc='The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n].'): 'all',
 Param(parent='GBTClassifier_ba2654d5e488', name='featuresCol', doc='features column name'): 'features',
 Param(parent='GBTClassifier_ba2654d5e488', name='labelCol', doc='label column name'): 'ca

In [137]:
list(list(zip(_results.avgMetrics, _results.getEstimatorParamMaps()))[0][1])[0]

Param(parent='GBTClassifier_ba2654d5e488', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.')

In [None]:
_results

In [None]:
pipeline = create_random_forest_pipeline()
cv_rf = random_forest_grid_search(pipeline)
cv_rf_results = cv_rf.fit(train_df)

In [None]:
predictions = cv_rf_results.bestModel.transform(test_df)

df_results = predictions.select(['canceled', 'prediction']).toPandas()
df_results['prediction'] = df_results.prediction.apply(int)

plot_confusion_matrix(df_results['canceled'], df_results['prediction'])

In [None]:
predictions

In [None]:
FNR(predictionCol = "prediction", labelCol="idx_labels").evaluate(predictions)

In [None]:
cv_rf_results.bestModel.stages[-1].numFeatures

In [None]:
rf = RandomForestClassifier(labelCol = 'idx_labels', featuresCol = 'features', numTrees = 10)
dir(rf)

In [None]:
cv_rf_results.bestModel

In [None]:
df_results = predictions.select(['canceled', 'prediction']).toPandas()
df_results['prediction'] = df_results.prediction.apply(int)
plot_confusion_matrix(df_results['canceled'], df_results['prediction'])

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol = 'canc', metricName = 'areaUnderROC')
 
best_model_results = cv_gbt_results.bestModel.transform(test_df)
    
evaluator.evaluate(best_model_results)

In [None]:
pipeline = create_gradient_boost_pipeline()
cv_gbt = gradient_boost_grid_search(pipeline)

In [None]:
cv_gbt_results = cv_gbt.fit(train_df)

In [None]:
cv_gbt_results.bestModel

In [None]:
import pandas as pd

scores = cv_gbt_results.avgMetrics
params = [{p.name: v for p, v in m.items()} for m in cv_gbt.getEstimatorParamMaps()]
params_pd = pd.DataFrame(params)
params_pd['score'] = scores
params_pd

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol = 'canc', metricName = 'areaUnderROC')
 
best_model_results = cv_gbt_results.bestModel.transform(test_df)
    
evaluator.evaluate(best_model_results)

In [None]:
# evaluator = BinaryClassificationEvaluator(labelCol = 'canceled', metricName = 'f1Measure')

metrics_to_evaluate = [ 'accuracy', 'f1', 'weightedPrecision', 'weightedRecall' ]

evaluator = MulticlassClassificationEvaluator(labelCol = 'canceled', metricName = 'f1')
 
best_model_results = cv_rf_results.bestModel.transform(test_df)
    
evaluator.evaluate(best_model_results)

In [None]:
evaluate_multiclass_classifier(best_model_results, ('canc', 'prediction'))

In [None]:
evaluate_multiclass_classifier(best_model_results, ('canc', 'prediction'))

In [None]:
best_model_results.select(['features', 'prediction', 'canc']).show(10, False)

In [None]:
best_model_results.select(['rawPrediction', 'prediction', 'canc']).show(10, False)

In [None]:
test_df.filter('canc = 1').count()

In [None]:
train_df.filter('canc = 1').count()

In [None]:
best_model_results.select("prediction", "canc", "features").filter('canc = 1').groupby(['canc', 'prediction']).agg({'canc':'count'}).show(50)

In [None]:
cv_rf_results.bestModel.stages[-1]

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.