In [6]:
from dataclasses import dataclass, asdict
from functools import cache
from functools import cached_property
import numpy as np
import numpy.typing as npt
import pandas as pd
import awkward as ak
from matplotlib.patches import Polygon


@cache
def get_segment(ring: int, station: int, sector: int, subsector: int) -> int:
    """
    https://github.com/cms-sw/cmssw/blob/CMSSW_13_3_0_pre3/Geometry/RPCGeometry/src/RPCGeomServ.cc#L361-L368
    """
    nsub = 3 if ring == 1 and station > 1 else 6
    return subsector + nsub * (sector - 1)


@cache
def get_roll_name(region: int, ring: int, station: int, sector: int, layer: int,
             subsector: int, roll: int
) -> str:
    """
    https://github.com/cms-sw/cmssw/blob/CMSSW_13_3_0_pre3/Geometry/RPCGeometry/src/RPCGeomServ.cc#L11-L87
    """
    if region == 0:
        name = f'W{ring:+d}_RB{station}'

        if station <= 2:
            name += 'in' if layer == 1 else 'out'
        else:
            if sector == 4 and station == 4:
                name += ['--', '-', '+', '++'][subsector - 1]
            elif (station == 3) or (station == 4 and sector not in (4, 9, 11)):
                name += '-' if subsector == 1 else '+'
        name += f'_S{sector:0>2d}_'
        name += ['Backward', 'Middle', 'Forward'][roll - 1]
    else:
        segment = get_segment(ring, station, sector, subsector)
        name = f'RE{station * region:+d}_R{ring}_CH{segment:0>2d}_'
        name += ['A', 'B', 'C', 'D', 'E'][roll - 1]
    return name


@cache
def get_detector_unit(region: int, station: int, layer: int) -> str:
    """
    adapted from https://gitlab.cern.ch/seyang/RPCDPGAnalysis/-/blob/3d71b88e/SegmentAndTrackOnRPC/python/RPCGeom.py#L113-114
    """
    if region == 0:
        detector_unit = f'RB{station:d}'
        if station <= 2:
            detector_unit += 'in' if layer == 1 else 'out'
    else:
        detector_unit = f'RE{region * station:+d}'
    return detector_unit


@dataclass(frozen=True, unsafe_hash=True)
class RPCDetId:
    region: int
    ring: int
    station: int
    sector: int
    layer: int
    subsector: int
    roll: int

    @classmethod
    def from_obj(cls, obj):
        return cls(region=obj.region, ring=obj.ring, station=obj.station,
                   sector=obj.sector, layer=obj.layer, subsector=obj.subsector,
                   roll=obj.roll)

    @property
    def segment(self):
        return get_segment(ring=self.ring, station=self.station,
                           sector=self.sector, subsector=self.subsector)

    @property
    def name(self):
        kwargs = asdict(self)
        return get_roll_name(**kwargs)

    @property
    def detector_unit(self):
        return get_detector_unit(region=self.region, station=self.station,
                                 layer=self.layer)

    @property
    def barrel(self):
        return self.region == 0


@dataclass
class RPCRoll:
    id: RPCDetId
    x: npt.NDArray[np.float64]
    y: npt.NDArray[np.float64]
    z: npt.NDArray[np.float64]

    @classmethod
    def from_row(cls, row: pd.Series):
        x = row[[f'x{idx}' for idx in range(1, 5)]].to_numpy(np.float64)
        y = row[[f'y{idx}' for idx in range(1, 5)]].to_numpy(np.float64)
        z = row[[f'z{idx}' for idx in range(1, 5)]].to_numpy(np.float64)
        det_id = RPCDetId.from_obj(row)
        return cls(det_id, x, y, z)

    @cached_property
    def phi(self) -> npt.NDArray[np.float64]:
        phi = np.arctan2(self.y, self.x)
        phi[phi < 0] += 2 * np.pi
        if abs(phi[0] - phi[2]) > np.pi:
            phi[phi > np.pi] -= 2 * np.pi
        return phi

    @property
    def polygon(self) -> Polygon:
        # if barrel
        if self.id.barrel:
            xy = np.stack([self.z, self.phi], axis=1)
        else:
            xy = np.stack([self.x, self.y], axis=1)
        return Polygon(xy, closed=True)

    @property
    def polygon_xlabel(self) -> str:
        if self.id.barrel:
            xlabel = r'$z$ [cm]'
        else:
            xlabel = r'$x$ [cm]'
        return xlabel

    @property
    def polygon_ylabel(self) -> str:
        if self.id.barrel:
            ylabel = r'$\phi$ [radian]'
        else:
            ylabel = r'$y$ [cm]'
        return ylabel

    @property
    def polygon_ymax(self):
        if self.id.barrel:
            ymax = 7
        else:
            ymax = None
        return ymax


