Skip to content

Commit

Permalink
Namespace and multiprocessing (#299)
Browse files Browse the repository at this point in the history
- Clean-up namespace by setting dir to __all__
- Move multiprocessing stuff to new private module _multiprocessing.
  • Loading branch information
prisae committed Aug 24, 2022
1 parent e1eb433 commit 50d29de
Show file tree
Hide file tree
Showing 27 changed files with 336 additions and 196 deletions.
145 changes: 145 additions & 0 deletions emg3d/_multiprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""
Helper routines to call functions with multiprocessing/concurrent.futures.
"""
# Copyright 2018-2022 The emsig community.
#
# This file is part of emg3d.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
from concurrent.futures import ProcessPoolExecutor

try:
import tqdm
import tqdm.contrib.concurrent
except ImportError:
tqdm = None

from emg3d import io, solver


def process_map(fn, *iterables, max_workers, **kwargs):
"""Dispatch processes in parallel or not, using tqdm or not.
:class:`emg3d.simulations.Simulation` uses the function
``tqdm.contrib.concurrent.process_map`` to run jobs asynchronously.
However, ``tqdm`` is a soft dependency. In case it is not installed we use
the class ``concurrent.futures.ProcessPoolExecutor`` directly, from the
standard library, and imitate the behaviour of process_map (basically a
``ProcessPoolExecutor.map``, returned as a list, and wrapped in a context
manager). If max_workers is smaller than two then we we avoid parallel
execution.
"""

# Parallel
if max_workers > 1 and tqdm is None:
with ProcessPoolExecutor(max_workers=max_workers) as ex:
return list(ex.map(fn, *iterables))

# Parallel with tqdm
elif max_workers > 1:
return tqdm.contrib.concurrent.process_map(
fn, *iterables, max_workers=max_workers, **kwargs)

# Sequential
elif tqdm is None:
return list(map(fn, *iterables))

# Sequential with tqdm
else:
return list(tqdm.auto.tqdm(
iterable=map(fn, *iterables), total=len(iterables[0]), **kwargs))


def solve(inp):
"""Thin wrapper of `solve` or `solve_source` for a `process_map`.
Used within a Simulation to call the solver in parallel. This function
always returns the ``efield`` and the ``info_dict``, independent of the
provided solver options.
Parameters
----------
inp : dict, str
If dict, two formats are recognized:
- Has keys [model, sfield, efield, solver_opts]:
Forwarded to `solve`.
- Has keys [model, grid, source, frequency, efield, solver_opts]
Forwarded to `solve_source`.
Consult the corresponding function for details on the input parameters.
Alternatively the path to the h5-file can be provided as a string
(file-based computation).
The ``model`` is interpolated to the grid of the source field (tuple of
length 4) or to the provided grid (tuple of length 6). Hence, the model
can be on a different grid (for source and frequency dependent
gridding).
Returns
-------
efield : Field
Resulting electric field, as returned from :func:`emg3d.solver.solve`
or :func:`emg3d.solver.solve_source`.
info_dict : dict
Resulting info dictionary, as returned from :func:`emg3d.solver.solve`
or :func:`emg3d.solver.solve_source`.
"""

# Four parameters => solve.
fname = False
if isinstance(inp, str):
fname = inp.rsplit('.', 1)[0] + '_out.' + inp.rsplit('.', 1)[1]
inp = io.load(inp, verb=0)['data']

# Has keys [model, sfield, efield, solver_opts]
if 'sfield' in inp.keys():

# Get input and initiate solver dict.
solver_input = {**inp['solver_opts'], 'sfield': inp['sfield']}
inp['grid'] = inp['sfield'].grid

# Function to compute.
fct = solver.solve

# Has keys [model, grid, source, frequency, efield, solver_opts]
else:

# Get input and initiate solver dict.
solver_input = {**inp['solver_opts'], 'source': inp['source'],
'frequency': inp['frequency']}

# Function to compute.
fct = solver.solve_source

# Interpolate model to source grid (if different).
model = inp['model'].interpolate_to_grid(inp['grid'])

# Add general parameters to input dict.
solver_input['model'] = model
solver_input['efield'] = inp['efield']
solver_input['return_info'] = True
solver_input['always_return'] = True

# Return the result.
efield, info = fct(**solver_input)
if fname:
io.save(fname, efield=efield, info=info, verb=0)
return fname, fname
else:
return efield, info
10 changes: 10 additions & 0 deletions emg3d/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@
# Numba-settings
_numba_setting = {'nogil': True, 'fastmath': True, 'cache': True}

__all__ = [
'amat_x', 'gauss_seidel', 'gauss_seidel_x', 'gauss_seidel_y',
'gauss_seidel_z', 'blocks_to_amat', 'solve', 'restrict',
'restrict_weights',
]


def __dir__():
return __all__


# LinearOperator to compute A x
@nb.njit(**_numba_setting)
Expand Down
4 changes: 4 additions & 0 deletions emg3d/electrodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
]


def __dir__():
return __all__


# BASE ELECTRODE TYPES
class Wire:
"""A wire consists of an arbitrary number of electrodes.
Expand Down
4 changes: 4 additions & 0 deletions emg3d/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
__all__ = ['Field', 'get_source_field', 'get_receiver', 'get_magnetic_field']


def __dir__():
return __all__


@utils._known_class
class Field:
r"""A Field contains the x-, y-, and z- directed electromagnetic fields.
Expand Down
4 changes: 4 additions & 0 deletions emg3d/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
__all__ = ['save', 'load']


def __dir__():
return __all__


def save(fname, **kwargs):
"""Save simulations, surveys, meshes, models, fields, and more to disk.
Expand Down
4 changes: 4 additions & 0 deletions emg3d/maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
'ellipse_indices']


