Skip to content

Commit

Permalink
Add docs on tqdm.contrib.concurrent.thread_map
Browse files Browse the repository at this point in the history
  • Loading branch information
jennydaman committed Dec 9, 2023
1 parent 9661a6f commit 10b064e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 31 deletions.
46 changes: 19 additions & 27 deletions examples/pl-replace/replace.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
#!/usr/bin/env python
"""
A short yet complicated *ChRIS* plugin example which uses
`chris_plugin.PathMapper`, `concurrent.futures.ThreadPoolExecutor`,
and `tqdm.tqdm`.
`chris_plugin.PathMapper` and `tqdm.contrib.concurrent.thread_map`
to process multiple inputs in parallel while showing a progress bar.
"""

from pathlib import Path
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from chris_plugin import chris_plugin, PathMapper
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import time
import random
import logging
from tqdm import tqdm
from tqdm.contrib.concurrent import thread_map
from tqdm.contrib.logging import logging_redirect_tqdm

# configure logging output to show time and thread name
Expand Down Expand Up @@ -56,15 +55,13 @@ def process_file(self, input_file: Path, output_file: Path):
The program's status is printed before processing and upon completion.
"""
with logging_redirect_tqdm():
logger.debug('Started "%s"', input_file)
with input_file.open("r") as i:
with output_file.open("w") as o:
for line in i:
o.write(line.replace(self.find, self.replace))
if self.slow:
time.sleep(random.random())
logger.debug('Finished "%s"', input_file)
logger.debug('Started "%s"', input_file)
with input_file.open("r") as input, output_file.open("w") as output:
for line in input:
output.write(line.replace(self.find, self.replace))
if self.slow:
time.sleep(random.random())
logger.debug('Finished "%s"', input_file)


@chris_plugin(
Expand All @@ -74,23 +71,18 @@ def process_file(self, input_file: Path, output_file: Path):
min_cpu_limit="2000m",
)
def main(options, inputdir: Path, outputdir: Path):

mapper = PathMapper.file_mapper(inputdir, outputdir, glob=options.inputPathFilter)
r = Replacer(find=options.find, replace=options.replace, slow=options.slow)

# create a progress bar with the total being the number of input files to process
with tqdm(desc="Processing", total=mapper.count()) as bar:

# a wrapper function which calls the processing function and updates the process bar
def process_and_progress(i, o):
r.process_file(i, o)
bar.update()

# create a thread pool with the specified number of workers
with ThreadPoolExecutor(max_workers=options.threads) as pool:
logger.debug(f"Using %d threads", options.threads)
# call the function on every input/output path pair
results = pool.map(lambda t: process_and_progress(*t), mapper)
logger.debug(f"Using %d threads", options.threads)
with logging_redirect_tqdm():
results = thread_map(
lambda t: r.process_file(*t),
mapper,
max_workers=options.threads,
total=mapper.count(),
maxinterval=0.1,
)

# if any job failed, an exception will be raised when it's iterated over
for _ in results:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setup(
name="chris_plugin",
version="0.3.1",
version="0.3.2",
packages=find_packages(where="src"),
package_dir={"": "src", "chris_plugin": "src/chris_plugin"},
url="https://github.com/FNNDSC/chris_plugin",
Expand Down
10 changes: 7 additions & 3 deletions src/chris_plugin/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,8 @@ def do_something(input_file: Path, output_file: Path) -> None:
pass
```
Hint: `` gets the number of CPUs available
to a containerized process (which can be limited by, for instance,
`docker run --cpuset-cpus 0-3`)
Hint: `os.sched_getaffinity(0)` gets the number of visible CPUs
(which can be limited by, for instance, `docker run --cpuset-cpus 0-3`)
Add a progress bar with [tqdm](https://github.com/tqdm/tqdm):
Expand All @@ -156,6 +155,11 @@ def do_something(input_file: Path, output_file: Path) -> None:
for input_file, output_path in bar:
do_something(input_file, output_path)
```
For some examples on how to use `tqdm`, `PathMapper`, and a thread or process pool, see these examples:
- https://github.com/FNNDSC/chris_plugin/blob/master/examples/pl-replace/replace.py
- https://github.com/FNNDSC/pl-mni2common/blob/47fa7c1e742ff7af0feb83cf5fb632db1f08ece6/mni2common.py#L52-L57
"""

input_dir: Path
Expand Down

0 comments on commit 10b064e

Please sign in to comment.