In [1]:
# parallel I/O 
##############

import numpy as np
import h5py
import glob
import os
import sys
import timeit
import concurrent.futures

data_files = glob.glob("/data/shared/LCDLargeWindow/fixedangle/ChPiEscan/temp/*.h5")[:10]
data_files_comp = glob.glob("/data/shared/LCDLargeWindow/fixedangle/ChPiEscan/ChPiEscan*.h5")[:10]
features = ['ECAL', 'HCAL', 'pdgID', 'ECAL_E', 'HCAL_E', 'HCAL_ECAL_ERatio', 'energy']
data_dict = {}

def timer(test_code, setup, iter_num):
    total_access_time = timeit.Timer(stmt=test_code, 
                                  setup=setup).timeit(number=iter_num)
    return (total_access_time/iter_num)

def access(file):
    for feat in features: 
        with h5py.File(file, 'r') as f:
            data_dict[f.filename + feat] = np.array(f[feat])

  from ._conv import register_converters as _register_converters


In [2]:
def pIO():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for _ in executor.map(access, data_files):
            print("P - Job Done")
            
def IO():
    for file in data_files_comp:
        access(file)
        print("R - Job Done")

In [3]:
setup = "from __main__ import pIO"
test_code = "pIO()"

tpio = timer(test_code, setup, 1)

data_dict = {}

setup = "from __main__ import IO"
test_code = "IO()"

tio = timer(test_code, setup, 1)

print("Parallel %.3f s"%tpio)
print("Regular %.3f s"%tio)

P - Job Done
P - Job Done
P - Job Done
P - Job Done
P - Job Done
P - Job Done
P - Job Done
P - Job Done
P - Job Done
P - Job Done
R - Job Done
R - Job Done
R - Job Done
R - Job Done
R - Job Done
R - Job Done
R - Job Done
R - Job Done
R - Job Done
R - Job Done
Parallel 54.930 s
Regular 408.937 s


In [6]:
print("Parallel I/O + recompression is ~ %.0f times faster than current I/O speed."%(tio / tpio))

Parallel I/O + recompression is ~ 7 times faster than current I/O speed.
