In [8]:
import numpy as np

In [1]:
from pyspark.sql import SparkSession

#initialize Spark Session

spark=SparkSession.builder.appName('CustomerChurn').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/18 18:54:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/18 18:54:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Loading Data

In [14]:

df=spark.read.csv('Customer-Churn-Records.csv',header=True,inferSchema=True)

df.show(5)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------+------------------+---------+------------+
|RowNumber|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|Complain|Satisfaction Score|Card Type|Point Earned|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+---------+--------------+---------------+------+--------+------------------+---------+------------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|       1|                 2|  DIAMOND|         464|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|       1|                 3|  DIAMOND|         456|
|        3|  15619304|   

### EDA

In [3]:
df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)
 |-- Complain: integer (nullable = true)
 |-- Satisfaction Score: integer (nullable = true)
 |-- Card Type: string (nullable = true)
 |-- Point Earned: integer (nullable = true)



In [4]:
from pyspark.sql.functions import col,isnan,when,count

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

[Stage 3:>                                                          (0 + 1) / 1]

+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+--------+------------------+---------+------------+
|RowNumber|CustomerId|Surname|CreditScore|Geography|Gender|Age|Tenure|Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|Complain|Satisfaction Score|Card Type|Point Earned|
+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+--------+------------------+---------+------------+
|        0|         0|      0|          0|        0|     0|  0|     0|      0|            0|        0|             0|              0|     0|       0|                 0|        0|           0|
+---------+----------+-------+-----------+---------+------+---+------+-------+-------------+---------+--------------+---------------+------+--------+------------------+---------+------------+



                                                                                

### Data Preprocessing

In [11]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import StringType


In [12]:
# List all categorical columns in a DataFrame
categorical_cols = [field.name for field in df.schema.fields if field.dataType == StringType()]
categorical_cols

['Surname', 'Geography', 'Gender', 'Card Type']

In [17]:
#Endcode the categorical columns
indexers=[StringIndexer(inputCol=column,outputCol=column+'_index') for column
          in
          categorical_cols]
# Sequentially apply each StringIndexer to the DataFrame
for indexer in indexers:
    df = indexer.fit(df).transform(df)
# Drop the original categorical columns
df=df.drop(*categorical_cols)
df.show(5)

+---------+----------+-----------+---+------+---------+-------------+---------+--------------+---------------+------+--------+------------------+------------+-------------+---------------+------------+---------------+
|RowNumber|CustomerId|CreditScore|Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|Complain|Satisfaction Score|Point Earned|Surname_index|Geography_index|Gender_index|Card Type_index|
+---------+----------+-----------+---+------+---------+-------------+---------+--------------+---------------+------+--------+------------------+------------+-------------+---------------+------------+---------------+
|        1|  15634602|        619| 42|     2|      0.0|            1|        1|             1|      101348.88|     1|       1|                 2|         464|       1958.0|            0.0|         1.0|            0.0|
|        2|  15647311|        608| 41|     1| 83807.86|            1|        0|             1|      112542.58|     0|       1|  

#### Select Features & Target

In [19]:
from pyspark.ml.feature import VectorAssembler

In [18]:
df.schema.fields

[StructField('RowNumber', IntegerType(), True),
 StructField('CustomerId', IntegerType(), True),
 StructField('CreditScore', IntegerType(), True),
 StructField('Age', IntegerType(), True),
 StructField('Tenure', IntegerType(), True),
 StructField('Balance', DoubleType(), True),
 StructField('NumOfProducts', IntegerType(), True),
 StructField('HasCrCard', IntegerType(), True),
 StructField('IsActiveMember', IntegerType(), True),
 StructField('EstimatedSalary', DoubleType(), True),
 StructField('Exited', IntegerType(), True),
 StructField('Complain', IntegerType(), True),
 StructField('Satisfaction Score', IntegerType(), True),
 StructField('Point Earned', IntegerType(), True),
 StructField('Surname_index', DoubleType(), False),
 StructField('Geography_index', DoubleType(), False),
 StructField('Gender_index', DoubleType(), False),
 StructField('Card Type_index', DoubleType(), False)]

In [20]:
#Define the feature columns
feature_cols=['CreditScore', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary', 'Complain','Satisfaction Score','Point Earned','Geography_index']

#Assemble the feature vector
assembler=VectorAssembler(inputCols=feature_cols,outputCol='features')
df=assembler.transform(df)


#final dataset
final_df=df.select('features','Exited')
final_df.show(5)

+--------------------+------+
|            features|Exited|
+--------------------+------+
|[619.0,42.0,2.0,0...|     1|
|[608.0,41.0,1.0,8...|     0|
|[502.0,42.0,8.0,1...|     1|
|[699.0,39.0,1.0,0...|     0|
|[850.0,43.0,2.0,1...|     0|
+--------------------+------+
only showing top 5 rows



### Train the Model

In [22]:
from pyspark.ml.classification import LogisticRegression

#Train test split
train_data,test_data=final_df.randomSplit([0.8,0.2],seed=42)

#Intialize the Logistic Regression model
lr=LogisticRegression(labelCol='Exited',featuresCol='features')

#Train the model
lr_model=lr.fit(train_data)

25/03/18 19:35:40 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


### Evaluate the model

In [23]:
#Make predictions
predictions=lr_model.transform(test_data)

#show sample predictions
predictions.select('features','Exited','prediction').show(5)

+--------------------+------+----------+
|            features|Exited|prediction|
+--------------------+------+----------+
|(12,[0,1,4,7,9,10...|     0|       0.0|
|(12,[0,1,4,7,9,10...|     0|       0.0|
|(12,[0,1,4,7,9,10...|     0|       0.0|
|[350.0,54.0,1.0,1...|     1|       1.0|
|[365.0,30.0,0.0,1...|     1|       1.0|
+--------------------+------+----------+
only showing top 5 rows



### Model Performace metrics

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator=BinaryClassificationEvaluator(labelCol='Exited',metricName='areaUnderROC')

auc=evaluator.evaluate(predictions)



print(f"Model AUC:  {auc:.2f}")

Model AUC:  1.00
