In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 



# install findspark using pip
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

In [None]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession

memory = '8g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

# start a spark session 
spark = SparkSession.builder.master("local[*]").getOrCreate()

# load data with inferred schema 
df = spark.read.options(header='True', inferSchema='True', delimiter=',') \
  .csv("Complete_Data_BDT.csv")
#df = spark.read.load("C:/Users/Admin/Downloads/Complete_Data_BDT.csv", format="csv", inferSchema="true", header="true")

# The inferred schema can be seen using .printSchema().
#df.printSchema()

In [None]:
import pandas as pd

#Converting spark df into pandas just for using SMOTE
df = df.toPandas()

#splitting X and Y sets and the test train data
y = df.CHURN
x = df.drop(columns=['CHURN'])

In [None]:
#splitting test and train sets

from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.2, random_state = 1)

Upsample is done only for train dataset and test should be untouched

In [None]:
from imblearn.over_sampling import SMOTE
os = SMOTE(random_state=0)
#columns = X_train.columns
os_data_x,os_data_y=os.fit_resample(x_train, y_train)
x_train = pd.DataFrame(data=os_data_x, )
y_train = pd.DataFrame(data=os_data_y,columns=['CHURN'])
# we can Check the numbers of our data
print("length of oversampled data is ",len(x_train))
print("Number of no subscription in oversampled data",len(y_train[y_train['CHURN']==0]))
print("Number of subscription",len(y_train[y_train['CHURN']==1]))
print("Proportion of no subscription data in oversampled data is ",len(y_train[y_train['CHURN']==0])/len(x_train))
print("Proportion of subscription data in oversampled data is ",len(y_train[y_train['CHURN']==1])/len(x_train))

length of oversampled data is  837618
Number of no subscription in oversampled data 418809
Number of subscription 418809
Proportion of no subscription data in oversampled data is  0.5
Proportion of subscription data in oversampled data is  0.5


Now the pandas df is converted back to spark df for further modelling

In [None]:
combined_train_dataset = pd.concat([x_train,y_train], axis=1, join='inner')
combined_train_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 837618 entries, 0 to 837617
Columns: 138 entries, MONTANT to CHURN
dtypes: float64(10), int32(128)
memory usage: 472.9 MB


In [None]:
combined_test_dataset = pd.concat([x_test,y_test], axis=1, join='inner')
combined_test_dataset.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 107246 entries, 236859 to 110351
Columns: 138 entries, MONTANT to CHURN
dtypes: float64(10), int32(128)
memory usage: 66.4 MB


In [None]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
#converting to spark df
train_dataset = spark.createDataFrame(combined_train_dataset)
test_dataset = spark.createDataFrame(combined_test_dataset)

Now createing feature vector using vectorAssemebler

In [None]:
#vector assembler threw error since one column header contains '.' . removing column headers before passing it to the function
tempList = [] #Edit01
for col in train_dataset.columns:
  new_name = col.strip()
  #new_name = "".join(new_name.split())
  new_name = new_name.replace('.','point') # EDIT
  tempList.append(new_name) #Edit02
#print(tempList) #Just for the sake of it #Edit03

train_dataset = train_dataset.toDF(*tempList)
test_dataset = test_dataset.toDF(*tempList)

In [None]:
import time
from pyspark.ml.feature import VectorAssembler
inputCols = train_dataset.drop('CHURN').columns
outputCol = "features"
assembler = VectorAssembler(inputCols = inputCols, outputCol = outputCol)
df_va_train = assembler.transform(train_dataset)

df_va_test = assembler.transform(test_dataset)

In [None]:
df_va_train.show()

+------------+--------------+------------+------------+------------+------------+------------+------------+------------+-------------+---------------+-------------+---------------+--------------+---------------+------------+------------+------------+------------+------------------+--------------+------------------+------------+-----------------+------------------+-------------------+--------------------+--------------------+--------------------+--------------------+-------------------+---------------------------+--------------------------+------------------------------+------------------+--------------------------+-----------------------+-----------------------------+----------------------------------------------+---------------------------+-------------------------------------------+----------------------------------------+---------------------------------------------+-------------------------------------------------+------------------------------+--------------------------------+----

In [None]:
df_va_train.select('features').show(5, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                           |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(137,[0,1,2,3,4,5,6,7,8,9,19,30,53],[-0.853655602,-0.854446657,-0.854349191,-0.594111697,-0.452242668,-0.363400022,-0.531390018,-0.37139143,-0.879382391,-0.792487905,1.0,1.0,1.0])|
|(137,[0,1,2,3,4,5,6,7,8,9,18,30,53],[-0.731654855,-0.010737747,-0.727588549,-0.172071006,-0.351582447,-0.375405239,-0.440016615,-0.327560704,-2.233780515,0.272243328,1.0,1.0,1.0])|
|(137,[0,1,2,3,4,5,6,7,8,9,19,30,71],[0.268751269,2.0011835,0.294037582,1.938132446,0.1262

Decision Tree Classifier

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

dt = DecisionTreeClassifier(labelCol="CHURN", featuresCol="features", maxDepth=25, minInstancesPerNode=30, impurity="gini")
pipeline = Pipeline(stages=[dt])
st = time.time()
model = pipeline.fit(df_va_train)
et = time.time()
print(et-st)

100.11413717269897


In [None]:
y_pred = model.transform(df_va_test)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
tp = y_pred.filter(y_pred.prediction==1.0).filter(y_pred.prediction == y_pred.CHURN).count()
tn = y_pred.filter(y_pred.prediction==0.0).filter(y_pred.prediction == y_pred.CHURN).count()
fp = y_pred.filter(y_pred.prediction==1.0).filter(y_pred.prediction != y_pred.CHURN).count()
fn = y_pred.filter(y_pred.prediction==0.0).filter(y_pred.prediction != y_pred.CHURN).count()
print("Accuracy: ", (tp+tn)/(y_pred.count()))
print("Precision: ", tp/(tp+fp))
print("Recall: ", tp/(tp+fn))
evaluator = BinaryClassificationEvaluator(labelCol="CHURN", metricName='areaUnderROC')
print("AUC: ", evaluator.evaluate(y_pred))

Accuracy:  0.9068030509296384
Precision:  0.14807398932112892
Recall:  0.59433601224646
AUC:  0.6052109242258152


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluatorMulti = MulticlassClassificationEvaluator(labelCol="CHURN", predictionCol="prediction")
acc = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "accuracy"})
precision = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "precisionByLabel"})
recall = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "recallByLabel"})
f1 = evaluatorMulti.evaluate(y_pred, {evaluatorMulti.metricName: "f1"})
roc_auc = evaluatorMulti.evaluate(y_pred)


