Skip to content

Commit

Permalink
Update CPU count configuration for parallel operations
Browse files Browse the repository at this point in the history
Changes have been made to improve the control of parallel computations in the application. The number of CPUs used by the mdio_to_segy and to_zarr methods can now be controlled by altering environmental variables MDIO__EXPORT__CPU_COUNT and MDIO__IMPORT__CPU_COUNT respectively. This allows users to optimize the program's performance based on their specific hardware setup.
  • Loading branch information
tasansal committed May 21, 2024
1 parent 1571a00 commit a82fe80
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
13 changes: 12 additions & 1 deletion src/mdio/converters/mdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

from __future__ import annotations

import os
from os import path
from tempfile import TemporaryDirectory

import numpy as np
from psutil import cpu_count
from tqdm.dask import TqdmCallback

from mdio import MDIOReader
Expand All @@ -24,6 +26,10 @@
distributed = None


default_cpus = cpu_count(logical=True)
NUM_CPUS = int(os.getenv("MDIO__EXPORT__CPU_COUNT", default_cpus))


def mdio_to_segy( # noqa: C901
mdio_path_or_buffer: str,
output_segy_path: str,
Expand Down Expand Up @@ -176,7 +182,12 @@ def mdio_to_segy( # noqa: C901
out_byteorder=out_byteorder,
file_root=tmp_dir.name,
axis=tuple(range(1, samples.ndim)),
).compute()
)

if client is not None:
flat_files = flat_files.compute()
else:
flat_files = flat_files.compute(num_workers=NUM_CPUS)

# If whole blocks are missing, remove them from the list.
missing_mask = flat_files == "missing"
Expand Down
7 changes: 4 additions & 3 deletions src/mdio/segy/blocked_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

import multiprocessing as mp
import os
from concurrent.futures import ProcessPoolExecutor
from itertools import repeat

Expand Down Expand Up @@ -35,8 +36,8 @@
ZFPY = None
zfpy = None

# Globals
NUM_CORES = cpu_count(logical=False)
default_cpus = cpu_count(logical=True)
NUM_CPUS = int(os.getenv("MDIO__IMPORT__CPU_COUNT", default_cpus))


def to_zarr(
Expand Down Expand Up @@ -136,7 +137,7 @@ def to_zarr(
# For Unix async writes with s3fs/fsspec & multiprocessing,
# use 'spawn' instead of default 'fork' to avoid deadlocks
# on cloud stores. Slower but necessary. Default on Windows.
num_workers = min(num_chunks, NUM_CORES)
num_workers = min(num_chunks, NUM_CPUS)
context = mp.get_context("spawn")
executor = ProcessPoolExecutor(max_workers=num_workers, mp_context=context)

Expand Down

0 comments on commit a82fe80

Please sign in to comment.