In [2]:
import os
os.environ["SPARK_HOME"] = "/Users/zouhairhajji/Documents/dev/spark-2.4.0-bin-hadoop2.7"

In [3]:
from pyspark.sql import SparkSession
import csv
import numpy as np
import random
from math import sqrt

spark = SparkSession.builder \
        .master('local[*]')  \
        .enableHiveSupport() \
        .getOrCreate()

sc = spark.sparkContext

In [4]:

def loadData(namefile) :
    textfile = sc.textFile(namefile)    \
            .zipWithIndex()   \
            .map(lambda x: (x[1], x[0].split(',')  ))  \
            .map(lambda x: (x[0], [float(i) for i in x[1]]  ))
            
    return textfile

def dimension(rdd_load_data) :
    return rdd_load_data.map(lambda x: len(x[1])).reduce(lambda x, y: max(x, y)) 






def closestcentroide(list1):
    cluster = list1[0][0]
    min_dist = list1[0][1]
    for elem in list1:
        if elem[1] < min_dist:
            cluster  = elem[0]
            min_dist = elem[1]
    return (cluster, min_dist)

def computeCentroids(rdd,findcentroid):

    union = findcentroid.join(rdd)
    centroid_point = union.map(lambda x: (x[1][0][0],x[1][1][:-1])) 
    numpoints = centroid_point.map(lambda x: (x[0],1)).reduceByKey(lambda x,y: x+y) 
    sum = centroid_point.reduceByKey(lambda x,y: [x[i]+y[i] for i in range(len(x))]) 
    union2 = sum.join(numpoints)
    newcentroids = union2.map(lambda x : (x[0], mean(x[1][0],x[1][1]))) 

    return newcentroids  


def calculate_distance(x, y):
    s = sqrt(sum([ (a - b)**2 for a,b in zip(x,y)]))
    dis = float('%.3f'%(s))
    return dis



def initCentroids(rdd_flat_data, k):
    seed = random.randint(0, 100)
    
    centroids = rdd_flat_data.takeSample(False, k, seed=seed)
    return  sc.parallelize(centroids)   \
                .zipWithIndex()         \
                .map(lambda x: (x[1], x[0][1])) 

    
def computeIntraClusterDistance(rdd, centroids):
    initdistance = assignToCluster(rdd,centroids) 
    totaldistance = initdistance.map(lambda x: (1,x[1][1])).reduceByKey(lambda x,y: x+y)
    return totaldistance
    
def mean(x,n):
    return [x[i]/n for i in range(len(x))]

flat_data = loadData('iris_input.data')
 
initCentroids(flat_data, 3).collect()




[(0, [6.3, 2.3, 4.4, 1.3]),
 (1, [6.2, 2.2, 4.5, 1.5]),
 (2, [6.7, 3.0, 5.2, 2.3])]

In [5]:
def calculate_distance(x, y):
    x = np.array(x)
    y = np.array(y)
    return np.sqrt(np.sum(np.square( x-y )))  
    
    
def assignToCluster(rdd, centroids):
    cartes = rdd.cartesian(centroids) 
    
    cartes = cartes.map(lambda x: (x[0][0], (x[1][0], calculate_distance(x[1][1], x[0][1]))  ) )
    cartes = cartes.groupByKey().mapValues(closestcentroide)
    return cartes


def closestcentroide(y):
    y = list(y)
    closest = y[0]
    
    for couple in y:
        if(closest[1] > couple[1]): 
            closest = couple
            
    return  closest

def calculate_new_coordinate(x): 
    mean = lambda x: sum(x) / len(x)
    
    return [ round(mean(x), 2) for x in zip(*x)]  

def computeCentroids(rdd, assigned_centroids): 
     return assigned_centroids   \
                .join(rdd)       \
                .map(lambda x: (x[1][0][0], x[1][1]) )       \
                .groupByKey()       \
                .mapValues(calculate_new_coordinate)

def computeIntraClusterDistance(rdd, centroids):
    initdistance = assignToCluster(rdd, centroids) 
    totaldistance = initdistance.map(lambda x: (1, x[1][1]) ).reduceByKey(lambda x,y: x+y)
    return totaldistance   

def kmeans(filename, k, m, d):
    flat_data = loadData(filename)
    dim = dimension(flat_data)
    
    centroids = initCentroids(flat_data, k)
    
    
    
    #assign every point to a centroids
    assigned_points = assignToCluster(flat_data, centroids)
    
    #compute centroids
    old_centroids = computeCentroids(flat_data, assigned_points)
    
    converged = False
    step_count = 0
    step_max = 10
    
    while not converged  and step_count < step_max:
        step_count += 1
        print('running step ', step_count)
        
        
        assigned_points = assignToCluster(flat_data, old_centroids) 
        
        new_centroids   = computeCentroids(flat_data, assigned_points)
        distance_intracluster = computeIntraClusterDistance(flat_data, new_centroids)
        
        
        centroid_changed = old_centroids.join(new_centroids).map(lambda x: np.sqrt(np.sum(np.square(np.array(x[1][0]) - np.array(x[1][1])))) ).reduce(lambda x, y: x+y) 
        print('    ->  ' ,  centroid_changed)
        
        
         
        if centroid_changed == 0:
            converged = True 
        old_centroids = sc.parallelize(new_centroids.collect()) 
            
        
    return (step_count, distance_intracluster.take(1), assigned_points, old_centroids  ) 
     
    