In [None]:
print("accuracy: %f, precision: %f, recall: %f, f1: %f, roc_auc: %f" % (acc, precision, recall, f1, roc_auc))

accuracy: 0.912155, precision: 0.988999, recall: 0.920197, f1: 0.936139, roc_auc: 0.936139


Gradient Boosting classification

In [None]:
from pyspark.ml.classification import GBTClassifier
# define the classifier 
gbt = GBTClassifier(labelCol="CHURN", featuresCol="features")

# train the classifier with train data 
st = time.time()
GBT = gbt.fit(df_va_train)
et = time.time()
print(et-st)
# predict test data 
y_pred_GBT = GBT.transform(df_va_test)

381.95277428627014


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluatorMulti = MulticlassClassificationEvaluator(labelCol="CHURN", predictionCol="prediction")
acc = evaluatorMulti.evaluate(y_pred_GBT, {evaluatorMulti.metricName: "accuracy"})
precision = evaluatorMulti.evaluate(y_pred_GBT, {evaluatorMulti.metricName: "precisionByLabel"})
recall = evaluatorMulti.evaluate(y_pred_GBT, {evaluatorMulti.metricName: "recallByLabel"})
f1 = evaluatorMulti.evaluate(y_pred_GBT, {evaluatorMulti.metricName: "f1"})
roc_auc = evaluatorMulti.evaluate(y_pred_GBT)

In [None]:
print("accuracy: %f, precision: %f, recall: %f, f1: %f, roc_auc: %f" % (acc, precision, recall, f1, roc_auc))

accuracy: 0.881497, precision: 0.995248, recall: 0.882752, f1: 0.919041, roc_auc: 0.919041


Random Forest Classification

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="CHURN",featuresCol="features").setImpurity("gini").setMaxDepth(6).setSeed(90)
st = time.time()
RFC = rf.fit(df_va_train)
et = time.time()
print(et-st)
y_pred_RF = RFC.transform(df_va_test)

83.42509007453918


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluatorMulti = MulticlassClassificationEvaluator(labelCol="CHURN", predictionCol="prediction")
acc = evaluatorMulti.evaluate(y_pred_RF, {evaluatorMulti.metricName: "accuracy"})
precision = evaluatorMulti.evaluate(y_pred_RF, {evaluatorMulti.metricName: "precisionByLabel"})
recall = evaluatorMulti.evaluate(y_pred_RF, {evaluatorMulti.metricName: "recallByLabel"})
f1 = evaluatorMulti.evaluate(y_pred_RF, {evaluatorMulti.metricName: "f1"})
roc_auc = evaluatorMulti.evaluate(y_pred_RF)

In [None]:
print("accuracy: %f, precision: %f, recall: %f, f1: %f, roc_auc: %f" % (acc, precision, recall, f1, roc_auc))

accuracy: 0.859575, precision: 0.996849, recall: 0.858783, f1: 0.905955, roc_auc: 0.905955


Logistic Regression

In [None]:
from pyspark.ml.classification import GBTClassifier, LogisticRegression

# define the classifier 
log_model = LogisticRegression(labelCol="CHURN", featuresCol="features", maxIter=1000)

# train the classifier with train data 
st = time.time()
LG = log_model.fit(df_va_train)
et = time.time()
print(et-st)

# predict test data 
y_pred_LR = LG.transform(df_va_test)

300.66247844696045


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluatorMulti = MulticlassClassificationEvaluator(labelCol="CHURN", predictionCol="prediction")
acc = evaluatorMulti.evaluate(y_pred_LR, {evaluatorMulti.metricName: "accuracy"})
precision = evaluatorMulti.evaluate(y_pred_LR, {evaluatorMulti.metricName: "precisionByLabel"})
recall = evaluatorMulti.evaluate(y_pred_LR, {evaluatorMulti.metricName: "recallByLabel"})
f1 = evaluatorMulti.evaluate(y_pred_LR, {evaluatorMulti.metricName: "f1"})
roc_auc = evaluatorMulti.evaluate(y_pred_LR)

In [None]:
print("accuracy: %f, precision: %f, recall: %f, f1: %f, roc_auc: %f" % (acc, precision, recall, f1, roc_auc))

accuracy: 0.871025, precision: 0.996414, recall: 0.870939, f1: 0.912866, roc_auc: 0.912866
