# Spark Machine Learning - Classification: CHURN MODELLING

In diesem Projekt wird Spark ML für eine Klassifikationsanalyse zur Vorhersage von Kundenabwanderungen eingesetzt.

## Read Data

Bibliotheken importieren und Spark aufbauen:

In [1]:
import findspark

In [2]:
findspark.init("/opt/manual/spark")

In [3]:
from pyspark.sql import SparkSession, functions as F
import pandas as pd

Erstellen und Konfigurieren einer SparkSession:

In [4]:
spark = SparkSession.builder \
.appName("Churn Scoring with GBT") \
.master("yarn") \
.config("spark.sql.shuffle.partitions", "2") \
.getOrCreate()

2022-06-12 23:25:30,114 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-06-12 23:25:31,833 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


Einlesen der CSV-Datei in ein DataFrame:

In [8]:
df = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.load("/user/train/datasets/Churn_Modelling.csv")

                                                                                

In [9]:
df.limit(5).toPandas()

Unnamed: 0,RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
0,1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1
1,2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
2,3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
3,4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0
4,5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0


In [10]:
df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



Die Übereinstimmung der Datentypen zwischen der Datenvorschau und dem Schema wird überprüft und es wurden keine Unstimmigkeiten festgestellt.

## Data Explore

Da der Datensatz nicht sehr groß ist, wird er im Speicher gehalten.:

In [11]:
df.persist()

DataFrame[RowNumber: int, CustomerId: int, Surname: string, CreditScore: int, Geography: string, Gender: string, Age: int, Tenure: int, Balance: double, NumOfProducts: int, HasCrCard: int, IsActiveMember: int, EstimatedSalary: double, Exited: int]

In [12]:
df_count = df.count()
print(df_count)

10000


[Stage 3:>                                                          (0 + 1) / 1]                                                                                

In [13]:
len(df.columns)

14

## Null Check

In [14]:
for col_name in df.dtypes:
    null_count = df.filter( (F.col(col_name[0]).isNull()) | (F.col(col_name[0]) == "")).count()
    
    if null_count > 0:
        print("{}  {} type null values: {} % {}".format(col_name[0], col_name[1], null_count, (null_count/df_count * 100)))

## Eigenschaften nach Typen gruppieren und kategoriale Merkmale verarbeiten

In [15]:
categoric_cols = []

numeric_cols = []

discarted_cols = ['RowNumber', 'CustomerId', 'Surname']

label_col = ['Exited']


In [16]:
for col_name in df.dtypes:
    if (col_name[0] not in discarted_cols+label_col):
        if col_name[1] == 'string':
            categoric_cols.append(col_name[0])
        else: numeric_cols.append(col_name[0])

In [17]:
print(categoric_cols)
print(len(categoric_cols))

['Geography', 'Gender']
2


In [18]:
print(numeric_cols )
print(len(numeric_cols))


