In [1]:
from pyspark import SparkContext
import os
import random
from itertools import combinations
from copy import deepcopy
import math
sc = SparkContext.getOrCreate()

In [93]:
n_clusters = 10
files = list(map(lambda x:"data/test1/"+x, os.listdir('data/test1')))
files.sort( key=lambda x: os.path.getsize(x), reverse=True )
ans = dict()
alpha = 3
discard_set = dict()
compression_set = dict()
retained_set = []

In [95]:
def getDataLoad(file):
    data = []
    fp = open(file, "r")
    for line in fp.readlines():
        points = line.split(',')
        data.append([float(x) for x in points])
    fp.close()
    random.shuffle(data)
    return data

def euclidean(x1, x2):
    ans = 0
    for i1, i2 in zip(x1,x2):
        ans += (i1-i2)**2
    return math.sqrt(ans)

def argmin(x):
    n = len(x)
    min_x, min_i = float('inf'), 0
    for i, xi in zip(range(n), x):
        if xi < min_x: min_x, min_i = xi, i
    return min_i

def add_vector( v1, v2 ):
    v = []
    for i,j in zip(v1,v2): v.append(i+j)
    return v

In [2]:
input_dir = "data/test2/"
n_cluster = 10

In [3]:
random.seed = 10

In [4]:
alpha = 3
files = list(map(lambda x: input_dir+x, os.listdir(input_dir)))
files.sort( key=lambda x: x)

In [5]:
ans_dict = {}
discard_set = {}
compression_set = {}
retained_set = {}

intermedites = ["round_id,nof_cluster_discard,nof_point_discard,nof_cluster_compression,nof_point_compression,nof_point_retained"]

In [None]:
def euclidean(x1, x2):
    ans = 0
    for i1, i2 in zip(x1,x2):
        ans += (i1-i2)**2
    return math.sqrt(ans)

def add_vector( v1, v2 ): return [ a+b for a,b in zip(v1, v2) ]

def argmin(x):
    n = len(x)
    min_x, min_i = float('inf'), 0
    for i, xi in zip(range(n), x):
        if xi < min_x: min_x, min_i = xi, i
    return min_i

def getCentroid(N, SUM): return [ a/N for a in SUM ]
def getStd(N, SUM, SUMSQ): return [ ((sq/N)-(s/N)**2)  ]

In [96]:
data = getDataLoad(files[0])
d = len(data[0])-1
threshold = 3*math.sqrt(d)

In [109]:
class KMeans():
    def __init__(self, n_clusters=10, max_iterations=float('inf')):
        self.k = n_clusters
        self.max_it = max_iterations
    
    def cluster_changed(self, old, new):
        for o,n in zip(old, new):
            if o!=n: return True
        return False
    
    def initialize_cluster(self, x):
        return random.sample(x, self.k)
    
    def initialize_cluster_later(self, x, n_clusters):
        cluster_centers = [random.sample( x, 1 )[0]]
        for i in range(1, n_clusters):
            dist, idx = 0, 0
            for j in range(len(x)):
                curr = 0
                for k in range(i):
                    curr += euclidean( x[j], cluster_centers[k]  )
                if curr > dist: dist, idx = curr, j
            cluster_centers.append(x[idx])
        return cluster_centers

    
    def fit(self, data):
        "returns ans, sumamry, Map[id, cluster_id], "
        initial = sc.parallelize(data).map(lambda x: ( str(int(x[0])), x[1:] ))
        cluster_centers = self.initialize_cluster([row[1:] for row in data])
        i = 0
        while i != self.max_it:
            point_cluster = initial.mapValues( lambda x: [euclidean(x, center) for center in cluster_centers] ) \
                             .mapValues(lambda x: argmin(x)).collectAsMap() #(id, cluster_id)

            new_cluster_centers = initial.map(lambda x: (point_cluster[x[0]], (x[1],1)) ) \
                            .reduceByKey( lambda x,y: (add_vector(x[0],y[0]), x[1]+y[1]) ) \
                            .mapValues( lambda x: [y/x[1] for y in x[0]] )

            new_cluster_centers = [ x[1] for x in sorted(new_cluster_centers.collect()) ]

            if self.cluster_changed(cluster_centers, new_cluster_centers):
                cluster_centers = new_cluster_centers
            else: 
                cluster_centers = new_cluster_centers 
                break
            i+=1
        summary = initial.mapValues( lambda x: ([euclidean(x, center) for center in cluster_centers], x) ) \
                        .map( lambda x: (argmin(x[1][0]), (1, x[1][1], [v**2 for v in x[1][1]])) ) \
                        .reduceByKey( lambda x,y: [x[0]+y[0], add_vector(x[1],y[1]), add_vector(x[2],y[2])] ) \
                        .collectAsMap() 
        
        cluster_points = initial.map( 
            lambda x: (argmin([euclidean(x[1], center) for center in cluster_centers]), (x[0], x[1]) ) ) \
            .groupByKey().mapValues(list).collectAsMap()
        
        return point_cluster, summary, cluster_points

In [38]:
def seperate_retained( ans, summary,  cluster_points):
    retained = []
    for k,v in list(summary.items()):
        if v[0] <= 1:
            if v[0] == 1:
                key, val = cluster_points[k][0]
                retained.append(  [float(key)]+val ) 
                ans.pop(key)
            summary.pop(k)
            cluster_points.pop(k)
    return ans, summary, cluster_points, retained
            

### Step 2

In [39]:
fraction = min(30000, int(len(data)*0.3))
sample = data[:fraction]

ds_ans, ds_summary, _ = KMeans(n_clusters = n_clusters).fit(sample)
len(ds_ans), len(ds_summary)

(6395, 10)

In [40]:
rest_data = data[fraction:]
point_cluster, summary, cluster_points = KMeans(n_clusters = n_clusters*3, max_iterations = 10).fit(rest_data)

cs_map, cs_summary, _, retain = seperate_retained(point_cluster, summary, cluster_points)
retained_set.extend(retain)
len(cs_map), len(cs_summary), len(retained_set)

(14918, 25, 5)

In [59]:
def mahalanobis_distance( point, N, SUM, SUMSQ ):
    mh = 0
    for i in range(d):
        std = (SUMSQ[i]/N) - (SUM[i]/N)**2
        centroid = SUM[i]/N
        if std==0: normalized = point[i]-centroid
        else: normalized = (point[i]-centroid)/std
        mh += (normalized**2)
    return math.sqrt(mh)


def assign_to_cluster( point, threshold, summary ):
    min_idx, min_mh = 0, float('inf')
    
    for idx, summ in summary.items():
        N, SUM, SUMSQ = summ[0], summ[1], summ[2]
        mh = mahalanobis_distance(point, N, SUM, SUMSQ)
        if mh < min_mh: min_mh, min_idx = mh, idx
            
    if min_mh < threshold:
        return min_idx
    else:
        return -1
    
def updateSummary(old_sum, updates):
    for idx, summary in updates.items():
        old_sum[idx][0] += summary[0]
        for i in range(d):
            old_sum[idx][1][i] += summary[1][i]
            old_sum[idx][2][i] += summary[2][i]

### Step 3

In [48]:
data = getDataLoad(files[1])
rdd = sc.parallelize(data).map( lambda x: (str(int(x[0])), x[1:]) ) \
        .map(lambda x: (assign_to_cluster(x[1], threshold, ds_summary), x[0], x[1] ) )