In [None]:
# NB: This is for spark running on parquet files converted from
#     baconbits skims. This is a prototype, there is lots of boilerplate.
#     We're making it better :-)

import pyspark.sql
import os
import sys

# The following line is necessary because we're working in a
# virtualenv. Without it, executors will use the wrong interpreter!
os.environ['PYSPARK_PYTHON'] = sys.executable

session = pyspark.sql.SparkSession.builder \
    .appName("baconbits-spark") \
    .config('spark.executor.memory', "16g") \
    .config('spark.executor.cores', "4") \
    .config('spark.sql.execution.arrow.enabled',"true") \
    .config('spark.sql.execution.arrow.maxRecordsPerBatch', 500000) \
    .config('spark.driver.maxResultSize',0) \
    .config('spark.dynamicAllocation.minExecutors',37) \
    .config('spark.dynamicAllocation.maxExecutors',250) \
    .config('spark.cores.max',1000) \
    .getOrCreate()
sc = session.sparkContext
sc.setLogLevel("INFO")
spark = session

nparts_per_file = 3
thread_workers = 16

In [None]:
import pyspark.sql.functions as fn
from tqdm import tqdm
import json

datasets = {}

with open('metadata/samplefiles.json') as f:
    temp = json.load(f)
    for dsgroup,datasetlist in temp.items():
        if dsgroup != 'Hbb_2017': continue
        datasets = datasetlist

In [None]:
import concurrent
from concurrent.futures import ThreadPoolExecutor
import pyspark.sql.functions as fn
from pyspark.sql.types import DoubleType
from columns import gghbbcolumns,gghbbcolumns_mc

allcolumns = gghbbcolumns + gghbbcolumns_mc

skim_root = 'bitsconvert_17042019'

def build_union(uniondf,df):
    if uniondf is None:
        return df
    else:
        return uniondf.union(df)

def read_df(dsloc):
    return spark.read.parquet('hdfs:///store/parquet/zprimebits/%s/%s/'%(skim_root,dsloc))

def count_df(df):
    return df.count()
    
alldfs = None

dfslist = {}
with ThreadPoolExecutor(max_workers=thread_workers) as executor:
    future_to_ds = {executor.submit(read_df,dataset): dataset for dataset in datasets.keys()}
    for future in tqdm(concurrent.futures.as_completed(future_to_ds),
                       total=len(future_to_ds),
                       desc='loading datasets'):
        dataset = future_to_ds[future]       
        df = future.result()
        df = df.withColumn('dataset', fn.lit(dataset))
        isData = False
        for mccol in gghbbcolumns_mc:
            if mccol not in df.columns:
                isdata = True
                df = df.withColumn(mccol, fn.lit(0.0))
        nfiles = len(os.listdir('/mnt/hdfs/store/parquet/zprimebits/%s/%s/'%(skim_root,dataset)))
        df = df.coalesce(nparts_per_file*nfiles)
        dfslist[dataset] = df

dfcounts = {}
with ThreadPoolExecutor(max_workers=thread_workers) as executor:
    future_to_ds = {executor.submit(count_df,df): df for ds,df in dfslist.items()}
    for future in tqdm(concurrent.futures.as_completed(future_to_ds),
                       total=len(future_to_ds),
                       desc='counting events'):
        dfcounts[future_to_ds[future]] = future.result()

In [None]:
print('N events to process:',sum(dfcounts.values()))

In [None]:
#get the hbb analysis worker from the cloudpickle file
import cloudpickle as cpkl
import lz4.frame as lz4f

processor_pkl = 'boostedHbbProcessor.cpkl.lz4'
processor_instance = None
with lz4f.open(processor_pkl, mode="rb") as fin:
    processor_instance = cpkl.load(fin)

In [None]:
import time
import pandas as pd
import numpy as np
import lz4.frame as lz4f
import pyspark.sql.functions as fn
from pyspark.sql.types import BinaryType
from fnal_column_analysis_tools import processor

lz4_clevel = 1

