In [None]:
import os
import sys
import numpy as np

thisdir = os.getcwd()
topdir = os.path.abspath(os.path.join(thisdir, '../../../'))
sys.path.append(topdir)

import tools.iotools as iotools
import tools.dftools as dftools

In [None]:
# find files

datadir = '/eos/user/l/llambrec/dialstools-output'
year = '2024'
eras = {
      'A': ['v1'],
      'B': ['v1'],
      'C': ['v1'],
      'D': ['v1'],
      'E': ['v1', 'v2'],
      'F': ['v1'],
      'G': ['v1'],
      'H': ['v1'],
      'I': ['v1', 'v2'],
      'J': ['v1']
}
layers = [1, 2, 3, 4]
dataset = 'ZeroBias'
reco = 'PromptReco'
mebase = 'PixelPhase1-Phase1_MechanicalView-PXBarrel-'
mebase += 'clusters_per_SignedModuleCoord_per_SignedLadderCoord_PXLayer_'

# find files corresponding to settings
files = {}
for era, versions in eras.items():
    for version in versions:
        fullera = f'{era}-{version}'
        files[fullera] = {}
        for layer in layers:
            f = f'{dataset}-Run{year}{era}-{reco}-{version}-DQMIO-{mebase}{layer}.parquet'
            f = os.path.join(datadir, f)
            files[fullera][layer] = f

# existence check
missing = []
nfiles = 0
for era, layers in files.items():
    for layer, f in layers.items():
        if not os.path.exists(f):
            missing.append(f)
        else: nfiles += 1
if len(missing) > 0:
    raise Exception(f'The following files do not exist: {missing}')
else:
    print(f'Found {nfiles} files.')

In [None]:
# define utility function for partitioning a set of runs

def split_in_batches(run_numbers, batch_size):
    run_change_ids = np.where(run_numbers[:-1] != run_numbers[1:])[0]
    run_change_ids += np.ones(len(run_change_ids)).astype(int)
    batch_ids = [0]
    for i in range(1, len(run_change_ids)):
        idx = run_change_ids[i]
        previous_idx = run_change_ids[i-1]
        if idx % batch_size < previous_idx % batch_size:
            batch_ids.append(previous_idx)
    batch_run_numbers = []
    for i, idx in enumerate(batch_ids):
        start_idx = idx
        end_idx = -1 if i==len(batch_ids)-1 else batch_ids[i+1]
        batch_run_numbers.append( np.unique(run_numbers[start_idx:end_idx]) )
    return batch_run_numbers

In [None]:
# define utility function to define output file

def output_file(input_file, part):
    dirname, input_file_name = os.path.split(input_file)
    name_parts = input_file_name.split('-')
    version_tag = name_parts[3]
    name_parts[3] = version_tag + f'-part{part}'
    output_file_name = '-'.join(name_parts)
    output_file = os.path.join(dirname, output_file_name)
    return output_file

In [None]:
# loop over files

for era, layers in files.items():
    for layer, f in layers.items():
        
        # find number of entries
        dftemp = iotools.read_parquet(f, columns=['run_number'])
        nentries = len(dftemp)
        
        # define splitting parameters
        if nentries < 60000: continue
        batch_size = 40000
        nbatches = int((nentries-1)/batch_size) + 1
        print(f'Splitting file {f} ({nentries} entries)'
              + f' into {nbatches} batches of (approximate) size {batch_size}...')
        
        # decide whether to keep exact batch size,
        # or split by run number in batches of approximate size
        # (latter approach is probably better but seems to require more memory and crashes)
        splitmode = 'exact'
        
        if splitmode=='run':
        
            # get batch parameters
            run_numbers = dftemp['run_number'].values
            batch_run_numbers = split_in_batches(run_numbers, batch_size)
        
            # printouts for testing
            print(f'  Orig: {run_numbers[0]} - {run_numbers[-1]}')
            print('  Batches:')
            for batch in batch_run_numbers:
                print(f'    - {batch[0]} - {batch[-1]}')
        
            # loop over batches
            for batchidx, batch in enumerate(batch_run_numbers):
            
                # read batch
                df = iotools.read_runs(f, run_numbers=batch)
            
                # write batch
                outf = output_file(f, batchidx+1)
                df.to_parquet(outf)
                print(f'  Batch {batchidx+1} written to {outf}.')
                del df
                
        if splitmode=='exact':
            
            for batchidx in range(nbatches):
                df = iotools.read_parquet(f, batch_size=batch_size, first_batch=batchidx, last_batch=batchidx)
                outf = output_file(f, batchidx+1)
                df.to_parquet(outf)
                print(f'  Batch {batchidx+1} written to {outf}.')
                del df