In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext

sc = SparkContext("local", "first app")

In [3]:
import matplotlib.pyplot as plt
import numpy as np

In [4]:
class CFG : 
    data_num = 200
    block_num = 4
    threshold = 0.01

In [5]:
def create_dataset(N, D = 2, minval = [0, 0], maxval = [1, 1]) : 
    """
    D차원 벡터 N개 생성
    """
    
    dataset = np.zeros((N, D + 1))
    
    # gen IDs
    dataset[:, 0] = np.arange(N, dtype = 'int')
    
    # gen values
    for i in range(D) : 
        vec = np.random.rand(N) * (maxval[i] - minval[i]) + minval[i]
        dataset[:, i + 1] = vec
    
    return dataset

In [6]:
dataset200 = create_dataset(200)

In [7]:
def map_func(input_data) : 
    x, vec = input_data[0], [input_data[1], input_data[2]]
    b = x // (CFG.data_num // CFG.block_num) + 1
    
    tmp_list = []
    for i in range(CFG.block_num) : 
        if i < b : 
            tmp_list.append(((i + 1, b), ((i + 1, b), x, vec)))
        else :
            tmp_list.append(((b, i + 1), ((b, i + 1), x, vec)))
        
    return tmp_list

In [8]:
test = sc.parallelize(dataset200, 1)

In [9]:
test.take(5)

[array([0.        , 0.28943461, 0.12429405]),
 array([1.        , 0.7768364 , 0.62959563]),
 array([2.        , 0.75849072, 0.23909181]),
 array([3.        , 0.51767681, 0.48866389]),
 array([4.        , 0.22737381, 0.48906431])]

In [10]:
test2 = test.flatMap(map_func)

In [11]:
test2.take(20)

[((1, 1.0), ((1, 1.0), 0.0, [0.2894346060761158, 0.1242940548243906])),
 ((1.0, 2), ((1.0, 2), 0.0, [0.2894346060761158, 0.1242940548243906])),
 ((1.0, 3), ((1.0, 3), 0.0, [0.2894346060761158, 0.1242940548243906])),
 ((1.0, 4), ((1.0, 4), 0.0, [0.2894346060761158, 0.1242940548243906])),
 ((1, 1.0), ((1, 1.0), 1.0, [0.7768364049746603, 0.6295956304491573])),
 ((1.0, 2), ((1.0, 2), 1.0, [0.7768364049746603, 0.6295956304491573])),
 ((1.0, 3), ((1.0, 3), 1.0, [0.7768364049746603, 0.6295956304491573])),
 ((1.0, 4), ((1.0, 4), 1.0, [0.7768364049746603, 0.6295956304491573])),
 ((1, 1.0), ((1, 1.0), 2.0, [0.7584907182409724, 0.23909180872417324])),
 ((1.0, 2), ((1.0, 2), 2.0, [0.7584907182409724, 0.23909180872417324])),
 ((1.0, 3), ((1.0, 3), 2.0, [0.7584907182409724, 0.23909180872417324])),
 ((1.0, 4), ((1.0, 4), 2.0, [0.7584907182409724, 0.23909180872417324])),
 ((1, 1.0), ((1, 1.0), 3.0, [0.5176768058344245, 0.4886638932421393])),
 ((1.0, 2), ((1.0, 2), 3.0, [0.5176768058344245, 0.488663893

In [12]:
def dist(vec1, vec2) : 
    d = (vec1[0] - vec2[0]) ** 2 + (vec1[1] - vec2[0]) ** 2
    return d

In [15]:
def reduce_func(tlist) : 
    result = []
    x = 0
    y = 0
    for (key, _, _) in tlist : 
        x, y = key
    if (x == y) : 
        for (_, id_r, vec_r) in tlist :
            for (_, id_s, vec_s) in tlist : 
                if dist(vec_r, vec_s) < CFG.threshold and (id_s != id_r) : 
                    result.append(((id_r, id_s), (vec_r, vec_s)))
    else : 
        list_r = []
        list_s = []
        for (_, id_rs, vec_rs) in tlist :
            b = id_rs // (CFG.data_num // CFG.block_num) + 1
            if (b == x) : 
                list_r.append((id_rs, vec_rs))
            elif (b == y) : 
                list_s.append((id_rs, vec_rs))
        for (id_r, vec_r) in list_r :
            for (id_s, vec_s) in list_s : 
                if dist(vec_r, vec_s) < CFG.threshold : 
                    result.append(((id_r, id_s), (vec_r, vec_s)))
    return result

In [29]:
test2.groupByKey().mapValues(reduce_func).flatMap(lambda x : x[1]).count()

569