## Chunked-based parallel dictionary processing

The fastest way to process a large dictionary is often in several large chunks (generally one per core).

- 1) Split large `dict` into smaller `dict`s with `chunkify()`
- 2) scatter `process_chunk()` function in parallel to each chunk
    - In this case returning a `dict`
- 3) gather returned `dict`s using `dechunkify()`

In [16]:
import itertools
import concurrent.futures
from collections import defaultdict

fusions = {'F35695': {'221301': ['675442', '771550', '629707'], '221299': ['675442', '771550', '629718'], '221303': ['675442', '726842', '771553', '629719', '629707']}, 'F239376': {'1062597': ['1062602', '1062603', '1062604', '173763'], '1062600': ['1062602', '1062603', '1062604', '1065120', '1065121', '1146085', '173763'], '1062598': ['1062602', '1062603', '1062604', '1065120', '1065121', '1146085', '173763']}, 'F204804': {'1099895': ['944468', '372572', '656463', '656464'], '944496': ['944460', '944468', '372572', '852930', '898333', '944462', '944466', '944474', '944483', '656463', '656464']}}
accessions_species = {'221301': 'CAPTE30046', '675442': 'LOXAF12298', '771550': 'MYOLU22413', '629707': 'ORYLA11790', '221299': 'CAEEL02237', '629718': 'BRAFL08488', '221303': 'MODNO30045', '726842': 'AMPQE22412', '771553': 'HUMAN22411', '629719': 'AMPQE44521', '1062597': 'AEDAE96834', '1062602': 'NEMVE18476', '1062603': 'ANOGA11254', '1062604': 'ETCHE45469', '173763': 'ANOCA74874', '1062600': 'MNELE13164', '1065120': 'AMPQE45287', '1065121': 'HUMAN36598', '1146085': 'CHICK11478', '1062598': 'MOUSE95987', '1099895': 'NOMLE33658', '944468': 'HUMAN32548', '372572': 'MONDO25489', '656463': 'MOUSE55694', '656464': 'MACEU59968', '944496': 'NOMLE64471', '944460': 'MOUSE19482', '852930': 'CHICK36369', '898333': 'NOMLE37198', '944462': 'LATCH91348', '944466': 'PROCA36198', '944474': 'TAKRU18746', '944483': 'TAEGU28287'}

In [19]:
def pmap(function, iterable, workers=4):
    '''Apply a function to an iterable using parallel worker processes'''
    with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as x:
        return [r for r in x.map(function, iterable)]

def chunkify(d, n=4):
    '''Create a generator of n evenly sized chunks of dictionary d'''
    chunksize = int(len(d)/n)
    it = iter(d)
    for i in range(0, len(d), chunksize):
        yield {k:d[k] for k in itertools.islice(it, chunksize)}

def dechunkify(chunks):
    '''Merge a chunked dictionary (list of dicts)'''
    dechunked = {}
    for chunk in chunks:
        for k, v in chunk.items():
            dechunked[k] = v
    return dechunked

def process_chunk(chunk):
    '''Data-specific function returning modified dict of same structure'''
    translated_chunk = defaultdict(dict)
    for family, fusions in chunk.items():
        for fusion, components in fusions.items():
            fusion_parent = accessions_species[fusion]
            component_parents = [accessions_species[c] for c in components]
            translated_chunk[family][fusion_parent] = component_parents
    return dict(translated_chunk)

In [18]:
chunks = chunkify(fusions, 3)
scattered = pmap(process_chunk, chunks, workers=4)
gathered = dechunkify(scattered)