In [1]:
import os
import anndata
import cooler
import pandas as pd
from joblib import Parallel, delayed
from multiprocessing.pool import ThreadPool
from typing import List, Union

In [2]:
max_workers = 48

def get_duo(thelist: List[Union[pd.DataFrame, pd.Series]]) -> Union[pd.Series, List[pd.Series]]:
    """Yields a pair of elements from a given list, or one element if only one element is present."""
    if len(thelist) == 0:
        raise ValueError("No element to yield!")
    if len(thelist) < 2:
        return thelist[0]
    length = len(thelist)
    for j in range(0, length, 2):
        if j >= len(thelist):
            return
        if j == len(thelist) - 1:
            yield thelist[-1]
        else:
            duo = thelist[j : j + 2]
            yield duo


def concat(duo: Union[pd.DataFrame, Union[pd.DataFrame, pd.Series]]) -> Union[pd.Series, pd.DataFrame]:
    if isinstance(duo, list):
        return pd.concat([duo[0], duo[1]], axis=1)
    else:
        return duo


def fast_concat(features: List) -> pd.DataFrame:
    while len(features) != 1:
        with ThreadPool(processes=max_workers) as pool:
            features = list(pool.imap_unordered(concat, get_duo(features)))
    features = features[0]
    return features

In [None]:
folder_path = '/home/micl/workspace/lmh_data/Lee2019/Human_single_cell_10kb_cool'

parallel = Parallel(n_jobs=max_workers, backend='loky', verbose=1)

def load_coolers(folder_path):
    def load_cooler(folder_path, file_name):
        c = cooler.Cooler(os.path.join(folder_path, file_name))
        contact = c.pixels(join=True)[:]
        contact = contact[contact['chrom1']==contact['chrom2']]
        binsize, chromsizes = c.binsize, c.chromsizes
        
        concat = contact[['chrom1', 'start1', 'start2', 'count']]
        concat = concat.rename(columns={'chrom1':'chrom', 'start1':'start', 'start2':'end'})
        concat = concat.set_index(['chrom', 'start', 'end'])

        return concat.astype('int').rename(columns={'count':file_name})

    joblist = []
    for root, dirs, files in os.walk(folder_path, topdown=False):
        for file_name in files:
            joblist.append(delayed(load_cooler)(folder_path, file_name))

    infos = parallel(joblist)
    infos = fast_concat(infos)
    infos = infos.fillna(0).sort_index()
#     infos = pd.concat(infos, axis=1).fillna(0).sort_index()
    return infos

infos = load_coolers(folder_path)
infos

[Parallel(n_jobs=48)]: Using backend LokyBackend with 48 concurrent workers.
[Parallel(n_jobs=48)]: Done 104 tasks      | elapsed:    3.9s
[Parallel(n_jobs=48)]: Done 354 tasks      | elapsed:    9.2s
[Parallel(n_jobs=48)]: Done 704 tasks      | elapsed:   17.0s
[Parallel(n_jobs=48)]: Done 1154 tasks      | elapsed:   26.9s
[Parallel(n_jobs=48)]: Done 1704 tasks      | elapsed:   39.2s
[Parallel(n_jobs=48)]: Done 2354 tasks      | elapsed:   53.5s
[Parallel(n_jobs=48)]: Done 3104 tasks      | elapsed:  1.2min
[Parallel(n_jobs=48)]: Done 3954 tasks      | elapsed:  1.5min
[Parallel(n_jobs=48)]: Done 4238 out of 4238 | elapsed:  1.6min finished


In [None]:
obs = pd.DataFrame(infos.T.index, columns=['cells'])
obs.insert(obs.shape[1] - 1, 'domain', 'scHiC')
obs = obs.set_index('cells')
var = infos.reset_index()[['chrom', 'start']].set_index(infos.index.map('{0[0]}_{0[1]}'.format))

infos.index = infos.index.map('{0[0]}_{0[1]}'.format)
infos = anndata.AnnData(X=infos.T, obs=obs, var=var)

In [None]:
# infos.write("/home/micl/workspace/lmh_data/Lee2019/scHiC.h5ad", compression="gzip")