In this demo, we first show how to use the Arrow `Dataset` API `SkyhookFileFormat` API to scan parquet files by pushing down scan opertations into Ceph and then we show how to use the `Dataset` API to process parquet files containing NanoEvents stored in Ceph in parallel through Coffea using Dask.

## Exploring SkyhookFileFormat with PyArrow

We import the Dataset API and the Parquet API from PyArrow.

In [1]:
import pyarrow
import pyarrow.dataset as ds
import pyarrow.parquet as pq

Now, we will instantiate the `SkyhookFileFormat`. Upon instantiation, the connection to the Ceph cluster is made under the hood. The connection is closed automatically upon object destruction. The `SkyhookFileFormat` API currently takes the Ceph configuration file as input. It inherits from the `FileFormat` API and uses the `DirectObjectAccess` API under the hood to interact with the underlying objects that make up a file in CephFS. Since, we mount CephFS, we use the `FileSystemDataset` that comes out of the box with Apache Arrow for instantiating our dataset, as by mounting CephFS we have just another directory of Parquet files. Having the suitability of using the `FileSystemDataset`, we just can start pushing down scan operations to our Parquet files by just plugging in `SkyhookFileFormat` in the format paramter. 

In [2]:
dataset = ds.dataset("file:///mnt/cephfs/nyc", format=ds.SkyhookFileFormat("parquet", "/opt/ceph/ceph.conf", "cephfs-data0"))

AttributeError: module 'pyarrow.dataset' has no attribute 'SkyhookFileFormat'

Now we apply some projections and filters on the dataset.

In [3]:
dataset.to_table(columns=["total_amount", "fare_amount"], filter=(ds.field("trip_distance") > 20.0)).to_pandas()

Unnamed: 0,total_amount,fare_amount
0,75.84,52.00
1,69.99,52.00
2,59.84,53.00
3,68.50,53.50
4,70.01,52.00
...,...,...
376,78.88,67.00
377,64.84,58.50
378,0.31,0.01
379,58.80,57.50


## Import the required modules

Import `uproot`, `awkward`, `coffea`.

In [4]:
import uproot
import awkward as ak
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from coffea import processor, hist

ImportError: Numba needs NumPy 1.21 or less

## Define a Processor instance

