In [10]:
import json
from typing import Optional
from pathlib import Path
from functools import singledispatchmethod
import numpy as np
import numpy.typing as npt
from dataclasses import dataclass
import awkward as ak
import uproot
import pandas as pd
from hist.hist import Hist
from hist.axis import StrCategory, IntCategory

In [11]:
@dataclass
class LumiBlockChecker:
    """
    https://twiki.cern.ch/twiki/bin/view/CMSPublic/SWGuideGoodLumiSectionsJSONFile
    """
    cert: dict[np.uint32, npt.NDArray[np.uint32]]

    @staticmethod
    def _transform_lumi_ranges(lumi: list[tuple[int, int]]
    ) -> npt.NDArray[np.uint32]:
        """
        """
        flat_lumi = np.array(lumi, dtype=np.uint32).flatten()
        # [first, last] to (first, last]
        flat_lumi[::2] -= 1
        return flat_lumi

    @classmethod
    def from_dict(cls, cert: dict[int, list[tuple[int, int]]]):
        flat_cert = {np.uint32(run): cls._transform_lumi_ranges(lumi_ranges)
                     for run, lumi_ranges in cert.items()}
        return cls(flat_cert)

    @classmethod
    def from_json(cls, path):
        with open(path) as stream:
            cert = json.load(stream)
        return cls.from_dict(cert)

    @staticmethod
    def _get_lumi_mask(lumi_arr: npt.NDArray[np.uint32],
                     ranges: npt.NDArray[np.uint32]
    ) -> npt.NDArray[np.bool_]:
        """
        """
        # odd(even) indices indicate good(bad) lumi blocks
        indices = np.searchsorted(ranges, lumi_arr)
        mask = (indices & 0x1).astype(bool)
        return mask

    @singledispatchmethod
    def get_lumi_mask(self, run, lumi: npt.NDArray[np.uint32]):
        raise NotImplementedError(f'expected np.uint32, npt.NDArray[np.uint32]'
                                  f' or int but got {type(run)}')

    @get_lumi_mask.register(int)
    @get_lumi_mask.register(np.uint32)
    def _(self,
          run: np.uint32,
          lumi: npt.NDArray[np.uint32]
    ) -> npt.NDArray[np.bool_]:
        """
        """
        if isinstance(run, int):
            run = np.uint32(run)

        if run in self.cert:
            mask = self._get_lumi_mask(lumi, ranges=self.cert[run])
        else:
            mask = np.full_like(lumi, fill_value=False, dtype=bool)
        return mask

    @get_lumi_mask.register(np.ndarray)
    def _(self,
          run: npt.NDArray[np.uint32],
          lumi: npt.NDArray[np.uint32]
    ) -> npt.NDArray[np.bool_]:
        """
        """
        mask = np.full_like(lumi, fill_value=False, dtype=bool)
        for each in np.unique(run):
            run_mask = run == each
            mask[run_mask] = self.get_lumi_mask(each, lumi[run_mask])
        return mask

In [12]:
def flatten_nanoaod(
    input_path: Path,
    cert_path: Path,
    output_path: Path,
    input_tree: str = 'Events',
    input_prefix: str = 'rpcTnP',
):
    tree = uproot.open(f'{input_path}:{input_tree}')

    aliases = {
        key.removeprefix(f'{input_prefix}_'): key
        for key in tree.keys() if (key.startswith(input_prefix))
    }

    aliases['n_hit'] = f'n{input_prefix}'
    aliases['lumi_block'] = 'luminosityBlock'
    expressions = list(aliases.keys()) + ['run', 'event']
    cut = f'(n{input_prefix} > 0)'
    
    tree: dict[str, np.ndarray] = tree.arrays(
        expressions=expressions,
        aliases=aliases,
        cut=cut,
        library='np'
    )

    n_hit = tree.pop('n_hit')
    
    lumi_block_checker = LumiBlockChecker.from_json(cert_path)
    lumi_mask = lumi_block_checker.get_lumi_mask(tree['run'], tree['lumi_block'])
    tree = {key: value[lumi_mask] for key, value in tree.items()}

    tree['run'] = np.repeat(tree['run'], n_hit)
    tree['lumi_block'] = np.repeat(tree['lumi_block'], n_hit)
    tree['event'] = np.repeat(tree['event'], n_hit)
    
    for key, values in tree.items():
        if key in ['run', 'lumi_block', 'event']:
            tree[key] = values.astype(np.uint32)
        elif key in ['is_fiducial', 'is_matched']:
            tree[key] = np.concatenate(values).astype(np.int32)
        elif key in ['region', 'ring', 'station', 'sector', 'layer', 'subsector', 'roll']:
            tree[key] = np.concatenate(values).astype(np.int32)
        elif key in ['cls', 'bx']:
            tree[key] = np.concatenate(values).astype(np.int32)
        else:
            tree[key] = np.concatenate(values).astype(np.float32)

    with uproot.recreate(output_path) as root_file:
        root_file["Hits"] = tree

