# Spark Machine Learning - Clustering: K-Means

Clustering ist eine der wichtigsten Methoden des unüberwachten Lernens, bei der im Gegensatz zu überwachten Verfahren keine Zielvariable (Label) vorgegeben ist. Während im überwachten Lernen das Modell eine Zielvariable vorhersagt, sucht das Clustering stattdessen nach natürlichen Gruppierungen in den Daten. In diesem Projekt wird die Clustering-Methode auf einen Churn-Datensatz angewendet, wobei das vorhandene Label bewusst ignoriert wird. Unter den verschiedenen Clustering-Algorithmen ist K-Means einer der am häufigsten verwendeten. Die praktische Anwendung von K-Means wird in diesem Projekt ausführlich demonstriert.

In [1]:
import findspark

In [2]:
findspark.init("/opt/manual/spark")

In [3]:
from pyspark.sql import SparkSession, functions as F
import pandas as pd

Erstellen und Konfigurieren einer SparkSession:

In [4]:
spark = SparkSession.builder \
.appName("KMenas") \
.master("yarn") \
.config("spark.sql.shuffle.partitions","2") \
.getOrCreate()

2022-06-12 23:42:47,742 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-06-12 23:42:49,406 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


## Read Data

Einlesen der CSV-Datei in ein DataFrame:

In [5]:
df = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.load("hdfs://localhost:9000/user/train/datasets/Churn_Modelling.csv")

                                                                                

In [6]:
df.limit(5).toPandas()

Unnamed: 0,RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
0,1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1
1,2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0
2,3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1
3,4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0
4,5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0


In [7]:
df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



Die Übereinstimmung der Datentypen zwischen der Datenvorschau und dem Schema wird überprüft und es wurden keine Unstimmigkeiten festgestellt.

Da der Datensatz nicht sehr groß ist, wird er im Speicher gehalten.:

In [8]:
df.persist()

DataFrame[RowNumber: int, CustomerId: int, Surname: string, CreditScore: int, Geography: string, Gender: string, Age: int, Tenure: int, Balance: double, NumOfProducts: int, HasCrCard: int, IsActiveMember: int, EstimatedSalary: double, Exited: int]

In [9]:
df_count = df.count()
print(df_count)

10000


In [10]:
len(df.columns)

14

## Null Check

In [11]:
for col_name in df.dtypes:
    null_count = df.filter( (F.col(col_name[0]).isNull()) | (F.col(col_name[0]) == "")).count()
    if(  null_count > 0 ):
        print("{} {} type has {} null values % {}".format(col_name, col_type[1], null_count, (null_count/df_count * 100)))

#No null

## Group Columns

In [12]:
categoric_cols = []
numeric_cols = []
discarted_cols = ['RowNumber','CustomerId','Surname']
label_col = ['Exited']

# Die Zielvariable wird beibehalten, jedoch nicht verwendet.

In [13]:
for col_name in df.dtypes:
    if (col_name[0] not in discarted_cols+label_col):
        if col_name[1] == 'string':
            categoric_cols.append(col_name[0])
        else: numeric_cols.append(col_name[0])

In [14]:
print(categoric_cols)
print(len(categoric_cols))

['Geography', 'Gender']
2


In [15]:
print(numeric_cols)
print(len(numeric_cols))

