In [None]:
# Upgrading pip
! python3.7 -m pip install --upgrade pip
# Downloading pyspark library
! pip install pyspark

In [None]:
# Importing and configuration of Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Churn Prediction with PySpark").getOrCreate()

In [None]:
CV_data = spark.read.csv('/content/churn-bigml-80 (1).csv', header='true', inferSchema='true')

final_test_data = spark.read.csv('/content/churn-bigml-20 (2).csv', header='true', inferSchema='true')

CV_data.printSchema()

In [None]:
print("The training dataset contains {} samples.".format(CV_data.count()))
print("The test dataset contains {} samples.".format(final_test_data.count()))

In [None]:
CV_data.select("*").toPandas().head().transpose()


In [None]:
numeric_features = [t[0] for t in CV_data.dtypes if t[1] == 'int' or t[1] == 'double']

CV_data.describe(numeric_features).toPandas().transpose()

In [None]:
sampled_data = CV_data.select(numeric_features).sample(False, 0.10)

print("The sampled dataset contains {} samples.".format(sampled_data.count()))

In [None]:
import seaborn as sns
sns.set(style="ticks")
g = sns.pairplot(sampled_data.toPandas(), diag_kind="kde")
fig = g.fig
fig.subplots_adjust(top=0.93, wspace=0.3)
fig.suptitle('Churn Attributes Pairwise Plots', fontsize=14, fontweight='bold');

In [None]:
def get_data(df, removeCols):
    df = df.drop(*removeCols) \
        .withColumn("Churn", when(df["Churn"] == 'true', 1.0).otherwise(0.0)) \
        .withColumn('International plan', when(df["International plan"] == 'Yes', 1.0).otherwise(0.0)) \
        .withColumn('Voice mail plan', when(df["Voice mail plan"] == 'Yes', 1.0).otherwise(0.0))
    return df

In [None]:
from pyspark.sql.functions import split, col, round, when

removeCols = ['State', 'Area code', 'Total day charge', 'Total eve charge', 'Total night charge', 'Total intl charge']

CV_data = get_data(CV_data, removeCols=removeCols)
final_test_data = get_data(final_test_data, removeCols=removeCols)

In [None]:
import pandas as pd
pd.DataFrame(CV_data.take(5), columns=CV_data.columns).transpose()

In [None]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree

def labelData(data):
    # label: row[end], features: row[0:end-1]
    return data.rdd.map(lambda row: LabeledPoint(row[-1], row[:-1]))

training_data, testing_data = labelData(CV_data).randomSplit([0.8, 0.2])

print("The two first rows of the training data RDD:")
print(training_data.take(2))
print("============================")

model = DecisionTree.trainClassifier(training_data, numClasses=2, maxDepth=2,
                                     categoricalFeaturesInfo={1:2, 2:2},
                                     impurity='gini', maxBins=32)
print(model.toDebugString())

In [23]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Make predictions on the testing data
predictions = model.predict(testing_data.map(lambda x: x.features))

# Create an RDD of (prediction, label) pairs
predictions_and_labels = predictions.zip(testing_data.map(lambda x: x.label))

# Create a MulticlassMetrics object
metrics = MulticlassMetrics(predictions_and_labels)

# Calculate precision for class 1 (True)
precision_true = metrics.precision(1)

# Calculate recall for class 1 (True)
recall_true = metrics.recall(1)

# Calculate F1-score for class 1 (True) manually
f1_score_true = 2 * (precision_true * recall_true) / (precision_true + recall_true)
accuracy = metrics.accuracy
print("Precision (True): {:.4f}".format(precision_true))
print("Recall (True): {:.4f}".format(recall_true))
print("F1-Score (True): {:.4f}".format(f1_score_true))
print("Accuracy: {:.4f}".format(accuracy))



Precision (True): 0.7000
Recall (True): 0.4330
F1-Score (True): 0.5350
Accuracy: 0.8612


In [24]:
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml import Pipeline

