In [1]:
from multiprocessing import Pool
import os
import copy
import numpy as np
import scipy as sc
import pandas as pd
import glob
import heapq

from scipy import spatial
from sklearn.metrics import jaccard_similarity_score

class Multiprocessor:
    def __init__(self, processors=None):
        self.commands = []
        self.processors = processors
        
    def get_default_workers(self):
        return os.cpu_count()

    def submit_job(self, function, *args, **kwargs):
        self.commands.append((function, *args, kwargs))
        
    def run_jobs(self):
        return self.__multithreader()
    
    def reset(self, processors=None):
        self.commands = []
        self.processors = processors
    
    def map_multi(self, function, iterator):
        self.reset()
        for item in iterator:
            self.submit_job(function, item)
        return self.run_jobs()
    
    def __multithreader(self):
        with Pool(processes=self.processors) as pool:
            results = [pool.apply_async(function, args = args, kwds = kwargs) for function, args, kwargs in self.commands]
            retrieved = [result.get() for result in results]
        return retrieved

In [2]:
# target = [524,871,291,704,440,63,651,856,438,484,352,835,391,628,64,198]
target = [4296,0.6191191952196445,0.22994753333771967,0.48658640483588744,0.31304992283500876,0.5161600314453116,0.3784549822237053,0.046350669737498906,0.08215184908019468,0.49491115260566276,0.987288754771809,0.8462115484051969,  0.782179271843162,0.20449087742962202,0.6122537136776209,0.318261235925028,0.5794273139733943]
target = np.array(target)
len(target)

17

In [3]:
def my_metric(a, b, dist_type =None):
    if dist_type == 'jaccard':
        return jaccard_similarity_score(a, b, normalize=True)
    else:
        #assume cosine similarity
        return 1 - spatial.distance.cosine(a, b)

def find_nearest(target, f, k = 10):
    H = []
    heapq.heapify(H)

    push = heapq.heappush
    pop = heapq.heappop
    
    df = pd.read_csv(f, engine='python')

    for row in df.values:
        distance = my_metric(target, row)

        neighbor = (distance, row)
        if len(H) < k:
            push(H, neighbor)

        else:
            furthest_saved_neighbor = pop(H)
            closest_neighbor = max(furthest_saved_neighbor, neighbor)
            push(H, closest_neighbor)

    return H

def heap_reduce(heaps, s_path):
    #collasp heaps and remove distance metric
    vals = [point[1] for slave_heaps in heaps for point in slave_heaps]
    df = pd.DataFrame(vals)
    df.to_csv(s_path, index=False)
    

def prettify(H):
    L = copy.deepcopy(H)
    ret = heapq.heappop(L)[1]

    while len(L) > 0:
        neighbor = heapq.heappop(L)[1]
        ret = np.vstack((ret, neighbor))
    re_pd = pd.DataFrame(ret)
    
    return re_pd

In [4]:
files = glob.glob('/home/winston/data/test_csvs/*.csv')

In [5]:
def build_args(files, target):
    targets = [target]*len(files)
    return zip(targets, files)

In [6]:
args_kargs = build_args(files, target)

In [7]:
%%time

mult = Multiprocessor()
returnVal = mult.map_multi(find_nearest, args_kargs)

s_path='/home/winston/data/red/reduced.csv'

heap_reduce(returnVal, s_path)

reduced_heap = find_nearest(target, s_path)
x = prettify(reduced_heap)

CPU times: user 38 ms, sys: 38.9 ms, total: 76.9 ms
Wall time: 24.8 s


