In [1]:
import os, glob, re
import shutil
import random
import json
import pyarrow.parquet as pq
import numpy as np
import h5py
import matplotlib.pyplot as plt
import argparse
import time
import cupy as cp
from multiprocessing import Pool
import argparse

In [2]:
def alphanum_key(s):
    """ Turn a string into a list of string and number chunks.
        "z23a" -> ["z", 23, "a"]
    """
    return [int(c) if c.isdigit() else c for c in re.split('([0-9]+)',s)]

In [3]:
def create_new_hdf5_file(filename, max_rows_per_file):
    #filename = f"{prefix}_{index}.h5"
    hdf5_file = h5py.File(filename, 'w')
    dataset_names = ['all_jet', 'all_meta']
    total_samples = max_rows_per_file
    datasets = {
        name: hdf5_file.create_dataset(
        name,
        (total_samples, 13, 125, 125) if 'jet' in name else (total_samples, 1),
        dtype='float32',  # Specify an appropriate data type
        compression='lzf',  # Optional: add compression
        #chunks = (min_samples, 13, 125, 125) if 'jet' in name else (min_samples, 1),
        ) for name in dataset_names
    }
    #hdf5_file.create_dataset('dataset', shape=(0, *data.shape[1:]), maxshape=(max_rows_per_file, *data.shape[1:]), dtype='float64')  # Adjust dtype as per your data type
    return hdf5_file

In [4]:
def append_data_to_hdf5(hdf5_file, start_index, end_index, df):
    #df = df[start_index:end_index]
    
    print("Writing to file", hdf5_file)
    xj = df.columns.get_loc('X_jet')
    yy = df.columns.get_loc('y')

    im = np.array(np.array(np.array(df.iloc[:, xj].tolist()).tolist()).tolist())
    meta = np.array(df.iloc[:,yy])
    
    print("In Append function dataframe shape", df.shape)
    print("shape hdf5", hdf5_file["all_jet"][start_index:end_index, :, :, :].shape)
    print("jet image shape", im.shape)
    print(" ")
    hdf5_file["all_jet"][start_index:end_index, :, :, :] = im
    hdf5_file["all_meta"][start_index:end_index, :] = np.full((df.shape[0],1), meta[0])
    
    return hdf5_file

In [5]:
def process_files(args):
#def process_files(file_path,h5py_file):
    
    file_path = args[0]
    h5py_file = args[1]
    batch_size = 4096
    
    print("Processing file",file_path)
    parquet = pq.ParquetFile(file_path)
    print(file_path, parquet.num_row_groups)
    total_samples = parquet.num_row_groups
    hdf5_file = create_new_hdf5_file(h5py_file,total_samples)
    batch_iter = parquet.iter_batches(batch_size,use_threads=True)

    start_index = 0
    bat = 0
    for batch in batch_iter:
        #batch = next(batch_iter)
        df = batch.to_pandas(use_threads=True)
        end_index = start_index + df.shape[0]
        print("File",file_path , " Batch no.", bat, "Data frame shape", df.shape, " Start idx:", start_index, " end idx:", end_index)

        if end_index<=total_samples:
            #print("Image shape going in append", im.shape, " ", start_index, " ", end_index)
            append_data_to_hdf5(hdf5_file, start_index, end_index, df)
            start_index += df.shape[0]
            #break

        bat +=1


In [None]:
parquet_dir = '/pscratch/sd/r/rchudasa/E2E_samples/ParquetFiles_correctTrackerLayerHits_SecVtxInfoAdded/'
#analysis_dir = '/pscratch/sd/r/rchudasa/E2E_samples/tau_hdf5_files/'
analysis_dir= '/pscratch/sd/r/rchudasa/E2E_samples/tau_hdf5_files/'
h5_name = 'tau_threads.h5'
batch_size = 4096
signal_files = [os.path.join(parquet_dir + 'signal/', f) for f in os.listdir(parquet_dir + 'signal/')]
bkg_files = [os.path.join(parquet_dir + 'background/', f) for f in os.listdir(parquet_dir + 'background/')]

#signal_files = [os.path.join(parquet_dir + 'signal_small/', f) for f in os.listdir(parquet_dir + 'signal_small/')]
#bkg_files = [os.path.join(parquet_dir + 'background_small/', f) for f in os.listdir(parquet_dir + 'background_small/')]

combined_files = signal_files+bkg_files
random.shuffle(combined_files)
print(type(combined_files))

counter = 0

inputfile_list = []
outputfile_list = []

for f in combined_files:
    opFile       = f.split("/")[-1].split(".")[0]
    proceessName = opFile.split("_")[0]
    processID    = opFile.split("_")[-1]
    #print(opFile, proceessName, processID)
    h5_file = analysis_dir+proceessName+"_"+processID+".h5"
    #print(h5_file)
    #process_files(f, h5_file, batch_size)
    inputfile_list.append(f)
    outputfile_list.append(h5_file)
    
    #tic = time.time()
    #process_files(f,h5_file)
    #toc = time.time()
    #print("It took {} minutes to run {} file".format((toc-tic)/60,f))
    #counter =+ 1
    #if counter >=10:
    #    break

#print(inputfile_list[0:10])
#print(outputfile_list[0:10])
#args = list(zip(inputfile_list[0:10],outputfile_list[0:10]))
args = list(zip(inputfile_list,outputfile_list)) 
print("----------------------------------------")
print(args)

with Pool(10) as p:
    print("**************",args)
    p.map(process_files,args)
toc = time.time()


print("It took ", toc-tic)

<class 'list'>
----------------------------------------
[('/pscratch/sd/r/rchudasa/E2E_samples/ParquetFiles_correctTrackerLayerHits_SecVtxInfoAdded/signal/DYToTauTau_M-50_13TeV-powheg_pythia8_13.parquet', '/pscratch/sd/r/rchudasa/E2E_samples/tau_hdf5_files/DYToTauTau_13.h5'), ('/pscratch/sd/r/rchudasa/E2E_samples/ParquetFiles_correctTrackerLayerHits_SecVtxInfoAdded/background/QCD_Pt-15to7000_TuneCP5_Flat_13TeV_pythia8_64.parquet', '/pscratch/sd/r/rchudasa/E2E_samples/tau_hdf5_files/QCD_64.h5'), ('/pscratch/sd/r/rchudasa/E2E_samples/ParquetFiles_correctTrackerLayerHits_SecVtxInfoAdded/background/QCD_Pt-15to7000_TuneCP5_Flat_13TeV_pythia8_36.parquet', '/pscratch/sd/r/rchudasa/E2E_samples/tau_hdf5_files/QCD_36.h5'), ('/pscratch/sd/r/rchudasa/E2E_samples/ParquetFiles_correctTrackerLayerHits_SecVtxInfoAdded/background/QCD_Pt-15to7000_TuneCP5_Flat_13TeV_pythia8_47.parquet', '/pscratch/sd/r/rchudasa/E2E_samples/tau_hdf5_files/QCD_47.h5'), ('/pscratch/sd/r/rchudasa/E2E_samples/ParquetFiles_cor