In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt

In [None]:
image_size = 256
batch_size = 32

In [None]:
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession

import matplotlib.pyplot as plt
import plotly.express as px

import pandas as pd

# pyspark SQL functions
from pyspark.sql.functions import col, when, count, udf

# pyspark data preprocessing modules
from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder

# pyspark data modeling and model evaluation modules
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [None]:
spark = SparkSession.builder.appName("Customer_Churn_Prediction").getOrCreate()
spark # building spark session

In [None]:
data = spark.read.format('csv').option ("inferSchema",True).option( "header",True).load("/content/drive/MyDrive/data for project /dataset_pyspark_churn.csv")
data.show()

In [None]:
# print data schema
data.printSchema()

In [None]:
# data dimension
data.count()
len(data.columns)

**EDA**

In [None]:
numerical_columns = [name for name , type in data.dtypes if type == 'double' or type == "int"]
categorical_columns = [name for name , type in data.dtypes if type == 'string']

data.select(numerical_columns).show()

In [None]:
#store numerical_columns in pandas DF
df = data.select(numerical_columns).toPandas()
df.head()

In [None]:
df.describe()

In [None]:
df.info()

In [None]:
fig = plt.figure(figsize = (15,10))
ax  = fig.gca()
df.hist(ax = ax, bins = 20)


In [None]:
df.corr()

In [None]:
#find unique value count per each categorical variavbles
for column in categorical_columns:
  data.groupby(column).count().show()# will give result for all column
categorical_columns # show all columns


In [None]:
# count null values in all column
for column in data.columns:
  data.select(count(when(col(column).isNull(),column)).alias(column)).show() # for all columns
  #data.select(count(when(col("Churn").isNull(),"Churn")).alias("Churn")).show() # specific for  Churn

### preprocessing of data

In [None]:
colums_with_missing_values = ['TotalCharges']

In [None]:
# imputer for pre processing
imputer = Imputer(inputCols=colums_with_missing_values, outputCols=colums_with_missing_values).setStrategy("mean")

In [None]:
# imputer to fill missing value
imputer = imputer.fit(data)
data = imputer.transform(data)

In [None]:
for column in data.columns:
  # check for missimg values
  data.select(count(when(col(column).isNull(),column)).alias(column)).show()

### removing the outliers
lets find customers with the tenure higher than 100

In [None]:
data.select("*").where(data.tenure > 100).show()

In [None]:
# drop outliers row
print("Before removing the outliers", data.count())
data = data.filter(data.tenure < 100)
print("After removing the outliers", data.count())

**vector assembling**

In [None]:
numerical_vector_assembler = VectorAssembler(inputCols=numerical_columns, outputCol="numerical_features_vector")
data = numerical_vector_assembler.transform(data)
data.show()  # numerical feature scaling

In [None]:
# numrical scaling
scaler = StandardScaler(inputCol="numerical_features_vector",
                        outputCol="scaled_numerical_features" , withStd =True , withMean = True)
data = scaler.fit(data).transform(data)
data.show()

**fearure preperation**

string indexing converting strig column to numerical

In [None]:
categorical_columns

In [None]:
categorical_columns_indexed = [name+ "_Indexed" for name in categorical_columns if name != "customerID"]
categorical_columns_indexed
indexer = StringIndexer(inputCols=[name for name in categorical_columns if name != "customerID"], outputCols=categorical_columns_indexed)
data = indexer.fit(data).transform(data)
data.show()

In [None]:
# vector assembler to combine all in one // vectorisation
#categorical_columns_indexed.remove("customerID_Indexed")
#categorical_columns_indexed.remove("Churn_Indexed")
categorical_vector_assembler = VectorAssembler(inputCols=[name for name in categorical_columns_indexed if name != "Churn_Indexed"], outputCol="categorical_features_vector")
data = categorical_vector_assembler.transform(data)
data.show()

