In [None]:
import awkward as ak
import hist
from coffea import processor
import corrections


class MyZPeak(processor.ProcessorABC):
    def process(self, events):
        dataset = events.metadata['dataset']
        isRealData = "genWeight" not in events.fields
        sumw = 0. if isRealData else ak.sum(events.genWeight)
        cutflow = {"start": len(events)}
        
        if isRealData:
            events = events[
                corrections.lumimask(events.run, events.luminosityBlock)
            ]
            cutflow["lumimask"] = len(events)
    
        events["goodmuons"] = events.Muon[
            (events.Muon.pt >= 20.)
            & events.Muon.tightId
        ]

        events = events[
            (ak.num(events.goodmuons) == 2)
            & (ak.sum(events.goodmuons.charge, axis=1) == 0)
        ]
        cutflow["ossf"] = len(events)
        
        # add first and second muon p4 in every event together
        events["zcand"] = events.goodmuons[:, 0] + events.goodmuons[:, 1]

        # require trigger
        events = events[
            # https://twiki.cern.ch/twiki/bin/view/CMS/MuonHLT2018
            events.HLT.Mu17_TrkIsoVVL_Mu8_TrkIsoVVL_DZ_Mass3p8
        ]
        cutflow["trigger"] = len(events)

        return {
            dataset: {
                "entries": len(events),
                "sumw": sumw,
                "cutflow": cutflow,
                "mass": (
                    hist.Hist.new
                    .Reg(120, 0., 120., label="$m_{\mu\mu}$ [GeV]")
                    .Double()
                    .fill(events.zcand.mass)
                )
            }
        }

    def postprocess(self, accumulator):
        return accumulator

In [None]:
from dask.distributed import Client

client = Client("tls://...:8786")
client

In [None]:
import shutil
shutil.make_archive("corrections", "zip", base_dir="corrections")
client.upload_file("corrections.zip")

In [None]:
result = processor.run_uproot_job(
    "fileset.json",
    "Events",
    MyZPeak(),
    processor.dask_executor,
    {"schema": processor.NanoAODSchema, "client": client},
    # maxchunks=5,
)

In [None]:
data = result["DoubleMuon2018A"]["mass"]

lumi = 14.0
xsweight = lumi * 1e3 * 6225.42 / result["ZJets2018"]["cutflow"]["start"]
sim = result["ZJets2018"]["mass"] * xsweight

In [None]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots()
sim.plot(ax=ax, histtype="fill", label="Z+jets")
data.plot(ax=ax, histtype="errorbar", color="k", label="Data")
ax.set_xlim(60, 120)
ax.legend()

In [None]:
result["DoubleMuon2018A"]["cutflow"]