In [1]:
# Conda Environment : miniconda3-mana-3.12
import numpy as np
import bitshuffle.h5
# import scipy.io
import os
import sys
import h5py
from sfdata import SFDataFiles, SFScanInfo, SFProcFile


In [2]:
os.listdir("/sf/maloja/data/p22279/work/scripts")

['FindCenter.ipynb',
 'parameters.py',
 'find_center.py',
 'example.ipynb',
 '.ipynb_checkpoints',
 'bad_pixel.py',
 'CMC.py',
 'JFMask.npy',
 'hit_finder.py',
 '__pycache__',
 'img_process.py']

In [3]:
%pwd

'/das/work/p22/p22279/Hankai'

In [4]:
work_dir = "/sf/maloja/data/p22279/work/scripts"
sys.path.append(work_dir)
import CMC
import bad_pixel
import parameters
import find_center
import hit_finder


import importlib


In [5]:
pgroup = "p22279" # use the correct pgroup number
raw_data_path = os.path.join("/sf/maloja/data", pgroup, "raw")

runs = sorted([os.path.join(raw_data_path, run) for run in os.listdir(raw_data_path) if run != 'run_info'])
for run_name in runs[::-1]:
    print(run_name)

/sf/maloja/data/p22279/raw/run0021-Energy_scan_SASE_15fs_001
/sf/maloja/data/p22279/raw/run0020-NaCl_50um_2lpm_15_fs_FELscan1060_1130_013
/sf/maloja/data/p22279/raw/run0019-NaCl_50um_2lpm_15_fs_FELscan1060_1130_012
/sf/maloja/data/p22279/raw/run0018-NaCl_50um_2lpm_15_fs_FELscan1060_1130_011
/sf/maloja/data/p22279/raw/run0017-NaCl_50um_2lpm_15_fs_FELscan1060_1130_010
/sf/maloja/data/p22279/raw/run0016-NaCl_50um_2lpm_15_fs_FELscan1060_1130_009
/sf/maloja/data/p22279/raw/run0015-NaCl_50um_2lpm_15_fs_FELscan1060_1130_008
/sf/maloja/data/p22279/raw/run0014-NaCl_50um_2lpm_15_fs_FELscan1060_1130_007
/sf/maloja/data/p22279/raw/run0013-NaCl_50um_2lpm_15_fs_FELscan1060_1130_006
/sf/maloja/data/p22279/raw/run0012-NaCl_50um_2lpm_15_fs_FELscan1060_1130_005
/sf/maloja/data/p22279/raw/run0011-NaCl_50um_2lpm_15_fs_FELscan1060_1130_004
/sf/maloja/data/p22279/raw/run0010-NaCl_50um_2lpm_15_fs_FELscan1060_1130_003
/sf/maloja/data/p22279/raw/run0009-NaCl_50um_2lpm_15_fs_FELscan1060_1130_002
/sf/maloja/data

In [6]:
# Select a few runs you want to look at
run_nums = [21]
selected_runs = [run for run in runs for run_num in run_nums if f'run{run_num:04d}' in run]
for selected_run in selected_runs:
    print(selected_run)

/sf/maloja/data/p22279/raw/run0021-Energy_scan_SASE_15fs_001


In [8]:
channels = {
    # 'XPS': 'SATES21-CAMS154-M1.projection_signal',
    # 'tof': 'SATES21-GES1:A2_VALUES',
    'calci': 'SATFE10-PEPG046-EVR0:CALCI',
    # 'laser_off': 'SAT-CVME-TIFALL5:EvtSet',
    # 'time_tool': 'SATES21-CAMS-PATT1.projection_signal',
    'gasmon0': "SATFE10-LSCP1:CH0:1",
    'gasmon1': "SATFE10-LSCP1:CH1:1",
    'gasmon2': "SATFE10-LSCP1:CH2:1",
    'gasmon3': "SATFE10-LSCP1:CH3:1",
    'diode_projection_bkg': "SATES24-CAMS161-M1.projection_background",
    'diode_projection_sig': 'SATES24-CAMS161-M1.projection_signal',
    'diode_full': 'SATES21-GES1:A4_VALUES',
    'JFdata': "JF15T08V01",
}


In [7]:
os.cpu_count()

56

In [1]:
import os
import numpy as np
import h5py
from concurrent.futures import ThreadPoolExecutor, as_completed
import gc

