In [1]:
import parsl
import os
from parsl.app.app import python_app, bash_app
from parsl.configs.local_threads import config

from parsl.providers import LocalProvider,CondorProvider
from parsl.channels import LocalChannel,SSHChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor

from parsl.addresses import address_by_hostname

x509_proxy = 'x509up_u%s'%(os.getuid())
year = '2018'

wrk_init = '''
source /cvmfs/sft.cern.ch/lcg/views/LCG_95apython3/x86_64-centos7-gcc7-opt/setup.sh
export PATH=`pwd`/.local/bin:$PATH
export PYTHONPATH=`pwd`/.local/lib/python3.6/site-packages:$PYTHONPATH

export X509_USER_PROXY=`pwd`/%s
mkdir -p decaf_parsl
'''%(x509_proxy, year)

twoGB = 2048
nproc = 8

condor_cfg = '''
transfer_output_files = decaf_parsl
RequestMemory = %d
RequestCpus = %d
''' % (twoGB*nproc, nproc)

xfer_files = ['%s/.local' % (os.environ['HOME'], ), '%s/%s' % (os.environ['HOME'], x509_proxy, )]

condor_htex = Config(
    executors=[
        HighThroughputExecutor(
            label="decaf_parsl",
            address=address_by_hostname(),
            prefetch_capacity=0,
            cores_per_worker=1,
            max_workers=nproc,
            worker_logdir_root='./',
            provider=CondorProvider(
                channel=LocalChannel(),
                init_blocks=16,
                max_blocks=200,
                nodes_per_block=1,
                worker_init = wrk_init,                
                transfer_input_files=xfer_files,
                scheduler_options=condor_cfg
            ),
        )
    ],
    strategy=None,
)

#parsl.set_stream_logger() # <-- log everything to stdout

dfk = parsl.load(condor_htex)

chunksize=500000


In [None]:
lumis = {}
#Values from https://twiki.cern.ch/twiki/bin/viewauth/CMS/PdmVAnalysisSummaryTable
lumis['2016']=35.92
lumis['2017']=41.53
lumis['2018']=59.97
lumi = 1000.*float(lumis[year])

In [None]:
import json

with open("../harvester/beans/"+year+".json") as fin:
    samplefiles = json.load(fin)
xsec = {k: v['xs'] for k,v in samplefiles.items()}

In [None]:
import time
from coffea import hist, processor
from analysis.darkhiggs import AnalysisProcessor,samples
import gzip
import pickle
import cloudpickle
import numpy as np

filelist = {}
selections = []
fileslice = slice(None)

for dataset, info in samplefiles.items():
    #if your_wanted_dataset not in dataset: continue
    files = []
    for file in info['files'][fileslice]:
        files.append(file)
    filelist[dataset] = files
    for selection,v in samples.items():
        for i in range (0,len(v)):
            if v[i] not in dataset: continue
            selections.append(selection)

    processor_instance=AnalysisProcessor(selected_regions=selections, year=year, xsec=xsec, lumi=lumi)
    tstart = time.time()
    output = processor.run_parsl_job(filelist,
                                      treename='Events',
                                      processor_instance=processor_instance,
                                      executor=processor.parsl_executor,
                                      executor_args={'config':None},
                                      data_flow=dfk
                                      chunksize=500000,
                                      )
    nbins = sum(sum(arr.size for arr in h._sumw.values()) for h in output.values() if isinstance(h, hist.Hist))
    nfilled = sum(sum(np.sum(arr > 0) for arr in h._sumw.values()) for h in output.values() if isinstance(h, hist.Hist))
    print("Filled %.1fM bins" % (nbins/1e6, ))
    print("Nonzero bins: %.1f%%" % (100*nfilled/nbins, ))

    # Pickle is not very fast or memory efficient, will be replaced by something better soon
    #    with lz4f.open("pods/"+options.year+"/"+dataset+".pkl.gz", mode="xb", compression_level=5) as fout:
    with gzip.open("pods/"+year+"/"+dataset+".pkl.gz", "wb") as fout:
        cloudpickle.dump(output, fout)
        
    dt = time.time() - tstart


In [None]:
parsl.dfk().cleanup()
parsl.clear()
