In [1]:
import cooler
import numpy as np
import h5py
import dask
import dask.array as da
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from dask.diagnostics import ProgressBar

In [9]:
###### Idea 1: Read in pixel table as hdf5 and convert to dask dataframe
cool = 's01_0hr_cl97_MicroC_Granta519.mcool'
res = 5000
f = h5py.File(cool, 'r')
bin1 = da.from_array(f['/resolutions/{}/pixels/bin1_id'.format(res)], chunks=(100000,))
bin2 = da.from_array(f['/resolutions/{}/pixels/bin2_id'.format(res)], chunks=(100000,))
count = da.from_array(f['/resolutions/{}/pixels/count'.format(res)], chunks=(100000,))
df = dd.concat([dd.from_dask_array(d) for d in [bin1, bin2, count]], axis=1)
cols = ['bin1','bin2','counts']
df_bins = df.rename(columns=dict(zip(df.columns, cols)))
df_bins

Unnamed: 0_level_0,bin1,bin2,counts
npartitions=1844,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,int64,int64,int32
100000,...,...,...
...,...,...,...
184300000,...,...,...
184314150,...,...,...


In [3]:
## 1.1: Group by bin ID and sum the counts to get row sums
# If a df.loc[bin_id] = ERROR, sum is 0
# need to compute data size (axis size) here
with ProgressBar():
    df = df_bins.groupby('bin1').counts.sum().reindex(fill_value=0)

[########################################] | 100% Completed | 39.71 s


In [4]:
df_bins

Unnamed: 0_level_0,bin1,bin2,counts
npartitions=1844,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,int64,int64,int32
100000,...,...,...
...,...,...,...
184300000,...,...,...
184314150,...,...,...


In [5]:
df

bin1
2          34
10         13
11         18
12         11
13          6
         ... 
619077    160
619078    880
619079    245
619080    735
619081    363
Name: counts, Length: 561645, dtype: int32

In [6]:
def get_VC_norm_factor(x):
    i = x['bin1']
    j = x['bin2']
    c = x['counts']
    try:
        rf = df.iloc[i]
    except:
        rf = 0
    try:
        cf = df.iloc[j]
    except:
        cf = 0
    return c / (cf*rf)

In [7]:
import warnings
warnings.filterwarnings("ignore")

#dask.config.set(scheduler='processes') # Causes error 'h5py objects cant be picked'
vc_facts = df_bins.apply(get_VC_norm_factor, axis=1)
with ProgressBar():
    vc_facts.compute(num_workers=100)

[                                        ] | 1% Completed | 52.90 sms



KeyboardInterrupt

