In [1]:
#pip install scikit-learn

In [1]:
#### IMPORT LIBRARIES #### 
import pickle
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import fetch_kddcup99
from functions import *
from time import time

timeString=str(time())
#### SELECT THE PICKLE FILE ####
pickle_fileP = 'dataP/log1P'+'_'+timeString+'.pkl'
pickle_fileR = 'dataR/log1U'+'_'+timeString+'.pkl'

###### FUNCTIONS #######
###### FUNCTIONS #######
def labelToInt(label):
    """
    Let's define a map from Y (set of strings) into (0,size(Y)) for easier usage
    """
    uniqueLabels=list(np.unique(y))
    return uniqueLabels.index(label)


def deleteBytes(datum):
    x = datum[1]["x"]
    mask = [type(i) != bytes for i in x]
    datum[1]["x"] = np.asarray(x[mask])
    print(x)
    print(mask)
    return datum
    
def localPlusPlusInit(points, k): 
    #print('pointsshape: ', points.shape)
    '''
    Initialization kmeans ++
    points is a numpy array (n,dim)
    '''
    C=points[np.random.choice(points.shape[0])]#sample from array di punti ecc...
    C=C[np.newaxis, :]
    for _ in range(k):
        #points is array (n, dim), C is array(g<=k, dim)
        #probs is array (n,1)
        probs=np.min(np.sum((points[:,:,np.newaxis]-C.T[np.newaxis,:,:])**2, axis=1), axis=1).flatten()
        #probs=[min([sum((point-centroid)**2) for centroid in C]) for point in points] #numpyfy this, or numbafy if left base python
        probs=probs/np.sum(probs)
        nextCentroid=points[np.random.choice(points.shape[0], p=probs)][np.newaxis,:]
        #print('LE FORME',C.shape, nextCentroid.shape)
        C=np.vstack((C, nextCentroid))
    return C


def weightedAverage(group):
    """
    Function to compute the weighted average
    """
    weight_column='weights'
    groupby_column='clusterId'
    columns_to_average = group.columns.difference([weight_column, groupby_column])
    weighted_averages = group[columns_to_average].multiply(group[weight_column], axis=0).sum() / group[weight_column].sum()
    return weighted_averages


def localLloyds(points, k, weights=None, n_iterations=100):
    """
    function that does the Local Lloyds algorithm
    """
    df=pd.DataFrame(points)
    if weights is None:
        weights=np.ones(shape=len(points))
    #print('weights', weights)
    df['weights']=weights
    df['clusterId']=np.zeros(shape=len(points))
    C=localPlusPlusInit(points, k)
    #print('localPlusPluisInit: ', C)
    clusterId=np.argmin(np.sum((points[:,:,np.newaxis]-C.T[np.newaxis,:,:])**2, axis=1), axis=1)
    for iteration in range(n_iterations):
        df['clusterId']=clusterId
        C_df=df.groupby('clusterId')\
            .apply(weightedAverage)\
            .reset_index()
        C_array=C_df[C_df.columns.difference(['weights', 'clusterId'])].reset_index(drop=True).to_numpy()
        clusterId=np.argmin(np.sum((points[:,:,np.newaxis]-C_array.T[np.newaxis,:,:])**2, axis=1), axis=1)
        #print(clusterId)
        
    return C_array   


def minmaxRescale(datum, minS, maxS):
    """
    Rescale a datum in [0,1]
    """
    mask = (minS < maxS).astype(bool)
    feature = datum[1]["x"] 
    feature = (feature[mask] - minS[mask])/(maxS[mask] - minS[mask])
    return (datum[0], {"x": feature, "y": datum[1]["y"], "d2":datum[1]["d2"]}) 


def selectCluster(datum, C, updateDistances=True):
    """
    Associates a datum to its centroid and updates the distance if True
    dimC(k, len(datum))
    """
    distances = np.sum((datum[1]["x"] - C)**2, axis=1)
    print('distances: ',distances)
    clusterId = np.argmin(distances)
    if updateDistances is True:
        return (clusterId, {'x':datum[1]['x'], 'y':datum[1]['y'], 'd2':distances[clusterId]})
    else:
        return (clusterId, datum[1])


def updateCentroids(Rdd):
    """
    update centroids as 'centers of mass' of clusters
    """
    C=Rdd.mapValues(lambda xy: (xy['x'], 1))\
              .reduceByKey(lambda a,b : (a[0]+b[0], a[1]+b[1]))\
              .mapValues(lambda a:a[0]/a[1])\
              .values()\
              .collect() 
    C=np.array(C) #check later more carefully if causes some overhead
    return C


