In [1]:
gcs_source_zarr_path_template = 'vo_agam_production/v3.x/curation/{sample_set}/snp_genotypes/{sample}.zarr.zip'

In [2]:
import sys
import psutil
import os
from humanize import naturalsize


def mem():
    vm = psutil.virtual_memory()
    process = psutil.Process(os.getpid())
    pm = process.memory_info()
    msg = f"{naturalsize(pm.rss)} ({pm.rss * 100 / vm.total:.1f}%) RES"
    return msg

    
def debug(*msg):
    msg += (mem(),)
    print(*msg, file=sys.stdout)
    sys.stdout.flush()


In [3]:
debug('test')

test 55.8 MB (0.4%) RES


In [4]:
import gcsfs
import zarr
import dask.array as da
from fsspec.implementations.zip import ZipFileSystem

In [5]:
sample_set = '1244-VO-GH-YAWSON-VMF00051'

In [6]:
block_size = 2**18
block_size

262144

In [7]:
# Passing session credentials via token prevents KilledWorker errors due to authentication failure.
gcs_session = gcsfs.GCSFileSystem(project='malariagen-jupyterhub', 
                                  token='cache', 
                                  cache_timeout=0)
gcs = gcsfs.GCSFileSystem(project='malariagen-jupyterhub', 
                          token=gcs_session.session.credentials, 
                          cache_timeout=0,
                          block_size=block_size)

In [8]:
gcs.ls('vo_agam_production')[:3]

['vo_agam_production/accessibility_maps',
 'vo_agam_production/accessibility_summary_stats',
 'vo_agam_production/ag3_data_paper']

In [9]:
from collections.abc import Mapping


class SafeStore(Mapping):
    
    ## This helps to ensure that no missing data are silently filled in.

    def __init__(self, store):
        self.store = store

    def __getitem__(self, key):
        print(key)
        try:
            return self.store[key]
        except KeyError as e:
            # always raise a runtime error to ensure zarr propagates the exception
            raise RuntimeError(e)

    def __contains__(self, key):
        return key in self.store

    def __iter__(self):
        return iter(self.store)

    def __len__(self):
        return len(self.store)


In [10]:
sample_set

'1244-VO-GH-YAWSON-VMF00051'

In [11]:
sample = 'VBS19051-5563STDY7800136'
input_pattern = gcs_source_zarr_path_template

In [12]:
gcs_path = input_pattern.format(sample_set=sample_set, sample=sample)
gcs_path

'vo_agam_production/v3.x/curation/1244-VO-GH-YAWSON-VMF00051/snp_genotypes/VBS19051-5563STDY7800136.zarr.zip'

In [13]:
%%time
debug()
zip_file = gcs.open(gcs_path)
debug()

95.4 MB (0.6%) RES
95.4 MB (0.6%) RES
CPU times: user 6.84 ms, sys: 2.29 ms, total: 9.13 ms
Wall time: 26 ms


In [14]:
%%time
debug()
zip_fs = ZipFileSystem(zip_file)
debug()

95.4 MB (0.6%) RES
98.4 MB (0.6%) RES
CPU times: user 41 ms, sys: 7.9 ms, total: 48.9 ms
Wall time: 204 ms


In [15]:
%%time
debug()
store = zip_fs.get_mapper("/")
debug()

98.4 MB (0.6%) RES
98.4 MB (0.6%) RES
CPU times: user 4.07 ms, sys: 1.37 ms, total: 5.44 ms
Wall time: 3.69 ms


In [16]:
%%time
debug()
zarr_store = SafeStore(store)
debug()

98.4 MB (0.6%) RES
98.5 MB (0.6%) RES
CPU times: user 3.66 ms, sys: 878 µs, total: 4.53 ms
Wall time: 3.23 ms


In [17]:
%%time
debug()
callset = zarr.open(store=zarr_store, mode='r')
debug()

98.5 MB (0.6%) RES
.zgroup
103.4 MB (0.7%) RES
CPU times: user 119 ms, sys: 9.92 ms, total: 129 ms
Wall time: 245 ms


In [18]:
%%time
debug()
z = callset[f'{sample}/X/calldata/GT']
debug()

103.4 MB (0.7%) RES
VBS19051-5563STDY7800136/X/calldata/GT/.zarray
103.4 MB (0.7%) RES
CPU times: user 34.1 ms, sys: 4.26 ms, total: 38.4 ms
Wall time: 68.7 ms


In [19]:
%%time
debug()
a = da.from_array(z, chunks=z.chunks)
debug()

103.4 MB (0.7%) RES
103.8 MB (0.7%) RES
CPU times: user 3.86 ms, sys: 3.3 ms, total: 7.15 ms
Wall time: 5.54 ms
