In [1]:
True

True

In [1]:
import karabo_data as kd
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
import multiprocessing
from tqdm.auto import tqdm
from time import strftime
import pandas as pd
from matplotlib.colors import LogNorm, BoundaryNorm
from importlib import reload
import os
import time
import warnings
from glob import glob
from karabo_data.read_machinery import find_proposal

kd.__version__

'0.6.2'

In [2]:
%matplotlib widget

In [3]:
# make sure subfolders exist
for f in ['tmp', 'images', 'processed_runs']:
    if not os.path.isdir(f):
        os.mkdir(f)

# worker function for multiprocessing
is saved to file "multiprocess_defs_v3.py"

In [122]:
%%writefile multiprocess_defs_v4.py


# -*- coding: utf-8 -*-
'''
@author: Michael Schneider, with input from XFEL-CAS and SCS beamline staff

Created on October 2, 2019
October 10, 2019: added pulse- and train-based masking

'''

import numpy as np
import xarray as xr
import karabo_data as kd
from karabo_data.read_machinery import find_proposal
import pandas as pd
import os
from glob import glob
from time import strftime, sleep
from tqdm import tqdm


def find_run_dir(proposal, run):
    '''returns the raw data folder for given (integer) proposal and run number'''
    proposal_dir = find_proposal(f'p{proposal:06d}')
    return os.path.join(proposal_dir, f'raw/r{run:04d}')


def load_run_selective(proposal, run_nr, include=None, exclude=None, maxfiles=None):
    '''
    Finds the rundirectory for the given (integer) proposal and run numbers and returns
    a karabo_data.DataCollection instance. Files can be selected or excluded based on
    filenames. Setting any of the optional paramters to None disables that particular
    selection.
    
    Parameters
    ==========
    proposal :: int
        proposal number
    run_nr :: int
        run number
    include :: str
        Load only files that have this string in the filename
    exclude :: str
        Skip files with this string in the filename
    maxfiles :: int
        Load at most this many files
    '''
    rundir = find_run_dir(proposal, run_nr)
    flist = glob(os.path.join(rundir, '*h5'))

    if include is not None:
        flist = [f for f in flist if include in f]

    if exclude is not None:
        flist = [f for f in flist if exclude not in f]
    
    return kd.DataCollection.from_paths(flist[:maxfiles])


def load_scan_variable(run, scan_variable, stepsize=None):
    '''
    Loads the given scan variable and rounds scan positions to integer multiples of "stepsize"
    for consistent grouping. Creates a dummy scan if scan_variable is set to None.
    Parameters:
        run : (karabo_data.DataCollection) RunDirectory instance
        scan_variable : (tuple of str) ("source name", "value path"), examples:
                        ('SCS_ILH_LAS/PHASESHIFTER/DOOCS', 'actualPosition.value')
                        ('SCS_ILH_LAS/DOOCS/PPL_OPT_DELAY', 'actualPosition.value')
                        ('SA3_XTD10_MONO/MDL/PHOTON_ENERGY', 'actualEnergy.value')
                        None creates a dummy file to average over all trains of the run
        stepsize : (float) nominal stepsize of the scan - values of scan_variable will be
                   rounded to integer multiples of this value
    '''
    if scan_variable is not None:
        source, path = scan_variable
        scan = run.get_array(source, path)
        if stepsize is not None:
            scan = stepsize * np.round(scan / stepsize)
    else:
        # dummy scan variable - this will average over all trains
        scan = xr.DataArray(np.ones(len(run.train_ids), dtype=np.int16),
                            dims=['trainId'], coords={'trainId': run.train_ids})
    scan.name = 'scan_variable'
    return scan


