the local simhash pooled index tester

In [15]:
# a shallow fork of simhashindex (https://github.com/liangsun/simhash/blob/master/simhash/__init__.py)
# what we need to do, in a db-free test env, is retain more than the string and the hash of
# the string (we have the sha of the source, the string, and the simhash of the string)
# and a way to exclude a sha from the result set - find equivalent objects in another response

# we are now trying parmap

from simhash import Simhash
import multiprocessing 

from itertools import chain
from operator import itemgetter
import os
import time

def _partition(store, workers):
     # chunk out our store into smaller bits
    breakpoint = int(round(len(store) /workers + 0.5))
    i = 0
    while i < len(store):
        yield store[i:i+breakpoint]
        i += breakpoint

def _reduce(mappings):
    near_dupes = list(chain.from_iterable(mappings))

# def _map(arr, simhash, k, f):
#     # make a bucket and compare the simhash to
#     # that local set, return if distance < k
#     bucket = IndexBucket(k=k, f=f)
    
#     print 'init bucket'

#     for i, q in enumerate(arr):
#         bucket.add(*q)

#     # do the comparison
#     near_dupes = bucket.get_near_dups(simhash)

#     # return tuples of object strings, distance scores
#     return [n.split('|') for n in near_dupes]
        
def get_near_dupes(objs, simhash, workers=5):
    # now let's do some stuffs
    # pool = Pool(processes=workers,)
    
    partitions = list(_partition(objs, workers))
#     mapped_buckets = parmap.map(_map, partitions, simhash, 64, 2, pool=pool)    

    task_queue = multiprocessing.JoinableQueue()
    result_queue = multiprocessing.Queue()
    num_jobs = len(partitions)
    
    # consumers = [multiprocessing.Process(target=_map, args=(partition, simhash, 64, 2)) for partition in partitions]

    consumers = [Consumer(task_queue, result_queue) for i in range(num_jobs)]
    for c in consumers:
        c.start()
    
    print 'start consumers {0}'.format(len(consumers))
    
    for partition in partitions:
        task_queue.put(Task(partition, simhash))
        print 'adding partition', len(partition)
        
    for i in xrange(num_jobs):
        task_queue.put(None)
    
    task_queue.join()
    
    print 'joined queue'
    
    mapped_buckets = []
    while num_jobs:
        result = result_queue.get()
        mapped_buckets.append(result)
        print result
        num_jobs -= 1

#     mapped_buckets = [result_queue.get() for result in montecarlos]
    
    print 'returned queue'
    
    near_dupes = _reduce(mapped_buckets)
    return near_dupes

class Task(object):
    def __init__(self, objs, simhash, k=2, f=64):
        self.objs = objs
        self.simhash = simhash
        self.k = k
        self.f = f
    
    def __call__(self):
        bucket = IndexBucket(k=k, f=f)
    
        print 'init bucket'

        s = time.time()
        for i, q in enumerate(arr):
            bucket.add(*q)

        # do the comparison
        near_dupes = bucket.get_near_dups(simhash)
        
        print 'gotten dupes {0}'.format(len(near_dupes))
        
        print 'processing time: ',  time.time() - s

        # return tuples of object strings, distance scores
        return [n.split('|') for n in near_dupes]

    def __str__(self):
        return 'task'

class Consumer(multiprocessing.Process):
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
    
    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % proc_name
                self.task_queue.task_done()
                break
            print '%s: %s' % (proc_name, next_task)
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class IndexBucket(object):
    def __init__(self, f=64, k=2):
        self.bucket = {}
        self.k = k
        self.f = f

    def get_near_dups(self, simhash):
        """
        `simhash` is an instance of Simhash
        return a list of obj_id (pipe-delimited string of sha|text|distance)
        """
        assert simhash.f == self.f

        ans = set()

        for key in self.get_keys(simhash):
            dups = self.bucket.get(key, set())

            for dup in dups:
                sim2, obj_blob = dup.split(',', 1)
                sim2 = Simhash(long(sim2, 16), self.f)

                d = simhash.distance(sim2)
                if d <= self.k:
                    ans.add('{0}|{1}'.format(obj_blob, d))
        return list(ans)
    
    def add(self, obj_id, obj_str, simhash):
        """
        `obj_id` is a string
        `simhash` is an instance of Simhash
        """
        assert simhash.f == self.f

        for key in self.get_keys(simhash):
            v = '%x,%s|%s' % (simhash.value, obj_id, obj_str)

            self.bucket.setdefault(key, set())
            self.bucket[key].add(v)

    def delete(self, obj_id, obj_str, simhash):
        """
        `obj_id` is a string
        `simhash` is an instance of Simhash
        """
        assert simhash.f == self.f

        for key in self.get_keys(simhash):
            v = '%x,%s|%s' % (simhash.value, obj_id, obj_str)

            if v in self.bucket.get(key, set()):
                self.bucket[key].remove(v)
    
    @property
    def offsets(self):
        """
        You may optimize this method according to <http://www.wwwconference.org/www2007/papers/paper215.pdf>
        """
        return [self.f // (self.k + 1) * i for i in range(self.k + 1)]

    def get_keys(self, simhash):
        for i, offset in enumerate(self.offsets):
            m = (i == len(self.offsets) - 1 and 2 ** (self.f - offset) - 1 or 2 ** (self.offsets[i + 1] - offset) - 1)
            c = simhash.value >> offset & m
            yield '%x:%x' % (c, i)

    def bucket_size(self):
        return len(self.bucket)


In [16]:
from uuid import uuid4  

# the test is uuid similarity (which is junk but auto-generated junk)
big_list = [str(uuid4()).split('-')[0]for i in xrange(0, 1000)]
big_list = [(b, Simhash(b)) for b in big_list]

# index = HashIndex(big_list, test_item[1])
test_item = big_list[1]
# print 'test item = ', test_item[0], test_item[1].value

s = time.time()
near_dupes = get_near_dupes(big_list, test_item[1])

#near_dupes = get_near_dupes(big_list, test_item[1], 5, 64, 2)
print near_dupes

print time.time() - s


start consumers 5
adding partition 201
adding partition 201
adding partition 201
adding partition 201
adding partition 196


Traceback (most recent call last):
  File "//anaconda/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed


KeyboardInterrupt: 