In [24]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler, MinMaxScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import count, isnan, when, col, sum

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Bank Customer Churn") \
    .getOrCreate()

# Load the dataset
df = spark.read.csv("Churn_Modelling.csv", header=True, inferSchema=True)

df.show(5) 
df.printSchema()


# Check for missing values
missing_values = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Show the count of missing values for each column
missing_values.show()



+---------+----------+--------+-----------+---------+------+----+------+---------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId| Surname|CreditScore|Geography|Gender| Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+----+------+---------+-------------+---------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female|42.0|     2|      0.0|            1|        1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female|41.0|     1| 83807.86|            1|        0|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female|42.0|     8| 159660.8|            3|        1|             0|      113931.57|     1|
|        4|  15701354|    Boni|        699|   France|Female|39.0|     1|      0.0|            2|        0|             0|       93

In [28]:
# Drop rows with missing value
df = df.dropna()

# Show the shape of the DataFrame
print("df shape:", (df.count(), len(df.columns)))

# Check for missing values
missing_values = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Show the count of missing values for each column
missing_values.show()


df shape: (9998, 14)
+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname|CreditScore|Geography|Gender|Age|Tenure|Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+
|        0|         0|      0|          0|        0|     0|  0|     0|      0|            0|        0|             0|              0|     0|
+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+



In [3]:
#check number of rows
df.count()
print('Number of Rows: ', df.count())

#check number of columns
len(df.columns)
print('Number of columns: ', len(df.columns))

df.columns

Number of Rows:  9998
Number of columns:  14


['RowNumber',
 'CustomerId',
 'Surname',
 'CreditScore',
 'Geography',
 'Gender',
 'Age',
 'Tenure',
 'Balance',
 'NumOfProducts',
 'HasCrCard',
 'IsActiveMember',
 'EstimatedSalary',
 'Exited']

In [4]:
# Compute descriptive statistics for all numerical columns
summary = df.describe()

# Transpose the summary DataFrame
summary_transposed = summary.select(*[col(c).alias(c) for c in summary.columns]).toPandas().transpose()

# Show the transposed summary DataFrame
print(summary_transposed)

                     0                     1                    2         3  \
summary          count                  mean               stddev       min   
RowNumber         9998       5003.4974994999   2886.3212745107903         1   
CustomerId        9998  1.5690938307161432E7    71923.98792778245  15565701   
Surname           9998                  None                 None     Abazu   
CreditScore       9998     650.5296059211843    96.63300292778429       350   
Geography         9998                  None                 None    France   
Gender            9998                  None                 None    Female   
Age               9998    38.920287057411485   10.487985680801955      18.0   
Tenure            9998     5.013002600520104   2.8921515050722646         0   
Balance           9998     76481.49081916342    62393.18703475571       0.0   
NumOfProducts     9998    1.5302060412082417   0.5816693900656227         1   
HasCrCard         9998    0.7055411082216443  0.4558

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import approx_count_distinct

unique_value_counts = df.agg(*[approx_count_distinct(c).alias(c) for c in df.columns])

# Show the unique value counts for each column
unique_value_counts.show()

+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+
|RowNumber|CustomerId|Surname|CreditScore|Geography|Gender|Age|Tenure|Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+
|     9823|      9889|   2792|        469|        3|     2| 72|    11|   7099|            4|        2|             2|           9628|     2|
+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+



In [6]:
# Define the StringIndexer for Geography
geography_indexer = StringIndexer(inputCol="Geography", outputCol="GeographyIndex")

# Define the StringIndexer for Gender
gender_indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex")

# Fit and transform the DataFrame
df = geography_indexer.fit(df).transform(df)
df = gender_indexer.fit(df).transform(df)

# Show the transformed DataFrame
df.show()

#columns to drop
# Drop columns
columns_to_drop = ['RowNumber', 'CustomerId', 'Surname', 'Geography','Gender']
df = df.drop(*columns_to_drop)

# Show DataFrame
df.show()

+---------+----------+---------+-----------+---------+------+-----+------+---------+-------------+---------+--------------+---------------+------+--------------+-----------+
|RowNumber|CustomerId|  Surname|CreditScore|Geography|Gender|  Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|GeographyIndex|GenderIndex|
+---------+----------+---------+-----------+---------+------+-----+------+---------+-------------+---------+--------------+---------------+------+--------------+-----------+
|        1|  15634602| Hargrave|        619|   France|Female| 42.0|     2|      0.0|            1|        1|             1|      101348.88|     1|           0.0|        1.0|
|        2|  15647311|     Hill|        608|    Spain|Female| 41.0|     1| 83807.86|            1|        0|             1|      112542.58|     0|           2.0|        1.0|
|        3|  15619304|     Onio|        502|   France|Female| 42.0|     8| 159660.8|            3|        1|             0|      1

# Without PCA 