def updateDistances(Rdd, C):
    """
    update the Rdd with square distances from centroids, given Rdd with centroids already updated
    """
    def datumUpdate(datum, C):
        d2=np.sum((datum[1]['x']-C[datum[0]])**2)
        #return datum
        return (datum[0], {"x": datum[1]["x"], "y": datum[1]["y"], "d2":d2})
    Rdd=Rdd.map(lambda datum:datumUpdate(datum, C))
    return Rdd


def cost(Rdd):
    """
    calculate global cost of X,C from an Rdd with distances from centroids already updated
    """
    my_cost=Rdd.map(lambda datum : datum[1]['d2'])\
               .reduce(lambda a,b: a+b)
    return my_cost 


def kMeans(Rdd, C_init, maxIterations, logParallelKmeans=None):
    """
    kMeans in parallel (?)
    """
    
    t0 = time()
    
    my_kMeansCosts = []
    tIterations = []
    C=C_init

    for t in range(maxIterations):
        t1 = time()
        RddCached = Rdd.map(lambda datum: selectCluster(datum, C)).persist()
        # Now we compute the new centroids by calculating the averages of points belonging to the same cluster.
        # Need to check that all centroids are assigned to at least one point, otherwise k changes!!! Solutions?!
        C=updateCentroids(RddCached)
        my_cost = cost(RddCached)
        
        my_kMeansCosts.append(my_cost)
        t2 = time()
        
        tIteration = t2 - t1
        tIterations.append(tIteration)
        
        #RddCached.unpersist() bad for time efficiency, not necessary due to Python Garbage collector
        

    tEnd = time()
    tTotal = tEnd - t0
    
    if logParallelKmeans is not None:
        logParallelKmeans["CostsKmeans"] = my_kMeansCosts
        logParallelKmeans["tIterations"] = tIterations
        logParallelKmeans["tTotal"] = tTotal
        
    return C


def naiveInitFromSet(Rdd, k, logNaiveInit=None):
    """
    uniform sampling of k points from Rdd
    """
    t0 = time()
    
    kSubset=Rdd.takeSample(False, k)
    # Replacement is set to False to avoid coinciding centroids BUT no guarantees that in the original dataset all points are distinct!!! Check if causes problems in the algorithm (i.e. need to pre-filter) or it's ok
    C_init=np.array([datum[1]['x'] for datum in kSubset])

    tEnd = time()
    
    if logNaiveInit is not None:
        logNaiveInit["tTotal"] = tEnd - t0
        
    return C_init


def naiveInitFromSpace(k, dim):
    """
    #uniform drawing of k points from euclidean space
    #we assume the Rdd has been mapped into a [0,1]^dim space
    """
    C_init=np.random.uniform(size=(k,dim))
    return C_init


def parallelInit(Rdd, k, l, logParallelInit=None):
    """
    Parallel initialization
    """
    t0 = time()
    
    # initialize C as a point in the dataset
    C=naiveInitFromSet(Rdd, 1) 
    
    # associate each datum to the only centroid (computed before) and computed distances and cost
    Rdd=Rdd.map(lambda datum : (0, datum[1]))
    Rdd=updateDistances(Rdd, C).persist()
    my_cost=cost(Rdd)

    # number of iterations (log(cost))
    n_iterations=int(np.log(my_cost))
    if(n_iterations<1): n_iterations=1

    
    tSamples = []
    tCentroids = []
    CostInits = [my_cost]
    # iterative sampling of the centroids
    for _ in range(n_iterations):

        t1=time()
        # sample C' according to the probability
        C_prime=Rdd.filter(lambda datum : np.random.uniform()<l*datum[1]['d2']/my_cost)\
                   .map(lambda datum : datum[1]['x'])\
                   .collect()
        C_prime=np.array(C_prime)
        t2=time()

        # stack C and C', update distances, centroids, and cost
        if (C_prime.shape[0]>0):
            C=np.vstack((C, C_prime))
            Rdd=Rdd.map(lambda datum: selectCluster(datum, C)).persist()
            my_cost=cost(Rdd)
        t3=time()

        tSample = t2 -t1
        tCentroid = t3 - t2
        tSamples.append(tSample)
        tCentroids.append(tCentroid)
        CostInits.append(my_cost)
       
    #erase centroids sampled more than once 
    C=C.astype(float)
    C=np.unique(C, axis=0)
    Rdd=Rdd.map(lambda datum: selectCluster(datum, C))
    
    #compute weights of centroids (sizes of each cluster) and put them in a list whose index is same centroid index as C
    wx=Rdd.countByKey()
    weights=np.zeros(len(C))
    weights[[list(wx.keys())]]=[list(wx.values())]
    
    #subselection of k centroids from C, using local Lloyds algorithm with k-means++ initialization
    if C.shape[0]<=k:
        C_init=C
    else:
        C_init=localLloyds(C, k, weights=weights, n_iterations=100)

    tEnd = time()
    
    if logParallelInit is not None:
        logParallelInit["tSamples"] = tSamples
        logParallelInit["tCentroids"] = tCentroids
        logParallelInit["CostInit"] = CostInits
        logParallelInit["tTotal"] = tEnd - t0
        
    return C_init

