## Churn Prediction with PySpark using MLlib and ML Packages

In [None]:
# Check version of python

import sys
print(sys.version)

In [None]:
# Source: https://www.mapr.com/blog/churn-prediction-pyspark-using-mllib-and-ml-packages

In [None]:
# Disable warnings
import warnings
warnings.filterwarnings('ignore')

In [None]:
#set Matplotlib inline plotting and load Pandas package
%matplotlib inline
import pandas as pd
pd.options.display.mpl_style = 'default'

In [None]:
from pyspark import SparkContext, SparkConf, HiveContext, Row

hive_ctx = HiveContext(sc)
hive_ctx.setConf("hive.exec.dynamic.partition", "true")
hive_ctx.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
hive_ctx.setConf("hive.exec.max.dynamic.partitions", "10000")
hive_ctx.sql("SET spark.sql.parquet.compression.codec=snappy") 

hive_ctx.sql("use prod_dxbreach_metrics")

In [None]:
# Load data 

Churn_data = hive_ctx.sql("Select * from bi_churnanalysistable")

In [None]:
# Look at the first 5 results

pd.DataFrame(Churn_data.take(5), columns=Churn_data.columns).transpose() 

# Same thing but without transpose (less easy to read)

 # pd.DataFrame(Churn_data.take(5), columns=Churn_data.columns)

### Split between training and test

In [None]:
# Obtain the number of rows:
Churn_data.describe().toPandas().transpose()

In [None]:
# Create pandas df
Churn_data_Pandas = pd.DataFrame(Churn_data.take(56126), columns=Churn_data.columns) # take(total number of rows)

# Split the data
Churn_train = Churn_data_Pandas.sample(frac=0.8) 
Churn_test = Churn_data_Pandas.drop(Churn_train.index)

## Summary Statistics

In [None]:
Churn_train.describe().transpose()

## Correlations and Data Preparation

In [None]:
# Cast ischurned as int

Churn_train[['ischurned']] = Churn_train[['ischurned']].apply(pd.to_numeric)

In [None]:
# take a look at the different types of data in Churn_data

Churn_train.dtypes

In [None]:
# Keep only int64 features

cols = list(Churn_train.ix[:,3:8] + Churn_train.ix[:,9:17] + Churn_train.ix[:,18:31]) 

# Select a 10% sample

Churn_train[cols].sample(frac=0.1).transpose()

In [None]:
# Keep only the numeric features

# numeric_features = [t[0] for t in Churn_train.dtypes if t[1] == 'int64' or t[1] == 'float64'] 

# sampled_data = Churn_train.select(numeric_features).sample(False, 0.10).toPandas()


In [None]:
# Matrix of covariance

sampled_data = Churn_train[cols].sample(frac=0.1) 

axs = pd.scatter_matrix(sampled_data, figsize=(12, 12)); 

