In [15]:
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
from sklearn.ensemble import GradientBoostingClassifier
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report
import timeit
import numpy as np
from sklearn.model_selection import RandomizedSearchCV
from pyspark.sql import SparkSession
import findspark
import os
from pyspark.sql.functions import col
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import math
from pyspark.ml.tuning import CrossValidator
from random import random
from pyspark.ml.classification import GBTClassifier





os.environ["JAVA_HOME"] = 'C:/Program Files/Java/jre1.8.0_333/'
os.environ["SPARK_HOME"] = 'C:/spark/spark-3.3.0-bin-hadoop2/'
os.environ["HADOOP_HOME"] = 'C:/hadoop/'
#os.environ["PYSPARK_SUBMIT_ARGS"] = '--master local pyspark-shell'

In [2]:
findspark.find()

'C:/spark/spark-3.3.0-bin-hadoop2/'

In [3]:
findspark.init()

In [4]:
spark = SparkSession\
.builder\
.master("local")\
.config("spark.executor.cores", "4")\
.config("spark.executor.memory", "10g")\
.config("spark.driver.memory", "10g")\
.config("spark.executor.memoryOverhead", "10g")\
.appName("Churn Project").getOrCreate()

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [5]:
df = spark.read.csv('C:/Users/paulc/Documents/Data Analysis Courses/MastersChurn/ProjectWork/ChurnModelScaled.csv',inferSchema=True,header=True)
scaled_df = spark.read.csv('C:/Users/paulc/Documents/Data Analysis Courses/MastersChurn/ProjectWork/ChurnModelScaled.csv',inferSchema=True,header=True)

In [6]:
df = df.drop('_c0')
scaled_df = scaled_df.drop('_c0')

In [16]:
df.printSchema()

root
 |-- MonthNumber: double (nullable = true)
 |-- ClientProfileSummary: double (nullable = true)
 |-- Gender: double (nullable = true)
 |-- CashActive_YN: double (nullable = true)
 |-- L1DSBBetCount: double (nullable = true)
 |-- L7DSBBetCount: double (nullable = true)
 |-- L30DSBBetCount: double (nullable = true)
 |-- L90DSBBetCount: double (nullable = true)
 |-- L1DDepositCount: double (nullable = true)
 |-- L7DDepositCount: double (nullable = true)
 |-- L30DDepositCount: double (nullable = true)
 |-- L90DDepositCount: double (nullable = true)
 |-- L7DOtherSportsBetCount: double (nullable = true)
 |-- L30DOtherSportsBetCount: double (nullable = true)
 |-- L90DOtherSportsBetCount: double (nullable = true)
 |-- L90DUnsuccessfulDepositCount: double (nullable = true)
 |-- DaysSinceLastSBCashAPD: double (nullable = true)
 |-- DaysSinceLastSBAPD: double (nullable = true)
 |-- OlderMale40: double (nullable = true)
 |-- CustomerConcession90days: double (nullable = true)
 |-- Active_Next30

Vector Assembler

In [7]:
numericCols = ['MonthNumber', 'ClientProfileSummary', 'Gender', 'CashActive_YN',
       'L1DSBBetCount', 'L7DSBBetCount', 'L30DSBBetCount', 'L90DSBBetCount',
       'L1DDepositCount', 'L7DDepositCount', 'L30DDepositCount',
       'L90DDepositCount', 'L7DOtherSportsBetCount', 'L30DOtherSportsBetCount',
       'L90DOtherSportsBetCount', 'L90DUnsuccessfulDepositCount',
       'DaysSinceLastSBCashAPD', 'DaysSinceLastSBAPD', 'OlderMale40',
       'CustomerConcession90days',
       'L7DSBTurnover', 'US_SportsTurnover7D', 'L30DSBTurnover',
       'US_SportsTurnover30D', 'CustomerConcession30days', 'L90DSBTurnover',
       'L1DSBTurnover', 'US_SportsTurnover1D', 'L30DSBFreeBetsHandle',
       'US_SportsTurnover90D', 'L7DSBFreeBetsHandle']




assembler = VectorAssembler(inputCols=numericCols, outputCol="features")
df = assembler.transform(df)
scaled_df = assembler.transform(scaled_df)

Train/Test Split

In [8]:
train,test = df.randomSplit([0.7,0.3],seed=42)
train_scaled,test_scaled = scaled_df.randomSplit([0.7,0.3],seed=42)

<h3> Run a base Gradient Boosted Tree to benchmark


The implementation is based upon: J.H. Friedman. “Stochastic Gradient Boosting.” 1999.
Soon after the introduction of gradient boosting, Friedman proposed a minor modification to the algorithm, motivated by Breiman's bootstrap aggregation ("bagging") method.[6] Specifically, he proposed that at each iteration of the algorithm, a base learner should be fit on a subsample of the training set drawn at random without replacement.[12] Friedman observed a substantial improvement in gradient boosting's accuracy with this modification.

In [11]:
gbt = GBTClassifier(featuresCol='features',labelCol='Active_Next30Days_Cash_YN',maxIter=20,seed=42,featureSubsetStrategy='sqrt')


gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
#predictions.select('prediction','Active_Next30Days_Cash_YN', 'probability')


#Scaled model

gbt_scaled = GBTClassifier(featuresCol='features',labelCol='Active_Next30Days_Cash_YN',maxIter=20,seed=42,featureSubsetStrategy='sqrt')


gbtModel_scaled = gbt_scaled.fit(train_scaled)
predictions_scaled = gbtModel_scaled.transform(test_scaled)
#predictions_scaled.select('prediction','Active_Next30Days_Cash_YN', 'probability')




