# Four Muon Spectrum - with apache spark

This code is another showcase of the awkward array toolset, and utilizing FCAT histograms in addition to advanced functionality.
This shows the analysis object syntax implemented by FCAT `JaggedCandidateArray`, along with a multi-tiered physics selection, and the usage of an accumulator class provided by FCAT.  We now add in the concept of corrections as well in the case of a Monte-Carlo sample.

Instead of using a local multithreading option for scale-out, we use apache spark.

In [None]:
import os
import sys
import pyspark.sql

from fnal_column_analysis_tools.processor.spark.detail import (_spark_initialize,
                                                               _spark_stop)

os.environ['PYSPARK_PYTHON'] = sys.executable

# Configuring Spark
The spark session is configured by specifying a 'master', the computer that submits the jobs.
In this case the master is 'local[\*]' which requests that spark use all available cores on your computer for computation. The 'execution.arrow' related statements drive optimizations within Apache Spark that allow columnar data processing to be done efficiently as vectorized operations.

At the ACCRE cluster if master is not specified you will instead launch jobs that can use all the cores available in in the system (up to nearly 1000). However, for the demonstrations today we just want to show how to use spark to do the basics of analysis.

In [None]:
spark_config = pyspark.sql.SparkSession.builder \
    .appName('spark-executor-test') \
    .master('local[*]') \
    .config('spark.sql.execution.arrow.enabled','true') \
    .config('spark.sql.execution.arrow.maxRecordsPerBatch', 200000)

# This function creates the spark session based on the configuration passed to it.
spark = _spark_initialize(config=spark_config, log_level='ERROR', spark_progress=False)

In [None]:
import time

from fnal_column_analysis_tools import hist
from fnal_column_analysis_tools.hist import plot
from fnal_column_analysis_tools.analysis_objects import JaggedCandidateArray
import fnal_column_analysis_tools.processor as processor
from awkward import JaggedArray
import numpy as np

In [None]:
# Look at ProcessorABC to see the expected methods and what they are supposed to do
class FancyDimuonProcessor(processor.ProcessorABC):
    def __init__(self, columns = []):
        self._columns = columns
        dataset_axis = hist.Cat("dataset", "Primary dataset")
        mass_axis = hist.Bin("mass", r"$m_{\mu\mu}$ [GeV]", 600, 0.25, 300)
        pt_axis = hist.Bin("pt", r"$p_{T,\mu}$ [GeV]", 3000, 0.25, 300)
        
        self._accumulator = processor.dict_accumulator({
            'mass': hist.Hist("Counts", dataset_axis, mass_axis),
            'mass_near': hist.Hist("Counts", dataset_axis, mass_axis),
            'mass_far': hist.Hist("Counts", dataset_axis, mass_axis),
            'pt_lead': hist.Hist("Counts", dataset_axis, pt_axis),
            'pt_trail': hist.Hist("Counts", dataset_axis, pt_axis),
            'cutflow': processor.defaultdict_accumulator(int),
        })
    
    @property
    def accumulator(self):
        return self._accumulator
    
    @property
    def columns(self):
        return self._columns
    
    def process(self, df):
        output = self.accumulator.identity()
        
        dataset = df['dataset']
        muons = JaggedCandidateArray.candidatesfromcounts(
            df['nMuon'],
            pt=df['Muon_pt'],
            eta=df['Muon_eta'],
            phi=df['Muon_phi'],
            mass=df['Muon_mass'],
            charge=df['Muon_charge'],
            softId=df['Muon_softId'],
            tightId=df['Muon_tightId']
            )        
        
        output['cutflow']['all events'] += muons.size
        
        soft_id = (muons.softId > 0)
        muons = muons[soft_id]
        output['cutflow']['soft id'] += soft_id.any().sum()
        
        twomuons = (muons.counts >= 2)
        output['cutflow']['two muons'] += twomuons.sum()
        
        dimuons = muons[twomuons].distincts()
        
        twodimuons = (dimuons.counts >= 2)
        output['cutflow']['>= two dimuons'] += twodimuons.sum()
        dimuons = dimuons[twodimuons]
        
        opposite_charge = (dimuons.i0['charge'] * dimuons.i1['charge'] == -1)
        
        dimuons = dimuons[opposite_charge]
        output['cutflow']['opposite charge'] += opposite_charge.any().sum()
        
        mass_20GeV = (dimuons.mass > 35)
        dimuons = dimuons[mass_20GeV]
        
        exactlytwodimuons = (dimuons.counts == 2)
        output['cutflow']['== two dimuons'] += exactlytwodimuons.sum()
        dimuons = dimuons[exactlytwodimuons].compact()
        
        leading_mu = (dimuons.i0.pt.content > dimuons.i1.pt.content)
        pt_lead = JaggedArray.fromoffsets(dimuons.offsets, np.where(leading_mu, 
                                                                    dimuons.i0.pt.content, dimuons.i1.pt.content))
        pt_trail = JaggedArray.fromoffsets(dimuons.offsets, np.where(~leading_mu, 
                                                                     dimuons.i0.pt.content, dimuons.i1.pt.content))
        
        near_z = np.abs(dimuons.mass - 91.118).argmin()
        far_z = np.abs(dimuons.mass - 91.118).argmax()
        
        output['mass'].fill(dataset=dataset,
                            mass=dimuons.p4.sum().mass)
        output['mass_near'].fill(dataset=dataset, 
                                 mass=dimuons.mass[near_z].flatten())
        output['mass_far'].fill(dataset=dataset, 
                                mass=dimuons.mass[far_z].flatten())
        output['pt_lead'].fill(dataset=dataset,
                               pt=pt_lead.flatten())
        output['pt_trail'].fill(dataset=dataset,
                                pt=pt_trail.flatten())
        return output

    def postprocess(self, accumulator):
        return accumulator