In [8]:
x

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16
0,5351.0,0.592752,0.215083,0.638454,0.553878,0.687891,0.617117,0.207878,0.054362,0.469386,0.971452,0.70944,0.992798,0.525112,0.810227,0.46917,0.145391
1,5153.0,0.341559,0.195996,0.433277,0.488066,0.620416,0.211123,0.325472,0.043006,0.586827,0.937057,0.999101,0.690419,0.410081,0.529628,0.717833,0.850448
2,4525.0,0.803341,0.145267,0.160885,0.314462,0.90341,0.492967,0.193777,0.294179,0.666641,0.988953,0.796325,0.914523,0.335762,0.580792,0.034296,0.566159
3,4386.0,0.576754,0.173225,0.420942,0.211277,0.429687,0.291281,0.157809,0.1239,0.812451,0.645199,0.611108,0.75468,0.257485,0.670794,0.060368,0.786081
4,4166.0,0.772289,0.130665,0.164632,0.329942,0.398508,0.337309,0.254052,0.101968,0.517591,0.753905,0.500334,0.50477,0.150206,0.557664,0.397756,0.556216
5,4505.0,0.767954,0.335554,0.469294,0.553499,0.382116,0.55489,0.294615,0.081687,0.765371,0.846256,0.644924,0.559226,0.372366,0.731183,0.421956,0.684049
6,3393.0,0.338501,0.276433,0.617404,0.260002,0.479972,0.215195,0.244456,0.17458,0.316761,0.667616,0.440283,0.709336,0.240191,0.616797,0.253428,0.442477
7,4737.0,0.545166,0.049416,0.441662,0.165413,0.49978,0.739911,0.196824,0.128719,0.398228,0.859229,0.803503,0.664353,0.103914,0.643581,0.299095,0.336585
8,4799.0,0.446117,0.185634,0.451194,0.23657,0.541061,0.118872,0.097573,0.235412,0.552066,0.998042,0.843263,0.896599,0.132445,0.505277,0.294368,0.194992
9,4296.0,0.619119,0.229948,0.486586,0.31305,0.51616,0.378455,0.046351,0.082152,0.494911,0.987289,0.846212,0.782179,0.204491,0.612254,0.318261,0.579427


In [9]:
target

array([4.29600000e+03, 6.19119195e-01, 2.29947533e-01, 4.86586405e-01,
       3.13049923e-01, 5.16160031e-01, 3.78454982e-01, 4.63506697e-02,
       8.21518491e-02, 4.94911153e-01, 9.87288755e-01, 8.46211548e-01,
       7.82179272e-01, 2.04490877e-01, 6.12253714e-01, 3.18261236e-01,
       5.79427314e-01])

In [10]:
def find_nearest(target, files, k = 10):
    H = []
    heapq.heapify(H)

    push = heapq.heappush
    pop = heapq.heappop
    
    for f in files:
        df = pd.read_csv(f, engine='python')

        for row in df.values:
            distance = my_metric(target, row)

            neighbor = (distance, row)
            if len(H) < k:
                push(H, neighbor)

            else:
                furthest_saved_neighbor = pop(H)
                closest_neighbor = max(furthest_saved_neighbor, neighbor)
                push(H, closest_neighbor)

    return H

In [11]:
%%time 
h = find_nearest(target, files)
x = prettify(h)

CPU times: user 3min 1s, sys: 4.25 s, total: 3min 5s
Wall time: 2min 14s


In [12]:
x

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16
0,5351.0,0.592752,0.215083,0.638454,0.553878,0.687891,0.617117,0.207878,0.054362,0.469386,0.971452,0.70944,0.992798,0.525112,0.810227,0.46917,0.145391
1,5153.0,0.341559,0.195996,0.433277,0.488066,0.620416,0.211123,0.325472,0.043006,0.586827,0.937057,0.999101,0.690419,0.410081,0.529628,0.717833,0.850448
2,4525.0,0.803341,0.145267,0.160885,0.314462,0.90341,0.492967,0.193777,0.294179,0.666641,0.988953,0.796325,0.914523,0.335762,0.580792,0.034296,0.566159
3,4386.0,0.576754,0.173225,0.420942,0.211277,0.429687,0.291281,0.157809,0.1239,0.812451,0.645199,0.611108,0.75468,0.257485,0.670794,0.060368,0.786081
4,4166.0,0.772289,0.130665,0.164632,0.329942,0.398508,0.337309,0.254052,0.101968,0.517591,0.753905,0.500334,0.50477,0.150206,0.557664,0.397756,0.556216
5,4505.0,0.767954,0.335554,0.469294,0.553499,0.382116,0.55489,0.294615,0.081687,0.765371,0.846256,0.644924,0.559226,0.372366,0.731183,0.421956,0.684049
6,3393.0,0.338501,0.276433,0.617404,0.260002,0.479972,0.215195,0.244456,0.17458,0.316761,0.667616,0.440283,0.709336,0.240191,0.616797,0.253428,0.442477
7,4737.0,0.545166,0.049416,0.441662,0.165413,0.49978,0.739911,0.196824,0.128719,0.398228,0.859229,0.803503,0.664353,0.103914,0.643581,0.299095,0.336585
8,4799.0,0.446117,0.185634,0.451194,0.23657,0.541061,0.118872,0.097573,0.235412,0.552066,0.998042,0.843263,0.896599,0.132445,0.505277,0.294368,0.194992
9,4296.0,0.619119,0.229948,0.486586,0.31305,0.51616,0.378455,0.046351,0.082152,0.494911,0.987289,0.846212,0.782179,0.204491,0.612254,0.318261,0.579427
