In [1]:
from coffea.nanoevents import NanoAODSchema
from coffea.dataset_tools import apply_to_fileset, max_chunks, max_files, preprocess

import dask
import numpy as np
import awkward as ak

from template_processor import TestProcessor

from dask.distributed import Client

Issue: coffea.nanoevents.methods.vector will be removed and replaced with scikit-hep vector. Nanoevents schemas internal to coffea will be migrated. Otherwise please consider using that package!.
  from coffea.nanoevents.methods import vector


In [2]:
import gzip
import json
import os
# Define the base directory where the preprocessed files are stored (preprocessed files point to sample on DAS and define slicing of root files by event (chunks)
base_dir = "../dataset_tools/preprocessing/preprocessed"
sample = "2023_ttbar_100000_preprocessed_available.json.gz"
#sample = "2023_SlepSnu_MN1_220_100000_preprocessed_available.json.gz" # These preprocessed files are generated one time in advance, stored until needed now in analysis
file_path = os.path.join(base_dir, sample)

ntuple_name = sample.replace("_100000_preprocessed_available.json.gz", "_dwg_ntuple")

with gzip.open(file_path, "rt") as file:
    preprocessed_available = json.load(file)


In [3]:
#client = Client("tls://localhost:8786")
#client

In [4]:
### SWITCH HERE ###

reduced_computation = True

num_files = 1 # number of root files from DAS to run over
num_chunks = 3 # number of events (chunks) per root file to run over (chunksize set during preprocessing, my default is 1 chunk = 100,000 events)

###################

In [5]:
if reduced_computation:
    
    test_preprocessed_files = max_files(preprocessed_available, num_files)
    test_preprocessed = max_chunks(test_preprocessed_files, num_chunks)

    small_tg, small_rep = apply_to_fileset(
        data_manipulation=TestProcessor(),
        fileset=test_preprocessed,
        schemaclass=NanoAODSchema,
        uproot_options={"allow_read_errors_with_report": (OSError, KeyError)},
    )
    computed, rep = dask.compute(small_tg, small_rep)

    
else:
    full_tg, full_rep = apply_to_fileset(
        data_manipulation=TestProcessor(),
        fileset=preprocessed_available,
        schemaclass=NanoAODSchema,
        uproot_options={"allow_read_errors_with_report": (OSError, KeyError)},
    )
    computed, rep = dask.compute(full_tg, full_rep)


In [6]:
computed.keys()

dict_keys(['/TTto2L2Nu_TuneCP5_13p6TeV_powheg-pythia8/Run3Summer23NanoAODv12-130X_mcRun3_2023_realistic_v14-v2/NANOAODSIM'])

In [7]:
sample_name = next(iter(computed))
sample_name 

'/TTto2L2Nu_TuneCP5_13p6TeV_powheg-pythia8/Run3Summer23NanoAODv12-130X_mcRun3_2023_realistic_v14-v2/NANOAODSIM'

In [8]:
results = computed[sample_name]

In [9]:
results #did this to get in front of the massive sample name, work with the results dictionary from here forward for simplicity