['CreditScore', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary']
8


In [19]:
print(discarted_cols )
print(len(discarted_cols))

['RowNumber', 'CustomerId', 'Surname']
3


In [20]:
print(label_col)
print(len(label_col))

['Exited']
1


## Trim

Führende und nachfolgende Leerzeichen werden aus den Werten in den kategorialen Spalten entfernt.:

In [21]:
for col_name in categoric_cols:
    df1 = df.withColumn(col_name, F.trim(col_name))

In [22]:
df.select(label_col[0]).groupBy(label_col[0]).count().show()

+------+-----+
|Exited|count|
+------+-----+
|     1| 2037|
|     0| 7963|
+------+-----+



## Examine Categoricals

In [23]:
for col_name in categoric_cols:
    df1.select(col_name).groupBy(col_name).count().show()


+---------+-----+
|Geography|count|
+---------+-----+
|   France| 5014|
|  Germany| 2509|
|    Spain| 2477|
+---------+-----+

+------+-----+
|Gender|count|
+------+-----+
|Female| 4543|
|  Male| 5457|
+------+-----+



## Verify Cols

In [24]:
if len(df1.columns) == (len(categoric_cols)+len(numeric_cols)+len(discarted_cols)+len(label_col)):
    print("Columns verified")
else: print("There is problem with columns")

Columns verified


## StringIndexer, OneHotEncoder, VectorAssembler, Scaler & Estimator

### Select Cols to Hot Coded

Kategoriale Spalten werden für die One-Hot-Codierung ausgewählt, wenn sie mehr als zwei eindeutige Werte enthalten.:

In [25]:
to_be_oheded = []
for col_name in categoric_cols:
    count = df1.select(col_name).distinct().count()

    if count > 2:
        to_be_oheded.append(col_name)

In [26]:
print(to_be_oheded)

['Geography']


### StringIndexer

In [27]:
from pyspark.ml.feature import StringIndexer

In [28]:
# Es werden `StringIndexer`-Objekte für jede kategoriale Spalte erstellt und konfiguriert, deren Ausgabespaltennamen werden in 
# Listen gespeichert. Zudem wird eine Liste von Spalten für die One-Hot-Codierung vorbereitet, falls diese Spalten mehr als 
# zwei eindeutige Werte enthalten.




my_dict = {}

string_indexer_obj = []

string_indexer_output_names = []

ohe_input_names = []

ohe_output_names = []

for col_name in categoric_cols:
    my_dict[col_name+"_indexedobj"] = StringIndexer() \
                                    .setHandleInvalid("skip") \
                                    .setInputCol(col_name) \
                                    .setOutputCol(col_name+"_indexed")
    string_indexer_obj.append(my_dict.get(col_name+"_indexedobj"))
    string_indexer_output_names.append(col_name+"_indexed")

    if col_name in to_be_oheded:
        ohe_input_names.append(col_name+"_indexed")
        ohe_output_names.append(col_name+"_ohe")

In [29]:
print(string_indexer_obj)

[StringIndexer_5f77ece94341, StringIndexer_aaa92e9f1464]


In [30]:
print(string_indexer_output_names)

['Geography_indexed', 'Gender_indexed']


In [31]:
print(ohe_input_names)

['Geography_indexed']


In [32]:
print(ohe_output_names)

['Geography_ohe']


### OneHotEncoder

In [33]:
from pyspark.ml.feature import OneHotEncoder

In [34]:
string_indexer_ohe_excluded = list(set(string_indexer_output_names).difference(set(ohe_input_names)))
print(string_indexer_ohe_excluded)

['Gender_indexed']


In [35]:
encoder = OneHotEncoder().setInputCols(ohe_input_names).setOutputCols(ohe_output_names)

### Vector Assembler

In [36]:
from pyspark.ml.feature import VectorAssembler

In [37]:
assembler = VectorAssembler().setInputCols(numeric_cols+string_indexer_ohe_excluded+ohe_output_names) \
.setOutputCol("unscaled_features")

### Scaler

In [38]:
from pyspark.ml.feature import StandardScaler

In [39]:
scaler = StandardScaler().setInputCol("unscaled_features").setOutputCol("features")

### Estimator

In [40]:
from pyspark.ml.classification import GBTClassifier

In [41]:
estimator = GBTClassifier() \
.setFeaturesCol("features") \
.setLabelCol(label_col[0])

## Modelltraining, -test und -bewertung

### Pipeline

In [42]:
from pyspark.ml import Pipeline

In [43]:
pipeline_obj = Pipeline().setStages(string_indexer_obj+[encoder, assembler, scaler, estimator])

### Split Data

In [44]:
train_df, test_df = df1.randomSplit([.8, .2], seed = 142)

In [45]:
print(train_df.count())

7994


### Train Model

In [46]:
pipeline_model =  pipeline_obj.fit(train_df)

### Prediction

In [47]:
transformed_df = pipeline_model.transform(test_df)

In [48]:
transformed_df.select("Exited", "prediction").show(10)

+------+----------+
|Exited|prediction|
+------+----------+
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       1.0|
|     0|       0.0|
|     0|       0.0|
|     1|       0.0|
|     0|       0.0|
+------+----------+
only showing top 10 rows



### Evaluate Model

In [49]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [50]:
evaluator = BinaryClassificationEvaluator(labelCol = label_col[0])

In [51]:
evaluator.evaluate(transformed_df)

0.8591799680023813

In [52]:
evaluator.getMetricName()

'areaUnderROC'