In [1]:
import numpy as np
from zhh import get_runtime_analysis, evaluate_runtime
from typing import Optional
from math import floor, ceil

version = 'v1'
REPO_ROOT = '/afs/desy.de/user/b/bliewert/public/MarlinWorkdirs/ZHH'
DATA_ROOT = f'/nfs/dust/ilc/user/bliewert/zhh/PreselectionRuntime/{version}'
ILD_VERSION = 'ILD_l5_o1_v02'
PROD_NAME = '500-TDR_ws'

PROCESS_INDEX = '/afs/desy.de/user/b/bliewert/nfs/zhh/CreateRawIndex/v1/processes.npy'
SAMPLE_INDEX = '/afs/desy.de/user/b/bliewert/nfs/zhh/CreateRawIndex/v1/samples.npy'

processes = np.load(PROCESS_INDEX)
samples = np.load(SAMPLE_INDEX)

In [2]:
evaluate_runtime(DATA_ROOT, 0)

(0,
 '2f_z_bhabhag',
 19,
 '/pnfs/desy.de/ilc/prod/ilc/ild/copy/dst-merged/500-TDR_ws/2f_Z_bhabhag/ILD_l5_o1_v02/v02-00-01/rv02-00-01.sv02-00-01.mILD_l5_o1_v02.E500-TDR_ws.I250101.P2f_z_bhabhag.eL.pL.n001.d_dstm_10366_0.slcio',
 1721862118.041468,
 1721862162.661763,
 29.62029504776001,
 0)

In [3]:
runtime_analysis = get_runtime_analysis(DATA_ROOT=DATA_ROOT)
runtime_analysis