def process_batch(calci_batch, pids_batch, JF_batch, dio_sig_batch, dio_bkg_batch, hit_finder):
    batch_jf_hits = []
    batch_pid_hits = []
    batch_calci_hits = []
    batch_diode_sig_hits = []
    batch_diode_bkg_hits = []
    batch_bkgs = []

    for img_index, (calci, jf_img, dio_sig, dio_bkg) in enumerate(zip(calci_batch, JF_batch, dio_sig_batch, dio_bkg_batch)):
        jf_img_np = np.array(jf_img)
        if hit_finder.is_hit(jf_img_np, bkg=0, threshold=1, percentage=0.015):
            batch_jf_hits.append(jf_img_np)
            batch_pid_hits.append(pids_batch[img_index])
            batch_calci_hits.append(calci)
            batch_diode_sig_hits.append(dio_sig)
            batch_diode_bkg_hits.append(dio_bkg)
        if not hit_finder.is_hit(jf_img_np, bkg=0, threshold=1, percentage=0.003):
            if len(batch_bkgs) < 10:
                batch_bkgs.append(jf_img_np)
    return batch_jf_hits, batch_pid_hits, batch_calci_hits, batch_diode_sig_hits, batch_diode_bkg_hits, batch_bkgs

Reduced_Data_Folder = '/das/work/p22/p22279/Reduced_Data'
Batch_Size = 30

for run_num, selected_run in zip(run_nums, selected_runs):

    dir_name = os.path.join(Reduced_Data_Folder, f'run{run_num:04d}')
    os.makedirs(dir_name, exist_ok=True)
    
    print(selected_run)
    scan = SFScanInfo(os.path.join(selected_run, 'meta/scan.json'))
    scan_steps = scan.readbacks
    print('scan steps: ', scan_steps)
    
    for i, step in enumerate(scan):

        subset = step[channels.values()]
        subset.drop_missing()        
        temp_calci = list(subset[channels['calci']].in_batches(Batch_Size))
        temp_pids = subset[channels['calci']].pids
        temp_diode_projection_signal = list(subset[channels['diode_projection_sig']].in_batches(Batch_Size))
        temp_diode_projection_bkg = list(subset[channels['diode_projection_bkg']].in_batches(Batch_Size))
        temp_JF = list(subset[channels['JFdata']].in_batches(Batch_Size))

        h5file = os.path.join(dir_name, f'Run{run_num:04d}_acq{i+1:04d}.h5')

        # Prepare batch jobs
        jobs = []
        for ((indices, calci_batch), (_, JF_batch), (_, dio_sig_batch), (_, dio_bkg_batch)) in zip(
                temp_calci, temp_JF, temp_diode_projection_signal, temp_diode_projection_bkg):
            pids_batch = temp_pids[indices]
            jobs.append((calci_batch, pids_batch, JF_batch, dio_sig_batch, dio_bkg_batch, hit_finder))

        # Parallel processing over batches
        jf_hits = []
        pid_hits = []
        calci_hits = []
        diode_sig_hits = []
        diode_bkg_hits = []
        bkgs = []
        with ThreadPoolExecutor(max_workers=4) as executor:  # Tune max_workers for your system
            futures = [executor.submit(process_batch, *job) for job in jobs]
            for future in as_completed(futures):
                (batch_jf_hits, batch_pid_hits, batch_calci_hits, 
                 batch_diode_sig_hits, batch_diode_bkg_hits, batch_bkgs) = future.result()
                jf_hits.extend(batch_jf_hits)
                pid_hits.extend(batch_pid_hits)
                calci_hits.extend(batch_calci_hits)
                diode_sig_hits.extend(batch_diode_sig_hits)
                diode_bkg_hits.extend(batch_diode_bkg_hits)
                for b in batch_bkgs:
                    if len(bkgs) < 10:
                        bkgs.append(b)
                del batch_jf_hits, batch_pid_hits, batch_calci_hits, batch_diode_sig_hits, batch_diode_bkg_hits, batch_bkgs
                gc.collect()
        del jobs
        gc.collect()

        # Write hits to HDF5 file (only if at least 1 hit)
        if jf_hits:
            with h5py.File(h5file, 'w') as f:
                f.create_dataset('ReducedJF', data=np.stack(jf_hits), compression='gzip')
                f.create_dataset('Pulse_ID', data=np.array(pid_hits), compression='gzip')
                f.create_dataset('Calci', data=np.array(calci_hits), compression='gzip')       
                f.create_dataset('Backgrounds', data=np.array(bkgs), compression='gzip')
                f.create_dataset('Diode_Projection_Signal', data=np.array(diode_sig_hits), compression='gzip')
                f.create_dataset('Diode_Projection_Background', data=np.array(diode_bkg_hits), compression='gzip')
        del jf_hits, pid_hits, calci_hits, diode_sig_hits, diode_bkg_hits, bkgs
        gc.collect()


NameError: name 'scan' is not defined

In [10]:
import os
import numpy as np
import h5py
from concurrent.futures import ThreadPoolExecutor, as_completed
import gc

def process_image(jf_img, calci, pid, dio_sig, dio_bkg, hit_finder):
    jf_img_np = np.array(jf_img)
    hit = hit_finder.is_hit(jf_img_np, bkg=0, threshold=1, percentage=0.015)
    bkg = not hit_finder.is_hit(jf_img_np, bkg=0, threshold=1, percentage=0.003)
    return hit, jf_img_np, pid, calci, dio_sig, dio_bkg, bkg