def get_dummy(df, numericCols, labelCol):
    # Combining a given list of columns into a single vector column features
    assembler = VectorAssembler(inputCols=numericCols, outputCol="features")

    # Index labels, adding metadata to the label column
    indexer = StringIndexer(inputCol=labelCol, outputCol='indexedLabel')

    # Automatically identify categorical features and index them
    featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=2)

    pipeline = Pipeline(stages = [assembler] + [indexer] + [featureIndexer])

    model = pipeline.fit(df)
    data = model.transform(df)

    data = data.withColumn('label', col(labelCol))

    return data.select('features', 'label', 'indexedFeatures', 'indexedLabel')

In [25]:
numericCols = CV_data.columns
numericCols.remove("Churn")

vectorized_CV_data = get_dummy(CV_data, numericCols, "Churn")
vectorized_stratified_CV_data = get_dummy(CV_data, numericCols, "Churn")
vectorized_final_test_data = get_dummy(final_test_data, numericCols, "Churn")

In [26]:
vectorized_stratified_CV_data.show(2, False)

+---------------------------------------------------------------------+-----+---------------------------------------------------------------------+------------+
|features                                                             |label|indexedFeatures                                                      |indexedLabel|
+---------------------------------------------------------------------+-----+---------------------------------------------------------------------+------------+
|[128.0,0.0,1.0,25.0,265.1,110.0,197.4,99.0,244.7,91.0,10.0,3.0,1.0]  |0.0  |[128.0,0.0,1.0,25.0,265.1,110.0,197.4,99.0,244.7,91.0,10.0,3.0,1.0]  |0.0         |
|[107.0,0.0,1.0,26.0,161.6,123.0,195.5,103.0,254.4,103.0,13.7,3.0,1.0]|0.0  |[107.0,0.0,1.0,26.0,161.6,123.0,195.5,103.0,254.4,103.0,13.7,3.0,1.0]|0.0         |
+---------------------------------------------------------------------+-----+---------------------------------------------------------------------+------------+
only showing top 2 rows



In [27]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

def cvComparing(vectorized_train_data, vectorized_test_data, classifiers, paramGrid, numFolds, roundLevel, seed):
    evaluatorB = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="indexedLabel")
    evaluatorf1 = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
    evaluatorwp = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="weightedPrecision")
    evaluatorwr = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="weightedRecall")
    evaluatoracc = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

    results = []

    for name, clf in classifiers:
        cv = CrossValidator(estimator=clf, estimatorParamMaps=paramGrid, evaluator=evaluatorf1, numFolds=numFolds, seed=seed, collectSubModels=True)
        cvModel = cv.fit(vectorized_train_data)
        predict_train = cvModel.transform(vectorized_train_data)
        predict_test = cvModel.transform(vectorized_test_data)

        underROC_train = evaluatorB.evaluate(predict_train)
        underROC_test = evaluatorB.evaluate(predict_test)
        acc_train = evaluatoracc.evaluate(predict_train)
        acc_test = evaluatoracc.evaluate(predict_test)
        f1_train = evaluatorf1.evaluate(predict_train)
        f1_test = evaluatorf1.evaluate(predict_test)
        wp_train = evaluatorwp.evaluate(predict_train)
        wp_test = evaluatorwp.evaluate(predict_test)
        wr_train = evaluatorwr.evaluate(predict_train)
        wr_test = evaluatorwr.evaluate(predict_test)

        results.append((name, underROC_train, underROC_test, acc_train, acc_test, f1_train, f1_test, wp_train, wp_test, wr_train, wr_test))

    schema = ['Classifier name', 'underROC_train', 'underROC_test', 'Accuracy_train', 'Accuracy_test', 'f1_train', 'f1_test', 'wPrecision_train', 'wPrecision_test', 'wRecall_train', 'wRecall_test']

    cvResults = spark.createDataFrame(results, schema=schema)

    for t in cvResults.dtypes:
        if t[1] == 'double':
            cvResults = cvResults.withColumn(t[0], round(cvResults[t[0]], roundLevel))

    return cvResults


In [28]:
from pyspark.ml.classification import LogisticRegression, NaiveBayes, LinearSVC, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder().build()