@fn.pandas_udf(BinaryType(), fn.PandasUDFType.SCALAR)
def compute(dataset,
            AK4Puppijet0_dPhi08,AK4Puppijet0_dR08,AK4Puppijet0_deepcsvb,AK4Puppijet0_pt,AK4Puppijet1_dPhi08,
            AK4Puppijet1_dR08,AK4Puppijet1_deepcsvb,AK4Puppijet1_pt,AK4Puppijet2_dPhi08,AK4Puppijet2_dR08,
            AK4Puppijet2_deepcsvb,AK4Puppijet2_pt,AK4Puppijet3_dPhi08,AK4Puppijet3_dR08,AK4Puppijet3_deepcsvb,
            AK4Puppijet3_pt,AK8Puppijet0_N2sdb1,AK8Puppijet0_deepdoubleb,AK8Puppijet0_deepdoublec,
            AK8Puppijet0_deepdoublecvb,AK8Puppijet0_eta,AK8Puppijet0_isHadronicV,AK8Puppijet0_isTightVJet,
            AK8Puppijet0_msd,AK8Puppijet0_phi,AK8Puppijet0_pt,AK8Puppijet0_pt_JERDown,AK8Puppijet0_pt_JERUp,
            AK8Puppijet0_pt_JESDown,AK8Puppijet0_pt_JESUp,AK8Puppijet1_e3_v1_sdb1,AK8Puppijet1_e4_v2_sdb1,
            AK8Puppijet1_msd,AK8Puppijet1_phi,AK8Puppijet1_tau32,MetXCorrjerDown,MetXCorrjerUp,MetXCorrjesDown,
            MetXCorrjesUp,MetYCorrjerDown,MetYCorrjerUp,MetYCorrjesDown,MetYCorrjesUp,nAK4PuppijetsPt30,
            neleLoose,nmuLoose,npu,ntau,passJson,pfmet,pfmetphi,scale1fb,triggerBits,vmuoLoose0_eta,
            vmuoLoose0_phi,vmuoLoose0_pt,kfactorEWK,kfactorQCD,genVPhi,genVMass,genVPt):
    global processor_instance, lz4_clevel
    
    
        
    columns = [AK4Puppijet0_dPhi08,AK4Puppijet0_dR08,AK4Puppijet0_deepcsvb,AK4Puppijet0_pt,AK4Puppijet1_dPhi08,
               AK4Puppijet1_dR08,AK4Puppijet1_deepcsvb,AK4Puppijet1_pt,AK4Puppijet2_dPhi08,AK4Puppijet2_dR08,
               AK4Puppijet2_deepcsvb,AK4Puppijet2_pt,AK4Puppijet3_dPhi08,AK4Puppijet3_dR08,AK4Puppijet3_deepcsvb,
               AK4Puppijet3_pt,AK8Puppijet0_N2sdb1,AK8Puppijet0_deepdoubleb,AK8Puppijet0_deepdoublec,
               AK8Puppijet0_deepdoublecvb,AK8Puppijet0_eta,AK8Puppijet0_isHadronicV,AK8Puppijet0_isTightVJet,
               AK8Puppijet0_msd,AK8Puppijet0_phi,AK8Puppijet0_pt,AK8Puppijet0_pt_JERDown,AK8Puppijet0_pt_JERUp,
               AK8Puppijet0_pt_JESDown,AK8Puppijet0_pt_JESUp,AK8Puppijet1_e3_v1_sdb1,AK8Puppijet1_e4_v2_sdb1,
               AK8Puppijet1_msd,AK8Puppijet1_phi,AK8Puppijet1_tau32,MetXCorrjerDown,MetXCorrjerUp,MetXCorrjesDown,
               MetXCorrjesUp,MetYCorrjerDown,MetYCorrjerUp,MetYCorrjesDown,MetYCorrjesUp,nAK4PuppijetsPt30,
               neleLoose,nmuLoose,npu,ntau,passJson,pfmet,pfmetphi,scale1fb,triggerBits,vmuoLoose0_eta,
               vmuoLoose0_phi,vmuoLoose0_pt,kfactorEWK,kfactorQCD,genVPhi,genVMass,genVPt]
    
    size = AK4Puppijet0_dPhi08.values.size
    items = {name:col.values for name,col in zip(allcolumns,columns)}
    df = processor.PreloadedDataFrame(size=size, items=items)
    df['dataset'] = dataset[0]
    
    tic = time.time()
    
    hists = processor_instance.process(df)
    
    histblob = lz4f.compress(cpkl.dumps(hists),compression_level=lz4_clevel)
        
    outs = np.full_like(AK8Puppijet0_phi.values,b'',dtype='O')
    outs[0] = histblob
    
    return pd.Series(outs)

