Skip to content

Commit

Permalink
fixup: use ThreadPool in flux-pstree.py
Browse files Browse the repository at this point in the history
  • Loading branch information
grondo committed Jan 6, 2022
1 parent ce2be5f commit 25bdb10
Showing 1 changed file with 45 additions and 20 deletions.
65 changes: 45 additions & 20 deletions src/cmd/flux-pstree.py
Expand Up @@ -11,6 +11,7 @@
import sys
import logging
import argparse
import concurrent.futures

import flux
import flux.uri
Expand Down Expand Up @@ -110,6 +111,30 @@ def format_prefix(self, job):
return ""


def process_entry(entry, formatter, filters, level, max_level, combine):

job = JobInfo(entry).get_instance_info()

# pylint: disable=comparison-with-callable
parent = job.uri and job.state_single == "R"

label = formatter.format(job, parent)
prefix = formatter.format_prefix(job)

if not parent:
return Tree(label, prefix)
return load_tree(
label,
formatter,
prefix=prefix,
uri=str(job.uri),
filters=filters,
level=level + 1,
max_level=max_level,
combine_children=combine,
)


# pylint: disable=too-many-locals
def load_tree(
label,
Expand Down Expand Up @@ -147,29 +172,29 @@ def load_tree(
except (OSError, FileNotFoundError):
return tree

# Since the executor cannot be used recursively, start one per
# loop iteration. This is very wasteful but greatly speeds up
# execution with even a moderate number of jobs using the ssh:
# connector. At some point this should be replaced by a global
# thread pool that can work with recursive execution.
#
executor = concurrent.futures.ThreadPoolExecutor()
futures = []
for entry in jobs:
job = JobInfo(entry).get_instance_info()

# pylint: disable=comparison-with-callable
parent = job.uri and job.state_single == "R"

label = formatter.format(job, parent)
prefix = formatter.format_prefix(job)

if parent:
subtree = load_tree(
label,
futures.append(
executor.submit(
process_entry,
entry,
formatter,
prefix=prefix,
uri=str(job.uri),
filters=orig_filters,
level=level + 1,
max_level=max_level,
combine_children=combine_children,
orig_filters,
level,
max_level,
combine_children,
)
tree.append_tree(subtree)
else:
tree.append(label, prefix)
)

for future in futures:
tree.append_tree(future.result())

return tree

Expand Down

0 comments on commit 25bdb10

Please sign in to comment.