def load_xgm(run, print_info=False):
    '''Returns the XGM data from loaded karabo_data.DataCollection'''
    nbunches = run.get_array('SCS_RR_UTC/MDL/BUNCH_DECODER', 'sase3.nPulses.value')
    nbunches = np.unique(nbunches)
    if len(nbunches) == 1:
        nbunches = nbunches[0]
    else:
        warnings.warn('not all trains have same length DSSC data')
        print('nbunches: ', nbunches)
        nbunches = max(nbunches)
    if print_info:
        print('SASE3 bunches per train:', nbunches)
    
    xgm = run.get_array('SCS_BLU_XGM/XGM/DOOCS:output', 'data.intensitySa3TD',
                        roi=kd.by_index[:nbunches], extra_dims=['pulse'])
    return xgm


def load_TIM(run, apd='MCP2apd'):
    '''
    Load TIM traces and match them to SASE3 pulses. "run" is a karabo_data.RunDirectory instance.
    '''
    import ToolBox as tb
    
    fields = ["sase1", "sase3", "npulses_sase3", "npulses_sase1", apd, "SCS_SA3", "nrj"]
    timdata = xr.Dataset()
    for f in fields:
        m = tb.mnemonics[f]
        timdata[f] = run.get_array(m['source'], m['key'], extra_dims=m['dim'])

    timdata.attrs['run'] = run
    timdata = tb.matchXgmTimPulseId(timdata)
    return timdata.rename({'sa3_pId': 'pulse'})[apd]


def prepare_module_empty(scan_variable, framepattern):
    '''Create empty (zero-valued) DataArray for a single DSSC module to iteratively add data to'''
    len_scan = len(np.unique(scan_variable))
    dims = ['scan_variable', 'x', 'y']
    coords = {'scan_variable': np.unique(scan_variable)}
    shape = [len_scan, 128, 512]
        
    empty = xr.DataArray(np.zeros(shape, dtype=float), dims=dims, coords=coords)
    module_data = xr.Dataset()
    for name in framepattern:
        module_data[name] = empty.copy()
    
    module_data['sum_count'] = xr.DataArray(np.zeros(len_scan, dtype=int), dims=['scan_variable'])
    return module_data


def load_dssc_info(proposal, run_nr):
    '''Loads the first data file for DSSC module 0 (this is hardcoded) and
    returns the detector_info dictionary'''
    dssc_module = load_run_selective(proposal, run_nr, include='DSSC00', maxfiles=1)
    dssc_info = dssc_module.detector_info('SCS_DET_DSSC1M-1/DET/0CH0:xtdf')
    return dssc_info


def load_chunk_data(sel, sourcename):
    '''Load DSSC data (sel is a DataCollection or a subset of a DataCollection
    obtained by its select_trains() method). The flattened multi-index (trains+pulses)
    is unraveled before returning the data.
    '''
    fpt = sel.detector_info(sourcename)['frames_per_train']
    data = sel.get_array(sourcename, 'image.data', extra_dims=['_empty_', 'x', 'y']).squeeze()
    
    tids = np.unique(data.trainId)
    data = data.rename(dict(trainId='trainId_pulse'))
    
    midx = pd.MultiIndex.from_product([sorted(tids), range(fpt)], names=('trainId', 'pulse'))
    data = xr.DataArray(data, dict(trainId_pulse=midx)).unstack('trainId_pulse')
    data = data.transpose('trainId', 'pulse', 'x', 'y')
    return data


def merge_chunk_data(module_data, chunk_data, framepattern):
    '''Merge chunk data with prepared dataset for entire module.
    Aligns on "scan_variable" and sums values for variables
    ['pumped', 'unpumped', 'sum_count']'''
    where = dict(scan_variable=chunk_data.scan_variable)
    for var in framepattern + ['sum_count']:
        # module_data[var].loc[where] = module_data[var].loc[where] + chunk_data[var]
        # previous line doesn't convert to larger dtypes when necessary
        # the next line concatenates the data along a new dimension ('tmp') and uses
        # the sum() method, which supports automatic conversion
        summed = xr.concat([module_data[var].loc[where], chunk_data[var]], dim='tmp').sum('tmp')
        module_data[var].loc[where] = summed
    return module_data


