In [1]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

In [2]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

In [20]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

df = spark.read.csv('datasets/normalized/iris.csv')
df.createOrReplaceTempView('df')

for col in df.columns:
    df = df.withColumn(col, df[col].cast(DoubleType()))

vectorAssembler = VectorAssembler(inputCols=["_c0","_c1","_c2"],
                                  outputCol="features")

kmeans = KMeans(featuresCol="features").setK(3).setSeed(1)

pipeline = Pipeline(stages=[vectorAssembler, kmeans])
model = pipeline.fit(df)
predictions = model.transform(df)

evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))


Silhouette with squared euclidean distance = 0.678947550812511


In [21]:
df.printSchema()

root
 |-- _c0: double (nullable = true)
 |-- _c1: double (nullable = true)
 |-- _c2: double (nullable = true)



In [16]:
from pyspark.sql.types import DoubleType

df_ = df.withColumn("_c0", df["_c0"].cast(DoubleType()))

In [17]:
df_.printSchema()

root
 |-- _c0: double (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)



In [18]:
df.columns

['_c0', '_c1', '_c2']