## create graph parallel

In [1]:
from tqdm import tqdm

In [2]:
import numpy as np
import scipy as sp
import scipy.sparse
from sklearn.neighbors import LSHForest
from multiprocessing import Process, Manager, Value, Lock, cpu_count
from time import time
import pickle
import glob
from operator import mul

In [3]:
L = sp.sparse.load_npz('./data/graph/labeled.npz')
U = sp.sparse.load_npz('./data/graph/unlabeled.npz')
M = sp.sparse.vstack([L,U])
last_index_l = L.shape[0]
last_index_u = last_index_l + U.shape[0]

# we only keep the closest neighbors
max_neighs = 5
size = M.shape[0]

#lshf = LSHForest(n_estimators=15, n_candidates=50, n_neighbors=6, random_state=42)
lshf = LSHForest(random_state=42) 
lshf.fit(M)

LSHForest(min_hash_match=4, n_candidates=50, n_estimators=10, n_neighbors=5,
     radius=1.0, radius_cutoff_ratio=0.9, random_state=42)

In [4]:
def split_load(size,cores):
    jobs_per_core = size//cores
    job_counts = [jobs_per_core]*cores

    split = list(map(mul, job_counts, range(1, cores + 1)))
    n = np.sum(job_counts)

    if n < size:
        split[cores-1] += size-n

    ranges = []

    for i in range(cores):
        if i == 0:
            ranges += [range(0,split[i])]
        else:
            ranges += [range(split[i-1],split[i])]

    return ranges

In [56]:
def compute_graph_for_embedding(graph,edges_weights,edges_ll,edges_lu,edges_uu,chunk,counter,lock):
    end = chunk[-1] + 1
    size = len(chunk)
    i_str = chunk[0]


    batch_size = 1000
    batch_num = int(np.ceil(size / batch_size))

    sims, inds = [], []

    for i in range(batch_num):
        t_str = time()
        distances, indices = lshf.kneighbors(M[i_str + i*batch_size:int(np.min([i_str + (i+1)*batch_size, end]))],n_neighbors=6)
        batch_ids = np.vstack(np.arange(i_str + i*batch_size, int(np.min([i_str + (i+1)*batch_size, end]))))
        xs, ys = np.where(indices==batch_ids)
        distances[xs,ys] = 2.0
        sims.extend(1-distances)
        inds.extend(indices)
        print(i, time() - t_str, end='\r')
    print()
    pickle.dump([sims, inds], open("./data/graph/approx_nn.%i.p" % i_str, "wb"))

    for c, i in enumerate(chunk):
        neighbors_indices = list(inds[c][sims[c].argsort()[-max_neighs::][::-1]])
        correct_indices = [np.where(inds[c]==j)[0][0] for j in neighbors_indices if i < j]
        graph.update({i:correct_indices})

        n = len(correct_indices)

        if n > 0:
            edges = list(zip([i] * n, correct_indices))
            edges_weights.update(dict(zip(edges,np.take(sims[c],correct_indices))))

            for j in correct_indices:
                if (0 <= i < last_index_l) and (0 <= j < last_index_l):
                    edges_ll.append((i,j))
                elif (0 <= i < last_index_l) and (last_index_l <= j < last_index_u):
                    edges_lu.append((i,j))
                else:
                    edges_uu.append((i,j))

        with lock:
            counter.value += 1
            print(str(counter.value))
    return

In [54]:
manager = Manager()
graph = manager.dict()
edges_weights = manager.dict()
edges_ll = manager.list()
edges_lu = manager.list()
edges_uu = manager.list()

counter = Value('i', 0)
lock = Lock()

processes = []
num_of_cpu = cpu_count()

chunks = split_load(size,num_of_cpu)

In [None]:
for chunk in chunks:
    p = Process(target=compute_graph_for_embedding,
                args=(graph, edges_weights, edges_ll, edges_lu, edges_uu, chunk, counter, lock))
    processes += [p]

_ = [p.start() for p in processes]
_ = [p.join() for p in processes]

In [None]:
# save to file the data structure that we worked so hard to compute
pickle.dump(dict(graph), open("./data/graph/graph.p", "wb"))
pickle.dump(dict(edges_weights), open("./data/graph/edges_weights.p", "wb"))
pickle.dump(list(edges_ll), open("./data/graph/edges_ll.p", "wb"))
pickle.dump(list(edges_lu), open("./data/graph/edges_lu.p", "wb"))
pickle.dump(list(edges_uu), open("./data/graph/edges_uu.p", "wb"))

## c