In [1]:
from glob import iglob
from datetime import datetime
from itertools import chain, repeat

import numpy as np
import pandas as pd
from cytoolz import compose, reduce
from h5py import File
from dask import delayed, compute
from dask.bag import from_sequence
from dask.diagnostics import ProgressBar

In [15]:
def read_afile(filename):
    try:
        with File(filename, 'r') as f:
            yield from ({'filename': filename,
                         'tag': tag,
                         'iom_intensity_pc': iom,
                         'img': img.astype('double'),
                         'img_intensity': img.sum(),
                         'delay': round(dt, 2)}
                        for tag, iom, img, dt
                        in zip(f['/bunches'],
                               f['/photon_diagnostics/FEL01'
                                 '/I0_monitor/iom_sh_a_pc'],
                               f['/vmi/andor'],
                               f['/user_laser/delay_line/position']))
    except Exception as err:
        print(err)
        yield from ()

In [16]:
basename = "/data/Test/Run_010"
filenames = sorted(iglob(f"{basename}/rawdata/*.h5"))
df = (
    from_sequence(filenames)
    .map(read_afile)
    .flatten()
    .to_dataframe()
)
df

Unnamed: 0_level_0,delay,filename,img,img_intensity,iom_intensity_pc,tag
npartitions=20,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,float64,object,object,int64,float64,int64
,...,...,...,...,...,...
...,...,...,...,...,...,...
,...,...,...,...,...,...
,...,...,...,...,...,...


In [26]:
bg_period, bg_mod = 3, 0  # /Background_Period
df['is_bg'] = df['tag'] % bg_period == bg_mod
delays = df['delay'].unique().compute()

with ProgressBar():
    summed = compute(
        *[
            {
                'is_bg': bg, 'delay': dt,
                'count': delayed(df[(df['is_bg'] == bg) & (df['delay'] == dt)]['img'].to_bag().count()),
                'summed': delayed(df[(df['is_bg'] == bg) & (df['delay'] == dt)]['img'].to_bag().sum()),
            }
            for bg in [True, False] for dt in delays
        ],
    )

[########################################] | 100% Completed | 28.1s


In [50]:
groupped = pd.DataFrame(list(summed)).set_index(['delay', 'is_bg'])

for k in set(groupped.index.get_level_values('delay')):
    with File(f"{basename}/work/reduced_dt={k}.h5", "w") as f:
        f['bg_n'] = groupped.loc[(k, True), 'count']
        f['bg_img'] = groupped.loc[(k, True), 'summed'] / groupped.loc[(k, True), 'count']
        f['sg_n'] = groupped.loc[(k, False), 'count']
        f['sg_img'] = groupped.loc[(k, False), 'summed'] / groupped.loc[(k, False), 'count']
        f['df_n'] = groupped.loc[(k, False), 'count']
        f['df_img'] = (
            groupped.loc[(k, False), 'summed'] / groupped.loc[(k, False), 'count']
            - groupped.loc[(k, True), 'summed'] / groupped.loc[(k, True), 'count']
        )