{'ntuple': {'MET': {'awk_info': None,
   'pt': <Array [97.8, 72.7, 108, ..., 28, 66, 67.9] type='183000 * float32[paramete...'>,
   'phi': <Array [-2.4, 1.1, -2.31, ..., -1.33, 1.38] type='183000 * float32[paramete...'>},
  'Electron': {'isGold': <Array [True, False, True, True, ..., False, True, True] type='118542 * bool'>,
   'isSilver': <Array [False, False, False, ..., False, False, False] type='118542 * bool'>,
   'isBronze': <Array [False, True, False, ..., True, False, False] type='118542 * bool'>,
   'awk_info': <Array [1, 1, 0, 0, 2, 0, 1, 0, ..., 1, 1, 0, 1, 0, 1, 1] type='183000 * int64'>,
   'pt': <Array [45.7, 10.6, 102, ..., 12.8, 156, 122] type='118542 * float32[parame...'>,
   'eta': <Array [-0.0922, 0.0165, ..., -0.048, 1.96] type='118542 * float32[paramete...'>,
   'phi': <Array [-0.397, 2.92, ..., -0.411, -1.32] type='118542 * float32[parameters...'>,
   'mass': <Array [3.58e-07, 1.29e-07, ..., -5.39e-06] type='118542 * float32[paramete...'>,
   'charge': <Array [-1,

In [10]:
results.keys()

dict_keys(['ntuple'])

In [11]:
results['ntuple']["LowPtElectron"]["isGold"]

In [12]:
ak.unflatten(results['Electron']['pt'], results['Electron']['awk_info'])

KeyError: 'Electron'

In [None]:
results["Electron"]["awk_info"]

In [None]:
results["LowPtElectron"]["awk_info"]

In [None]:
results["Muon"]["awk_info"]

In [None]:
ak.sum(
results['ntuple']['LowPtElectron']['isGold']
)

In [None]:

ak.flatten(results['ntuple']['LowPtElectron']['isGold'])


In [None]:
results['ntuple'].keys()

In [None]:
ak.sum(ak.num(results['ntuple']['Electron']['pt']))

In [None]:
results['ntuple']['Electron']['pt'][:10]

In [None]:
%%time
chunk_size = 100000
ntuple = results['ntuple']
nentries = ntuple['num_tot_Events']
total_chunks = (nentries + chunk_size - 1) // chunk_size

os.makedirs(ntuple_name, exist_ok=True)

for chunk_idx, start in enumerate(range(0, nentries, chunk_size)):
    end = min(start + chunk_size, nentries)


    #sliced_events = {
    #    key: val[start:end]
    #    for key, val in ntuple["Event"].items()
    #}
    
    sliced_electron = {
        key: val[start:end]
        for key, val in ntuple["Electron"].items()
    }
    #sliced_electron["nEle"] = ak.sum(ak.num(sliced_electron["pt"], axis=1))

    sliced_lpte = {
        key: val[start:end]
        for key, val in ntuple["LowPtElectron"].items()
    }
    #sliced_lpte["nLpte"] = ak.sum(ak.num(sliced_lpte["pt"], axis=1))
    
    sliced_muon = {
        key: val[start:end]
        for key, val in ntuple["Muon"].items()
    }
    #sliced_muon["nMu"] = ak.sum(ak.num(sliced_muon["pt"], axis=1))
    
    slice_ntuple = {
        "numEvents": end - start,
        "dataset": ntuple["dataset"],
        #"Events": sliced_events,
        "Electron": sliced_electron,
        "Muon": sliced_muon,
        "LowPtElectron": sliced_lpte,
        
    }
    
    
    
    # Create output filename
    filename = f"{ntuple_name}/{ntuple_name}_{slice_ntuple['numEvents']}_events_chunk_{chunk_idx:03d}.parquet"
    
    # Save to Parquet
    ak.to_parquet(slice_ntuple, filename, compression="SNAPPY")
    print(f"Saved {filename}")
    print(f"chunk {chunk_idx} of {total_chunks}")
    with open(f"{ntuple_name}/upload_log.txt", "a") as log_file:
        log_file.write(
            f"Chunk_index: {chunk_idx} of: {total_chunks - 2} saved successfully. "
            f"Skim generated over {nentries} events of dataset: {ntuple['dataset']}.\n"
    )


In [None]:
parquet_files = sorted(glob.glob("2023_ttbar_dwg_ntuple/*.parquet"))

test_ntuple = {}
for file in parquet_files:
    record = ak.from_parquet(file)        # load the ak.Record
    my_dict = ak.to_list(record)          # ✅ deep conversion to native dict
    test_ntuple |= my_dict    

test_ntuple.type

In [None]:
test_ntuple.keys()

In [None]:
test_ntuple.type

In [None]:
#parquet_files = sorted(glob.glob("2023_ttbar_dwg_ntuple/*.parquet"))

#for file in parquet_files:
    
test_ntuple = ak.from_parquet("2023_ttbar_dwg_ntuple/2023_ttbar_dwg_ntuple_100000_events_chunk_000.parquet")   


test_ntuple

In [None]:
test_ntuple['Muon'].dxy

In [None]:
max(ak.flatten(test_ntuple.Muon.pt))

In [None]:
import sys
print(sys.version)


In [None]:
#init voms proxy in terminal before running this cell:
# voms-proxy-init -voms cms -vomses /etc/vomses
#!xrdcp -r {ntuple_name}/ root://xrootd-local.unl.edu:1094//store/user/dgrove/my_ntuple/


In [None]:
test_ntuple.Electron.nEle

In [None]:
test_ntuple.Muon.nMu

In [None]:
test_ntuple.LowPtElectron.nLpte