In [None]:
final_vector_assembler = VectorAssembler(inputCols=["scaled_numerical_features","categorical_features_vector"], outputCol="final_features_vector")
data = final_vector_assembler.transform(data)
data.select(["final_features_vector", "Churn_Indexed"]).show(truncate=False)

In [None]:
data.show()

In [None]:
#data.select(["final_features_vector", "Churn_Indexed"]).show(truncate=False)

In [None]:
# decision tree for churn analysis
train , test = data.randomSplit([0.7, 0.3], seed =100)
print(train.count())
print(test.count())

In [None]:
train.show()

In [None]:
dt = DecisionTreeClassifier(featuresCol="final_features_vector", labelCol="Churn_Indexed", maxDepth =6)
model = dt.fit(train) # maxdepth for hyperparameter tunig

In [None]:
# make prediction for data
predictions_test = model.transform(test)
predictions_test.select(["Churn","prediction"]).show()

In [None]:
# evaluate model AUC
evaluator = BinaryClassificationEvaluator(labelCol="Churn_Indexed")
auc_test = evaluator.evaluate(predictions_test, {evaluator.metricName :"areaUnderROC"})
print("AUC:", auc_test)

In [None]:
predictions_train= model.transform(train)
auc_train = evaluator.evaluate(predictions_train, {evaluator.metricName :"areaUnderROC"})
print("AUC:", auc_train)

**hyperparameter tuning**


In [None]:
def evaluate_dt(mode_params):
      test_accuracies = []
      train_accuracies = []

      for maxD in mode_params:
        # train the model based on the maxD
        decision_tree = DecisionTreeClassifier(featuresCol = 'final_features_vector', labelCol = 'Churn_Indexed', maxDepth = maxD)
        dtModel = decision_tree.fit(train)

        # calculating test error
        predictions_test = dtModel.transform(test)
        evaluator = BinaryClassificationEvaluator(labelCol="Churn_Indexed")
        auc_test = evaluator.evaluate(predictions_test, {evaluator.metricName: "areaUnderROC"})
        # recording the accuracy
        test_accuracies.append(auc_test)

        # calculating training error
        predictions_training = dtModel.transform(train)
        evaluator = BinaryClassificationEvaluator(labelCol="Churn_Indexed")
        auc_training = evaluator.evaluate(predictions_training, {evaluator.metricName: "areaUnderROC"})
        train_accuracies.append(auc_training)

      return(test_accuracies, train_accuracies)

In [None]:
maxDepths =[2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
test_accs , train_accs = evaluate_dt(maxDepths)
print(train_accs)
print(test_accs)

In [None]:
df = pd.DataFrame()
df['maxDepth'] = maxDepths
df['test_AUC'] = test_accs
df['train_AUC'] = train_accs

df

In [None]:
px.line(df, x= "maxDepth", y =["train_AUC","test_AUC"]) # visual

**Model Development**

In [None]:
feature_importance = model.featureImportances
print(feature_importance)

In [None]:
# Get the list of feature importances from the DenseVector
feature_importances_list = feature_importance.toArray().tolist()

# Get the names of the features used in the final_features_vector
feature_names = numerical_columns + [name for name in categorical_columns_indexed if name != "Churn_Indexed"]

df = pd.DataFrame(feature_importances_list, columns = ["score"], index = feature_names)
df

In [None]:
#feature_importance =model.featuresImportances
print(feature_importance)
score = [score for i, score in enumerate(feature_importance) ]
# Get the names of the features used in the final_features_vector
feature_names = numerical_columns + [name for name in categorical_columns_indexed if name != "Churn_Indexed" and name != "customerID_Indexed"]
df = pd.DataFrame(score, columns = ["score"], index = feature_names)
df

In [None]:
px.bar(df, x= "score")

In [None]:
df = data.groupBy("Contract", "Churn").count().toPandas()
df
px.bar(df ,x= "Contract", y ="count", color ="Churn" )