array([(  0, '2f_z_bhabhag', 19, '/pnfs/desy.de/ilc/prod/ilc/ild/copy/dst-merged/500-TDR_ws/2f_Z_bhabhag/ILD_l5_o1_v02/v02-00-01/rv02-00-01.sv02-00-01.mILD_l5_o1_v02.E500-TDR_ws.I250101.P2f_z_bhabhag.eL.pL.n001.d_dstm_10366_0.slcio', 1.7218621e+09, 1.7218621e+09,  29.620295 , 0),
       (  1, '2f_z_bhabhag', 19, '/pnfs/desy.de/ilc/prod/ilc/ild/copy/dst-merged/500-TDR_ws/2f_Z_bhabhag/ILD_l5_o1_v02/v02-00-01/rv02-00-01.sv02-00-01.mILD_l5_o1_v02.E500-TDR_ws.I250102.P2f_z_bhabhag.eL.pR.n001.d_dstm_10366_0.slcio', 1.7218621e+09, 1.7218621e+09,  21.761604 , 0),
       (  2, '2f_z_bhabhag', 19, '/pnfs/desy.de/ilc/prod/ilc/ild/copy/dst-merged/500-TDR_ws/2f_Z_bhabhag/ILD_l5_o1_v02/v02-00-01/rv02-00-01.sv02-00-01.mILD_l5_o1_v02.E500-TDR_ws.I250104.P2f_z_bhabhag.eR.pL.n001.d_dstm_10366_0.slcio', 1.7218621e+09, 1.7218621e+09,  23.54165  , 0),
       (  3, '2f_z_bhabhag', 19, '/pnfs/desy.de/ilc/prod/ilc/ild/copy/dst-merged/500-TDR_ws/2f_Z_bhabhag/ILD_l5_o1_v02/v02-00-01/rv02-00-01.sv02-00-01.mILD_l

In [33]:
a = runtime_analysis['src'] == '/pnfs/desy.de/ilc/prod/ilc/mc-2020/ild/dst-merged/500-TDR_ws/hh/ILD_l5_o1_v02/v02-02-03/00015742/000/rv02-02-03.sv02-02-03.mILD_l5_o1_v02.E500-TDR_ws.I403006.Pn1n1hh.eR.pL.n002.d_dstm_15742_24.slcio'
np.sum(a)

0

In [4]:
tD = runtime_analysis['tDuration']
print(f'{np.min(tD)} : {np.max(tD)}')

9.8073091506958 : 371.6954040527344


In [5]:
def get_runtime_adjusted_weights(runtime_analysis:np.ndarray,
                                 divide_by_min:bool=True,
                                 MAX_CAP:Optional[float]=None,
                                 MIN_CAP:Optional[float]=None):
    unique_processes = np.unique(runtime_analysis['process'])

    dtype = [
        ('process', '<U64'),
        ('tAvg', 'f'),
        ('n_processed', 'i'),
        ('tPE', 'f')]

    results = np.empty(len(unique_processes), dtype=dtype)

    i = 0
    for process in unique_processes:
        # Average for
        mask = runtime_analysis['process'] == process
        tAvg = np.average(runtime_analysis['tDuration'][mask])
        n_processed = int(np.average(runtime_analysis['n_processed'][mask])) # should be equal
        tPE = tAvg/n_processed
        
        results[i] = np.array([ (process, tAvg, n_processed, tPE) ], dtype=dtype)
        i += 1
        
    if divide_by_min:
        results['tPE'] *= 1/results['tPE'].min()
        
    if MAX_CAP is not None:
        results['tPE'] = np.minimum(results['tPE'], MAX_CAP)
        
    if MIN_CAP is not None:
        results['tPE'][results['tPE'] < MIN_CAP] = 1
        
    return results

In [6]:
runtime_adjusted_weights = get_runtime_adjusted_weights(runtime_analysis, True, 5, 2)
runtime_adjusted_weights

array([('2f_z_bhabhag',  24.653614, 19, 1.       ),
       ('2f_z_bhabhang',  26.402027, 19, 2.099731 ),
       ('2f_z_h',  48.915146, 19, 3.8901803),
       ('2f_z_l',  25.599873, 19, 2.0359364),
       ('2f_z_nung',  25.19878 , 19, 2.0040379),
       ('4f_lowmee_sze_l',  33.21747 , 19, 2.6417572),
       ('4f_lowmee_szeorsw_l',  24.588127, 19, 1.       ),
       ('4f_sw_l',  28.427618, 19, 2.2608247),
       ('4f_sw_sl',  30.516903, 19, 2.4269836),
       ('4f_sze_l',  28.276108, 19, 2.248775 ),
       ('4f_sze_sl',  30.81671 , 19, 2.450827 ),
       ('4f_szeorsw_l',  22.734737, 19, 1.       ),
       ('4f_sznu_l',  27.016434, 19, 2.1485944),
       ('4f_sznu_sl',  42.00641 , 19, 3.3407342),
       ('4f_ww_h',  30.166235, 19, 2.3990953),
       ('4f_ww_l',  28.161156, 19, 2.239633 ),
       ('4f_ww_sl',  30.782879, 19, 2.4481366),
       ('4f_zz_h',  85.170906, 19, 5.       ),
       ('4f_zz_l',  28.013412, 19, 2.227883 ),
       ('4f_zz_sl',  83.93462 , 19, 5.       ),
       ('4f_z

In [23]:
n_before = 0
n_after = 0

for process in np.unique(samples['process']):
    n_samples = np.sum(samples["process"] == process)
    n_samples_adj = n_samples*runtime_adjusted_weights['tPE'][runtime_adjusted_weights["process"] == process]
    print(f'{process} -> {n_samples} -> {n_samples_adj}')
    
    n_before += n_samples
    n_after += ceil(n_samples_adj)
    
print(f'Before {n_before} | After {n_after}')

2f_z_bhabhag -> 40 -> [40.]
2f_z_bhabhang -> 646 -> [1356.4261]
2f_z_h -> 230 -> [894.74146]
2f_z_l -> 24 -> [48.862473]
2f_z_nung -> 477 -> [955.9261]
4f_lowmee_sze_l -> 4 -> [10.567029]
4f_lowmee_szeorsw_l -> 4 -> [4.]
4f_sw_l -> 56 -> [126.606186]
4f_sw_sl -> 38 -> [92.22538]
4f_sze_l -> 269 -> [604.9205]
4f_sze_sl -> 36 -> [88.22977]
4f_szeorsw_l -> 36 -> [36.]
4f_sznu_l -> 5 -> [10.742971]
4f_sznu_sl -> 11 -> [36.748077]
4f_ww_h -> 55 -> [131.95024]
4f_ww_l -> 7 -> [15.677431]
4f_ww_sl -> 43 -> [105.269875]
4f_zz_h -> 11 -> [55.]
4f_zz_l -> 2 -> [4.455766]
4f_zz_sl -> 41 -> [205.]
4f_zzorww_h -> 48 -> [100.16298]
4f_zzorww_l -> 15 -> [34.139156]
e1e1hh -> 121 -> [605.]
e1e1qqh -> 80 -> [311.8299]
e2e2hh -> 60 -> [300.]
e2e2qqh -> 42 -> [210.]
e3e3hh -> 59 -> [295.]
e3e3qqh -> 42 -> [157.62547]
eeeeee -> 4 -> [8.292247]
eeeell -> 4 -> [8.447545]
eeeexx -> 4 -> [9.358356]
eeeeyy -> 4 -> [14.075451]
eellxx -> 4 -> [8.145082]
eellyy -> 4 -> [17.728565]
eeveev -> 4 -> [4.]
eevelv -> 4 

In [8]:
samples[samples['process'] == 'eevlev']

array([(108628, 'eevlev', 'eevlev_LR', 14948, -1,  1, '/pnfs/desy.de/ilc/prod/ilc/ild/copy/dst-merged/500-TDR_ws/6f_eeWW/ILD_l5_o1_v02/v02-00-01/rv02-00-01.sv02-00-01.mILD_l5_o1_v02.E500-TDR_ws.I108628.Peevlev.eL.pR.n001.d_dstm_10361_0.slcio'),
       (108630, 'eevlev', 'eevlev_RR', 10000,  1,  1, '/pnfs/desy.de/ilc/prod/ilc/ild/copy/dst-merged/500-TDR_ws/6f_eeWW/ILD_l5_o1_v02/v02-00-01/rv02-00-01.sv02-00-01.mILD_l5_o1_v02.E500-TDR_ws.I108630.Peevlev.eR.pR.n001.d_dstm_10361_0.slcio'),
       (108627, 'eevlev', 'eevlev_LL', 10000, -1, -1, '/pnfs/desy.de/ilc/prod/ilc/ild/copy/dst-merged/500-TDR_ws/6f_eeWW/ILD_l5_o1_v02/v02-00-01/rv02-00-01.sv02-00-01.mILD_l5_o1_v02.E500-TDR_ws.I108627.Peevlev.eL.pL.n001.d_dstm_10361_0.slcio'),
       (108629, 'eevlev', 'eevlev_RL', 10000,  1, -1, '/pnfs/desy.de/ilc/prod/ilc/ild/copy/dst-merged/500-TDR_ws/6f_eeWW/ILD_l5_o1_v02/v02-00-01/rv02-00-01.sv02-00-01.mILD_l5_o1_v02.E500-TDR_ws.I108629.Peevlev.eR.pL.n001.d_dstm_10361_0.slcio')],
      dtype=[('run_

In [9]:
#s = samples[0]
s = samples[samples['run_id'] == 108628]
print(s)



[(108628, 'eevlev', 'eevlev_LR', 14948, -1, 1, '/pnfs/desy.de/ilc/prod/ilc/ild/copy/dst-merged/500-TDR_ws/6f_eeWW/ILD_l5_o1_v02/v02-00-01/rv02-00-01.sv02-00-01.mILD_l5_o1_v02.E500-TDR_ws.I108628.Peevlev.eL.pR.n001.d_dstm_10361_0.slcio')]


In [20]:
def get_sample_chunk_splits(samples:np.ndarray,
                      runtime_adjusted_weights:np.ndarray):
    
    dtype = [
        ('process', '<U60'),
        ('proc_pol', '<U64'),
        ('location', '<U512'),
        
        ('chunk_size', 'i'),
        ('n_chunks', 'i'),
        ('chunk_start', 'i')]

    results = np.empty(0, dtype=dtype)
    
    for s in samples:
        weight = runtime_adjusted_weights['tPE'][runtime_adjusted_weights['process'] == s['process']]
        n_events = s['n_events']

        MIN_UNITS_PER_CHUNK = 8000 # one unit = one unit event * one unit of weight; see the process type for which tPE == 1.0
        
        chunk_size = max(1, floor(n_events / weight))
        n_chunks = max(1, ceil(n_events/chunk_size))

        if n_chunks > 1 and (n_events % n_chunks)*weight < MIN_UNITS_PER_CHUNK:
            n_chunks -= 1
            
        # skip:execute
        # 0:chunk_size-1
        # chunk_size:2xchunk_size -1
        
        n_accounted = 0
        chunk_start = 0
        for i in range(n_chunks):
            c_chunk_size = min(chunk_size, n_events - n_accounted)
            n_accounted += chunk_size
            
            results = np.append(results, np.array([
                (s['process'], s['proc_pol'], s['location'], c_chunk_size, n_chunks, chunk_start)
            ], dtype=dtype))
        
    return results

In [21]:
sample_chunk_splits = get_sample_chunk_splits(samples, runtime_adjusted_weights)

In [22]:
len(sample_chunk_splits)

12323

In [25]:
sample_chunk_splits[['chunk_size', 'proc_pol']]

array([(19400, '4f_szeorsw_l_LR'), (18456, '2f_z_nung_LR'),
       (18456, '2f_z_nung_LR'), ..., ( 1418, 'e2e2hh_NN'),
       ( 1418, 'e2e2hh_NN'), (11400, 'eexylv_RR')],
      dtype={'names': ['chunk_size', 'proc_pol'], 'formats': ['<i4', '<U64'], 'offsets': [2544, 240], 'itemsize': 2556})

In [26]:
samples['proc_pol']

array(['4f_szeorsw_l_LR', '2f_z_nung_LR', 'n1n1hh_NN', ..., 'yycyyu_RL',
       'e2e2hh_NN', 'eexylv_RR'], dtype='<U64')