In [None]:
from fnal_column_analysis_tools.processor import run_spark_job
from fnal_column_analysis_tools.processor.spark.spark_executor import spark_executor

tstart = time.time()    

fileset = {
    'DoubleMuon': [
        'data/Run2012B_DoubleMuParked.root',
        'data/Run2012C_DoubleMuParked.root',
    ],
    'ZZ to 4mu': [
        'data/ZZTo4mu.root'
    ]
}

columns = ['nMuon','Muon_pt','Muon_eta','Muon_phi','Muon_mass',
           'Muon_charge','Muon_softId','Muon_tightId']
proc = FancyDimuonProcessor(columns=columns)

output = run_spark_job(filelist, processor_instance=proc, executor=spark_executor, spark=spark, thread_workers=2)

elapsed = time.time() - tstart
print(output)

In [None]:
_spark_stop(spark)

In [None]:
fig, ax, _ = plot.plot1d(output['mass'], overlay='dataset')
ax.set_xlim(70,150)
ax.set_ylim(0, 3000)

In [None]:
fig, ax, _ = plot.plot1d(output['mass_near'], overlay='dataset')
#ax.set_xscale('log')
#ax.set_yscale('log')
ax.set_xlim(60,120)
ax.set_ylim(0.1, 7500)

In [None]:
fig, ax, _ = plot.plot1d(output['mass_far'], overlay='dataset')
#ax.set_xscale('log')
#ax.set_yscale('log')
ax.set_ylim(0.1, 8000)

In [None]:
fig, ax, _ = plot.plot1d(output['pt_lead'], overlay='dataset')
#ax.set_xscale('log')
ax.set_yscale('log')
ax.set_ylim(0.1, 5e3)

In [None]:
fig, ax, _ = plot.plot1d(output['pt_trail'], overlay='dataset')
#ax.set_xscale('log')
ax.set_yscale('log')
ax.set_ylim(0.1, 2e4)

In [None]:
print("Events/s:", output['cutflow']['all events']/elapsed)