Reduced_Data_Folder = '/das/work/p22/p22279/Reduced_Data'
Batch_Size = 200

for run_num, selected_run in zip(run_nums, selected_runs):
    dir_name = os.path.join(Reduced_Data_Folder, f'run{run_num:04d}')
    os.makedirs(dir_name, exist_ok=True)
    
    print(selected_run)
    scan = SFScanInfo(os.path.join(selected_run, 'meta/scan.json'))
    scan_steps = scan.readbacks
    print('scan steps: ',scan_steps)
    
    for i, step in enumerate(scan):
        subset = step[channels.values()]
        subset.drop_missing()
        temp_calci = subset[channels['calci']].in_batches(Batch_Size)
        temp_pids = subset[channels['calci']].pids
        temp_dio_sig = subset[channels['diode_projection_sig']].in_batches(Batch_Size)
        temp_dio_bkg = subset[channels['diode_projection_bkg']].in_batches(Batch_Size)
        temp_JF = subset[channels['JFdata']].in_batches(Batch_Size)
        bkgs = []

        h5file = os.path.join(dir_name, f'Run{run_num:04d}_acq{i+1:04d}.h5')

        jf_hits = []
        pid_hits = []
        calci_hits = []
        diode_sig_hits = []
        diode_bkg_hits = []

        for (indices, calci_batch), (_, JF_batch), (_, dio_sig_batch), (_, dio_bkg_batch) in zip(
                temp_calci, temp_JF, temp_dio_sig, temp_dio_bkg):

            pids_batch = temp_pids[indices]
            batch_args = [
                (jf_img, calci, pid, dio_sig, dio_bkg, hit_finder)
                for jf_img, calci, pid, dio_sig, dio_bkg
                in zip(JF_batch, calci_batch, pids_batch, dio_sig_batch, dio_bkg_batch)
            ]

            # Parallelize inside the batch
            with ThreadPoolExecutor(max_workers=40) as executor:  # Tune as your memory allows
                futures = [executor.submit(process_image, *arg) for arg in batch_args]
                for future in as_completed(futures):
                    hit, jf_img_np, pid, calci, dio_sig, dio_bkg, bkg = future.result()
                    if hit:
                        jf_hits.append(jf_img_np)
                        pid_hits.append(pid)
                        calci_hits.append(calci)
                        diode_sig_hits.append(dio_sig)
                        diode_bkg_hits.append(dio_bkg)
                    if bkg and len(bkgs) < 10:
                        bkgs.append(jf_img_np)

            del batch_args, futures
            gc.collect()

        # Write hits to HDF5 file (only if at least 1 hit)
        if jf_hits:
            with h5py.File(h5file, 'w') as f:
                f.create_dataset('ReducedJF', data=np.stack(jf_hits), compression='gzip')
                f.create_dataset('Pulse_ID', data=np.array(pid_hits), compression='gzip')
                f.create_dataset('Calci', data=np.array(calci_hits), compression='gzip')
                f.create_dataset('Backgrounds', data=np.array(bkgs), compression='gzip')
                f.create_dataset('Diode_Projection_Signal', data=np.array(diode_sig_hits), compression='gzip')
                f.create_dataset('Diode_Projection_Background', data=np.array(diode_bkg_hits), compression='gzip')
        del jf_hits, pid_hits, calci_hits, diode_sig_hits, diode_bkg_hits
        gc.collect()


/sf/maloja/data/p22279/raw/run0021-Energy_scan_SASE_15fs_001
scan steps:  [1059.67093607 1060.68509218 1061.69669985 1062.68785355 1063.683689
 1064.68660213 1065.70814402 1066.6863924  1067.70714405 1068.68887305
 1069.69392258 1070.67701328 1071.68470556 1072.66693419 1073.68411268
 1074.69046199 1075.68833703 1076.68337931 1077.70472423 1078.68035547
 1079.6984762  1080.70213796 1081.6785282  1082.69234665 1083.68266757
 1084.68432677 1085.7004825  1086.67669169 1087.68226356 1088.68200883
 1089.68208718 1090.67647257 1091.68075819 1092.68686562 1093.68865884
 1094.69454198 1095.67676531 1096.67861629 1097.68601286 1098.66969364
 1099.67688665 1100.68269995 1101.66577075 1102.68104234 1103.69511177
 1104.65946183 1105.6740973  1106.70905558 1107.68152191 1108.68572675
 1109.66613791 1110.66929268 1111.67931604 1112.68598396 1113.68002505
 1114.67889982]
Auto-located gain file: /sf/jungfrau/config/gainMaps/JF15T08V01/gains.h5
Auto-located pedestal file: /sf/maloja/data/p22279/raw/JF_


KeyboardInterrupt



56