In [47]:
%load_ext autotime
import numpy as np
import msgpack
import random

The autotime extension is already loaded. To reload it, use:
  %reload_ext autotime
time: 4.93 ms


# "Interchange Dictionary" (InterDict) concept-test implementation


* This is a specialized container that representing a dictionary of dictionaries.
* Everything is written (transactionally) to on-disk storage
* Obvious goal is to provide key-value database of serialized dicts that, when stored on a shared filesystem, allows concurrent access from multiple processes/hosts
* Global write lock; reads do not lock
* Unique sets of keys for the stored dictionaries are hashed (this saves disk space; hurts performance, probably not necessary)
* Pure python, could probably be sped up by using cythonized functions

In [134]:
import lmdb, msgpack, hashlib
from pyhashxx import hashxx

class InterDict(object):

    def unpack(self, v):
        return msgpack.unpackb(v)

    def pack(self, v):
        return msgpack.packb(v)

    def hash(self, v):
        return np.uint64(hashxx(v, seed=0))

    def __init__(self, dbdir):
        self.dbdir = dbdir if isinstance(dbdir, bytes) else dbdir.encode()
        self.env = lmdb.open(dbdir, max_dbs=3, map_size=int(1e9))
        self.hashes = self.env.open_db(b'hashes', integerkey=True)
        self.keys = self.env.open_db(b'keys', integerkey=True)
        self.vals = self.env.open_db(b'vals', integerkey=True)

    def __setitem__(self, key, val):
        key_packed = np.uint64(key)
        dk = tuple(sorted(val.keys()))
        dk_packed = self.pack(dk)
        dk_hashed = self.hash(dk_packed)
        dv = tuple(val[k] for k in dk)
        dv_packed = self.pack(dv)
        with self.env.begin(write=True, buffers=True) as txn:
            txn.put(dk_hashed, dk_packed, db=self.hashes)
            txn.put(key_packed, dk_hashed, db=self.keys)
            txn.put(key_packed, dv_packed, db=self.vals)

    def __getitem__(self, key):
        key_packed = np.uint64(key)
        with self.env.begin(write=False, buffers=True) as txn:
            dk_hashed = txn.get(key_packed, db=self.keys)
            if dk_hashed is not None:
                dk_packed = txn.get(dk_hashed, db=self.hashes)
                if dk_packed is not None:
                    dk = self.unpack(dk_packed)
                else:
                    raise KeyError()
            else:
                raise KeyError()
            dv_packed = txn.get(key_packed, db=self.vals)
            if dv_packed is not None:
                dv = self.unpack(dv_packed)
            else:
                raise KeyError()
        return dict(zip(dk,dv))

    def batch_inserter(self, dict_keys):
        dk = tuple(sorted(dict_keys))
        dk_packed = self.pack(dk)
        dk_hashed = self.hash(dk_packed)

        with self.env.begin(write=True) as txn:
            txn.put(dk_hashed, dk_packed, db=self.hashes)

        def inserter_function(itr):
            with self.env.begin(write=True) as txn:
                for key,val in itr:
                    key_packed = np.uint64(key)
                    dv = tuple(val[k] for k in dk)
                    dv_packed = self.pack(dv)    
                    txn.put(key_packed, dk_hashed, db=self.keys)
                    txn.put(key_packed, dv_packed, db=self.vals)

        return inserter_function
    
    def batch_getvalues(self, dict_keys):
        dk = tuple(sorted(dict_keys))
        dk_packed = self.pack(dk)
        dk_hashed = self.hash(dk_packed)

        def getvalues_function(itr):
            with self.env.begin(write=False) as txn:
                for key in itr:
                    key_packed = np.uint64(key)
                    if dk_hashed == np.frombuffer(txn.get(key_packed, db=self.keys),
                                                  dtype='uint64')[0]:
                        dv_packed = txn.get(key_packed, db=self.vals)
                        if dv_packed is not None:
                            yield self.unpack(dv_packed)
                        else:
                            raise KeyError()
                    else:
                        raise KeyError()
                        
        return getvalues_function
                    

time: 169 ms


# Simple performance comparisons

Compares serialization and deserialization times to:

* a plain python dictionary
* a python dictionary of msgpack serialialized dictionaries

In [86]:
ndicts = 10000
nkeys = 10
test_dict = {}
test_msgpack = {}
dummy = None
!rm -r '/tmp/test-interdict'
interd = InterDict('/tmp/test-interdict')

time: 146 ms


## plain python dict storage

In [87]:
for i in range(ndicts):
    test_dict[i] = {j:str(random.random()) for j in range(nkeys)}

time: 232 ms


## dict storing msgpack bytestrings

In [88]:
for i in range(ndicts):
    test_msgpack[i] = msgpack.packb({j:str(random.random()) for j in range(nkeys)})

time: 292 ms


## serialized storage using the InterDict on-disk dict

In [89]:
for i in range(ndicts):
    interd[i] = {j:str(random.random()) for j in range(nkeys)}

time: 5.17 s


## batch insertion

In [91]:
batch_inserter = interd.batch_inserter([i for i in range(nkeys)])
def itr(ndicts, nkeys):
    for i in range(ndicts):
        yield (i, tuple(str(random.random()) for j in range(nkeys)))
batch_inserter(itr(ndicts, nkeys))

time: 427 ms


# Deserialization

## plain python dict (random)

In [92]:
for n in range(ndicts):
    i = random.randint(0, ndicts-1)
    dummy = test_dict[i]

time: 50 ms


## msgpack dict (random)

In [93]:
for n in range(ndicts):
    i = random.randint(0, ndicts-1)
    dummy = msgpack.unpackb(test_msgpack[i])

time: 87.6 ms


## InterDict (random)

In [94]:
for n in range(ndicts):
    i = random.randint(0, ndicts-1)
    dummy = interd[i]

time: 256 ms


## InterDict (sequential)

In [95]:
for n in range(ndicts):
    dummy = interd[n]

time: 203 ms


## InterDict batch access (random)

In [96]:
batch_getvalues = interd.batch_getvalues(tuple(i for i in range(nkeys)))
def random_idx(ndicts):
    for i in range(ndicts):
        yield random.randint(0, ndicts-1)
for x in batch_getvalues(random_idx(ndicts)):
    dummy = x

time: 226 ms


## InterDict batch access (sequential)

In [97]:
batch_getvalues = interd.batch_getvalues(tuple(i for i in range(nkeys)))
for x in batch_getvalues(range(ndicts)):
    dummy = x

time: 172 ms
