In [1]:
import findspark
findspark.init("/opt/manual/spark/")

In [2]:
from pyspark.sql import SparkSession, functions as F
import pandas as pd

In [3]:
spark = SparkSession.builder \
.appName("classification") \
.master("yarn") \
.config("spark.sql.shuffle.partitions","2") \
.enableHiveSupport() \
.getOrCreate()

In [4]:
#Read data
df = spark.read \
.format("csv") \
.option("header",True) \
.option("inferSchema", True) \
.load("/user/train/datasets/Churn_Modelling.csv")

In [5]:
df.limit(5).toPandas()

Unnamed: 0,RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
0,1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1
1,2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
2,3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
3,4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0
4,5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0


In [6]:
# Schema check
df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [7]:
df.persist()

DataFrame[RowNumber: int, CustomerId: int, Surname: string, CreditScore: int, Geography: string, Gender: string, Age: int, Tenure: int, Balance: double, NumOfProducts: int, HasCrCard: int, IsActiveMember: int, EstimatedSalary: double, Exited: int]

In [8]:
df_count = df.count()
print(df_count)

10000


In [9]:
len(df.columns)

14

In [10]:
# null check
for col_name in df.dtypes:
    null_count = df.filter( (F.col(col_name[0]).isNull()) | (F.col(col_name[0]) == "")).count()
    
    if null_count > 0:
        print(f"{col_name[0]} {col_name[1]} type null values: {null_count} % {null_count/df_count * 100}")

In [11]:
# Grouping by attribute types

cat_cols = []
num_cols = []
label_col = ["Exited"]
discarted_cols = ["RowNumber", "CustomerId", "Surname"]

In [12]:
for col_name in df.dtypes:
    if (col_name[0] not in label_col + discarted_cols):
        if col_name[1] == "string":
            cat_cols.append(col_name[0])
        else:
            num_cols.append(col_name[0])

In [13]:
print(cat_cols)
print(len(cat_cols))

['Geography', 'Gender']
2


In [14]:
print(num_cols)
print(len(num_cols))

['CreditScore', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary']
8


In [15]:
# Trim

for col_name in cat_cols:
    df1 = df.withColumn(col_name, F.trim(col_name))

In [16]:
df1.select(label_col[0]).groupBy(label_col[0]).count().show()

+------+-----+
|Exited|count|
+------+-----+
|     1| 2037|
|     0| 7963|
+------+-----+



In [17]:
# Examine Categoricals

for col_name in cat_cols:
    print(df1.select(col_name).groupBy(col_name).count().show())

+---------+-----+
|Geography|count|
+---------+-----+
|   France| 5014|
|  Germany| 2509|
|    Spain| 2477|
+---------+-----+

None
+------+-----+
|Gender|count|
+------+-----+
|Female| 4543|
|  Male| 5457|
+------+-----+

None


In [18]:
# Verify columns

len(df1.columns) == (len(cat_cols) + len(num_cols) + len(label_col) + len(discarted_cols))

True

In [19]:
#Sorting: StringIndexer > OneHotEncoding > VectorAssembler > Scaler > Estimator > Trasnformer > Evaluator

# select cols to ohe

to_be_oheded = []

for col_name in cat_cols:
    
    count = df1.select(col_name).distinct().count()
    
    if count > 2:
        to_be_oheded.append(col_name)
        

In [20]:
print(to_be_oheded)

['Geography']


In [21]:
# StringIndexer

from pyspark.ml.feature import StringIndexer

In [22]:
my_dict = {}
string_indexer_obj = []
string_indexer_output_names = []
ohe_input_names = []
ohe_output_names = []

for col_name in cat_cols:
    my_dict[col_name+"_indexedobj"] = StringIndexer() \
    .setHandleInvalid("skip") \
    .setInputCol(col_name) \
    .setOutputCol(col_name+"_indexed")
    

    string_indexer_obj.append(my_dict.get(col_name+"_indexedobj"))
    string_indexer_output_names.append(col_name+"_indexed")

    
    if col_name in to_be_oheded:
        ohe_input_names.append(col_name+"_indexed")
        ohe_output_names.append(col_name+"_ohe")

In [23]:
print(string_indexer_obj)
print(string_indexer_output_names)
print(ohe_input_names)
print(ohe_output_names)

[StringIndexer_6e52d821b4c9, StringIndexer_ce2e2372a1a4]
['Geography_indexed', 'Gender_indexed']
['Geography_indexed']
['Geography_ohe']


In [24]:
#onehotencodied
from pyspark.ml.feature import OneHotEncoder

In [25]:
string_indexer_ohe_excluded = list(set(string_indexer_output_names).difference(set(ohe_input_names)))
print(string_indexer_ohe_excluded)

['Gender_indexed']


In [26]:
encoder = OneHotEncoder() \
.setInputCols(ohe_input_names) \
.setOutputCols(ohe_output_names)

In [27]:
#vector assembler
from pyspark.ml.feature import VectorAssembler

In [28]:
assembler = VectorAssembler().setInputCols(num_cols+string_indexer_ohe_excluded+ohe_output_names).setOutputCol("unscaled_features")

In [29]:
#Scaler
from pyspark.ml.feature import StandardScaler

In [30]:
scaler = StandardScaler().setInputCol("unscaled_features").setOutputCol("features")

In [31]:
#Model training, test and evaluate
#Estimator

from pyspark.ml.classification import GBTClassifier

In [32]:
estimator = GBTClassifier().setFeaturesCol("features").setLabelCol(label_col[0])

In [33]:
#pipeline
from pyspark.ml import Pipeline

In [34]:
pipeline_obj = Pipeline().setStages(string_indexer_obj+[encoder,assembler,scaler,estimator])

In [35]:
#split data

train_df, test_df = df1.randomSplit([.8,.2],seed=142)

In [36]:
print(train_df.count(), test_df.count())

7994 2006


In [37]:
#train model
pipeline_model = pipeline_obj.fit(train_df)

In [38]:
#prediction
transformed_df = pipeline_model.transform(test_df)

In [39]:
transformed_df.select("Exited", "prediction").show(10)

+------+----------+
|Exited|prediction|
+------+----------+
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       1.0|
|     0|       0.0|
|     0|       0.0|
|     1|       0.0|
|     0|       0.0|
+------+----------+
only showing top 10 rows



In [40]:
#model evaluate
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [41]:
evaluator = BinaryClassificationEvaluator(labelCol=label_col[0])

In [42]:
evaluator.evaluate(transformed_df)

0.8591799680023813

In [43]:
evaluator.getMetricName()

'areaUnderROC'

In [44]:
spark.stop()