def split_frames(data, pattern, prefix=''):
    '''Split frames according to "pattern" and average over resulting splits.
    "pattern" is a list of frame names (order matters!). Examples:
        pattern = ['pumped', 'pumped_dark', 'unpumped', 'unpumped_dark']  # 4 DSSC frames, 2 FEL pulses
        pattern = ['pumped', 'unpumped']  # 2 FEL frames, no intermediate darks
        pattern = ['image']  # no splitting, average over all frames
    Returns a dataset with data variables named prefix + framename
    '''
    n = len(pattern)
    dataset = xr.Dataset()
    for i, name in enumerate(pattern):
        dataset[prefix + name] = data.loc[{'pulse': np.s_[i::n]}].mean('pulse')
    return dataset


def calc_xgm_frame_indices(nbunches, framepattern):
    '''
    Returns a coordinate array for XGM data. The coordinates correspond to DSSC
    frame numbers and depend on the number of FEL pulses per train ("nbunches")
    and the framepattern. In framepattern, dark DSSC frame names (i.e., without
    FEL pulse) _must_ include "dark" as a substring.
    '''
    n_frames = len(framepattern)
    n_data_frames = np.sum(['dark' not in p for p in framepattern])
    frame_max = nbunches * n_frames // n_data_frames

    frame_indices = []
    for i, p in enumerate(framepattern):
        if 'dark' not in p:
            frame_indices.append(np.arange(i, frame_max, n_frames))

    return np.sort(np.concatenate(frame_indices))


def process_dssc_module(job):
    '''Aggregate DSSC data (chunked, to fit into memory) for a single module.
    Groups by "scan_variable" in given scanfile - use dummy scan_variable to average
    over all trains. This implies, that only trains found in the scanfile are considered.
    Designed for the multiprocessing module - expects a job dictionary with the following keys:
      proposal : (int) proposal number
      run : (int) run number
      module : (int) DSSC module to process
      chunksize : (int) number of trains to process simultaneously
      scanfile : (str) name of hdf5 file with xarray.DataArray containing the scan variable and trainIds
      framepattern : (list of str) names for the (possibly repeating) intra-train pulses. See split_dssc_data
      pulsemask : (str) name of hdf5 file with boolean xarray.DataArray to select/reject trains and pulses
    '''
    proposal = job['proposal']
    run_nr = job['run_nr']
    module = job['module']
    chunksize = job['chunksize']
    scanfile = job['scanfile']
    framepattern = job['framepattern']
    maskfile = job.get('maskfile', None)
    
    sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf'
    
    collection = load_run_selective(proposal, run_nr, include=f'DSSC{module:02d}')
        
    ntrains = len(collection.train_ids)
    
    # read preprocessed scan variable from file - selection and (possibly) rounding already done.
    scan = xr.open_dataarray(scanfile, 'data', autoclose=True)

    # read binary pulse/train mask - e.g. from XGM thresholding
    if maskfile is not None:
        pulsemask = xr.open_dataarray(maskfile, 'data', autoclose=True)
    else:
        pulsemask = None
    
    module_data = prepare_module_empty(scan, framepattern)
    chunks = np.arange(ntrains, step=chunksize)
    if module == 15:
        pbar = tqdm(total=len(chunks) + 1)
    for start_index in chunks:
        sel = collection.select_trains(kd.by_index[start_index:start_index + chunksize])
        data = load_chunk_data(sel, sourcename)
        if pulsemask is not None:
            data = data.where(pulsemask)

        data = split_frames(data, framepattern)
        data['sum_count'] = xr.full_like(data.trainId, fill_value=1)
        data['scan_variable'] = scan  # aligns on trainId, drops non-matching trains 
        data = data.groupby('scan_variable').sum('trainId')
        module_data = merge_chunk_data(module_data, data, framepattern)
        if module == 15:
            pbar.update(1)
    
    for name in framepattern:
        module_data[name] = module_data[name] / module_data.sum_count
    if module == 15:
            pbar.update(1)
    return module_data
        