def predictedCentroidsLabeler(C_expected, C_predicted):
    distMatrix=np.sum((C_expected[:,:,np.newaxis]-C_predicted.T[np.newaxis, :,:])**2,axis=1)
    #the labeler i-th entry j, tells that i-th centroid of C_expected is associated to j-th element of C_predicted
    labeler=np.argmin(distMatrix,axis=1)
    #square distance of element of C_expected to nearest point in C_predicted
    distances=distMatrix[np.arange(len(distMatrix)),labeler]
    return labeler, distances



#### SET UP SPARK ####

# import the python libraries to create/connect to a Spark Session
from pyspark.sql import SparkSession

# build a SparkSession 
#   connect to the master node on the port where the master node is listening (7077)
#   declare the app name 
#   configure the executor memory to 512 MB
#   either *connect* or *create* a new Spark Context
spark = SparkSession.builder \
    .master("spark://spark-master:7077")\
    .appName("My first spark application")\
    .config("spark.executor.memory", "512m")\
    .getOrCreate()

# create a spark context
sc = spark.sparkContext

#### IMPORT THE DATA SET ####

data = fetch_kddcup99(return_X_y = True, percent10 = True) # default percent10=True

# collect samples and features (target)
x = data[0]
y = data[1] 

# cut the data fro memory reasons
subLen = 1000
x = x[:subLen,]
y = y[:subLen]

#### PARALLEL

# setting up the output information
totalLogParallelInit = {}
totalLogParallelKmeans = {}
tDurationsParallel = {}
tPreOperationsParallel = {}

# cycle over num_slices to be run
# nSlices = [2, 4, 8, 16, 32, 64]
nSlices = [1, 2]

for nSlice in nSlices:

    tInit = time() # compute the time of the beginning of the iteration over the number of partitions
    print(f"The iteration with {nSlice} number of partition started at time {tInit}")
    
    # parallelize over nSlice partitions
    Rdd = sc.parallelize([(None, {"x": x[i],"y": y[i], "d2":None}) for i in range(len(y))], numSlices = nSlice)

    # cut the categorical attributes
    Rdd = Rdd.map(deleteBytes)\
             .persist()

    # setting the theoretical number of clusters
    kTrue = Rdd.map(lambda datum: datum[1]["y"])\
               .distinct()\
               .count()
    
    # rescale the RDD over the max
    maxS = Rdd.map(lambda datum: datum[1]["x"])\
           .reduce(lambda a, b: np.maximum(a, b))
    minS = Rdd.map(lambda datum: datum[1]["x"])\
           .reduce(lambda a, b: np.minimum(a, b))

    Rdd = Rdd.map(lambda datum: minmaxRescale(datum, minS, maxS))\
             .persist()
    
    # setting up the input and output information for the alghoritm
    logParallelInit = {}
    logParallelKmeans = {}

    k=kTrue
    l=k*2 # rescaling probability to have more centroids than k

    tInitI = time()

    tPreOperation = tInitI - tInit
    print(f"Finished the pre-steps after {tPreOperation} seconds")
          
    # initialization kMeans //
    C_init = parallelInit(Rdd, k, l, logParallelInit)
    
    tInitialization = time() - tInitI
    print(f"Finished the initialization after {tInitialization} seconds")
    
    # run the k-means alghoritm
    C = kMeans(Rdd, C_init, 15, logParallelKmeans)
    
    # time information
    tEnd = time() # compute the time of the end of the iteration over the number of partitions
    tDuration = tEnd - tInit
    
    print(f"The iteration with {nSlice} number of partition ended at time {tEnd} after {tDuration} seconds")

    # output in the correct memory adresses
    totalLogParallelInit[f"Number of partition" + str(nSlice)] = logParallelInit
    totalLogParallelKmeans[f"Number of partition" + str(nSlice)] = logParallelKmeans
    tDurationsParallel[f"Number of partition" + str(nSlice)] = tDuration
    tPreOperationsParallel[f"Number of partition" + str(nSlice)] = tPreOperation