In [7]:
from typing import Optional, Union
from collections import defaultdict
from pathlib import Path
from typing import Optional, Union
import json
import numpy as np
import numpy.typing as npt
import pandas as pd
import uproot
import matplotlib.pyplot as plt
from matplotlib.colors import Colormap, ListedColormap
from matplotlib.patches import Polygon
from matplotlib.collections import PatchCollection
from mpl_toolkits.axes_grid1 import make_axes_locatable
import mplhep as mh
#from RPCDPGAnalysis.NanoAODTnP.RPCGeomServ import RPCRoll # type: ignore

In [16]:
def plot_patches(patches: list[Polygon],
                 values: npt.NDArray[np.float32],
                 mask: Optional[npt.NDArray[np.bool_]] = None,
                 cmap: Union[Colormap, str] = ListedColormap([
                    "red", "orangered", "tomato", "darkorange", "orange",
                    "gold", "yellow", "greenyellow", "lawngreen", "lime"
                 ]),
                 edgecolor: str = 'black',
                 ax: Optional[plt.Axes] = None,
                 vmin: Optional[Union[float, np.float32]] = None,
                 vmax: Optional[Union[float, np.float32]] = None,
                 lw: float = 1,
) -> plt.Figure:
    """
    """
    ax = ax or plt.gca()
    if vmin is None:
        vmin = np.nanmin(values)
    if vmax is None:
        vmax = np.nanmax(values)
    cmap = plt.get_cmap(cmap)

    normalized_values = (values - vmin) / (vmax - vmin)
    color = cmap(normalized_values)
    if mask is not None:
        color[mask] = np.nan

    collection = PatchCollection(patches)
    collection.set_color(color)
    collection.set_edgecolor(edgecolor)
    collection.set_linewidth(lw)
    ax.add_collection(collection)

    # add colobar
    ax.autoscale_view()
    scalar_mappable = plt.cm.ScalarMappable(
        cmap=cmap,
        norm=plt.Normalize(vmin=vmin, vmax=vmax) # FIXME
    )
    scalar_mappable.set_array([])
    axes_divider = make_axes_locatable(ax)
    cax = axes_divider.append_axes("right", size="5%", pad=0.2)
    cax.figure.colorbar(scalar_mappable, cax=cax, pad=0.1)

    return ax.figure