['CreditScore', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary']
8


In [16]:
print(discarted_cols)
print(len(discarted_cols))

['RowNumber', 'CustomerId', 'Surname']
3


In [17]:
print(label_col)
print(len(label_col))

['Exited']
1


Es sind insgesamt 10 Spalten vorhanden (8 numerische und 2 kategoriale). Auf diesen Daten wird K-Means-Clustering angewendet.

## Trim

Führende und nachfolgende Leerzeichen werden aus den Werten in den kategorialen Spalten entfernt.:

In [18]:
for col_name in categoric_cols:
    df1 = df.withColumn(col_name, F.trim(col_name))

In [19]:
df1.select(label_col[0]).groupBy(label_col[0]).count().show()

+------+-----+
|Exited|count|
+------+-----+
|     1| 2037|
|     0| 7963|
+------+-----+



## Examine Categoricals

In [20]:
for col_name in categoric_cols:
    df1.select(col_name).groupBy(col_name).count().show()

+---------+-----+
|Geography|count|
+---------+-----+
|   France| 5014|
|  Germany| 2509|
|    Spain| 2477|
+---------+-----+

+------+-----+
|Gender|count|
+------+-----+
|Female| 4543|
|  Male| 5457|
+------+-----+



## Verify Cols

In [21]:
if len(df1.columns) == (len(categoric_cols)+len(numeric_cols)+len(discarted_cols+label_col)):
    print("Columns verified")
else: print("There is problem with columns")

Columns verified


## Select cols to Hot Coded

Kategoriale Spalten werden für die One-Hot-Codierung ausgewählt, wenn sie mehr als zwei eindeutige Werte enthalten.:

In [22]:
to_be_oheded = []

for col_name in categoric_cols:
    count = df1.select(col_name).distinct().count()
    if count > 2:
        to_be_oheded.append(col_name)

In [23]:
print(to_be_oheded)

['Geography']


## StringIndexer

In [24]:
from pyspark.ml.feature import StringIndexer

In [25]:
# Es werden `StringIndexer`-Objekte für jede kategoriale Spalte erstellt und konfiguriert, deren Ausgabespaltennamen werden in 
# Listen gespeichert. Zudem wird eine Liste von Spalten für die One-Hot-Codierung vorbereitet, falls diese Spalten mehr als 
# zwei eindeutige Werte enthalten.



my_dict = {}

string_indexer_obj = []

string_indexer_output_names = []

ohe_input_names = []

ohe_output_names = []

for col_name in categoric_cols:
    my_dict[col_name+"_indexedobj"] = StringIndexer() \
                                    .setHandleInvalid("skip") \
                                    .setInputCol(col_name) \
                                    .setOutputCol(col_name+"_indexed")
    string_indexer_obj.append(my_dict.get(col_name+"_indexedobj"))
    string_indexer_output_names.append(col_name+"_indexed")

    if col_name in to_be_oheded:
        ohe_input_names.append(col_name+"_indexed")
        ohe_output_names.append(col_name+"_ohe")
    

In [26]:
print(string_indexer_obj)

[StringIndexer_1bdecbee1a55, StringIndexer_4d08677fb931]


In [27]:
print(string_indexer_output_names)

['Geography_indexed', 'Gender_indexed']


In [28]:
print(ohe_input_names)

['Geography_indexed']


In [29]:
print(ohe_output_names)

['Geography_ohe']


## OneHotEncoder

In [30]:
from pyspark.ml.feature import OneHotEncoder

In [31]:
string_indexer_ohe_excluded = list(set(string_indexer_output_names).difference(set(ohe_input_names)))
print(string_indexer_ohe_excluded)

['Gender_indexed']


In [32]:
encoder = OneHotEncoder().setInputCols(ohe_input_names).setOutputCols(ohe_output_names)

## Vector Assembler

In [33]:
from pyspark.ml.feature import VectorAssembler

In [34]:
assembler = VectorAssembler().setInputCols(numeric_cols+string_indexer_ohe_excluded+ohe_output_names) \
.setOutputCol("unscaled_features")

## Scaler

In [35]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler().setInputCol("unscaled_features").setOutputCol("features")

Die Daten sind wie bei der Klassifikation bereit und unverändert. Bis zu diesem Punkt ist der Code in diesem Clustering-Projekt identisch mit dem Klassifikationsprojekt. Der Unterschied liegt im verwendeten Algorithmus: Beim Clustering, einer unüberwachten Lernmethode, kommen Cluster-Algorithmen wie K-Means zum Einsatz. Bei der Klassifikation werden hingegen Estimatoren wie Entscheidungsbäume oder Random Forest verwendet, um Modelle für die Klassifizierung zu trainieren. Daher wird in diesem Clustering-Projekt der Estimator ganz weggelassen.

## Pipeline

In [36]:
from pyspark.ml import Pipeline

In [37]:
pipeline_obj = Pipeline().setStages(string_indexer_obj+[encoder, assembler, scaler])

In [38]:
pipeline_model = pipeline_obj.fit(df1)

                                                                                

In [39]:
pipeline_df = pipeline_model.transform(df1)

In [40]:
pipeline_df.limit(5).toPandas()

Unnamed: 0,RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited,Geography_indexed,Gender_indexed,Geography_ohe,unscaled_features,features
0,1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1,0.0,1.0,"(1.0, 0.0)","[619.0, 42.0, 2.0, 0.0, 1.0, 1.0, 1.0, 101348....","[6.404333924389993, 4.0046505619078925, 0.6915..."
1,2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0,2.0,1.0,"(0.0, 0.0)","[608.0, 41.0, 1.0, 83807.86, 1.0, 0.0, 1.0, 11...","[6.2905250824379895, 3.9093017390053237, 0.345..."
2,3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1,0.0,1.0,"(1.0, 0.0)","[502.0, 42.0, 8.0, 159660.8, 3.0, 1.0, 0.0, 11...","[5.193821696355051, 4.0046505619078925, 2.7660..."
3,4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0,0.0,1.0,"(1.0, 0.0)","[699.0, 39.0, 1.0, 0.0, 2.0, 0.0, 0.0, 93826.6...","[7.232034593131834, 3.718604093200186, 0.34576..."
4,5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0,2.0,1.0,"(0.0, 0.0)","[850.0, 43.0, 2.0, 125510.82, 1.0, 1.0, 1.0, 7...","[8.794319605382057, 4.099999384810461, 0.69152..."


## Train Model

In [41]:
from pyspark.ml.clustering import KMeans

In [42]:
 def compute_kmeans_model(df, k):
    kmeansObject = KMeans() \
        .setSeed(142) \
        .setK(k)
    
    return kmeansObject.fit(df)

## Evaluator

In [43]:
from pyspark.ml.evaluation import ClusteringEvaluator

In [44]:
evaluator = ClusteringEvaluator()

Bestimmung des optimalen Werts für k:

In [45]:
for k in range(2,11):
    kmeans_model = compute_kmeans_model(pipeline_df, k)
    
    transformed_df = kmeans_model.transform(pipeline_df)
    
    score = evaluator.evaluate(transformed_df)
    
    print("k: {}, score: {}" .format(k, score))

2022-06-12 23:44:46,824 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2022-06-12 23:44:46,824 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


k: 2, score: 0.24590625242752776
k: 3, score: 0.22645984369837002
k: 4, score: 0.21109416946633186
k: 5, score: 0.2016827064122783
k: 6, score: 0.18932939099895102
k: 7, score: 0.1852897706405449
k: 8, score: 0.1758095043959162


2022-06-12 23:45:04,295 WARN storage.BlockManager: Asked to remove block broadcast_693, which does not exist


k: 9, score: 0.1855896305798431
k: 10, score: 0.167440153785295


In [46]:
kmeans_model = compute_kmeans_model(pipeline_df, 2)

## Prediction

In [47]:
transformed_df = kmeans_model.transform(pipeline_df)

In [48]:
transformed_df.limit(10).toPandas()

Unnamed: 0,RowNumber,CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited,Geography_indexed,Gender_indexed,Geography_ohe,unscaled_features,features,prediction
0,1,15634602,Hargrave,619,France,Female,42,2,0.0,1,1,1,101348.88,1,0.0,1.0,"(1.0, 0.0)","[619.0, 42.0, 2.0, 0.0, 1.0, 1.0, 1.0, 101348....","[6.404333924389993, 4.0046505619078925, 0.6915...",1
1,2,15647311,Hill,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0,2.0,1.0,"(0.0, 0.0)","[608.0, 41.0, 1.0, 83807.86, 1.0, 0.0, 1.0, 11...","[6.2905250824379895, 3.9093017390053237, 0.345...",0
2,3,15619304,Onio,502,France,Female,42,8,159660.8,3,1,0,113931.57,1,0.0,1.0,"(1.0, 0.0)","[502.0, 42.0, 8.0, 159660.8, 3.0, 1.0, 0.0, 11...","[5.193821696355051, 4.0046505619078925, 2.7660...",1
3,4,15701354,Boni,699,France,Female,39,1,0.0,2,0,0,93826.63,0,0.0,1.0,"(1.0, 0.0)","[699.0, 39.0, 1.0, 0.0, 2.0, 0.0, 0.0, 93826.6...","[7.232034593131834, 3.718604093200186, 0.34576...",1
4,5,15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0,2.0,1.0,"(0.0, 0.0)","[850.0, 43.0, 2.0, 125510.82, 1.0, 1.0, 1.0, 7...","[8.794319605382057, 4.099999384810461, 0.69152...",0
5,6,15574012,Chu,645,Spain,Male,44,8,113755.78,2,1,0,149756.71,1,2.0,0.0,"(0.0, 0.0)","[645.0, 44.0, 8.0, 113755.78, 2.0, 1.0, 0.0, 1...","[6.673336641731091, 4.19534820771303, 2.766084...",0
6,7,15592531,Bartlett,822,France,Male,50,7,0.0,2,1,1,10062.8,0,0.0,0.0,"(1.0, 0.0)","[822.0, 50.0, 7.0, 0.0, 2.0, 1.0, 1.0, 10062.8...","[8.504624371322414, 4.767441145128443, 2.42032...",1
7,8,15656148,Obinna,376,Germany,Female,29,4,115046.74,4,1,0,119346.88,1,1.0,1.0,"(0.0, 1.0)","[376.0, 29.0, 4.0, 115046.74, 4.0, 1.0, 0.0, 1...","[3.8901931430866514, 2.765115864174497, 1.3830...",0
8,9,15792365,He,501,France,Male,44,4,142051.07,2,0,1,74940.5,0,0.0,0.0,"(1.0, 0.0)","[501.0, 44.0, 4.0, 142051.07, 2.0, 0.0, 1.0, 7...","[5.183475437995778, 4.19534820771303, 1.383042...",1
9,10,15592389,H?,684,France,Male,27,2,134603.88,1,1,1,71725.73,0,0.0,0.0,"(1.0, 0.0)","[684.0, 27.0, 2.0, 134603.88, 1.0, 1.0, 1.0, 7...","[7.076840717742739, 2.5744182183693596, 0.6915...",1


In der letzten Spalte wird die „prediction“ angezeigt, die angibt, zu welchem Cluster die Datenpunkte gehören. Zum Vergleich dieser Vorhersagen mit den tatsächlichen Klassenzuweisungen in der Spalte „Exited“ kann der folgende Code verwendet werden:

In [49]:
transformed_df.select("Exited","prediction").limit(20).toPandas()

Unnamed: 0,Exited,prediction
0,1,1
1,0,0
2,1,1
3,0,1
4,0,0
5,1,0
6,0,1
7,1,0
8,0,1
9,0,1


Es zeigt sich, dass viele „Exited“-Werte (Churn) und „prediction“-Werte nicht übereinstimmen. Eine vollständige Übereinstimmung zwischen Clustering und Klassifizierung ist nicht immer zu erwarten, da Clusterzuweisungen nicht exakt mit Klassenzuweisungen übereinstimmen müssen. Stattdessen kann Clustering dazu verwendet werden, Daten nach anderen Kriterien zu gruppieren.

Eine exakte Übereinstimmung der Cluster mit den „Exited“-Werten ist unrealistisch. Bei der Analyse physischer oder biologischer Daten kann Clustering jedoch näher an der Klassifikation liegen. Zum Beispiel kann im Iris-Datensatz, der drei Klassen enthält, ein Clustering-Algorithmus jede Blume korrekt einer bestimmten Klasse zuordnen. Die Qualität der Ergebnisse hängt von der Natur der Daten ab und kann je nach Anwendungsfall variieren.