Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compiler: Add machinery for custom memory allocators and MPI #1764

Merged
merged 7 commits into from
Oct 2, 2021
Merged
3 changes: 2 additions & 1 deletion devito/core/arm.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ class ArmAdvOperator(Cpu64AdvOperator):
@timed_pass(name='specializing.IET')
def _specialize_iet(cls, graph, **kwargs):
options = kwargs['options']
language = kwargs['language']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

# Distributed-memory parallelism
optimize_halospots(graph)
if options['mpi']:
mpiize(graph, mode=options['mpi'], sregistry=sregistry)
mpiize(graph, mode=options['mpi'], language=language, sregistry=sregistry)

# Lower IncrDimensions so that blocks of arbitrary shape may be used
relax_incr_dimensions(graph)
Expand Down
9 changes: 6 additions & 3 deletions devito/core/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,13 @@ class Cpu64NoopOperator(Cpu64OperatorMixin, CoreOperator):
@timed_pass(name='specializing.IET')
def _specialize_iet(cls, graph, **kwargs):
options = kwargs['options']
language = kwargs['language']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

# Distributed-memory parallelism
if options['mpi']:
mpiize(graph, mode=options['mpi'], sregistry=sregistry)
mpiize(graph, mode=options['mpi'], language=language, sregistry=sregistry)

# Shared-memory parallelism
if options['openmp']:
Expand Down Expand Up @@ -192,6 +193,7 @@ def _specialize_clusters(cls, clusters, **kwargs):
@timed_pass(name='specializing.IET')
def _specialize_iet(cls, graph, **kwargs):
options = kwargs['options']
language = kwargs['language']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

Expand All @@ -201,7 +203,7 @@ def _specialize_iet(cls, graph, **kwargs):
# Distributed-memory parallelism
optimize_halospots(graph)
if options['mpi']:
mpiize(graph, mode=options['mpi'], sregistry=sregistry)
mpiize(graph, mode=options['mpi'], language=language, sregistry=sregistry)

# Lower IncrDimensions so that blocks of arbitrary shape may be used
relax_incr_dimensions(graph)
Expand Down Expand Up @@ -330,7 +332,8 @@ def _make_iet_passes_mapper(cls, **kwargs):
'blocking': partial(relax_incr_dimensions),
'parallel': parizer.make_parallel,
'openmp': parizer.make_parallel,
'mpi': partial(mpiize, mode=options['mpi'], sregistry=sregistry),
'mpi': partial(mpiize, mode=options['mpi'], language=kwargs['language'],
sregistry=sregistry),
'linearize': partial(linearize, sregistry=sregistry),
'simd': partial(parizer.make_simd),
'prodders': hoist_prodders,
Expand Down
10 changes: 7 additions & 3 deletions devito/core/gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,13 @@ class DeviceNoopOperator(DeviceOperatorMixin, CoreOperator):
@timed_pass(name='specializing.IET')
def _specialize_iet(cls, graph, **kwargs):
options = kwargs['options']
language = kwargs['language']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

# Distributed-memory parallelism
if options['mpi']:
mpiize(graph, mode=options['mpi'], sregistry=sregistry)
mpiize(graph, mode=options['mpi'], language=language, sregistry=sregistry)

# GPU parallelism
parizer = cls._Target.Parizer(sregistry, options, platform)
Expand Down Expand Up @@ -181,13 +182,14 @@ def _specialize_clusters(cls, clusters, **kwargs):
@timed_pass(name='specializing.IET')
def _specialize_iet(cls, graph, **kwargs):
options = kwargs['options']
language = kwargs['language']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

# Distributed-memory parallelism
optimize_halospots(graph)
if options['mpi']:
mpiize(graph, mode=options['mpi'], sregistry=sregistry)
mpiize(graph, mode=options['mpi'], language=language, sregistry=sregistry)

# GPU parallelism
parizer = cls._Target.Parizer(sregistry, options, platform)
Expand Down Expand Up @@ -263,6 +265,7 @@ def callback(f):
@classmethod
def _make_iet_passes_mapper(cls, **kwargs):
options = kwargs['options']
language = kwargs['language']
platform = kwargs['platform']
sregistry = kwargs['sregistry']

Expand All @@ -273,7 +276,8 @@ def _make_iet_passes_mapper(cls, **kwargs):
'optcomms': partial(optimize_halospots),
'parallel': parizer.make_parallel,
'orchestrate': partial(orchestrator.process),
'mpi': partial(mpiize, mode=options['mpi'], sregistry=sregistry),
'mpi': partial(mpiize, mode=options['mpi'], language=language,
sregistry=sregistry),
'linearize': partial(linearize, sregistry=sregistry),
'prodders': partial(hoist_prodders),
'init': parizer.initialize
Expand Down
44 changes: 30 additions & 14 deletions devito/data/allocators.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def _alloc_C_libcall(self, size, ctype):
return None, None
else:
# Convert it back to a void * - this is
# _very_ important when later # passing it to numa_free
# _very_ important when later passing it to `numa_free`
c_pointer = ctypes.c_void_p(c_pointer)
return c_pointer, (c_pointer, c_bytesize)

Expand Down Expand Up @@ -375,19 +375,37 @@ def alloc(self, shape, dtype):
ALLOC_NUMA_ANY = NumaAllocator('any')
ALLOC_NUMA_LOCAL = NumaAllocator('local')

