In [0]:
# Framework
# Import dataset
# Define float variables
# Impute missing data
# Prepare date using pipeline
# Perform clustering

In [0]:
# Import data
fileLocation = "dbfs:/FileStore/shared_uploads/akbarazad@outlook.sg/CC_GENERAL.csv"
fileType = "csv"
inferSchema = "false"
firstRowIsHeader = "true"

df = spark.read.format(fileType)\
.option("inferSchema" , inferSchema)\
.option("header" , firstRowIsHeader)\
.load(fileLocation)

In [0]:
# Print metadata
df.printSchema()

In [0]:
# Casting multiple variables
from pyspark.sql.types import *


In [0]:
# Identifying and assigning list of variables
floatVars = list(set(df.columns) - set(['CUST_ID'])) # Remove CUST_ID
for column in floatVars:
  df = df.withColumn(column, df[column].cast(FloatType()))
df.printSchema

In [0]:
df.head()

In [0]:
# Imputing data
from pyspark.ml.feature import Imputer

# Identifying variables to perform mean imputation
inputCols = list(set(df.columns) - set(['CUST_ID']))

# Pass parameters to impute function
imputer = Imputer(inputCols = inputCols, outputCols = ["{}_imputed".format(c) for c in inputCols])

# Applying imputation
dfImputed = imputer.fit(df).transform(df) # Returns spark dataframe

# Keep columns with "_imputed"
dfImputed = dfImputed.drop(*inputCols)

# Rename columns
newColumnNames = list(map(lambda x: x.replace("_imputed", ""), dfImputed.columns))
dfImputed = dfImputed.toDF(*newColumnNames)
dfImputed.columns

In [0]:
# Data preparation
from pyspark.ml.feature import VectorAssembler, Normalizer
from pyspark.ml import Pipeline

# Ignore these variables for modelling
ignore = ['CUST_ID']

# Create vector of all features
assembler = VectorAssembler(inputCols = [x for x in dfImputed.columns if x not in ignore], outputCol = 'features')

# Create normalisation for all features to scale between 0 and 1
normaliser = Normalizer(inputCol = 'features', outputCol = 'normFeatures', p = 1.0) # p refers to power for norm

# Define pipeline
pipeline = Pipeline(stages = [assembler, normaliser])

# Fit the pipeline
transformations = pipeline.fit(dfImputed)

# Apply transformation
dfUpdated = transformations.transform(dfImputed)
dfUpdated.columns

In [0]:
# Building model
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Trains a bisecting k-means model
bkm = BisectingKMeans().setK(2).setSeed(1) # Set number of clusters
model = bkm.fit(dfUpdated.select('normFeatures').withColumnRenamed('normFeatures', 'features')) # normFeatures is dense vector of features

# Make predictions
predictions = model.transform(dfUpdated.select('normFeatures').withColumnRenamed('normFeatures', 'features'))

# Evaluate clustering
evaluator = ClusteringEvaluator()
silhoutte = evaluator.evaluate(predictions)
print(f"Slhoutte with squared euclidean distance: {silhoutte}")

# Show results
print("Cluster centres: ")
centres = model.clusterCenters()
for centre in centres:
  print(centre)

In [0]:
# Show cluster assignment
predictions.show()