def mean_nanoaod(
    input_path: Path,
    cert_path: Path,
    output_path: Path,
    input_tree: str = 'Events',
    input_prefix: str = 'rpcTnP',
):
    tree = uproot.open(f'{input_path}:{input_tree}')

    rpc_tnp_keys = [
        'tag_pt', 'tag_eta', 'tag_phi', 
        'probe_pt', 'probe_eta', 'probe_phi', 
        'probe_time', 'probe_dxdz', 'probe_dydz', 
        'dimuon_pt', 'dimuon_mass', 'cls', 'bx'
    ]

    aliases = {key: f'{input_prefix}_' + key for key in rpc_tnp_keys}
    aliases['lumi_block'] = 'luminosityBlock'

    expressions = list(aliases.keys()) + ['run', 'event']
    cut = f'(n{input_prefix} > 0)'
    
    tree: dict[str, np.ndarray] = tree.arrays(
        expressions=expressions,
        aliases=aliases,
        cut=cut,
        library='np'
    )
    
    lumi_block_checker = LumiBlockChecker.from_json(cert_path)
    lumi_mask = lumi_block_checker.get_lumi_mask(tree['run'], tree['lumi_block'])
    tree = {key: value[lumi_mask] for key, value in tree.items()}

    for key in rpc_tnp_keys:
        if key in ['cls', 'bx']:
            tree[f'mean_{key}'] = np.array([np.mean(tree[key][i][tree[key][i] > -999]) for i in range(len(tree[key]))], dtype=np.float32) 
            tree[f'mean_{key}'][np.isnan(tree[f'mean_{key}'])] = -999
            tree.pop(key)
        else:
            tree[key] = np.array([tree[key][i][0] for i in range(len(tree[key]))], dtype=np.float32)
    
    for key in ['run', 'event', 'lumi_block']:
        tree[key] = tree[key].astype(np.uint32)
    
    with uproot.recreate(output_path) as root_file:
        root_file["Events"] = tree

input_path = '/afs/cern.ch/user/j/joshin/public/Workspace-RPC/Log/NanoAOD-TnP/241017-RPCDPG/flatten/output_155.root'
cert_path = '/afs/cern.ch/user/j/joshin/public/RPCEffTnP/CMSSW_14_1_0/src/RPCDPGAnalysis/NanoAODTnP/data/cert/Run2024/Cert_Collisions2024_eraD_Golden.json'
flat_output_path = '/afs/cern.ch/user/j/joshin/public/Workspace-RPC/Log/NanoAOD-TnP/241017-RPCDPG/flatten/flat_output_155.root'
zip_output_path = '/afs/cern.ch/user/j/joshin/public/Workspace-RPC/Log/NanoAOD-TnP/241017-RPCDPG/flatten/zip_output_155.root'


flat_tnp = flatten_nanoaod(
    input_path=input_path,
    cert_path=cert_path,
    output_path=flat_output_path  
)

zip_tnp = mean_nanoaod(
    input_path=input_path,
    cert_path=cert_path,
    output_path=zip_output_path
)


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
