Skip to content

Commit

Permalink
Merge pull request #1764 from devitocodes/generalize-mpi-modes
Browse files Browse the repository at this point in the history
compiler: Add machinery for custom memory allocators and MPI
  • Loading branch information
FabioLuporini committed Oct 2, 2021
2 parents 74e7859 + e116845 commit e663ffe
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 72 deletions.
3 changes: 2 additions & 1 deletion devito/core/arm.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ class ArmAdvOperator(Cpu64AdvOperator):
@timed_pass(name='specializing.IET')
def _specialize_iet(cls, graph, **kwargs):
options = kwargs['options']
language = kwargs['language']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

# Distributed-memory parallelism
optimize_halospots(graph)
if options['mpi']:
mpiize(graph, mode=options['mpi'], sregistry=sregistry)
mpiize(graph, mode=options['mpi'], language=language, sregistry=sregistry)

# Lower IncrDimensions so that blocks of arbitrary shape may be used
relax_incr_dimensions(graph)
Expand Down
9 changes: 6 additions & 3 deletions devito/core/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,13 @@ class Cpu64NoopOperator(Cpu64OperatorMixin, CoreOperator):
@timed_pass(name='specializing.IET')
def _specialize_iet(cls, graph, **kwargs):
options = kwargs['options']
language = kwargs['language']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

# Distributed-memory parallelism
if options['mpi']:
mpiize(graph, mode=options['mpi'], sregistry=sregistry)
mpiize(graph, mode=options['mpi'], language=language, sregistry=sregistry)

# Shared-memory parallelism
if options['openmp']:
Expand Down Expand Up @@ -192,6 +193,7 @@ def _specialize_clusters(cls, clusters, **kwargs):
@timed_pass(name='specializing.IET')
def _specialize_iet(cls, graph, **kwargs):
options = kwargs['options']
language = kwargs['language']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

Expand All @@ -201,7 +203,7 @@ def _specialize_iet(cls, graph, **kwargs):
# Distributed-memory parallelism
optimize_halospots(graph)
if options['mpi']:
mpiize(graph, mode=options['mpi'], sregistry=sregistry)
mpiize(graph, mode=options['mpi'], language=language, sregistry=sregistry)

# Lower IncrDimensions so that blocks of arbitrary shape may be used
relax_incr_dimensions(graph)
Expand Down Expand Up @@ -330,7 +332,8 @@ def _make_iet_passes_mapper(cls, **kwargs):
'blocking': partial(relax_incr_dimensions),
'parallel': parizer.make_parallel,
'openmp': parizer.make_parallel,
'mpi': partial(mpiize, mode=options['mpi'], sregistry=sregistry),
'mpi': partial(mpiize, mode=options['mpi'], language=kwargs['language'],
sregistry=sregistry),
'linearize': partial(linearize, sregistry=sregistry),
'simd': partial(parizer.make_simd),
'prodders': hoist_prodders,
Expand Down
10 changes: 7 additions & 3 deletions devito/core/gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,13 @@ class DeviceNoopOperator(DeviceOperatorMixin, CoreOperator):
@timed_pass(name='specializing.IET')
def _specialize_iet(cls, graph, **kwargs):
options = kwargs['options']
language = kwargs['language']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

# Distributed-memory parallelism
if options['mpi']:
mpiize(graph, mode=options['mpi'], sregistry=sregistry)
mpiize(graph, mode=options['mpi'], language=language, sregistry=sregistry)

# GPU parallelism
parizer = cls._Target.Parizer(sregistry, options, platform)
Expand Down Expand Up @@ -181,13 +182,14 @@ def _specialize_clusters(cls, clusters, **kwargs):
@timed_pass(name='specializing.IET')
def _specialize_iet(cls, graph, **kwargs):
options = kwargs['options']
language = kwargs['language']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

# Distributed-memory parallelism
optimize_halospots(graph)
if options['mpi']:
mpiize(graph, mode=options['mpi'], sregistry=sregistry)
mpiize(graph, mode=options['mpi'], language=language, sregistry=sregistry)

# GPU parallelism
parizer = cls._Target.Parizer(sregistry, options, platform)
Expand Down Expand Up @@ -263,6 +265,7 @@ def callback(f):
@classmethod
def _make_iet_passes_mapper(cls, **kwargs):
options = kwargs['options']
language = kwargs['language']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

Expand All @@ -273,7 +276,8 @@ def _make_iet_passes_mapper(cls, **kwargs):
'optcomms': partial(optimize_halospots),
'parallel': parizer.make_parallel,
'orchestrate': partial(orchestrator.process),
'mpi': partial(mpiize, mode=options['mpi'], sregistry=sregistry),
'mpi': partial(mpiize, mode=options['mpi'], language=language,
sregistry=sregistry),
'linearize': partial(linearize, sregistry=sregistry),
'prodders': partial(hoist_prodders),
'init': parizer.initialize
Expand Down
44 changes: 30 additions & 14 deletions devito/data/allocators.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def _alloc_C_libcall(self, size, ctype):
return None, None
else:
# Convert it back to a void * - this is
# _very_ important when later # passing it to numa_free
# _very_ important when later passing it to `numa_free`
c_pointer = ctypes.c_void_p(c_pointer)
return c_pointer, (c_pointer, c_bytesize)

Expand Down Expand Up @@ -375,19 +375,37 @@ def alloc(self, shape, dtype):
ALLOC_NUMA_ANY = NumaAllocator('any')
ALLOC_NUMA_LOCAL = NumaAllocator('local')