Overwriting multiprocess_defs_v4.py


In [123]:
import multiprocess_defs_v4 as process

process = reload(process)

# setup processing and index non-DSSC data

In [77]:
%%time

# basic run information
proposal = 2280
run_nr = 132
is_dark = False

# DSSC frame names - make sure, "dark" is included in names for frames without FEL
framepattern = ['unpumped', 'unpumped_dark', 'pumped', 'pumped_dark']

# scan settings (set scan_variable to None for static data)
stepsize = None
# scan_variable = ('SCS_ILH_LAS/PHASESHIFTER/DOOCS', 'actualPosition.value')
# scan_variable = ('SA3_XTD10_MONO/MDL/PHOTON_ENERGY', 'actualEnergy.value')
scan_variable = ('SCS_ILH_LAS/DOOCS/PPL_OPT_DELAY', 'actualPosition.value')

scan_variable = None if is_dark else scan_variable

# index non-DSSC data
run = process.load_run_selective(proposal, run_nr, exclude='DSSC')

CPU times: user 459 ms, sys: 165 ms, total: 624 ms
Wall time: 627 ms


# prepare scan variable and write to file for subprocesses

In [109]:
scanfile = './tmp/scan.h5'
maskfile = './tmp/mask.h5'

for fname in [scanfile, maskfile]:
    if os.path.isfile(fname):
        os.remove(fname)

dssc_info = process.load_dssc_info(proposal, run_nr)
fpt = dssc_info['frames_per_train']
print('DSSC frames per train:', fpt)

scan = process.load_scan_variable(run, scan_variable, stepsize)

DSSC frames per train: 4


## optional: discard groups with low number of trains

In [112]:
n_min = 10

grouped = scan.groupby(scan)

for val, grp in grouped:
    if len(grp) < 10:
        scan.loc[{'trainId': grp.trainId.values}] = np.nan

scan = scan.dropna('trainId')

# save to hdf5

In [114]:
scan.to_netcdf(scanfile, group='data', mode='w')

