In [1]:
!pip install pyspark --quiet
!pip install -U -q PyDrive --quiet 
!apt install openjdk-8-jdk-headless &> /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

import pyspark
from pyspark.sql import SparkSession

sc = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[K     |████████████████████████████████| 198 kB 58.7 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
cleaned_data = sc.read \
  .option('header', 'True')\
  .option('inferSchema', 'True')\
  .option('sep', ',')\
  .csv('/content/drive/MyDrive/cleaned_data_without_duplicate.csv')
  # .csv('/content/drive/MyDrive/cleaned_data.csv')

In [4]:
cleaned_data.count()

7630

# balancing dataset

In [5]:
from pyspark.sql.functions import col, explode, array, lit

major_df = cleaned_data.filter(col("label") == 'No')
minor_df = cleaned_data.filter(col("label") == 'Yes')
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))
a = range(ratio)

oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

combined_df = major_df.unionAll(oversampled_df)

cleaned_data = combined_df

ratio: 2


In [6]:
cleaned_data = cleaned_data.drop('gender')
cleaned_data = cleaned_data.drop('_c0')
cleaned_data=cleaned_data.where((col('SeniorCitizen') != 14.0) | (col('SeniorCitizen') != 17.0) )
cleaned_data=cleaned_data.where(cleaned_data['tenure']> 0)

In [7]:
cleaned_data.count()

9562

In [8]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer, StandardScaler)
from pyspark.ml import Pipeline

stages = []

numeric_features = ['SeniorCitizen', 'tenure', 'MonthlyCharges', 'TotalCharges']
# numeric_features = ['SeniorCitizen', 'tenure', 'TotalCharges']
categorical_columns =  [item[0] for item in cleaned_data.dtypes if item[1].startswith('string')]
categorical_columns.remove('Label')

for feature in categorical_columns:
  stringIndexer = StringIndexer(inputCol=feature, outputCol=feature + 'Index')
  encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],outputCols=[feature + 'Vec'])
  stages+=[stringIndexer, encoder]

labelIndexer = StringIndexer(inputCol='Label', outputCol='label')
stages += [labelIndexer]
input_features = [c + 'Vec' for c in categorical_columns]+ numeric_features
assembler = VectorAssembler(inputCols= input_features, outputCol='features')
stages+=[assembler]
scaler = StandardScaler(inputCol='features', outputCol='standard_features')
stages+=[scaler]

pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(cleaned_data)
assembler_df = pipelineModel.transform(cleaned_data)

# training model

In [11]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

log_reg= LogisticRegression(featuresCol='features',labelCol='label')

train_data, test_data = assembler_df.randomSplit([0.8, 0.2])
fit_model = log_reg.fit(train_data)
results = fit_model.transform(test_data)

my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='label')
results.select('label','prediction')
AUC = my_eval.evaluate(results)
print("AUC score is : ",AUC)

accuracy = results.filter(results.label == results.prediction).count() / float(results.count())
print("Accuracy : ",accuracy)

AUC score is :  0.8489109148695582
Accuracy :  0.8340471092077087


In [12]:
TN = results.filter('prediction = 0 AND Label = prediction').count()
TP = results.filter('prediction = 1 AND Label = prediction').count()
FN = results.filter('prediction = 0 AND Label = 1').count()
FP = results.filter('prediction = 1 AND Label = 0').count()
Precision = TP/(TP+FP)
Recall = TP/(TP+FN)
F1=(2*Precision*Recall)/(Precision+Recall)
print("Precision : "+str(Precision))
print("Recall: : "+str(Recall))
print("F1: "+str(F1))

Precision : 0.7335423197492164
Recall: : 0.9273447820343461
F1: 0.8191365227537923