step_count, distance, points, centroids= kmeans('iris_input.data', 4, 4, 10)
print("distance : " , distance)
print("step_coup : " , step_count)
print("points : " , points.take(10))
print("centroids : ", centroids.take(2))


running step  1
    ->   1.1684194380199828
running step  2
    ->   0.2228188009885354
running step  3
    ->   0.0
distance :  [(1, 83.74462988190915)]
step_coup :  3
points :  [(0, (0, 0.14035668847618196)), (8, (0, 0.8047981113297915)), (16, (0, 0.6585590330410785)), (24, (0, 0.4895916665957458)), (32, (0, 0.7209022125087422)), (40, (0, 0.1889444362769118)), (48, (0, 0.40706264874095255)), (56, (2, 0.45923850012820133)), (64, (1, 0.45497252664309273)), (72, (2, 0.39102429592034293))]
centroids :  [(0, [5.01, 3.42, 1.46, 0.24]), (1, [5.53, 2.64, 3.96, 1.23])]


In [10]:
points.collect()

[(0, (0, 0.14035668847618196)),
 (8, (0, 0.8047981113297915)),
 (16, (0, 0.6585590330410785)),
 (24, (0, 0.4895916665957458)),
 (32, (0, 0.7209022125087422)),
 (40, (0, 0.1889444362769118)),
 (48, (0, 0.40706264874095255)),
 (56, (2, 0.45923850012820133)),
 (64, (1, 0.45497252664309273)),
 (72, (2, 0.39102429592034293)),
 (80, (1, 0.3178049716414143)),
 (88, (1, 0.3987480407475374)),
 (96, (1, 0.39874804074753767)),
 (104, (3, 0.43069710934716077)),
 (112, (3, 0.38144462245521277)),
 (120, (3, 0.24799193535274452)),
 (128, (3, 0.643039656630911)),
 (136, (3, 0.7729812416870155)),
 (144, (3, 0.4934571916590129)),
 (1, (0, 0.4401136216933076)),
 (9, (0, 0.36837480912787696)),
 (17, (0, 0.14730919862656233)),
 (25, (0, 0.44463468150831414)),
 (33, (0, 0.9239588735436229)),
 (41, (0, 1.2424572427250766)),
 (49, (0, 0.1403566884761821)),
 (57, (1, 0.9710818709048172)),
 (65, (2, 0.6963476143421476)),
 (73, (2, 0.46572524088780104)),
 (81, (1, 0.4230839160261236)),
 (89, (1, 0.16431676725155

#  ml lib


In [8]:

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler

In [9]:
def kmeans_mllib(filename, k):
    flat_data = loadData(filename).map(lambda x: x[1])  
    
    features = ["c1", "c2", "c3", "c4"]
    dataset = spark.createDataFrame(flat_data, features)

     
    
    transformed_dataset = VectorAssembler(inputCols=features, outputCol="features").transform(dataset) 
    
    
    kmeans = KMeans(k=k, seed=1) 
    model = kmeans.fit(transformed_dataset.select('features'))
    
    return model.transform(transformed_dataset) 
    
kmeans_mllib('iris_input.data', 4).show(150)

 

+---+---+---+---+-----------------+----------+
| c1| c2| c3| c4|         features|prediction|
+---+---+---+---+-----------------+----------+
|5.1|3.5|1.4|0.2|[5.1,3.5,1.4,0.2]|         0|
|4.9|3.0|1.4|0.2|[4.9,3.0,1.4,0.2]|         0|
|4.7|3.2|1.3|0.2|[4.7,3.2,1.3,0.2]|         0|
|4.6|3.1|1.5|0.2|[4.6,3.1,1.5,0.2]|         0|
|5.0|3.6|1.4|0.2|[5.0,3.6,1.4,0.2]|         0|
|5.4|3.9|1.7|0.4|[5.4,3.9,1.7,0.4]|         0|
|4.6|3.4|1.4|0.3|[4.6,3.4,1.4,0.3]|         0|
|5.0|3.4|1.5|0.2|[5.0,3.4,1.5,0.2]|         0|
|4.4|2.9|1.4|0.2|[4.4,2.9,1.4,0.2]|         0|
|4.9|3.1|1.5|0.1|[4.9,3.1,1.5,0.1]|         0|
|5.4|3.7|1.5|0.2|[5.4,3.7,1.5,0.2]|         0|
|4.8|3.4|1.6|0.2|[4.8,3.4,1.6,0.2]|         0|
|4.8|3.0|1.4|0.1|[4.8,3.0,1.4,0.1]|         0|
|4.3|3.0|1.1|0.1|[4.3,3.0,1.1,0.1]|         0|
|5.8|4.0|1.2|0.2|[5.8,4.0,1.2,0.2]|         0|
|5.7|4.4|1.5|0.4|[5.7,4.4,1.5,0.4]|         0|
|5.4|3.9|1.3|0.4|[5.4,3.9,1.3,0.4]|         0|
|5.1|3.5|1.4|0.3|[5.1,3.5,1.4,0.3]|         0|
|5.7|3.8|1.7|

fez 2