#### TOTAL OUTPUT ON FILE ####

# compute the total log 
logParallel = {"totalLogParallelInit": totalLogParallelInit, "totalLogParallelKmeans": totalLogParallelKmeans, "tDurationsParallel": tDurationsParallel, "tPreOperationsParallel": tPreOperationsParallel}

# save the log file
if not os.path.exists('dataP'): # create a directory if it doesnt exist
    os.makedirs('dataP')

with open(pickle_fileP, "wb") as file:
    pickle.dump(logParallel, file)




print("Starting the naive inizialization part")




#### NAIVE RANDOM

# setting up some dictionaries
totalLogNaiveInit = {}
totalLogNaiveKmeans = {}
tDurationsNaive = {}
tPreOperationsNaive = {}

# cycle over num_slices to be run
# nSlices = [2, 4, 8, 16, 32, 64]
nSlices = [1, 2]

for nSlice in nSlices:

    tInit = time() # compute the time of the beginning of the iteration over the number of partitions
    print(f"The iteration with {nSlice} number of partition started at time {tInit}")
    
    # parallelize over nSlice partitions
    Rdd = sc.parallelize([(None, {"x": x[i],"y": y[i], "d2":None}) for i in range(len(y))], numSlices = nSlice)

    # cut the categorical attributes
    Rdd = Rdd.map(deleteBytes)\
             .persist()

    # setting the theoretical number of clusters
    kTrue = Rdd.map(lambda datum: datum[1]["y"])\
               .distinct()\
               .count()
    
    # rescale the RDD over the max
    maxS = Rdd.map(lambda datum: datum[1]["x"])\
           .reduce(lambda a, b: np.maximum(a, b))
    minS = Rdd.map(lambda datum: datum[1]["x"])\
           .reduce(lambda a, b: np.minimum(a, b))

    Rdd = Rdd.map(lambda datum: minmaxRescale(datum, minS, maxS))\
             .persist()
    
    # setting up the input and output information for the alghoritm
    logNaiveInit = {}
    logNaiveKmeans = {}

    k=kTrue
    l=k*2 # rescaling probability to have more centroids than k

    tInitI = time()

    tPreOperation = tInitI - tInit
    print(f"Finished the pre-steps after {tPreOperation} seconds")
          
    # initialization kMeans //
    C_init = naiveInitFromSet(Rdd, k, logNaiveInit)
    
    tInitialization = time() - tInitI
    print(f"Finished the initialization after {tInitialization} seconds")
    
    # run the k-means alghoritm
    C = kMeans(Rdd, C_init, 15, logNaiveKmeans)
    
    # time information
    tEnd = time() # compute the time of the end of the iteration over the number of partitions
    tDuration = tEnd - tInit
    
    print(f"The iteration with {nSlice} number of partition ended at time {tEnd} after {tDuration} seconds")

    # output in the correct memory adresses
    totalLogNaiveInit[f"Number of partition" + str(nSlice)] = logNaiveInit
    totalLogNaiveKmeans[f"Number of partition" + str(nSlice)] = logNaiveKmeans
    tDurationsNaive[f"Number of partition" + str(nSlice)] = tDuration
    tPreOperationsNaive[f"Number of partition" + str(nSlice)] = tPreOperation

#### TOTAL OUTPUT ON FILE ####

# compute the total log 
logNaive = {"totalLogNaiveInit": totalLogNaiveInit, "totalLogNaiveKmeans": totalLogNaiveKmeans, "tDurationsNaive": tDurationsNaive, "tPreOperationsNaive": tPreOperationsNaive}

# save the log file
if not os.path.exists('dataR'): # create a directory if it doesnt exist
    os.makedirs('dataR')

with open(pickle_fileR, "wb") as filer:
    pickle.dump(logNaive, filer)

ModuleNotFoundError: No module named 'sklearn'