classifiers = []
##############################################
classifiers.append(('LR', LogisticRegression(labelCol='indexedLabel', featuresCol='indexedFeatures')))
classifiers.append(('NB', NaiveBayes(labelCol='indexedLabel', featuresCol='indexedFeatures')))
classifiers.append(('SVC', LinearSVC(labelCol='indexedLabel', featuresCol='indexedFeatures')))
classifiers.append(('DT', DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')))
classifiers.append(('RF', RandomForestClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')))
classifiers.append(('GBT', GBTClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')))
##############################################


cvResults_CV_data = cvComparing(vectorized_train_data=vectorized_CV_data, vectorized_test_data=vectorized_final_test_data,
                                classifiers=classifiers, paramGrid=paramGrid, numFolds=5, roundLevel=3, seed=123)

cvResults_stratified_CV_data = cvComparing(vectorized_train_data=vectorized_stratified_CV_data,
                                           vectorized_test_data=vectorized_final_test_data, classifiers=classifiers,
                                           paramGrid=paramGrid, numFolds=5, roundLevel=3, seed=123)

In [29]:
cvResults_CV_data.toPandas().set_index('Classifier name')

Unnamed: 0_level_0,underROC_train,underROC_test,Accuracy_train,Accuracy_test,f1_train,f1_test,wPrecision_train,wPrecision_test,wRecall_train,wRecall_test
Classifier name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
LR,0.597,0.577,0.863,0.855,0.836,0.827,0.835,0.82,0.863,0.855
NB,0.61,0.61,0.632,0.625,0.686,0.681,0.8,0.804,0.632,0.625
SVC,0.5,0.5,0.854,0.858,0.787,0.792,0.73,0.735,0.854,0.858
DT,0.841,0.818,0.948,0.945,0.945,0.94,0.948,0.945,0.948,0.945
RF,0.808,0.758,0.941,0.924,0.936,0.915,0.943,0.922,0.941,0.924
GBT,0.921,0.859,0.976,0.954,0.975,0.951,0.976,0.953,0.976,0.954


In [30]:
cvResults_stratified_CV_data.toPandas().set_index('Classifier name')

Unnamed: 0_level_0,underROC_train,underROC_test,Accuracy_train,Accuracy_test,f1_train,f1_test,wPrecision_train,wPrecision_test,wRecall_train,wRecall_test
Classifier name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
LR,0.597,0.577,0.863,0.855,0.836,0.827,0.835,0.82,0.863,0.855
NB,0.61,0.61,0.632,0.625,0.686,0.681,0.8,0.804,0.632,0.625
SVC,0.5,0.5,0.854,0.858,0.787,0.792,0.73,0.735,0.854,0.858
DT,0.841,0.818,0.948,0.945,0.945,0.94,0.948,0.945,0.948,0.945
RF,0.808,0.758,0.941,0.924,0.936,0.915,0.943,0.922,0.941,0.924
GBT,0.921,0.859,0.976,0.954,0.975,0.951,0.976,0.953,0.976,0.954


In [35]:
evaluatorf1 = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
evaluatorwp = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="weightedPrecision")
evaluatorwr = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="weightedRecall")
evaluatorac = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

clf = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')
# clf = NaiveBayes(labelCol='indexedLabel', featuresCol='indexedFeatures')

clfModel = clf.fit(vectorized_stratified_CV_data)
pred_train = clfModel.transform(vectorized_stratified_CV_data)

print('Weighted Precision   ', evaluatorwp.evaluate(pred_train))
print('Weighted Recall      ', evaluatorwr.evaluate(pred_train))
print('F1                   ', evaluatorf1.evaluate(pred_train))
print('Accuracy             ', evaluatorac.evaluate(pred_train))






Weighted Precision    0.947758168313307
Weighted Recall       0.9482370592648162
F1                    0.9448870264090747
Accuracy              0.9482370592648162


In [36]:
print("The metric name used by the BinaryClassificationEvaluator is {}.".format(BinaryClassificationEvaluator().getMetricName()))

The metric name used by the BinaryClassificationEvaluator is areaUnderROC.
