In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import isnan, when, count, col
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import StandardScaler

In [None]:
spark = SparkSession.builder.appName('RespiratoryDisease').getOrCreate()

#**Part 1 - Data Preprocessing**

In [None]:
df = spark.read.csv('/FileStore/tables/metadata.csv',inferSchema=True,header=True)

df.show(5)

**1.1 Cleaning Null Values**

In [None]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show() #the survival label is mostly null, so lets remove it anyway since this is unsupervised. Also remove _c9 column which is entirely null

In [None]:
df = df.drop('survival','_c9')
df.columns #survival and _c9 columns are now dropped

In [None]:
df = df.na.drop(how = 'any',) #so dropping rows where all the row values are null
df.describe().show()

**1.2 Encoding Categorical Data**


In [None]:
encoder_gen = StringIndexer(inputCol="sex", outputCol="sex_cat")
encoded = encoder_gen.fit(df).transform(df)

encoder_find = StringIndexer(inputCol="finding", outputCol="finding_cat")
encoded = encoder_find.fit(encoded).transform(encoded)

encoder_view = StringIndexer(inputCol="view", outputCol="view_cat")
encoded = encoder_view.fit(encoded).transform(encoded)

encoder_mod = StringIndexer(inputCol="modality", outputCol="modality_cat")
encoded = encoder_mod.fit(encoded).transform(encoded)

encoder_loc = StringIndexer(inputCol="location", outputCol="location_cat")
encoded = encoder_loc.fit(encoded).transform(encoded)

encoded.show(5) #notice new column that encodes the ocean prox data

**1.3 Creating Features Column**


In [None]:
assembler = VectorAssembler(inputCols=['offset','age','sex_cat','finding_cat','view_cat','modality_cat','location_cat'],
                           outputCol='features') 
#so we're grabbing columns 1 to 2nd last one which are our features. Last column is our label. Column 0 is ID which is not relevant

output = assembler.transform(encoded) #applying our vector assembler to all our data



In [None]:
output.select('features').head(4) #notice features is a DenseVector containing all the features we combined

**1.4 Scaling Features**


In [None]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

scalerModel = scaler.fit(output)

# Normalize each feature to have unit standard deviation.
cluster_final = scalerModel.transform(output)
cluster_final.show(5)

#**2.0 Creating Cluster**


In [None]:
kmeans3 = KMeans(featuresCol='scaledFeatures',k=3)
kmeans2 = KMeans(featuresCol='scaledFeatures',k=2)

model_k3 = kmeans3.fit(cluster_final)
model_k2 = kmeans2.fit(cluster_final)

pred_k3 = model_k3.transform(cluster_final)
pred_k2 = model_k2.transform(cluster_final)

evaluator = ClusteringEvaluator()

silhouette_k3 = evaluator.evaluate(pred_k3)
silhouette_k2 = evaluator.evaluate(pred_k2)

print("Silhouette with squared euclidean distance = " + str(silhouette_k3))
print("Silhouette with squared euclidean distance = " + str(silhouette_k2))

In [None]:
centers_k3 = model_k3.clusterCenters()
print("Cluster Centers: ")
for center in centers_k3:
    print(center)

In [None]:
centers_k2 = model_k2.clusterCenters()
print("Cluster Centers: ")
for center in centers_k2:
    print(center)