This is a quick demo of observed memory leakage in uproot with dask distributed 

In [2]:
import uproot
print(f"uproot verion: {uproot.__version__}")
import distributed
print(f"distributed verion: {distributed.__version__}")

uproot verion: 5.3.1
distributed verion: 2024.3.0


In [3]:
import numpy as np
import awkward as ak
import dask_awkward as dak
# from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from distributed import Client, performance_report
import json 
import glob
import os
import tqdm
import time
from itertools import islice
import copy
import dask
from coffea.dataset_tools import (
    max_chunks
)

def divide_chunks(data: dict, SIZE: int):
    """
    This takes a big sample of a dataset consisting of multiple root files and divides them to smaller sets of root files.
    Similar to coffea.dataset_tools maxfile function, but not exactly the same 
    """
    it = iter(data)
    for i in range(0, len(data), SIZE):
      yield {k:data[k] for k in islice(it, SIZE)}

If do_regular_restart == False, we get observe memory leak, as recorded in "./withOutRestart_uproot.log"
If do_regular_restart == True, the operation continues, but it often gets stuck with no error statements. I wrote a slightly different code that still "restarts" the client in each for loop iteration (next next cell), and that code does run until the end with no memory leakage.

In [None]:
do_regular_restart = False
client = Client(n_workers=1,  threads_per_worker=1, processes=True, memory_limit='0.7 GiB')
sample_path = "./input_file.json"
with open(sample_path) as file:
    samples = json.loads(file.read())
samples  = max_chunks(samples, 20)
dataset = list(samples.keys())[0]
sample = list(samples.values())[0]
with performance_report(filename="dask-report.html"): # Sadly, the dask performance report doesn't record memory usage beyond the first minute, so it not very useful for recording memory leakage.
    max_file_len = 1
    smaller_files = list(divide_chunks(sample["files"], max_file_len))
    for idx in tqdm.tqdm(range(len(smaller_files)), leave=False):
        smaller_sample = copy.deepcopy(sample)
        # print("test")
        smaller_sample["files"] = smaller_files[idx]
        fnames = list(smaller_sample["files"].keys())
        input = {fname: "Events" for fname in fnames}
        events = uproot.dask(input, handler=uproot.XRootDSource)
        muons_pt = events["Muon_pt"]
        nmuons = ak.num(muons_pt, axis=1)
        muon_selection = (
            muons_pt > 20 &
            nmuons == 2
        )
        muons_pt = muons_pt[muon_selection]
        dask.compute(muons_pt)
        if do_regular_restart:
            client.restart(wait_for_workers=False)

I personally experienced the script above getting stuck with when do_regular_restart == True, So I found a different way to restart the dask client in every loop (code below). This method successfully completed its run, which is saved in "./withRestart_uproot.log"

In [None]:
sample_path = "./input_file.json"
with open(sample_path) as file:
    samples = json.loads(file.read())
samples  = max_chunks(samples, 20)
dataset = list(samples.keys())[0]
sample = list(samples.values())[0]
# with performance_report(filename="dask-report.html"): # Sadly, the dask performance report doesn't record memory usage beyond the first minute, so it not very useful for recording memory leakage.
max_file_len = 1
smaller_files = list(divide_chunks(sample["files"], max_file_len))
for idx in tqdm.tqdm(range(len(smaller_files)), leave=False):
    with Client(n_workers=1,  threads_per_worker=1, processes=True, memory_limit='0.7 GiB') as client:
        smaller_sample = copy.deepcopy(sample)
        smaller_sample["files"] = smaller_files[idx]
        fnames = list(smaller_sample["files"].keys())
        input = {fname: "Events" for fname in fnames}
        events = uproot.dask(input)
        muons_pt = events["Muon_pt"]
        nmuons = ak.num(muons_pt, axis=1)
        muon_selection = (
            muons_pt > 20 &
            nmuons == 2
        )
        muons_pt = muons_pt[muon_selection]
        dask.compute(muons_pt)

At Lindsey's request, trying to simulate mem leakage using local root files, given possible issue with xRootD

In [6]:
import glob
local_root_path = "/eos/purdue/store/data/Run2018A/SingleMuon/NANOAOD/UL2018_MiniAODv2_NanoAODv9-v2/2550000"
fnames = glob.glob(local_root_path+"/*.root")
local_root_path = "/eos/purdue/store/data/Run2018B/SingleMuon/NANOAOD/UL2018_MiniAODv2_NanoAODv9-v2/250000/"
fnames += glob.glob(local_root_path+"/*.root")
local_root_path = "/eos/purdue/store/data/Run2018C/SingleMuon/NANOAOD/UL2018_MiniAODv2_NanoAODv9-v2/250000/"
fnames += glob.glob(local_root_path+"/*.root")
local_root_path = "/eos/purdue/store/data/Run2018D/SingleMuon/NANOAOD/UL2018_MiniAODv2_NanoAODv9-v1/280000/"
fnames += glob.glob(local_root_path+"/*.root")
print(f"len(fnames): {len(fnames)}")
client = Client(n_workers=1,  threads_per_worker=1, processes=True, memory_limit='0.7 GiB')

for fname in tqdm.tqdm(fnames):
    input = {fname: "Events" }
    events = uproot.dask(input)
    muons_pt = events["Muon_pt"]
    nmuons = ak.num(muons_pt, axis=1)
    muon_selection = (muons_pt > 20 & nmuons == 2)
    muons_pt = muons_pt[muon_selection]
    dask.compute(muons_pt)

len(fnames): 300


Perhaps you already have a cluster running?
Hosting the HTTP server on port 45403 instead
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.


In [None]:
# sample_path = "./input_file_Big.json"
# with open(sample_path) as file:
#     samples = json.loads(file.read())
# samples  = max_chunks(samples, 20)
# samples
! python memleak_uproot_test.py

In [2]:
! pwd

/depot/cms/private/users/yun79/valerie/fork/dask_mem_leak_test/uproot_version