# Rotate axis labels and remove axis ticks
n = len(sampled_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

### Alternative 1: Export to CSV and use R to determine which fields to drop (glm method)

In [None]:
# If on my computer: 
# pd.DataFrame.to_csv()
# import os
# path = 'C:\Users\mdeleseleuc\Documents'
# output_filename = 'Churn_train.csv'
# Churn_train.to_csv(os.path.join(path,output_filename))

# In the cluster:

Churn_train_Spark = sqlContext.createDataFrame(Churn_train) # Convert Panda df to Spark df

Churn_train_Spark.registerTempTable("mytempTableTrain") # Create temporary table 

# sqlContext.sql("drop table mytable")
sqlContext.sql("create table if not exists mytableTrain as select * from mytempTable") # create table in hive

### Alternative 2: Drop useless fields by hand (don't do that...)

In [None]:
# Let's get rid of highly correlated variables

Churn_train = Churn_train.drop('lifespan').drop('daysincelastseen') \
             .drop('uniquemapcompleted').drop()

Churn_train.describe().transpose()

### Alternative 3: Feature Selection using tree Classifier

In [None]:
# Source 1: http://scikit-learn.org/stable/modules/feature_selection.html
"""Source 2: https://kaggle2.blob.core.windows.net/forum-message-attachments/44681/1286/
kaggle_forest.py?sv=2012-02-12&se=2016-11-14T22%3A52%3A58Z&sr=b&sp=r&sig=vizTB90DrEljcgvjIGgVVkPowHxAs%2BKO%2BZmqCzf8lms%3D"""

from sklearn.ensemble import ExtraTreesClassifier
from sklearn.feature_selection import SelectFromModel
from sklearn import ensemble

# feature_cols = [col for col in Churn_train.columns if col in Churn_train[cols]]

feature_cols = [col for col in Churn_train.columns if col not in ['s__uid','cohort','monthcohort','avgattemptspermap', 
                                                                  'successesfailsratio','ischurned']
               ]

X_train = Churn_train[feature_cols]
y = Churn_train['ischurned']

X_train.shape              

In [None]:
# Vérify that columns are the one we wanted

feature_cols

In [None]:
# Get rid of NaN values
from sklearn.preprocessing import Imputer 
X_train = Imputer().fit_transform(X_train)

clf = ensemble.RandomForestClassifier()
clf.fit(X_train, y)

# Print the features importance (contribution to total variance)
clf.feature_importances_  

In [None]:
# Take a look at the new shape

model = SelectFromModel(clf, prefit=True)
X_new = model.transform(X_train)
X_new.shape

In [None]:
# Build a forest and compute the feature importances

# Source: http://scikit-learn.org/stable/auto_examples/ensemble/plot_forest_importances.html
          #sphx-glr-auto-examples-ensemble-plot-forest-importances-py

import numpy as np

importances = clf.feature_importances_
indices = np.argsort(importances)[::-1]

# Print the feature ranking
print("Feature ranking:")

for f in range(X_train.shape[1]):
    print("%d. feature %d (%f)" % (f + 1, indices[f], importances[indices[f]]))
    
""" Informative features =  'firstsessionduration','uniquemapsplayed', 'totalinfiltrations', 'successes', 'retries',
                           'fails', 'lifespan',  'playerschallenges',  'weaponparts'
"""
# Total variance explained (with informative features) = 78% 

### Reprocessing: Normalize the features

Source

http://machinelearningmastery.com/rescaling-data-for-machine-learning-in-python-with-scikit-learn/

## Using the Spark MLlib Package

Decision trees require almost no data preparation (ie normalization) and can handle both categorical and continuous data.

To remedy over-fitting and improve prediction accuracy, decision trees can also be limited to a certain depth or complexity, or bundled into ensembles of trees (ie random forests).

A decision tree is a predictive model which maps observations (features) about an item to conclusions about the item's label or class. The model is generated using a top-down approach, where the source dataset is split into subsets using a statistical measure, often in the form of the Gini index or information gain via Shannon entropy. This process is applied recursively until a subset contains only samples with the same target class, or is halted by a predefined stopping criteria.

### Model Training

MLlib classifiers and regressors require data sets in a format of rows of type LabeledPoint, which separates row labels and feature lists, and names them accordingly. The custom labelData() function shown below performs the row parsing. We'll pass it the prepared data set (Churn_train) and split it further into training and testing sets. 

A decision tree classifier model is then generated using the training data, using a maxDepth of 2, to build a "shallow" tree. The tree depth can be regarded as an indicator of model complexity.

In [None]:
# Move ischurned to the end 

Churn_train_map = Churn_train[feature_cols] # columns excluding categorical variables

Churn_train_map['ischurned'] = Churn_train['ischurned'] # Add ischurned (at the end)

# Did it work?
Churn_train_map.columns

In [None]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree

# LabeledPoint is basically the structure saying which is your target variable (label) and your features vector

def labelData(data):
    # label: row[end], features: row[0:end-1]
    return sqlContext.createDataFrame(data).map(lambda row: LabeledPoint(row[-1], row[:-1]))

# sqlContext.createDataFrame(panda_df) = convert to spark_df
       
training_data, testing_data = labelData(Churn_train_map).randomSplit([0.8, 0.2])

model = DecisionTree.trainClassifier(training_data, numClasses=2, 
                                     categoricalFeaturesInfo = {}, # variables are all continuous
                                     impurity='gini', maxDepth=8, maxBins=32) #Max depth = model complexity
 
print model.toDebugString()

The toDebugString() function provides a print of the tree's decision nodes and final prediction outcomes at the end leafs. 

We can see that features 10 and 16 are used for decision making and should thus be considered as having high predictive power to determine a customer's likeliness to churn. 

Decision trees are often used for feature selection because they provide an automated mechanism for determining the most important features (those closest to the tree root).

N.B: in line with features selection above.

In [None]:
# Sneak peak at the labeled data.

training_data.toDF().show()

In [None]:
print 'Feature 13:', Churn_train_map.columns[13]
print 'Feature 15:', Churn_train_map.columns[15]

## Model Evaluation

Predictions of the testing data's churn outcome are made with the model's predict() function and grouped together with the actual churn label of each customer data using getPredictionsLabels().

We'll use MLlib's MulticlassMetrics() for the model evaluation, which takes rows of (prediction, label) tuples as input. It provides metrics such as precision, recall, F1 score and confusion matrix, which have been bundled for printing with the custom printMetrics() function.

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

def getPredictionsLabels(model, test_data):
    predictions = model.predict(test_data.map(lambda r: r.features))
    return predictions.zip(test_data.map(lambda r: r.label))

def printMetrics(predictions_and_labels):
    metrics = MulticlassMetrics(predictions_and_labels)
    print 'Precision of True ', metrics.precision(1)
    print 'Precision of False', metrics.precision(0)
    print 'Recall of True    ', metrics.recall(1)
    print 'Recall of False   ', metrics.recall(0)
    print 'F-1 Score         ', metrics.fMeasure()
    print 'Confusion Matrix\n', metrics.confusionMatrix().toArray()

predictions_and_labels = getPredictionsLabels(model, testing_data)

printMetrics(predictions_and_labels)

F-1 Score = Overall accuracy of the model

The precision is the ratio tp / (tp + fp) where tp is the number of true positives and fp the number of false positives. 
The precision is intuitively the ability of the classifier not to label as positive a sample that is negative.

The recall is the ratio tp / (tp + fn) where tp is the number of true positives and fn the number of false negatives. 
The recall is intuitively the ability of the classifier to find all the positive samples.

The recall (aka sensitivity) for the Churn=True samples is high, while the recall for the Churn=False examples is relatively low.

Perhaps the model's sensitivity bias toward Churn= True samples is due to a skewed distribution of the two types of samples.

Let's try grouping the Churn_train DataFrame by the ischurned field and counting the number of instances in each group.

In [None]:
Churn_train.groupby('ischurned')['ischurned'].count()

There are roughly 7 times as many True churn samples as False churn samples. 

## Stratified Sampling

We can put the two sample types on the same footing using stratified sampling. The DataFrames sampleBy() function does this when provided with fractions of each sample type to be returned.

In [None]:
Churn_train_strat = sqlContext.createDataFrame(Churn_train) # transform into PySpark dataset to be able to use sampleBy

Here we're keeping all instances of the Churn=False class, but downsampling the Churn=True class to a fraction of 5672/39229.

In [None]:
stratified_Churn_train = Churn_train_strat.sampleBy('ischurned', fractions={0:1.0, 1: 5672./39229}).cache()

stratified_Churn_train.groupby('ischurned').count().toPandas()

In [None]:
stratified_Churn_train.describe().toPandas().transpose() # 11,484 rows (reduced the number of true)

# Convert dataset to dataframe

stratified_Churn_train = pd.DataFrame(stratified_Churn_train.take(11484), columns=stratified_Churn_train.columns) 

In [None]:
# Move ischurned to the end 

stratified_Churn_train_map = stratified_Churn_train[feature_cols] # columns excluding categorical variables

stratified_Churn_train_map['ischurned'] = stratified_Churn_train['ischurned'] # Add ischurned (at the end)

# Did it work?
Churn_train_map.columns

In [None]:
# Create the decision tree

training_data, testing_data = labelData(stratified_Churn_train_map).randomSplit([0.8, 0.2])

model = DecisionTree.trainClassifier(training_data, numClasses=2, 
                                     categoricalFeaturesInfo = {}, # variables are all continuous
                                     impurity='gini', maxDepth=8, maxBins=32) #Max depth = model complexity
 
print model.toDebugString()

In [None]:
# Evaluation its accuracy

predictions_and_labels = getPredictionsLabels(model, testing_data)
printMetrics(predictions_and_labels)

stratified data was helpful in building a less biased model which will provide more generalized and robust predictions.

But: accuracry greatly decreased!

## Model Selection