@fn.pandas_udf(BinaryType(), fn.PandasUDFType.GROUPED_AGG)
def agg_histos(df):
    global lz4_clevel
    goodlines = df[df.str.len() > 0]
    outhist = None
    for line in goodlines:
        temp = cpkl.loads(lz4f.decompress(line))
        if outhist is None:
            outhist = temp
        else:
            for key,val in temp.items():
                outhist[key] += val
    return lz4f.compress(cpkl.dumps(outhist),compression_level=lz4_clevel)


In [None]:
cacheddfs = {ds:df.select(*tuple(["dataset"]+allcolumns)).cache() for ds,df in dfslist.items()}

In [None]:
def runanalysis(df):    
    return df.withColumn('partid', fn.spark_partition_id()) \
             .withColumn('histos', compute(*tuple(["dataset"]+allcolumns))) \
             .select('partid','histos') \
             .groupBy('partid').agg(agg_histos('histos')) \
             .groupBy().agg(agg_histos('agg_histos(histos)')) \
             .toPandas()

tic = time.time()
theresults = {}
with ThreadPoolExecutor(max_workers=thread_workers) as executor:
    future_to_ds = {executor.submit(runanalysis,df):ds for ds,df in cacheddfs.items()}
    for future in tqdm(concurrent.futures.as_completed(future_to_ds),
                       total=len(future_to_ds),
                       desc='analysis jobs'):
        theresults[future_to_ds[future]] = future.result()
dt = time.time() - tic

In [None]:
nevt = sum(dfcounts.values())
print('total time: ',dt/60)
print("μs/evt", dt/nevt*1e6)
print("Mevt/s", nevt/dt/1e6)

In [None]:
from copy import deepcopy

#unpack the returned histogram bytestrings
myresults = deepcopy(list(theresults.values()))
start = myresults.pop()
start = start[start.columns[0]][0]
final_accumulator = cpkl.loads(lz4f.decompress(start))
for bitstream in myresults:
    if bitstream.empty: continue
    for key,ahist in cpkl.loads(lz4f.decompress(bitstream[bitstream.columns[0]][0])).items():
        final_accumulator[key] += ahist

#fill sumw with the normalizations of the full samples
normlist = None
with lz4f.open('correction_files/sumw_mc.cpkl.lz4','rb') as fin:
    normlist = cpkl.load(fin)

final_accumulator['sumw'] = deepcopy(normlist)

In [None]:
from fnal_column_analysis_tools import hist
import gzip
import pickle
import numexpr

processor_instance.postprocess(final_accumulator)

nbins = sum(sum(arr.size for arr in h._sumw.values()) for h in final_accumulator.values() if isinstance(h, hist.Hist))
nfilled = sum(sum(np.sum(arr>0) for arr in h._sumw.values()) for h in final_accumulator.values() if isinstance(h, hist.Hist))
print("Processed %.1fM events" % (nevt/1e6, ))
print("Filled %.1fM bins" % (nbins/1e6, ))
print("Nonzero bins: %.1f%%" % (100*nfilled/nbins, ))

# Pickle is not very fast or memory efficient, will be replaced by something better soon
with lz4f.open("hists.cpkl.lz4", mode="wb", compression_level=6) as fout:
    cpkl.dump(final_accumulator, fout)

#dt = time.time() - tstart
#print("%.2f us*cpu/event overall" % (1e6*dt*nworkers/final_accumulators['nentries'], ))

In [None]:
spark.stop()