def __dir__():
return __all__


class BaseMap:
"""Maps variable `x` to computational variable `σ` (conductivity).
Expand Down
4 changes: 4 additions & 0 deletions emg3d/meshes.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
'check_mesh', 'estimate_gridding_opts']


def __dir__():
return __all__


class BaseMesh:
"""Minimal TensorMesh for internal multigrid computation.
Expand Down
4 changes: 4 additions & 0 deletions emg3d/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
__all__ = ['Model', 'VolumeModel', 'expand_grid_model']


def __dir__():
return __all__


# MODEL
@utils._known_class
class Model:
Expand Down
13 changes: 9 additions & 4 deletions emg3d/simulations.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@

import numpy as np

from emg3d import fields, io, maps, meshes, models, solver, surveys, utils
from emg3d import fields, io, maps, meshes, models, surveys, utils

__all__ = ['Simulation', ]


def __dir__():
return __all__


@utils._known_class
class Simulation:
"""Create a simulation for a given survey on a given model.
Expand Down Expand Up @@ -794,9 +798,10 @@ def collect_efield_inputs(inp):
self.survey.add_noise(**kwargs)

def _compute(self, fn, description, srcfreq=None):
"""Use utils._process_map to call solver._solve asynchronously."""
return utils._process_map(
solver._solve,
"""Use process_map to call solver.solve asynchronously."""
from emg3d import _multiprocessing as _mp
return _mp.process_map(
_mp.solve,
list(map(fn, self._srcfreq if srcfreq is None else srcfreq)),
max_workers=self.max_workers,
**{'desc': description, **self._tqdm_opts},
Expand Down
89 changes: 4 additions & 85 deletions emg3d/solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
'RegularGridProlongator']


def __dir__():
return __all__


# MAIN USER-FACING FUNCTIONS
def solve(model, sfield, sslsolver=True, semicoarsening=True,
linerelaxation=True, verb=0, **kwargs):
Expand Down Expand Up @@ -458,91 +462,6 @@ def solve_source(model, source, frequency, **kwargs):
return solve(model, sfield, **kwargs)


def _solve(inp):
"""Thin wrapper of `solve` or `solve_source` for a `process_map`.
Used within a Simulation to call the solver in parallel. This function
always returns the ``efield`` and the ``info_dict``, independent of the
provided solver options.
Parameters
----------
inp : dict, str
If dict, two formats are recognized:
- Has keys [model, sfield, efield, solver_opts]:
Forwarded to `solve`.
- Has keys [model, grid, source, frequency, efield, solver_opts]
Forwarded to `solve_source`.
Consult the corresponding function for details on the input parameters.
Alternatively the path to the h5-file can be provided as a string
(file-based computation).
The ``model`` is interpolated to the grid of the source field (tuple of
length 4) or to the provided grid (tuple of length 6). Hence, the model
can be on a different grid (for source and frequency dependent
gridding).
Returns
-------
efield : Field
Resulting electric field, as returned from :func:`emg3d.solver.solve`
or :func:`emg3d.solver.solve_source`.
info_dict : dict
Resulting info dictionary, as returned from :func:`emg3d.solver.solve`
or :func:`emg3d.solver.solve_source`.
"""

# Four parameters => solve.
fname = False
if isinstance(inp, str):
from emg3d import io
fname = inp.rsplit('.', 1)[0] + '_out.' + inp.rsplit('.', 1)[1]
inp = io.load(inp, verb=0)['data']

# Has keys [model, sfield, efield, solver_opts]
if 'sfield' in inp.keys():

# Get input and initiate solver dict.
solver_input = {**inp['solver_opts'], 'sfield': inp['sfield']}
inp['grid'] = inp['sfield'].grid

# Function to compute.
fct = solve

# Has keys [model, grid, source, frequency, efield, solver_opts]
else:

# Get input and initiate solver dict.
solver_input = {**inp['solver_opts'], 'source': inp['source'],
'frequency': inp['frequency']}

# Function to compute.
fct = solve_source

# Interpolate model to source grid (if different).
model = inp['model'].interpolate_to_grid(inp['grid'])

# Add general parameters to input dict.
solver_input['model'] = model
solver_input['efield'] = inp['efield']
solver_input['return_info'] = True
solver_input['always_return'] = True

# Return the result.
efield, info = fct(**solver_input)
if fname:
io.save(fname, efield=efield, info=info, verb=0)
return fname, fname
else:
return efield, info


# SOLVERS
def multigrid(model, sfield, efield, var, **kwargs):
"""Multigrid solver for three-dimensional electromagnetic diffusion.
Expand Down
4 changes: 4 additions & 0 deletions emg3d/surveys.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
'txrx_lists_to_dict', 'frequencies_to_dict']


def __dir__():
return __all__


@utils._known_class
class Survey:
"""Create a survey containing sources, receivers, and data.
Expand Down
5 changes: 4 additions & 1 deletion emg3d/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@

from emg3d import utils


__all__ = ['Fourier', ]


def __dir__():
return __all__


@utils._requires('empymod')
class Fourier:
r"""Time-domain CSEM computation.
Expand Down
Loading

0 comments on commit 50d29de

Please sign in to comment.