## mask individual pulses based on XGM thresholding
It is possible to pass a 2d binary mask (DataArray dimensions 'trainId' and 'pulse) to (de)select trains and/or pulses.
Broadcasting is supported, so passing just a 1d DataArray with one of the two dimensions is also possible.
Care has to be taken to match the frame numbers of the DSSC data - especially when intermediate dark frames are recorded, the number of XGM pulse numbers do not necessarily match the DSSC frame numbers.
This functionality may also be used to limit the number of DSSC frames processed, e.g., when more frames than FEL pulses were recorded (take care that dark runs (i.e., possibly without any XGM data) still get a correct mask in that case!).

The following example selects based on XGM threshold, but you can of course build your own selection mask based on any other information as well. Just save the final xarray.DataArray to "maskfile" as shown.

In [115]:
# default mask - all pulses and trains included
pulsemask = xr.DataArray(np.ones([len(run.train_ids), fpt], dtype=bool),
                         dims=['trainId', 'pulse'],
                         coords={'trainId': run.train_ids, 'pulse': range(fpt)})

In [116]:
xgm_threshold = 0
xgm_max = np.inf  #1.2e4
if not is_dark:
    xgm = process.load_xgm(run, print_info=True)
    xgm_frame_coords = process.calc_xgm_frame_indices(xgm.shape[1], framepattern)
    xgm['pulse'] = xgm_frame_coords
    n_frames_dark = len([p for p in framepattern if 'dark' in p])
#     n_dssc_frames = xgm.shape[1] * len(framepattern) // n_frames_dark
    valid = (xgm > xgm_threshold) * (xgm < xgm_max)
    pulsemask = valid.combine_first(pulsemask).astype(bool)
    pulsemask = (pulsemask.where(pulsemask)).dropna('trainId', how='any').astype(bool)  # rejects entire train if any pulse < threshold
    nrejected = int(valid.size - valid.sum())
    percent_rejected = 100 * nrejected / valid.size
    print(f'rejecting {nrejected} out of {valid.size} pulses ({percent_rejected:.1f}%) due to xgm threshold')
    
pulsemask.to_netcdf(maskfile, group='data', mode='w')

SASE3 bunches per train: 2
rejecting 0 out of 36018 pulses (0.0%) due to xgm threshold


## plot XGM and threshold

In [117]:
fig, [ax1, ax2] = plt.subplots(nrows=2, sharex=True, figsize=[6, 3])

# ax1.plot(scan.xgm.mean('dim_0'), label='pumped')
ax1.plot(xgm.trainId, xgm, 'o', c='C0', ms=1)
if xgm_threshold is not None:
    ax1.axhline(xgm_threshold, c='r', lw=1)
    ax1.axhline(xgm_max, c='r', lw=1)
ax1.set_ylabel('xgm')

ax2.plot(scan.trainId, scan)
ax2.set_ylabel('scan variable')
ax2.set_xlabel('trainId')

ax1.set_title(f'run: {run_nr}')

tstamp = strftime('%y%m%d_%H%M')
# fig.savefig(f'images/run{run_nr}_{tstamp}_scan.png', dpi=200)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

## plot number of trains per step

In [118]:
counts = xr.DataArray(np.ones(len(scan)),
                      dims=['scan_variable'],
                      coords={'scan_variable': scan.values},
                      name='counts')

counts = counts.groupby('scan_variable').sum()

fig, ax = plt.subplots(figsize=[5, 2.2])
ax.plot(counts.scan_variable, counts, 'o', ms=2)
ax.set_xlabel('scan variable')
ax.set_ylabel('number of trains')
ax.set_title(f'run {run_nr}')
ax.grid(True)
# plt.tight_layout()

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

# create joblist for multiprocessing
This is a conservative estimate for the maximum number of trains to process simultaneously without using more than "max_GB" gigabytes of memory.

In [119]:
max_GB = 500

# max_GB / (8byte * 16modules * 128px * 512px * N_pulses)
chunksize = int(max_GB * 128 // fpt)
chunksize = min(512, chunksize)  # more than 512 trains doesn't seem to give any performance benefit
print('processing', chunksize, 'trains per chunk')

processing 512 trains per chunk


In [120]:
jobs = []
for m in range(16):
    jobs.append(dict(
        proposal=proposal,
        run_nr=run_nr,
        module=m,
        chunksize=chunksize,
        scanfile=scanfile,
        framepattern=framepattern,
        maskfile=maskfile,
    ))

# create multiprocessing pool and execute

In [121]:
%%time

timestamp = time.strftime('%X')
print(f'start time: {timestamp}')

with multiprocessing.Pool(16) as pool:
    module_data = pool.map(process.process_dssc_module, jobs)
    
print('finished:', time.strftime('%X'))

module_data = xr.concat(module_data, dim='module')
module_data = module_data.dropna('scan_variable')
module_data['run'] = run_nr

start time: 13:26:49


100%|██████████| 37/37 [04:45<00:00,  4.36s/it]

finished: 13:32:15
CPU times: user 10.3 s, sys: 21.3 s, total: 31.6 s
Wall time: 5min 40s


# merge processed data with scan variable and normalization data
If trains/ pulses were filtered, this is a good place to add additional data that has to be filtered in the same way (e.g., TIM). Just use the same binary mask ("pulsemask") before grouping and adding it to the processed DSSC data.

## optional: add TIM data

In [124]:
tim = process.load_TIM(run)
tim['pulse'] = xgm_frame_coords

In [125]:
fig, ax = plt.subplots()
ax.plot(xgm[:, 0], tim[:, 0], 'o', ms=2, label='pumped')
ax.plot(xgm[:, 1], tim[:, 1], 'o', ms=2, label='unpumped')

ax.legend()
ax.set_title(f'run {run_nr}')
ax.set_xlabel('XGM')
ax.set_ylabel('TIM (MCP2apd)')

tstamp = strftime('%y%m%d_%H%M')
# fig.savefig(f'images/run{run_nr}_{tstamp}_TIM.png', dpi=200)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [126]:
if not is_dark:
    pulses_no_dark = [p for p in framepattern if 'dark' not in p]
    
    xgm = xgm.where(valid).dropna('trainId', how='any')  # IMPORTANT: use the same mask for normalization data!
    xgm = process.split_frames(xgm, pulses_no_dark, prefix='xgm_')
    xgm['scan_variable'] = scan
    xgm = xgm.groupby('scan_variable').mean('trainId')
    module_data = xr.merge([module_data, xgm])
    
    tim = tim.where(valid).dropna('trainId', how='any')
    tim = process.split_frames(tim, pulses_no_dark, prefix='tim_')
    tim['scan_variable'] = scan
    tim = tim.groupby('scan_variable').mean('trainId')
    module_data = xr.merge([module_data, tim])
    
module_data = module_data.transpose('scan_variable', 'module', 'x', 'y')

# save to hdf5

In [127]:
overwrite = True

save_folder = './processed_runs/'

if is_dark:
    fname = f'run{run_nr}.h5'  # no scan
else:
    fname = f'run{run_nr}_by-delay.h5'  # run with delay scan (change for other scan types!)


save_path = os.path.join(save_folder, fname)
file_exists = os.path.isfile(save_path)

if (not file_exists) or (file_exists and overwrite):
    if file_exists:
        os.remove(save_path)
    module_data.to_netcdf(save_path, group='data')
    os.chmod(os.path.join(save_folder, fname), 664)
    print('saving: ', save_path)
else:
    print('file', save_path, 'exists and overwrite is False')

saving:  ./processed_runs/run132_by-delay.h5


# scratch

In [23]:
%load_ext memory_profiler
%load_ext line_profiler

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler


In [None]:
%lprun -T profile.txt -f process.process_dssc_module process.process_dssc_module(job)

In [13]:
from multiprocess_defs_v4 import *
job = jobs[0]

In [14]:
proposal = job['proposal']
run_nr = job['run_nr']
module = job['module']
chunksize = job['chunksize']
scanfile = job['scanfile']
framepattern = job['framepattern']
maskfile = job.get('maskfile', None)

sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf'
collection = load_run_selective(proposal, run_nr, include=f'DSSC{module:02d}')
ntrains = len(collection.train_ids)
    
# read preprocessed scan variable from file - selection and (possibly) rounding already done.
scan = xr.open_dataarray(scanfile, 'data', autoclose=True, cache=False)
# read binary pulse/train mask - e.g. from XGM thresholding
if maskfile is not None:
    pulsemask = xr.open_dataarray(maskfile, 'data', autoclose=True)
    pulsemask = pulsemask.where(pulsemask)  # boolean to [1, NaN]
else:
    pulsemask = None

module_data = prepare_module_empty(scan, framepattern)
chunks = np.arange(ntrains, step=chunksize)

In [15]:
start_index = 0
sel = collection.select_trains(kd.by_index[start_index:start_index + chunksize])
data = load_chunk_data(sel, sourcename)

In [37]:
if pulsemask is not None:
    data = (data * pulsemask)

In [27]:
data = split_frames(data, framepattern)
data['sum_count'] = xr.full_like(data.trainId, fill_value=1)

In [114]:
data['scan_variable'] = scan  # aligns on trainId -> NaN for non-matching trains 
data = data.groupby('scan_variable').sum('trainId')
module_data = merge_chunk_data(module_data, data, framepattern)
    

# data = process.split_frames(data, framepattern)