In [12]:
evaluation = MulticlassClassificationEvaluator(labelCol='Active_Next30Days_Cash_YN', predictionCol='prediction',metricName='accuracy')
accuracy = evaluation.evaluate(predictions)
print("The base GradientBoostedClassifier Test error is:", round((1.0 - accuracy),4))

evaluation_scaled = MulticlassClassificationEvaluator(labelCol='Active_Next30Days_Cash_YN', predictionCol='prediction',metricName='accuracy')
accuracy_scaled = evaluation_scaled.evaluate(predictions_scaled)
print("The base ScaledGradientBoostedClassifier Test error is:", round((1.0 - accuracy_scaled),4))

The base GradientBoostedClassifier Test error is: 0.1678
The base ScaledGradientBoostedClassifier Test error is: 0.1678


In [14]:
print("The Accuracy of the base GradientBoostedClassifier is:", round(accuracy,4))
print("The Accuracy of the base ScaledGradientBoostedClassifier is:", round(accuracy_scaled,4))
print("Improvement as a result of scaling is:", round(0.8354-0.8354,4))

The Accuracy of the base GradientBoostedClassifier is: 0.8322
The Accuracy of the base ScaledGradientBoostedClassifier is: 0.8322
Improvement as a result of scaling is: 0.0


In [25]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, lit

# replicate the spark dataframe into multiple copies
w = Window().orderBy()
replication_df = train.withColumn('replication_id', lit())

TypeError: lit() takes 1 positional argument but 2 were given

In [24]:
replication_df

DataFrame[MonthNumber: double, ClientProfileSummary: double, Gender: double, CashActive_YN: double, L1DSBBetCount: double, L7DSBBetCount: double, L30DSBBetCount: double, L90DSBBetCount: double, L1DDepositCount: double, L7DDepositCount: double, L30DDepositCount: double, L90DDepositCount: double, L7DOtherSportsBetCount: double, L30DOtherSportsBetCount: double, L90DOtherSportsBetCount: double, L90DUnsuccessfulDepositCount: double, DaysSinceLastSBCashAPD: double, DaysSinceLastSBAPD: double, OlderMale40: double, CustomerConcession90days: double, Active_Next30Days_Cash_YN: double, L7DSBTurnover: double, US_SportsTurnover7D: double, L30DSBTurnover: double, US_SportsTurnover30D: double, CustomerConcession30days: double, L90DSBTurnover: double, L1DSBTurnover: double, US_SportsTurnover1D: double, L30DSBFreeBetsHandle: double, US_SportsTurnover90D: double, L7DSBFreeBetsHandle: double, features: vector, replication_id: int]

<h3> Randomized Hyperparameter search


In [24]:
class RandomGridBuilder:
  '''Grid builder for random search. Sets up grids for use in CrossValidator in Spark using values randomly sampled from user-provided distributions.
  Distributions should be provided as lambda functions, so that the numbers are generated at call time.

  Parameters:
    num_models: Integer (Python) - number of models to generate hyperparameters for
    seed: Integer (Python) - seed (optional, default is None)

  Returns:
    param_map: list of parameter maps to use in cross validation.

  Example usage:
    from pyspark.ml.classification import LogisticRegression
    lr = LogisticRegression()
    paramGrid = RandomGridBuilder(2)\
               .addDistr(lr.regParam, lambda: np.random.rand()) \
               .addDistr(lr.maxIter, lambda : np.random.randint(10))\
               .build()

    Returns similar output as Spark ML class ParamGridBuilder and can be used in its place. The above paramGrid provides random hyperparameters for 2 models.
    '''

  def __init__(self, num_models, seed=None):
    self._param_grid = {}
    self.num_models = num_models
    self.seed = seed

  def addDistr(self, param, distr_generator):
    '''Add distribution based on dictionary generated by function passed to addDistr.'''

    if 'pyspark.ml.param.Param' in str(type(param)):
      self._param_grid[param] = distr_generator
    else:
      raise TypeError('param must be an instance of Param')

    return self

  def build(self):
    param_map = []
    for n in range(self.num_models):
      if self.seed:
        # Set seeds for both numpy and random in case either is used for the random distribution
        np.random.seed(self.seed + n)
        random.seed(self.seed + n)
      param_dict = {}
      for param, distr in self._param_grid.items():
        param_dict[param] = distr()
      param_map.append(param_dict)

    return param_map

In [30]:
gbtRS = GBTClassifier(featuresCol='features',labelCol='Active_Next30Days_Cash_YN',featureSubsetStrategy='sqrt')

# Set up cross validation with random search
randomParams = RandomGridBuilder(num_models=1)\
                 .addDistr(gbt.maxIter, lambda : np.random.randint(10,40))\
                 .addDistr(gbt.maxDepth, lambda : np.random.randint(5,30))\
                 .build()


In [33]:
evaluator = MulticlassClassificationEvaluator(labelCol='Active_Next30Days_Cash_YN', predictionCol='prediction',metricName='accuracy')

# Cross validator with random search
cv = CrossValidator(
                    estimator = gbtRS,
                    estimatorParamMaps = randomParams,
                    evaluator = evaluator,
                    parallelism = 10
                   )

cvModel = cv.fit(train)

KeyboardInterrupt: 

In [40]:
cvModel.bestModel

NameError: name 'cvModel' is not defined

In [None]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml import Pipeline

gbt = GBTClassifier(featuresCol='features',labelCol='Active_Next30Days_Cash_YN',featureSubsetStrategy='sqrt')
pipeline = Pipeline(stages=[gbt])
paramGrid = ParamGridBuilder().addGrid(gbt.maxDepth, [10, 30])\
                              .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)

cvModel = crossval.fit(train)