The processor implementation given below has been taken from [here](https://github.com/CoffeaTeam/coffea/blob/master/binder/nanoevents.ipynb).

In [5]:
class MyZPeak(processor.ProcessorABC):
    def __init__(self):
        self._histo = hist.Hist(
            "Events",
            hist.Cat("dataset", "Dataset"),
            hist.Bin("mass", "Z mass", 60, 60, 120),
        )
    
    @property
    def accumulator(self):
        return self._histo
    
    # we will receive a NanoEvents instead of a coffea DataFrame
    def process(self, events):
        out = self.accumulator.identity()
        mmevents = events[
            (ak.num(events.Muon) == 2)
            & (ak.sum(events.Muon.charge, axis=1) == 0)
        ]
        zmm = mmevents.Muon[:, 0] + mmevents.Muon[:, 1]
        out.fill(
            dataset=events.metadata["dataset"],
            mass=zmm.mass,
        )
        return out
    
    def postprocess(self, accumulator):
        return accumulator

## Write some NanoEvents Parquet files to CephFS

Here we populate the CephFS mounted directory with the parquet files created in the previous step. In this version, we need to make sure that the individual file sizes is under 4MB which is the default object size of Ceph to ensure one-to-one mapping of files to objects, which is a requirement in the multiple-file design that we have now.

In [6]:
import os

ak.to_parquet(
    uproot.lazy("nano_dy.root:Events"),
    "nano_dy.parquet",
    list_to32=True,
    use_dictionary=False,
    compression="GZIP",
    compression_level=1,
)

ak.to_parquet(
    uproot.lazy("nano_dimuon.root:Events"),
    "nano_dimuon.parquet",
    list_to32=True,
    use_dictionary=False,
    compression="GZIP",
    compression_level=1,
)

os.makedirs("/mnt/cephfs/nanoevents/ZJets", exist_ok=True)
os.makedirs("/mnt/cephfs/nanoevents/Data", exist_ok=True)
for i in range(5):
    os.system(f"cp nano_dy.parquet /mnt/cephfs/nanoevents/ZJets/nano_dy.{i}.parquet")
    os.system(f"cp nano_dimuon.parquet /mnt/cephfs/nanoevents/Data/nano_dimuon.{i}.parquet")

In [7]:
!ls /mnt/cephfs/nanoevents/Data

/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
nano_dimuon.0.parquet  nano_dimuon.3.parquet  nano_dimuon.parquet
nano_dimuon.1.parquet  nano_dimuon.4.parquet
nano_dimuon.2.parquet  nano_dimuon.5.parquet


In [8]:
!ls /mnt/cephfs/nanoevents/ZJets

/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
nano_dy.0.parquet  nano_dy.2.parquet  nano_dy.4.parquet
nano_dy.1.parquet  nano_dy.3.parquet  nano_dy.parquet


## Reading Nanoevents using SkyhookFileFormat

In [None]:
events_skyhook = NanoEventsFactory.from_parquet("/mnt/cephfs/nanoevents/ZJets/nano_dy.0.parquet", skyhook_options = {"ceph_config_path": "/opt/ceph/ceph.conf", "ceph_data_pool": "cephfs_data0"}).events()
ak.flatten([events_skyhook.Muon[i].pt for i in range(len(events_skyhook.Muon)) if len(events_skyhook.Muon[i])]).to_numpy()

## Running a job in parallel using Dask

The `LocalCluster()` used below creates a process pool with worker count equal to the number of cores available to the Notebook where each worker is single-threaded. The `LocalCluster` can be replaced by other cluster resource managers provided by Dask Distributed like `KuberneresCluster`, `YarnCluster`, etc. Here, we create a `LocalCluster` and get a client handle to it.

In [9]:
from dask.distributed import Client

client = Client("tcp://127.0.0.1:35507")
client

0,1
Connection method: Direct,
Dashboard: /user/oksana.shadura@cern.ch/proxy/8787/status,

0,1
Comm: tcp://127.0.0.1:35507,Workers: 1
Dashboard: /user/oksana.shadura@cern.ch/proxy/8787/status,Total threads: 6
Started: 1 minute ago,Total memory: 23.42 GiB

0,1
Comm: tcp://127.0.0.1:36480,Total threads: 6
Dashboard: /user/oksana.shadura@cern.ch/proxy/38067/status,Memory: 23.42 GiB
Nanny: tcp://127.0.0.1:40557,
Local directory: /home/cms-jovyan/dask-worker-space/worker-i2oiu_xt,Local directory: /home/cms-jovyan/dask-worker-space/worker-i2oiu_xt
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 132.79 MiB,Spilled bytes: 0 B
Read bytes: 14.02 kiB,Write bytes: 18.01 kiB


We have added a new function called `run_parquet_job` to the executor API in coffea to run jobs on Parquet files using the Arrow Dataset API under the hood. 
This API takes an optional `ceph_config_path` parameter, which is basically the path to the configuration file of the Ceph cluster and instructs this function to read from RADOS using the `SkyhookFileFormat` (which allows pushdown) instead of the out of the box `ParquetFormat` API . This API also allows just passing a single directory path and the Datasets API does the dataset discovery task by itself. The calls to the Dataset API are launced in parallel and there will one Dataset API call per file.

In [12]:
%%time
from coffea import processor
from coffea.nanoevents import schemas

executor = processor.DaskExecutor(client=client)

run = processor.Runner(
    executor=executor,
    use_skyhook=True,
    savemetrics=True,
    skyhook_options = {"ceph_config_path": "/opt/ceph/ceph.conf", "ceph_data_pool": "cephfs_data0"},
    format="parquet",
    schema=schemas.NanoAODSchema,
)

hists = run(
    {
        "ZJets": "/mnt/cephfs/nanoevents/ZJets",
        "Data": "/mnt/cephfs/nanoevents/Data",
    },
    "Events",
    processor_instance=MyZPeak(),
)

The history saving thread hit an unexpected error (OperationalError('database or disk is full')).History will not be written to the database.
[##                                      ] | 6% Completed |  8.0s

RuntimeError: Work item WorkItem(dataset='Data', filename='/opt/ceph/ceph.conf:cephfs_data0:/mnt/cephfs/nanoevents/Data/nano_dimuon.parquet', treename='Events', entrystart=0, entrystop=0, fileuuid='', usermeta=None) caused a KilledWorker exception (likely a segfault or out-of-memory issue)

## Running iteratively using the `iterative_executor`

Run the same job again, but now iteratively. The calls to the Dataset API will now be sequential.

In [None]:
%%time
from coffea import processor
from coffea.nanoevents import schemas

executor = processor.IterativeExecutor()

run = processor.Runner(
    executor=executor,
    use_skyhook=True,
    savemetrics=True,
    skyhook_options = {"ceph_config_path": "/opt/ceph/ceph.conf", "ceph_data_pool": "cephfs_data0"},
    format="parquet",
    schema=schemas.NanoAODSchema,
)

hists = run(
    {
        "ZJets": "/mnt/cephfs/nanoevents/ZJets",
        "Data": "/mnt/cephfs/nanoevents/Data",
    },
    "Events",
    processor_instance=MyZPeak(),
)

Processing:   0%|          | 0/13 [00:00<?, ?chunk/s]

## Running iteratively without Skyhook `iterative_executor`

Run the same job again, but now iteratively without Skyhook. The calls to the Dataset API will now be sequential.

In [None]:
%%time
executor = processor.IterativeExecutor()

run = processor.Runner(
    executor=executor,
    use_skyhook=False,
    savemetrics=True,
    skyhook_options = {"ceph_config_path": "/opt/ceph/ceph.conf", "ceph_data_pool": "cephfs_data0"},
    format="parquet",
    schema=schemas.NanoAODSchema,
)

hists = run(
    {
        "ZJets": "/mnt/cephfs/nanoevents/ZJets",
        "Data": "/mnt/cephfs/nanoevents/Data",
    },
    "Events",
    processor_instance=MyZPeak(),
)

As expected, much slower than running using Dask.

## Plotting the results



In [None]:
%matplotlib inline

hist.plot1d(result)