custom_allocators = {}
"""User-defined allocators."""


def register_allocator(name, allocator):
"""
Register a custom MemoryAllocator.
"""
if not isinstance(name, str):
raise TypeError("name must be a str, not `%s`" % type(name))
if name in custom_allocators:
raise ValueError("A MemoryAllocator for `%s` already exists" % name)
if not isinstance(allocator, MemoryAllocator):
raise TypeError("Expected a MemoryAllocator, not `%s`" % type(allocator))

custom_allocators[name] = allocator


def infer_knl_mode():
path = os.path.join('/sys', 'bus', 'node', 'devices', 'node1')
return 'flat' if os.path.exists(path) else 'cache'


def default_allocator():
def default_allocator(name=None):
"""
Return a suitable MemoryAllocator for the architecture on which the process
is running. Possible allocators are: ::
Return a MemoryAllocator for the underlying architecture.

* ALLOC_GUARD: Only used in so-called "develop mode", to trigger SIGSEGV as
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

develop-mode or develop_mode just for 'grepping' homogeneity

soon as OOB accesses are performed.
* ALLOC_FLAT: Align memory to page boundaries using the posix function
``posix_memalign``
`posix_memalign`.
* ALLOC_NUMA_LOCAL: Allocate memory in the "closest" NUMA node. This only
makes sense on a NUMA architecture. Falls back to
allocation in an arbitrary NUMA node if there isn't
Expand All @@ -397,16 +415,14 @@ def default_allocator():
Falls back to DRAM if there isn't enough space.
* ALLOC_KNL_DRAM: On a Knights Landing platform, allocate memory in DRAM.

The default allocator is chosen based on the following algorithm: ::

* If running in DEVELOP mode (env var DEVITO_DEVELOP), return ALLOC_FLAT;
* If ``libnuma`` is not available on the system, return ALLOC_FLAT (though
it typically is available, at least on relatively recent Linux distributions);
* If on a Knights Landing platform (codename ``knl``, see ``print_defaults()``)
return ALLOC_KNL_MCDRAM;
* If on a multi-socket Intel Xeon platform, return ALLOC_NUMA_LOCAL;
* In all other cases, return ALLOC_FLAT.
Custom allocators may be added with `register_allocator`.
"""
if name is not None:
try:
return custom_allocators[name]
except KeyError:
pass

if configuration['develop-mode']:
return ALLOC_GUARD
elif NumaAllocator.available():
Expand Down
21 changes: 14 additions & 7 deletions devito/mpi/routines.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ class HaloExchangeBuilder(object):
Build IET-based routines to implement MPI halo exchange.
"""

def __new__(cls, mode, sregistry, **generators):
def __new__(cls, mode, language, sregistry, **generators):
obj = object.__new__(mpi_registry[mode])

obj._language = language
obj._sregistry = sregistry

# Unique name generators
Expand Down Expand Up @@ -530,7 +531,7 @@ class OverlapHaloExchangeBuilder(DiagHaloExchangeBuilder):
def _make_msg(self, f, hse, key):
# Only retain the halos required by the Diag scheme
halos = sorted(i for i in hse.halos if isinstance(i.dim, tuple))
return MPIMsg('msg%d' % key, f, halos)
return MPIMsg('msg%d' % key, f, halos, self._language)

def _make_all(self, f, hse, msg):
df = AliasFunction(name=self.sregistry.make_name(prefix='a'),
Expand Down Expand Up @@ -703,7 +704,7 @@ def _make_region(self, hs, key):
def _make_msg(self, f, hse, key):
# Only retain the halos required by the Diag scheme
halos = sorted(i for i in hse.halos if isinstance(i.dim, tuple))
return MPIMsgEnriched('msg%d' % key, f, halos)
return MPIMsgEnriched('msg%d' % key, f, halos, self._language)

def _make_all(self, f, hse, msg):
df = AliasFunction(name=self.sregistry.make_name(prefix='a'),
Expand Down Expand Up @@ -1050,15 +1051,16 @@ class MPIMsg(CompositeObject):
(_C_field_rsend, c_mpirequest_p),
]

def __init__(self, name, target, halos):
def __init__(self, name, target, halos, language=None):
self._target = target
self._halos = halos
self._language = language

super(MPIMsg, self).__init__(name, 'msg', self.fields)
super().__init__(name, 'msg', self.fields)

# Required for buffer allocation/deallocation before/after jumping/returning
# to/from C-land
self._allocator = default_allocator()
self._allocator = default_allocator(language)
self._memfree_args = []

def __del__(self):
Expand All @@ -1083,6 +1085,10 @@ def target(self):
def halos(self):
return self._halos

@property
def language(self):
return self._language

@property
def npeers(self):
return len(self._halos)
Expand Down Expand Up @@ -1119,6 +1125,7 @@ def _arg_apply(self, *args, **kwargs):

# Pickling support
_pickle_args = ['name', 'target', 'halos']
_pickle_kwargs = CompositeObject._pickle_kwargs + ['language']


class MPIMsgEnriched(MPIMsg):
Expand All @@ -1136,7 +1143,7 @@ class MPIMsgEnriched(MPIMsg):
]

def _arg_defaults(self, alias=None):
super(MPIMsgEnriched, self)._arg_defaults(alias)
super()._arg_defaults(alias)

function = alias or self.function
neighborhood = function.grid.distributor.neighborhood
Expand Down