In [20]:
# Feature Selection
feature_cols = ['CreditScore', 'Age', 'Tenure', 'Balance','NumOfProducts', 'HasCrCard', 
                'IsActiveMember', 'EstimatedSalary', 'GeographyIndex', 'GenderIndex']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="unscaled_features")
data = assembler.transform(df)

scaler = MinMaxScaler(inputCol="unscaled_features", outputCol="features")
scaler_model = scaler.fit(data)
scaled_data = scaler_model.transform(data)

# Split the data into training and testing sets
(train_data, test_data) = scaled_data.randomSplit([0.8, 0.2], seed=42)

# Model Building - Logistic Regression
lr = LogisticRegression(labelCol='Exited', featuresCol="features")
lr_model = lr.fit(train_data)

# Model Building - Random Forest
rf = RandomForestClassifier(labelCol='Exited', featuresCol="features")
rf_model = rf.fit(train_data)

# Model Building - SVM
svm = LinearSVC(labelCol='Exited', featuresCol="features")
svm_model = svm.fit(train_data)

# Model Evaluation
def evaluate_model(model, test_data):
    predictions = model.transform(test_data)
    evaluator = BinaryClassificationEvaluator(labelCol='Exited')
    accuracy = evaluator.evaluate(predictions)
    return accuracy

# Evaluate Logistic Regression model
lr_accuracy = evaluate_model(lr_model, test_data)

# Evaluate Random Forest model
rf_accuracy = evaluate_model(rf_model, test_data)

# Evaluate SVM model
svm_accuracy = evaluate_model(svm_model, test_data)

# Print accuracies
print("Logistic Regression Accuracy:", lr_accuracy)
print("Random Forest Accuracy:", rf_accuracy)
print("SVM Accuracy:", svm_accuracy)

# Stop SparkSession
#spark.stop()


Logistic Regression Accuracy: 0.7458584788777394
Random Forest Accuracy: 0.8359586280857412
SVM Accuracy: 0.7416869829343347


# with PCA

In [23]:
# Feature Selection
feature_cols = ['CreditScore', 'Age', 'Tenure', 'Balance','NumOfProducts', 'HasCrCard', 
                'IsActiveMember', 'EstimatedSalary', 'GeographyIndex', 'GenderIndex']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="unscaled_features")
data = assembler.transform(df)

# Apply MinMaxScaler to standadize the data
scaler = MinMaxScaler(inputCol="unscaled_features", outputCol="features")
scaler_model = scaler.fit(data)
scaled_data = scaler_model.transform(data)

from pyspark.ml.feature import PCA
# Apply PCA
pca = PCA(k=5, inputCol="features", outputCol="pca_features")
pca_model = pca.fit(scaled_data)
pca_data = pca_model.transform(scaled_data)

# Split the data into training and testing sets
(train_data, test_data) = pca_data.randomSplit([0.8, 0.2], seed=42)

# Model Building - Logistic Regression
lr = LogisticRegression(labelCol='Exited', featuresCol="pca_features")
lr_model = lr.fit(train_data)

# Model Building - Random Forest
rf = RandomForestClassifier(labelCol='Exited', featuresCol="pca_features")
rf_model = rf.fit(train_data)

# Model Building - SVM
svm = LinearSVC(labelCol='Exited', featuresCol="pca_features")
svm_model = svm.fit(train_data)

# Model Evaluation
def evaluate_model(model, test_data):
    predictions = model.transform(test_data)
    evaluator = BinaryClassificationEvaluator(labelCol='Exited')
    accuracy = evaluator.evaluate(predictions)
    return accuracy

# Evaluate Logistic Regression model
lr_accuracy = evaluate_model(lr_model, test_data)
# Evaluate Random Forest model
rf_accuracy = evaluate_model(rf_model, test_data)
# Evaluate SVM model
svm_accuracy = evaluate_model(svm_model, test_data)

# Print accuracies
print("Logistic Regression Accuracy:", lr_accuracy)
print("Random Forest Accuracy:", rf_accuracy)
print("SVM Accuracy:", svm_accuracy)


# Stop SparkSession
spark.stop()


Logistic Regression Accuracy: 0.6463037714986054
Random Forest Accuracy: 0.6800953865419713
SVM Accuracy: 0.5488847701264251


In [37]:
# Basic summary statistics
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+------------------+-------------------+------------------+------------------+-------------------+------------------+-------------------+
|summary|      CreditScore|               Age|            Tenure|          Balance|     NumOfProducts|          HasCrCard|    IsActiveMember|   EstimatedSalary|             Exited|    GeographyIndex|        GenderIndex|
+-------+-----------------+------------------+------------------+-----------------+------------------+-------------------+------------------+------------------+-------------------+------------------+-------------------+
|  count|             9998|              9998|              9998|             9998|              9998|               9998|              9998|              9998|               9998|              9998|               9998|
|   mean|650.5296059211843|38.920287057411485| 5.013002600520104|76481.49081916342|1.5302060412082417| 0.705541108221644