# Fetching many columns and asynchronous calls

This demo uses ServiceX to fetch many columns of data from a 10 TB dataset (note that this takes a while). We also demonstrate how to submit multiple calls in parallel using asynchronous features.

## Setup

This demo requires a version of `servicex` up and running, as well as two ports forwarded for the service. We begin by setting up a configuration file for ServiceX, named `.servicex`. The file should be located either in the execution directory, or the user home directory (this would be ~/home for Linux or MacOS systems and the User directory for Windows systems).

The contents of the file should be:

```
api_endpoint:
  endpoint: endpoint here
  username: username here
  password: password here

  minio_endpoint: localhost:9000
  minio_username: miniouser
  minio_password: leftfoot1
```

Replace the fields with the appropriate ServiceX information and credentials (if you don't have these, you may request credentials from `http://rc1-xaod-servicex.uc.ssl-hep.org/`). From here, we import our dependencies and prepare to begin using the software.

In [1]:
import asyncio
import servicex
from servicex import ServiceXDataset
from servicex.minio_adaptor import MinioAdaptor
from servicex.servicex_adaptor import ServiceXAdaptor
from func_adl_xAOD import ServiceXDatasetSource
import uproot_methods
from numpy import genfromtxt
import time
import pytest

Next, we decide which dataset we are interested in. Notice the `max_workers` argument, which tells ServiceX the maximum number of cores it can use to transform the data. ServiceX automatically scales its queries, but will never use more workers than this argument

In [None]:
dataset = ServiceXDataset('data17_13TeV:data17_13TeV.periodK.physics_Main.PhysCont.DAOD_STDM7.grp23_v01_p4030', max_workers = 400)

And then we define our functions. This is a very lengthy block of code, which pulls 101 columns of data from the dataset from 10 event collections and returns them as an awkward array. Notice the structure of the `func_adl` code here; the first `.Select` calls the event collections as a list of collections, while the second `.Select` statements calls specific lists of events within the collections. Inside these are nested `.Select` statements, which select specific objects inside the lists.

For instance, we can follow the statement:

```
    .Select('lambda e: (e.Jets("AntiKt4EMTopoJets"), \
    ...
    .Select('lambda ls: (ls[0].Select(lambda jet: jet.pt().Where(lambda jet: jet.pt()/1000 > 60)), \
```

The first line shown selects the databank `xAOD::AntiKt4EMTopoJets`, and stores it as the first item in a list of databanks. The second line shown has three components. 

The first, `.Select('lambda ls: (ls[0]` tells the program to browse to the first item in the top-level list (which we just stored `xAOD::AntiKt4EMTopoJets` as). 

The second component is the statement `.Select(lambda jet: jet.pt()`. This tells ServiceX to, while browsing inside the `xAOD::AntiKt4EMTopoJets` databank, look for jets and retrieve their transverse momenta.

The third component is the statement `.Where(lambda jet: jet.pt()/1000 >60))`. This statement filters the jets by their transverse momenta, so that it selects only the jets that have greater than 60 GeV energy (note that Jet pTs are stored in MeV).

In [None]:
def retrieve_101_columns(dataset):
    
    query = ServiceXDatasetSource(dataset) \
        .Select('lambda e: (e.Jets("AntiKt4EMTopoJets"), \
                            e.Tracks("AntiKt4PV0TrackJets"), \
                            e.BTagging("BTagging_AntiKT4EMTopo"), \
                            e.BTagging("BTagging_AntiKt4Track"), \
                            e.Clusters("CaloCalTopoClusters"), \
                            e.Tracks("CombinedMuonTrackParticles"), \
                            e.Electrons("Electrons"), \
                            e.Tracks("GSFTrackParticles"), \
                            e.Muons("Muons"), \
                            e.Photons("Photons"))') \
        .Select('lambda ls: (ls[0].Select(lambda jet: jet.e()), \
                             ls[0].Select(lambda jet: jet.eta()), \
                             ls[0].Select(lambda jet: jet.index()), \
                             ls[0].Select(lambda jet: jet.m()), \
                             ls[0].Select(lambda jet: jet.phi()), \
                             ls[0].Select(lambda jet: jet.pt().Where(lambda jet: jet.pt()/1000 > 60)), \
                             ls[0].Select(lambda jet: jet.px()), \
                             ls[0].Select(lambda jet: jet.py()), \
                             ls[0].Select(lambda jet: jet.pz()), \
                             ls[0].Select(lambda jet: jet.rapidity()), \
                             ls[1].Select(lambda trj: trj.e()), \
                             ls[1].Select(lambda trj: trj.eta()), \
                             ls[1].Select(lambda trj: trj.index()), \
                             ls[1].Select(lambda trj: trj.m()), \
                             ls[1].Select(lambda trj: trj.phi()), \
                             ls[1].Select(lambda trj: trj.pt()), \
                             ls[1].Select(lambda trj: trj.px()), \
                             ls[1].Select(lambda trj: trj.py()), \
                             ls[1].Select(lambda trj: trj.pz()), \
                             ls[1].Select(lambda trj: trj.rapidity()), \
                             ls[2].Select(lambda bto: bto.index()), \
                             ls[2].Select(lambda bto: bto.nIP2D_TrackParticles()), \
                             ls[2].Select(lambda bto: bto.nIP3D_TrackParticles()), \
                             ls[2].Select(lambda bto: bto.nSV0_TrackParticles()), \
                             ls[2].Select(lambda bto: bto.nSV1_TrackParticles()), \
                             ls[3].Select(lambda bto: btr.index()), \
                             ls[3].Select(lambda btr: btr.nIP2D_TrackParticles()), \
                             ls[3].Select(lambda btr: btr.nIP3D_TrackParticles()), \
                             ls[3].Select(lambda btr: btr.nSV0_TrackParticles()), \
                             ls[3].Select(lambda btr: btr.nSV1_TrackParticles()), \
                             ls[4].Select(lambda clu: clu.calE()), \
                             ls[4].Select(lambda clu: clu.calEta()), \
                             ls[4].Select(lambda clu: clu.calM()), \
                             ls[4].Select(lambda clu: clu.calPhi()), \
                             ls[4].Select(lambda clu: clu.e()), \
                             ls[4].Select(lambda clu: clu.eta()), \
                             ls[4].Select(lambda clu: clu.index()), \
                             ls[4].Select(lambda clu: clu.m()), \
                             ls[4].Select(lambda clu: clu.nSamples()), \
                             ls[4].Select(lambda clu: clu.phi()), \
                             ls[4].Select(lambda clu: clu.rapidity()), \
                             ls[4].Select(lambda clu: clu.rawE()), \
                             ls[4].Select(lambda clu: clu.rawEta()), \
                             ls[4].Select(lambda clu: clu.rawM()), \
                             ls[4].Select(lambda clu: clu.rawPhi()), \
                             ls[5].Select(lambda mtp: mtp.charge()), \
                             ls[5].Select(lambda mtp: mtp.d0()), \
                             ls[5].Select(lambda mtp: mtp.e()), \
                             ls[5].Select(lambda mtp: mtp.eta()), \
                             ls[5].Select(lambda mtp: mtp.index()), \
                             ls[5].Select(lambda mtp: mtp.m()), \
                             ls[5].Select(lambda mtp: mtp.phi()), \
                             ls[5].Select(lambda mtp: mtp.pt()), \
                             ls[5].Select(lambda mtp: mtp.qOverP()), \
                             ls[5].Select(lambda mtp: mtp.rapidity()), \
                             ls[5].Select(lambda mtp: mtp.theta()), \
                             ls[5].Select(lambda mtp: mtp.vz()), \
                             ls[5].Select(lambda mtp: mtp.z0()), \
                             ls[6].Select(lambda ele: ele.charge()), \
                             ls[6].Select(lambda ele: ele.e()), \
                             ls[6].Select(lambda ele: ele.eta()), \
                             ls[6].Select(lambda ele: ele.index()), \
                             ls[6].Select(lambda ele: ele.m()), \
                             ls[6].Select(lambda ele: ele.nCaloClusters()), \
                             ls[6].Select(lambda ele: ele.nTrackParticles()), \
                             ls[6].Select(lambda ele: ele.phi()), \
                             ls[6].Select(lambda ele: ele.pt()), \
                             ls[6].Select(lambda ele: ele.rapidity()), \
                             ls[7].Select(lambda gsf: gsf.charge()), \
                             ls[7].Select(lambda gsf: gsf.d0()), \
                             ls[7].Select(lambda gsf: gsf.e()), \
                             ls[7].Select(lambda gsf: gsf.eta()), \
                             ls[7].Select(lambda gsf: gsf.index()), \
                             ls[7].Select(lambda gsf: gsf.m()), \
                             ls[7].Select(lambda gsf: gsf.phi()), \
                             ls[7].Select(lambda gsf: gsf.pt()), \
                             ls[7].Select(lambda gsf: gsf.qOverP()), \
                             ls[7].Select(lambda gsf: gsf.rapidity()), \
                             ls[7].Select(lambda gsf: gsf.theta()), \
                             ls[7].Select(lambda gsf: gsf.vz()), \
                             ls[7].Select(lambda gsf: gsf.z0()), \
                             ls[8].Select(lambda muo: muo.charge()), \
                             ls[8].Select(lambda muo: muo.e()), \
                             ls[8].Select(lambda muo: muo.eta()), \
                             ls[8].Select(lambda muo: muo.index()), \
                             ls[8].Select(lambda muo: muo.m()), \
                             ls[8].Select(lambda muo: muo.nMuonSegments()), \
                             ls[8].Select(lambda muo: muo.phi()), \
                             ls[8].Select(lambda muo: muo.pt()), \
                             ls[8].Select(lambda muo: muo.quality()), \
                             ls[8].Select(lambda muo: muo.rapidity()), \
                             ls[9].Select(lambda ptn: ptn.conversionRadius()), \
                             ls[9].Select(lambda ptn: ptn.e()), \
                             ls[9].Select(lambda ptn: ptn.eta()), \
                             ls[9].Select(lambda ptn: ptn.index()), \
                             ls[9].Select(lambda ptn: ptn.m()), \
                             ls[9].Select(lambda ptn: ptn.nCaloClusters()), \
                             ls[9].Select(lambda ptn: ptn.nVertices()), \
                             ls[9].Select(lambda ptn: ptn.phi()), \
                             ls[9].Select(lambda ptn: ptn.pt()), \
                             ls[9].Select(lambda ptn: ptn.rapidity())') \
        .AsAwkwardArray(("JetE", \
                         "JetEta", \
                         "JetIndex", \
                         "JetM", \
                         "JetPhi", \
                         "JetPt", \
                         "JetPx", \
                         "JetPy", \
                         "JetPz", \
                         "JetRapidity", \
                         "TrackE", \
                         "TrackEta", \
                         "TrackIndex", \
                         "TrackM", \
                         "TrackPhi", \
                         "TrackPt", \
                         "TrackPx", \
                         "TrackPy", \
                         "TrackPz", \
                         "TrackRapidity", \
                         "BTopoIndex", \
                         "BTopo2DTrack", \
                         "BTopo3DTrack", \
                         "BTopoSV0", \
                         "BTopoSV1", \
                         "BTrackIndex", \
                         "BTrack2DTrack", \
                         "BTrack3DTrack", \
                         "BTrackSV0", \
                         "BTrackSV1", \
                         "ClustersCalE", \
                         "ClustersCalEta", \
                         "ClustersCalM", \
                         "ClustersCalPhi", \
                         "ClustersE", \
                         "ClustersEta", \
                         "ClustersIndex", \
                         "ClustersM", \
                         "ClustersNSamples", \
                         "ClustersPhi", \
                         "ClustersRapidity", \
                         "ClustersRawE", \
                         "ClustersRawEta", \
                         "ClustersRawM", \
                         "ClustersRawPhi", \
                         "MuonTrackCharge", \
                         "MuonTrackD0", \
                         "MuonTrackE", \
                         "MuonTrackEta", \
                         "MuonTrackIndex", \
                         "MuonTrackM", \
                         "MuonTrackPhi", \
                         "MuonTrackPt", \
                         "MuonTrackQOverP", \
                         "MuonTrackRapidity", \
                         "MuonTrackTheta", \
                         "MuonTrackVZ", \
                         "MuonTrackZ0", \
                         "EleCharge", \
                         "EleE", \
                         "EleEta", \
                         "EleIndex", \
                         "EleM", \
                         "EleNClusters", \
                         "EleNTrack", \
                         "ElePhi", \
                         "ElePt", \
                         "EleRapidity", \
                         "GSFCharge", \
                         "GSFD0", \
                         "GSFE", \
                         "GSFEta", \
                         "GSFIndex", \
                         "GSFM", \
                         "GSFPhi", \
                         "GSFPt", \
                         "GSFQOverP", \
                         "GSFRapidity", \
                         "GSFTheta", \
                         "GSFVZ", \
                         "GSFZ0", \
                         "MuonCharge", \
                         "MuonE", \
                         "MuonEta", \
                         "MuonIndex", \
                         "MuonM", \
                         "MuonN", \
                         "MuonPhi", \
                         "MuonPt", \
                         "MuonQuality", \
                         "MuonRapidity", \
                         "PhotonRadius", \
                         "PhotonE", \
                         "PhotonEta", \
                         "PhotonIndex", \
                         "PhotonM", \
                         "PhotonNClusters", \
                         "PhotonNVertices", \
                         "PhotonPhi", \
                         "PhotonPt", \
                         "PhotonRapidity")) \
        .value()
    
    return query

Finally, we actually submit our request.

In [None]:
one_hundred_and_one_dalmations = retrieve_101_columns(dataset)

print(one_hundred_and_one_dalmations[b'JetPt'])

## Asynchronous calls: submitting multiple queries

We can also submit multiple queries to ServiceX at a time. This can be done using asynchronous functions. First, we decide which datasets we are interested in.

In [None]:
dataset_list = [ServiceXDataset('scope: dataset_name'), \
               ServiceXDataset('scope: dataset_name'), \
               ServiceXDataset('scope: dataset_name'), \
               ServiceXDataset('scope: dataset_name'), \
               ServiceXDataset('scope: dataset_name'), \
               ServiceXDataset('scope: dataset_name'), \
               ServiceXDataset('scope: dataset_name'), \
               ServiceXDataset('scope: dataset_name'), \
               ServiceXDataset('scope: dataset_name'), \
               ServiceXDataset('scope: dataset_name')

In this example, we have ten datasets we wish to retrieve data from. We will pull jet and electron data from both. First, we define our call:

In [None]:
async def fetch_data(dataset):
    query = ServiceXDatasetSource(dataset) \
        .Select('lambda e: (e.Jets("AntiKt4EMTopoJets"), e.Electrons("Electrons")') \
        .Select('lambda ls: (ls[0].Select(lambda jet: jet.e()), \
                             ls[0].Select(lambda jet: jet.eta()), \
                             ls[0].Select(lambda jet: jet.index()), \
                             ls[0].Select(lambda jet: jet.m()), \
                             ls[0].Select(lambda jet: jet.phi()), \
                             ls[0].Select(lambda jet: jet.pt()), \
                             ls[0].Select(lambda jet: jet.px()), \
                             ls[0].Select(lambda jet: jet.py()), \
                             ls[0].Select(lambda jet: jet.pz()), \
                             ls[0].Select(lambda jet: jet.rapidity()), \
                             ls[1].Select(lambda ele: ele.charge()), \
                             ls[1].Select(lambda ele: ele.e()), \
                             ls[1].Select(lambda ele: ele.eta()), \
                             ls[1].Select(lambda ele: ele.index()), \
                             ls[1].Select(lambda ele: ele.m()), \
                             ls[1].Select(lambda ele: ele.phi()), \
                             ls[1].Select(lambda ele: ele.pt()), \
                             ls[1].Select(lambda ele: ele.qOverp()), \
                             ls[1].Select(lambda ele: ele.rapidity()), \
                             ls[1].Select(lambda ele: ele.theta()))') \
        .AsAwkwardArray(("JetE", \
                         "JetEta", \
                         "JetIndex", \
                         "JetM", \
                         "JetPhi", \
                         "JetPt", \
                         "JetPx", \
                         "JetPy", \
                         "JetPz", \
                         "JetRapidity", \
                         "EleCharge", \
                         "EleE", \
                         "EleEta", \
                         "EleIndex", \
                         "EleM", \
                         "EleNClusters", \
                         "EleNTrack", \
                         "ElePhi", \
                         "ElePt", \
                         "EleRapidity")) \
        .value_async()

    return await query

And now we retrieve our data:

In [None]:
data_1, data_2, data_3, data_4, data_5, data_6, data_7, data_8, data_9, data_10 = await asyncio.gather(retrieve_data(dataset_list[0]), \
                                                                                                       retrieve_data(dataset_list[1]), \
                                                                                                       retrieve_data(dataset_list[2]), \
                                                                                                       retrieve_data(dataset_list[3]), \
                                                                                                       retrieve_data(dataset_list[4]), \
                                                                                                       retrieve_data(dataset_list[5]), \
                                                                                                       retrieve_data(dataset_list[6]), \
                                                                                                       retrieve_data(dataset_list[7]), \
                                                                                                       retrieve_data(dataset_list[8]), \
                                                                                                       retrieve_data(dataset_list[9]))