In [10]:
sc

In [157]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.linalg import Vectors

In [158]:
#Input 
# Loads data.
raw_dataset = spark.read.format("csv").option("header", "true").load("iris.csv")

In [159]:
#Split dataset
train_ds, test_ds = raw_dataset.randomSplit([0.75, 0.25], seed=12345)
len(train_ds.rdd.collect()), len(test_ds.rdd.collect())

(125, 25)

In [185]:
class Knn:
    
    def __init__(self, dataset):
        self.dataset = dataset
        
    def __row_dist(self, input, row):
        return (row[-1], Vectors.squared_distance(Vectors.dense(input), row[:-1]))
    
    def predict(self, input, k):
        
        cluster_dists = [self.__row_dist(Vectors.dense(input), row) for row in self.dataset.rdd.collect()]
        knearest = sorted(cluster_dists, key=lambda tup: tup[1])[:k]

        occurences = {}
        for e in knearest:
            if e[0] in occurences:
                occurences[e[0]] += 1
            else: 
                occurences[e[0]] = 1
                
        return max(occurences, key=occurences.get)
    
    def predict_all(self, inputs, k):
        return [(input, self.predict(input[:-1], k)) for input in inputs]
    

In [197]:
#EKnn model class

class EKnn:
    
    def __init__(self, dataset):
        self.dataset = dataset
        
    def __closest_cluster(self, row):
        dists = [Vectors.squared_distance(Vectors.dense(row[0:-1]), self.centers[i]) for i in range(len(self.centers))]
        return dists.index(min(dists))
    
    def __row_dist(self, input, row):
        return (row[-1], Vectors.squared_distance(Vectors.dense(input), row[:-1]))
    
    def train(self, k):
        self.kmeans_k = k
        tds = self.dataset.rdd.map(lambda x:(Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])
        # Trains a k-means model.
        kmeans = KMeans().setK(k) #.setSeed(1)
        model = kmeans.fit(tds)
        self.centers = model.clusterCenters()
        
        self.clusters = [[] for i in range(len(self.centers))]
        for row in self.dataset.rdd.collect():
            min_idx = self.__closest_cluster(row)
            self.clusters[min_idx].append(row)

        return self.clusters
    
    def predict(self, input, k):
        
        ## Find closest cluster
        centers_dists = [Vectors.squared_distance(Vectors.dense(input), self.centers[i]) for i in range(len(self.centers))]
        min_idx = centers_dists.index(min(centers_dists))
        cluster = self.clusters[min_idx]
        
        cluster_dists = [self.__row_dist(Vectors.dense(input), row) for row in self.dataset.rdd.collect()]
        knearest = sorted(cluster_dists, key=lambda tup: tup[1])[:k]

        occurences = {}
        for e in knearest:
            if e[0] in occurences:
                occurences[e[0]] += 1
            else: 
                occurences[e[0]] = 1
                
        return max(occurences, key=occurences.get)
    
    def predict_all(self, inputs, k):
        return [(input, self.predict(input[:-1], k)) for input in inputs]
    
    
            
        
        
        

In [200]:
model_k = EKnn(train_ds)
clusters = model_k.train(3)
k_predictions  = model_k.predict_all(test_ds.rdd.collect(), 10)

k_correct_predictions = 0
for p in k_predictions:
    if p[0].variety == p[1]:
        k_correct_predictions += 1

k_accuracy = k_correct_predictions / len(k_predictions) * 100

print(k_accuracy)

model_1 = Knn(train_ds)
predictions = model_1.predict_all(test_ds.rdd.collect(), 10)

correct_predictions = 0
for p in predictions:
    if p[0].variety == p[1]:
        correct_predictions += 1

accuracy = correct_predictions / len(predictions) * 100

print(accuracy)

100.0
100.0


In [190]:
test_ds.rdd.collect()[0][:-1]

('4.6', '3.2', '1.4', '.2')