In [15]:
import numpy as np
import math
from operator import mul
from pyspark import SparkContext

In [16]:
def data_parallel_region(data, func, *params):
    return data.map(func)


def data_parallel_region_with_reduction(data, func, reduce_func, *params):
    prods = data.map(func)    
    print("*params = ", *params)
    print("params = ", params)
    print("params[0] = ", params[0])
    return prods.fold(*params, reduce_func)

In [17]:
def list_prod(factors):
    prod = 1
    for x in factors:
        prod *= x
    return prod


def gaussian(x, mu, sigma):
    f = math.sqrt((2 * math.pi) ** len(sigma)) * list_prod(sigma)
    return f * np.exp(-scaled_sq_norm_diff(x, mu, sigma)/2)


def sq_sum(x):
    return sum((y**2 for y in x))


def sq_norm_diff(x, y):
    return sq_sum((z[0]-z[1] for z in zip(x, y)))


def scaled_sq_norm_diff(x, y, s):
    return sq_sum(((z[0]-z[1])/z[2] for z in zip(x, y, s)))

In [18]:
def component_responsibility(x, params):
    probs = [w * gaussian(x, m, v) for w, m, v in params]
    return [p/sum(probs) for p in probs]

In [19]:
def weighted_terms(point, resp):
    return [resp,
            [[p * x for x in point] for p in resp],
            [[p * x**2 for x in point] for p in resp]
           ]

In [20]:
def aggregate_products(prod1, prod2):
    resp_sum = [r1 + r2 for r1, r2 in zip(prod1[0], prod2[0])]
    weighted_point_sum = [[wx1 + wx2 for wx1, wx2 in zip(wp1, wp2)] for wp1, wp2 in zip(prod1[1], prod2[1])]
    weighted_sq_point_sum = [[wsqx1 + wsqx2 for wsqx1, wsqx2 in zip(wsq1, wsq2)]
                             for wsq1, wsq2 in zip(prod1[2], prod2[2])]
    return [resp_sum, weighted_point_sum, weighted_sq_point_sum]

In [21]:
def errf(s1, s2):
    return sum([sq_norm_diff(z1, z2) for z1, z2 in zip(s1, s2)])

In [22]:
def em(k, ds, n, dim, comps, eps):
    print("################################")
    print("EM started.")
    err = 1.
    iter = 0
    while err > eps:
        sum_of_prods = data_parallel_region_with_reduction(ds,
                                                           lambda x: weighted_terms(x, component_responsibility(x, comps)),
                                                           aggregate_products,
                                                           [(0.,) * k, [(0,) * dim] * k, [(0,) * dim] * k])
        mix_weights = [w/n for w in sum_of_prods[0]]
        means = [[y/z for y, z in zip(x[1], (x[0],) * dim)] for x in zip(sum_of_prods[0], sum_of_prods[1])]
        vars = [[y[0]/y[1] - y[2]**2 for y in zip(x[0], (x[1],) * dim , x[2])]
                for x in zip(sum_of_prods[2], sum_of_prods[0], means)]
        last_comps = comps.copy()                                # save parameters
        comps = [x for x in zip(mix_weights, means, vars)]       # update parameters
        err = errf([x[1] for x in last_comps], [x[1] for x in comps])
        iter += 1
        print("iteration ", iter, "completed.")
        print("################")
    return [err, comps]

In [23]:
import csv
import seaborn as sns

In [24]:
sc = SparkContext()
data_file_name = "Data/mixture_3_unbalanced.csv"
data = sc.textFile(data_file_name)
sep = ','
d = data.map(lambda l: list(map(lambda x: float(x), l.split(sep))))
K = 3                    # Number of components
dim = len(d.first())
print("dimensionality = ", dim)
n = d.count()
print("data set size = ", n)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-10-e2b131645a9e>:1 

In [None]:
csvfile = open(data_file_name)
myreader = csv.reader(csvfile)
d_list = [[float(x) for x in u] for u in myreader]
if dim == 2:
    sns.scatterplot(x=list(zip(*d_list))[0], y=list(zip(*d_list))[1])

In [None]:
# Initialize distribution & mixture parameters
# random.seed(1234567890)  # set a seed for testing
# random weights
mix_weights = [1./K] * K
param_means = [[-2,2],[3,2],[-2,-2]]
param_vars = [[.5,.5]] * K
comps = [p for p in zip(mix_weights, param_means, param_vars)]
print("Initial parameters: [[weight_1, mean_1, var_1], ..., [weight_k, mean_k, var_k]] =")
print(comps)

In [None]:
emresult = em(K, d, n, dim, comps, 0.01)
print(emresult)

In [None]:
gaussian_components = emresult[1]
weights, means, vars = list(zip(*(emresult[1])))
means_x = list(zip(*means))[0]
means_y = list(zip(*means))[1]
if dim == 2:
    sns.scatterplot(x=list(zip(*d_list))[0], y=list(zip(*d_list))[1])
    sns.scatterplot(x=means_x, y=means_y)