## Simulating the tutorial of Official Tutorial
(non-costom distance function)

Tutorial: 

http://spark.apache.org/docs/latest/mllib-clustering.html


data:
https://github.com/apache/spark/tree/master/data/mllib


In general using a different distance measure doesn't make sense, because k-means (unlike k-medoids) algorithm is well defined only for Euclidean distances.

### Why does k-means clustering algorithm use only Euclidean distance metric?

The term "centroid" is itself from Euclidean geometry. It is multivariate mean in euclidean space. Euclidean space is about euclidean distances. Non-Euclidean distances will generally not span Euclidean space. That's why K-Means is for Euclidean distances only.


Source: https://stats.stackexchange.com/questions/81481/why-does-k-means-clustering-algorithm-use-only-euclidean-distance-metric

If you are looking for an Manhattan-distance variant of k-means, there is k-medians. Because the median is a known best L1 estimator.

If you want arbitrary distance functions, have a look at k-medoids (aka: PAM, partitioning around medoids). The medoid minimizes arbitrary distances (because it is defined as the minimum), and there only exist a finite number of possible medoids, too. It is much more expensive than the mean, though.

In [1]:
#Grant the total CPU info
# !pip install psutil
# import psutil,datetime

In [2]:
#api for k-means algo
from pyspark.ml.clustering import KMeans,KMeansModel

#ClusteringEvaluator
from pyspark.ml.evaluation import ClusteringEvaluator
#https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.ClusteringEvaluator.html
#The metric computes the Silhouette measure using the squared Euclidean distance.

from numpy import array
from math import sqrt

#Build the api for the entrance
from pyspark.context import SparkContext,SparkConf
from pyspark.rdd import RDD
from pyspark.sql import SparkSession
import os
import sys


#Config
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jre1.8.0_301'
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable


sc = SparkContext(conf=SparkConf().setAppName("Kmeans").setMaster("local"))
print(f'sc:{sc}')
spark = SparkSession(sc)

sc:<SparkContext master=local appName=Kmeans>


## Import dataset

In [54]:
os.getpath()

AttributeError: module 'os' has no attribute 'getpath'

In [45]:
# Load and parse the data
path = r'C:\Users\LeoShr\p_space\NTHU\MDA\CH3_K_means\dataset\sample_kmeans_data.txt'
#dataset = spark.read.format("libsvm").load(path) #this is used to import csv


In [48]:
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
clusters = KMeans.fit(parsedData, 2, maxIterations=10, initializationMode="random")

TypeError: fit() got an unexpected keyword argument 'maxIterations'

In [36]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]#list
rdd = spark.sparkContext.parallelize(dept)#list 2 RDD
rdd

<class 'list'>


ParallelCollectionRDD[28] at readRDDFromFile at PythonRDD.scala:274

In [42]:
#load 成rdd
data = sc.textFile(path)
print(data)
data.collect()

# #PySpark provides toDF() function in RDD which can be used to convert RDD into Dataframe
# #load 成df
# df_data = data.toDF()
# df_data.collect()

C:\Users\LeoShr\p_space\NTHU\MDA\CH3_K_means\dataset\sample_kmeans_data.txt MapPartitionsRDD[43] at textFile at <unknown>:0


['0 1:0.0 2:0.0 3:0.0',
 '1 1:0.1 2:0.1 3:0.1',
 '2 1:0.2 2:0.2 3:0.2',
 '3 1:9.0 2:9.0 3:9.0',
 '4 1:9.1 2:9.1 3:9.1',
 '5 1:9.2 2:9.2 3:9.2']

In [44]:
#Not working
#parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
parsedData = data.map(lambda x: x.split(' '))
parsedData.collect()

[['0', '1:0.0', '2:0.0', '3:0.0'],
 ['1', '1:0.1', '2:0.1', '3:0.1'],
 ['2', '1:0.2', '2:0.2', '3:0.2'],
 ['3', '1:9.0', '2:9.0', '3:9.0'],
 ['4', '1:9.1', '2:9.1', '3:9.1'],
 ['5', '1:9.2', '2:9.2', '3:9.2']]

### Build model

In [23]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

ModuleNotFoundError: No module named 'sklearn'

In [26]:
# Build the model (cluster the data)
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(parsedData)

AttributeError: 'PipelinedRDD' object has no attribute '_jdf'


### AttributeError: 'PipelinedRDD' object has no attribute '_jdf'

MultilayerPerceptronClassifier is available with pyspark.ml and it works with DataFrame only while pyspark.mllib works with RDD and MultilayerPerceptronClassifier is not available under mlLib (and it will never be), now I have to change the way I load the data in Spark ans load it as a dataframe 

Stackoverflow: https://stackoverflow.com/questions/39643185/pipelinedrdd-object-has-no-attribute-jdf

In [51]:

# Build the model (cluster the data)
# clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
maxIterations=10
kmeans = KMeans(k = maxIterations)

In [52]:
kmeans.setSeed(1)

KMeans_628ea7e40eaf

In [53]:
kmeans.setWeightCol("weighCol")

KMeans_628ea7e40eaf

In [None]:
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))


In [None]:
# Save and load model
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")

In [None]:

# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

# Save and load model
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")