def plot_occ_detector_unit(h_total,
                           detector_unit: str,
                           roll_list: list[RPCRoll],
                           label: str,
                           year: Union[int, str],
                           com: float,
                           output_path: Optional[Path],
                           close: bool,
):
    """
    plot occupancy
    """
    name_list = [each.id.name for each in roll_list]

    total = h_total[name_list].values() # type: ignore

    patches = [each.polygon for each in roll_list]
    
    
    values = total
    inactive_roll_mask = total < 1

    vmin = 0
    vmax = np.max(total)

    fig, ax = plt.subplots(figsize=(12, 10))
    fig = plot_patches(
        patches=patches,
        values=values,
        mask=inactive_roll_mask,
        vmin=vmin,
        vmax=vmax,
        ax=ax
    )
    _, cax = fig.get_axes()

    xlabel = roll_list[0].polygon_xlabel
    ylabel = roll_list[0].polygon_ylabel
    ymax = roll_list[0].polygon_ymax

    ax.set_xlabel(xlabel, fontsize=30) # type: ignore
    ax.set_ylabel(ylabel, fontsize=30) # type: ignore

    cax_ylabel = 'Occupancy'

    cax.set_ylabel(cax_ylabel)
    cax.set_ylim(vmin, vmax)
    ax.set_ylim(None, ymax) # type: ignore
    ax.annotate(detector_unit, (0.05, 0.925), weight='bold',
                xycoords='axes fraction', fontsize=26) # type: ignore
    mh.cms.label(ax=ax, llabel="Work in Progress", com=com, year=year, fontsize=26)

    if output_path is not None:
        #for suffix in ['.png', '.pdf']:
        for suffix in ['.png']:
            fig.savefig(output_path.with_suffix(suffix))

    if close:
        plt.close(fig)

    return fig


def plot_occ_detector(input_path: Path,
                      geom_path: Path,
                      output_dir: Path,
                      com: float,
                      year: Union[int, str],
                      label: str,
                      roll_blacklist_path: Optional[Path] = None,
):
    input_file = uproot.open(input_path)

    if roll_blacklist_path is None:
        roll_blacklist = set()
    else:
        with open(roll_blacklist_path) as stream:
            roll_blacklist = set(json.load(stream))

    h_total: Hist = input_file['total'].to_hist() # type: ignore
    h_passed: Hist = input_file['passed'].to_hist() # type: ignore

    geom = pd.read_csv(geom_path)
    roll_list = [RPCRoll.from_row(row)
                 for _, row in geom.iterrows()
                 if row.roll_name not in roll_blacklist]

    if not output_dir.exists():
        output_dir.mkdir(parents=True)

    # wheel (or disk) to rolls
    unit_to_rolls = defaultdict(list)
    for roll in roll_list:
        unit_to_rolls[roll.id.detector_unit].append(roll)

    
    for detector_unit, roll_list in unit_to_rolls.items():
        output_path = output_dir / detector_unit
        plot_occ_detector_unit(
            h_total,
            detector_unit=detector_unit,
            roll_list=roll_list,
            label=label,
            year=year,
            com=com,
            output_path=output_path,
            close=True
        )


In [18]:
from pathlib import Path
import argparse
import matplotlib as mpl
import mplhep as mh
#from RPCDPGAnalysis.NanoAODTnP.Plotting import plot_eff_detector # type: ignore

mpl.use('agg')
mh.style.use(mh.styles.CMS)

working_dir = Path('/u/user/sjws5411/Workspace/Efficiency/CMSSW_14_1_0_pre2/Workspace-RPC/240425-TnP_RPC24/TnP_Plotting')

data_list = [
    'Run2022B', 'Run2022C', 'Run2022D', 'Run2022E', 'Run2022F', 'Run2022G',
    'Run2023B', 'Run2023C', 'Run2023D',
    'Run2022', 'Run2023'
]

for data in data_list:
    plot_occ_detector(
        input_path = working_dir / 'data' / f'{data}.root',
        geom_path = working_dir / 'geometry' / 'run3.csv',
        output_dir = working_dir / 'plotting' / 'OccByDet' / data,
        com = 13.6,
        year = data[3:],
        label = 'Work in Progress',
        roll_blacklist_path = working_dir / 'blacklist' / 'roll-blacklist.json',
    )
    

  return super().__getitem__(self._index_transform(index))
  return super().__getitem__(self._index_transform(index))
  return super().__getitem__(self._index_transform(index))
  return super().__getitem__(self._index_transform(index))
  return super().__getitem__(self._index_transform(index))
  return super().__getitem__(self._index_transform(index))
  return super().__getitem__(self._index_transform(index))
  return super().__getitem__(self._index_transform(index))
  return super().__getitem__(self._index_transform(index))
  return super().__getitem__(self._index_transform(index))
  return super().__getitem__(self._index_transform(index))