custom_allocators = {}
"""User-defined allocators."""


def register_allocator(name, allocator):
"""
Register a custom MemoryAllocator.
"""
if not isinstance(name, str):
raise TypeError("name must be a str, not `%s`" % type(name))
if name in custom_allocators:
raise ValueError("A MemoryAllocator for `%s` already exists" % name)
if not isinstance(allocator, MemoryAllocator):
raise TypeError("Expected a MemoryAllocator, not `%s`" % type(allocator))

custom_allocators[name] = allocator


def infer_knl_mode():
path = os.path.join('/sys', 'bus', 'node', 'devices', 'node1')
return 'flat' if os.path.exists(path) else 'cache'


def default_allocator():
def default_allocator(name=None):
"""
Return a suitable MemoryAllocator for the architecture on which the process
is running. Possible allocators are: ::
Return a MemoryAllocator for the underlying architecture.
* ALLOC_GUARD: Only used in so-called "develop mode", to trigger SIGSEGV as
soon as OOB accesses are performed.
* ALLOC_FLAT: Align memory to page boundaries using the posix function
``posix_memalign``
`posix_memalign`.
* ALLOC_NUMA_LOCAL: Allocate memory in the "closest" NUMA node. This only
makes sense on a NUMA architecture. Falls back to
allocation in an arbitrary NUMA node if there isn't
Expand All @@ -397,16 +415,14 @@ def default_allocator():
Falls back to DRAM if there isn't enough space.
* ALLOC_KNL_DRAM: On a Knights Landing platform, allocate memory in DRAM.
The default allocator is chosen based on the following algorithm: ::
* If running in DEVELOP mode (env var DEVITO_DEVELOP), return ALLOC_FLAT;
* If ``libnuma`` is not available on the system, return ALLOC_FLAT (though
it typically is available, at least on relatively recent Linux distributions);
* If on a Knights Landing platform (codename ``knl``, see ``print_defaults()``)
return ALLOC_KNL_MCDRAM;
* If on a multi-socket Intel Xeon platform, return ALLOC_NUMA_LOCAL;
* In all other cases, return ALLOC_FLAT.
Custom allocators may be added with `register_allocator`.
"""
if name is not None:
try:
return custom_allocators[name]
except KeyError:
pass

if configuration['develop-mode']:
return ALLOC_GUARD
elif NumaAllocator.available():
Expand Down
21 changes: 14 additions & 7 deletions devito/mpi/routines.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ class HaloExchangeBuilder(object):
Build IET-based routines to implement MPI halo exchange.
"""

def __new__(cls, mode, sregistry, **generators):
def __new__(cls, mode, language, sregistry, **generators):
obj = object.__new__(mpi_registry[mode])

obj._language = language
obj._sregistry = sregistry

# Unique name generators
Expand Down Expand Up @@ -530,7 +531,7 @@ class OverlapHaloExchangeBuilder(DiagHaloExchangeBuilder):
def _make_msg(self, f, hse, key):
# Only retain the halos required by the Diag scheme
halos = sorted(i for i in hse.halos if isinstance(i.dim, tuple))
return MPIMsg('msg%d' % key, f, halos)
return MPIMsg('msg%d' % key, f, halos, self._language)

def _make_all(self, f, hse, msg):
df = AliasFunction(name=self.sregistry.make_name(prefix='a'),
Expand Down Expand Up @@ -703,7 +704,7 @@ def _make_region(self, hs, key):
def _make_msg(self, f, hse, key):
# Only retain the halos required by the Diag scheme
halos = sorted(i for i in hse.halos if isinstance(i.dim, tuple))
return MPIMsgEnriched('msg%d' % key, f, halos)
return MPIMsgEnriched('msg%d' % key, f, halos, self._language)

def _make_all(self, f, hse, msg):
df = AliasFunction(name=self.sregistry.make_name(prefix='a'),
Expand Down Expand Up @@ -1050,15 +1051,16 @@ class MPIMsg(CompositeObject):
(_C_field_rsend, c_mpirequest_p),
]

def __init__(self, name, target, halos):
def __init__(self, name, target, halos, language=None):
self._target = target
self._halos = halos
self._language = language

super(MPIMsg, self).__init__(name, 'msg', self.fields)
super().__init__(name, 'msg', self.fields)

# Required for buffer allocation/deallocation before/after jumping/returning
# to/from C-land
self._allocator = default_allocator()
self._allocator = default_allocator(language)
self._memfree_args = []

def __del__(self):
Expand All @@ -1083,6 +1085,10 @@ def target(self):
def halos(self):
return self._halos

@property
def language(self):
return self._language

@property
def npeers(self):
return len(self._halos)
Expand Down Expand Up @@ -1119,6 +1125,7 @@ def _arg_apply(self, *args, **kwargs):

# Pickling support
_pickle_args = ['name', 'target', 'halos']
_pickle_kwargs = CompositeObject._pickle_kwargs + ['language']


class MPIMsgEnriched(MPIMsg):
Expand All @@ -1136,7 +1143,7 @@ class MPIMsgEnriched(MPIMsg):
]

def _arg_defaults(self, alias=None):
super(MPIMsgEnriched, self)._arg_defaults(alias)
super()._arg_defaults(alias)

function = alias or self.function
neighborhood = function.grid.distributor.neighborhood
Expand Down

0 comments on commit e663ffe

Please sign in to comment.