diff --git a/pySDC/core/BaseTransfer.py b/pySDC/core/BaseTransfer.py index 54d140c0db..32439075fa 100644 --- a/pySDC/core/BaseTransfer.py +++ b/pySDC/core/BaseTransfer.py @@ -45,7 +45,6 @@ def __init__(self, fine_level, coarse_level, base_transfer_params, space_transfe # set up logger self.logger = logging.getLogger('transfer') - # just copy by object self.fine = fine_level self.coarse = coarse_level diff --git a/pySDC/implementations/sweeper_classes/generic_implicit_MPI.py b/pySDC/implementations/sweeper_classes/generic_implicit_MPI.py index 7deaa8a511..c79f7202eb 100644 --- a/pySDC/implementations/sweeper_classes/generic_implicit_MPI.py +++ b/pySDC/implementations/sweeper_classes/generic_implicit_MPI.py @@ -148,6 +148,13 @@ def predict(self): L.status.unlocked = True L.status.updated = True + def communicate_tau_correction_for_full_interval(self): + L = self.level + P = L.prob + if self.rank < self.comm.size - 1: + L.tau[-1] = P.u_init + self.comm.Bcast(L.tau[-1], root=self.comm.size - 1) + class generic_implicit_MPI(SweeperMPI, generic_implicit): """ @@ -250,6 +257,7 @@ def compute_end_point(self): L.uend += L.u[0] # add up tau correction of the full interval (last entry) - if L.tau[-1] is not None: + if L.tau[self.rank] is not None: + self.communicate_tau_correction_for_full_interval() L.uend += L.tau[-1] return None diff --git a/pySDC/implementations/sweeper_classes/imex_1st_order_MPI.py b/pySDC/implementations/sweeper_classes/imex_1st_order_MPI.py index 1711f9f511..e704daaede 100644 --- a/pySDC/implementations/sweeper_classes/imex_1st_order_MPI.py +++ b/pySDC/implementations/sweeper_classes/imex_1st_order_MPI.py @@ -107,6 +107,7 @@ def compute_end_point(self): L.uend += L.u[0] # add up tau correction of the full interval (last entry) - if L.tau[-1] is not None: + if L.tau[self.rank] is not None: + self.communicate_tau_correction_for_full_interval() L.uend += L.tau[-1] return None diff --git a/pySDC/implementations/transfer_classes/BaseTransferMPI.py b/pySDC/implementations/transfer_classes/BaseTransferMPI.py new file mode 100644 index 0000000000..2dae942a50 --- /dev/null +++ b/pySDC/implementations/transfer_classes/BaseTransferMPI.py @@ -0,0 +1,176 @@ +from mpi4py import MPI + +from pySDC.core.Errors import UnlockError +from pySDC.core.BaseTransfer import base_transfer + + +class base_transfer_MPI(base_transfer): + """ + Standard base_transfer class + + Attributes: + logger: custom logger for sweeper-related logging + params(__Pars): parameter object containing the custom parameters passed by the user + fine (pySDC.Level.level): reference to the fine level + coarse (pySDC.Level.level): reference to the coarse level + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.comm_fine = self.fine.sweep.comm + self.comm_coarse = self.coarse.sweep.comm + + if ( + self.comm_fine.size != self.fine.sweep.coll.num_nodes + or self.comm_coarse.size != self.coarse.sweep.coll.num_nodes + ): + raise NotImplementedError( + f'{type(self).__name__} only works when each rank administers one collocation node so far!' + ) + + def restrict(self): + """ + Space-time restriction routine + + The routine applies the spatial restriction operator to the fine values on the fine nodes, then reevaluates f + on the coarse level. This is used for the first part of the FAS correction tau via integration. The second part + is the integral over the fine values, restricted to the coarse level. Finally, possible tau corrections on the + fine level are restricted as well. + """ + + F, G = self.fine, self.coarse + CF, CG = self.comm_fine, self.comm_coarse + SG = G.sweep + PG = G.prob + + # only if the level is unlocked at least by prediction + if not F.status.unlocked: + raise UnlockError('fine level is still locked, cannot use data from there') + + # restrict fine values in space + tmp_u = self.space_transfer.restrict(F.u[CF.rank + 1]) + + # restrict collocation values + G.u[0] = self.space_transfer.restrict(F.u[0]) + recvBuf = [None for _ in range(SG.coll.num_nodes)] + recvBuf[CG.rank] = PG.u_init + for n in range(SG.coll.num_nodes): + CF.Reduce(self.Rcoll[n, CF.rank] * tmp_u, recvBuf[CG.rank], root=n, op=MPI.SUM) + G.u[CG.rank + 1] = recvBuf[CG.rank] + + # re-evaluate f on coarse level + G.f[0] = PG.eval_f(G.u[0], G.time) + G.f[CG.rank + 1] = PG.eval_f(G.u[CG.rank + 1], G.time + G.dt * SG.coll.nodes[CG.rank]) + + # build coarse level tau correction part + tauG = G.sweep.integrate() + + # build fine level tau correction part + tauF = F.sweep.integrate() + + # restrict fine level tau correction part in space + tmp_tau = self.space_transfer.restrict(tauF) + + # restrict fine level tau correction part in collocation + tauFG = tmp_tau.copy() + for n in range(SG.coll.num_nodes): + recvBuf = tauFG if n == CG.rank else None + CF.Reduce(self.Rcoll[n, CF.rank] * tmp_tau, recvBuf, root=n, op=MPI.SUM) + + # build tau correction + G.tau[CG.rank] = tauFG - tauG + + if F.tau[CF.rank] is not None: + tmp_tau = self.space_transfer.restrict(F.tau[CF.rank]) + + # restrict possible tau correction from fine in collocation + recvBuf = [None for _ in range(SG.coll.num_nodes)] + recvBuf[CG.rank] = PG.u_init + for n in range(SG.coll.num_nodes): + CF.Reduce(self.Rcoll[n, CF.rank] * tmp_tau, recvBuf[CG.rank], root=n, op=MPI.SUM) + G.tau[CG.rank] += recvBuf[CG.rank] + else: + pass + + # save u and rhs evaluations for interpolation + G.uold[CG.rank + 1] = PG.dtype_u(G.u[CG.rank + 1]) + G.fold[CG.rank + 1] = PG.dtype_f(G.f[CG.rank + 1]) + + # works as a predictor + G.status.unlocked = True + + return None + + def prolong(self): + """ + Space-time prolongation routine + + This routine applies the spatial prolongation routine to the difference between the computed and the restricted + values on the coarse level and then adds this difference to the fine values as coarse correction. + """ + + # get data for easier access + F, G = self.fine, self.coarse + CF, CG = self.comm_fine, self.comm_coarse + SF = F.sweep + PF = F.prob + + # only of the level is unlocked at least by prediction or restriction + if not G.status.unlocked: + raise UnlockError('coarse level is still locked, cannot use data from there') + + # build coarse correction + + # interpolate values in space first + tmp_u = self.space_transfer.prolong(G.u[CF.rank + 1] - G.uold[CF.rank + 1]) + + # interpolate values in collocation + recvBuf = [None for _ in range(SF.coll.num_nodes)] + recvBuf[CF.rank] = F.u[CF.rank + 1].copy() + for n in range(SF.coll.num_nodes): + + CG.Reduce(self.Pcoll[n, CG.rank] * tmp_u, recvBuf[n], root=n, op=MPI.SUM) + F.u[CF.rank + 1] += recvBuf[CF.rank] + + # re-evaluate f on fine level + F.f[CF.rank + 1] = PF.eval_f(F.u[CF.rank + 1], F.time + F.dt * SF.coll.nodes[CF.rank]) + + return None + + def prolong_f(self): + """ + Space-time prolongation routine w.r.t. the rhs f + + This routine applies the spatial prolongation routine to the difference between the computed and the restricted + values on the coarse level and then adds this difference to the fine values as coarse correction. + """ + + # get data for easier access + F, G = self.fine, self.coarse + CF, CG = self.comm_fine, self.comm_coarse + SF = F.sweep + + # only of the level is unlocked at least by prediction or restriction + if not G.status.unlocked: + raise UnlockError('coarse level is still locked, cannot use data from there') + + # build coarse correction + + # interpolate values in space first + tmp_u = self.space_transfer.prolong(G.u[CF.rank + 1] - G.uold[CF.rank + 1]) + tmp_f = self.space_transfer.prolong(G.f[CF.rank + 1] - G.fold[CF.rank + 1]) + + # interpolate values in collocation + recvBuf_u = [None for _ in range(SF.coll.num_nodes)] + recvBuf_f = [None for _ in range(SF.coll.num_nodes)] + recvBuf_u[CF.rank] = F.u[CF.rank + 1].copy() + recvBuf_f[CF.rank] = F.f[CF.rank + 1].copy() + for n in range(SF.coll.num_nodes): + + CG.Reduce(self.Pcoll[n, CG.rank] * tmp_u, recvBuf_u[CF.rank], root=n, op=MPI.SUM) + CG.Reduce(self.Pcoll[n, CG.rank] * tmp_f, recvBuf_f[CF.rank], root=n, op=MPI.SUM) + + F.u[CF.rank + 1] += recvBuf_u[CF.rank] + F.f[CF.rank + 1] += recvBuf_f[CF.rank] + + return None diff --git a/pySDC/projects/parallelSDC/AllenCahn_parallel.py b/pySDC/projects/parallelSDC/AllenCahn_parallel.py index 002cada4d0..f39f1a8222 100644 --- a/pySDC/projects/parallelSDC/AllenCahn_parallel.py +++ b/pySDC/projects/parallelSDC/AllenCahn_parallel.py @@ -11,7 +11,7 @@ from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit from pySDC.implementations.transfer_classes.TransferMesh_FFT2D import mesh_to_mesh_fft2d from pySDC.playgrounds.Allen_Cahn.AllenCahn_monitor import monitor -from pySDC.projects.parallelSDC.BaseTransfer_MPI import base_transfer_MPI +from pySDC.implementations.transfer_classes.BaseTransferMPI import base_transfer_MPI from pySDC.implementations.sweeper_classes.generic_implicit_MPI import generic_implicit_MPI diff --git a/pySDC/projects/parallelSDC/BaseTransfer_MPI.py b/pySDC/projects/parallelSDC/BaseTransfer_MPI.py deleted file mode 100644 index b5447997d7..0000000000 --- a/pySDC/projects/parallelSDC/BaseTransfer_MPI.py +++ /dev/null @@ -1,235 +0,0 @@ -import logging - -import numpy as np -import scipy.sparse as sp - -from pySDC.core.Errors import UnlockError -from pySDC.helpers.pysdc_helper import FrozenClass - - -# short helper class to add params as attributes -class _Pars(FrozenClass): - def __init__(self, pars): - self.finter = False - for k, v in pars.items(): - setattr(self, k, v) - - self._freeze() - - -class base_transfer_MPI(object): - """ - Standard base_transfer class - - Attributes: - logger: custom logger for sweeper-related logging - params(__Pars): parameter object containing the custom parameters passed by the user - fine (pySDC.Level.level): reference to the fine level - coarse (pySDC.Level.level): reference to the coarse level - """ - - def __init__(self, fine_level, coarse_level, base_transfer_params, space_transfer_class, space_transfer_params): - """ - Initialization routine - - Args: - fine_level (pySDC.Level.level): fine level connected with the base_transfer operations - coarse_level (pySDC.Level.level): coarse level connected with the base_transfer operations - base_transfer_params (dict): parameters for the base_transfer operations - space_transfer_class: class to perform spatial transfer - space_transfer_params (dict): parameters for the space_transfer operations - """ - - self.params = _Pars(base_transfer_params) - - # set up logger - self.logger = logging.getLogger('transfer') - - # just copy by object - self.fine = fine_level - self.coarse = coarse_level - - fine_grid = self.fine.sweep.coll.nodes - coarse_grid = self.coarse.sweep.coll.nodes - - if len(fine_grid) == len(coarse_grid): - self.Pcoll = sp.eye(len(fine_grid)).toarray() - self.Rcoll = sp.eye(len(fine_grid)).toarray() - else: - raise NotImplementedError('require no reduction of collocation nodes') - - # set up spatial transfer - self.space_transfer = space_transfer_class( - fine_prob=self.fine.prob, coarse_prob=self.coarse.prob, params=space_transfer_params - ) - - @staticmethod - def get_transfer_matrix_Q(f_nodes, c_nodes): - """ - Helper routine to quickly define transfer matrices between sets of nodes (fully Lagrangian) - Args: - f_nodes: fine nodes - c_nodes: coarse nodes - - Returns: - matrix containing the interpolation weights - """ - nnodes_f = len(f_nodes) - nnodes_c = len(c_nodes) - - tmat = np.zeros((nnodes_f, nnodes_c)) - - for i in range(nnodes_f): - xi = f_nodes[i] - for j in range(nnodes_c): - den = 1.0 - num = 1.0 - for k in range(nnodes_c): - if k == j: - continue - else: - den *= c_nodes[j] - c_nodes[k] - num *= xi - c_nodes[k] - tmat[i, j] = num / den - - return tmat - - def restrict(self): - """ - Space-time restriction routine - - The routine applies the spatial restriction operator to teh fine values on the fine nodes, then reevaluates f - on the coarse level. This is used for the first part of the FAS correction tau via integration. The second part - is the integral over the fine values, restricted to the coarse level. Finally, possible tau corrections on the - fine level are restricted as well. - """ - - # get data for easier access - F = self.fine - G = self.coarse - - PG = G.prob - - SF = F.sweep - SG = G.sweep - - # only if the level is unlocked at least by prediction - if not F.status.unlocked: - raise UnlockError('fine level is still locked, cannot use data from there') - - # restrict fine values in space - G.u[0] = self.space_transfer.restrict(F.u[0]) - G.u[SG.rank + 1] = self.space_transfer.restrict(F.u[SF.rank + 1]) - - # re-evaluate f on coarse level - G.f[0] = PG.eval_f(G.u[0], G.time) - G.f[SG.rank + 1] = PG.eval_f(G.u[SG.rank + 1], G.time + G.dt * SG.coll.nodes[SG.rank]) - - # build coarse level tau correction part - tauG = G.sweep.integrate() - - # build fine level tau correction part - tauF = F.sweep.integrate() - - # restrict fine level tau correction part in space - tauFG = self.space_transfer.restrict(tauF) - - # build tau correction - G.tau[SG.rank] = tauFG - tauG - - if F.tau[SF.rank] is not None: - # restrict possible tau correction from fine in space - G.tau[SG.rank] += self.space_transfer.restrict(F.tau[SF.rank]) - else: - pass - - # save u and rhs evaluations for interpolation - G.uold[SG.rank + 1] = PG.dtype_u(G.u[SG.rank + 1]) - G.fold[SG.rank + 1] = PG.dtype_f(G.f[SG.rank + 1]) - # G.uold[0] = PG.dtype_u(G.u[0]) - # G.fold[0] = PG.dtype_f(G.f[0]) - - # works as a predictor - G.status.unlocked = True - - return None - - def prolong(self): - """ - Space-time prolongation routine - - This routine applies the spatial prolongation routine to the difference between the computed and the restricted - values on the coarse level and then adds this difference to the fine values as coarse correction. - """ - - # get data for easier access - F = self.fine - G = self.coarse - - PF = F.prob - - SF = F.sweep - SG = G.sweep - - # only of the level is unlocked at least by prediction or restriction - if not G.status.unlocked: - raise UnlockError('coarse level is still locked, cannot use data from there') - - # build coarse correction - - # we need to update u0 here for the predictor step, since here the new values for the fine sweep are not - # received from the previous processor but interpolated from the coarse level. - # need to restrict F.u[0] again here, since it might have changed in PFASST - G.uold[0] = self.space_transfer.restrict(F.u[0]) - - # interpolate values in space first - F.u[SF.rank + 1] += self.space_transfer.prolong(G.u[SG.rank + 1] - G.uold[SG.rank + 1]) - - # re-evaluate f on fine level - F.f[0] = PF.eval_f(F.u[0], F.time) - F.f[SF.rank + 1] = PF.eval_f(F.u[SF.rank + 1], F.time + F.dt * SF.coll.nodes[SF.rank]) - - return None - - def prolong_f(self): - """ - Space-time prolongation routine w.r.t. the rhs f - - This routine applies the spatial prolongation routine to the difference between the computed and the restricted - values on the coarse level and then adds this difference to the fine values as coarse correction. - """ - - # get data for easier access - F = self.fine - G = self.coarse - - PG = G.prob - - SF = F.sweep - SG = G.sweep - - # only of the level is unlocked at least by prediction or restriction - if not G.status.unlocked: - raise UnlockError('coarse level is still locked, cannot use data from there') - - # build coarse correction - # need to restrict F.u[0] again here, since it might have changed in PFASST - G.uold[0] = self.space_transfer.restrict(F.u[0]) - G.fold[0] = PG.eval_f(G.uold[0], G.time) - - # interpolate values in space first - tmp_u = [self.space_transfer.prolong(G.u[0] - G.uold[0])] - tmp_f = [self.space_transfer.prolong(G.f[0] - G.fold[0])] - for m in range(1, SG.coll.num_nodes + 1): - tmp_u.append(self.space_transfer.prolong(G.u[m] - G.uold[m])) - tmp_f.append(self.space_transfer.prolong(G.f[m] - G.fold[m])) - - # interpolate values in collocation - F.u[0] += tmp_u[0] - F.f[0] += tmp_f[0] - for n in range(1, SF.coll.num_nodes + 1): - for m in range(1, SG.coll.num_nodes + 1): - F.u[n] += self.Pcoll[n - 1, m - 1] * tmp_u[m] - F.f[n] += self.Pcoll[n - 1, m - 1] * tmp_f[m] - - return None diff --git a/pySDC/tests/test_sweepers/test_MPI_sweeper.py b/pySDC/tests/test_sweepers/test_MPI_sweeper.py index e94234717d..4c1dd43028 100644 --- a/pySDC/tests/test_sweepers/test_MPI_sweeper.py +++ b/pySDC/tests/test_sweepers/test_MPI_sweeper.py @@ -1,7 +1,7 @@ import pytest -def run(use_MPI, num_nodes, quad_type, residual_type, imex, initGuess, useNCCL): +def run(use_MPI, num_nodes, quad_type, residual_type, imex, init_guess, useNCCL, ML): """ Run a single sweep for a problem and compute the solution at the end point with a sweeper as specified. @@ -11,8 +11,9 @@ def run(use_MPI, num_nodes, quad_type, residual_type, imex, initGuess, useNCCL): quad_type (str): Type of nodes residual_type (str): Type of residual computation imex (bool): Use IMEX sweeper or not - initGuess (str): which initial guess should be used + init_guess (str): which initial guess should be used useNCCL (bool): ... + ML (int): Number of levels in space Returns: pySDC.Level.level: The level containing relevant data @@ -25,7 +26,10 @@ def run(use_MPI, num_nodes, quad_type, residual_type, imex, initGuess, useNCCL): else: from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit as sweeper_class - from pySDC.implementations.problem_classes.TestEquation_0D import testequation0d as problem_class + if ML: + from pySDC.implementations.problem_classes.HeatEquation_ND_FD import heatNd_unforced as problem_class + else: + from pySDC.implementations.problem_classes.TestEquation_0D import testequation0d as problem_class else: if use_MPI: from pySDC.implementations.sweeper_classes.imex_1st_order_MPI import imex_1st_order_MPI as sweeper_class @@ -35,12 +39,13 @@ def run(use_MPI, num_nodes, quad_type, residual_type, imex, initGuess, useNCCL): from pySDC.implementations.problem_classes.HeatEquation_ND_FD import heatNd_forced as problem_class dt = 1e-1 + description = {} sweeper_params = { 'num_nodes': num_nodes, 'quad_type': quad_type, 'QI': 'IEpar', 'QE': 'PIC', - "initial_guess": initGuess, + "initial_guess": init_guess, } problem_params = {} @@ -51,7 +56,17 @@ def run(use_MPI, num_nodes, quad_type, residual_type, imex, initGuess, useNCCL): sweeper_params['comm'] = NCCLComm(MPI.COMM_WORLD) problem_params['useGPU'] = True - description = {} + if ML > 1: + from pySDC.implementations.transfer_classes.TransferMesh import mesh_to_mesh + + description['space_transfer_class'] = mesh_to_mesh + + problem_params['nvars'] = [2 ** (ML - i) for i in range(ML)] + if use_MPI: + from pySDC.implementations.transfer_classes.BaseTransferMPI import base_transfer_MPI + + description['base_transfer_class'] = base_transfer_MPI + description['problem_class'] = problem_class description['problem_params'] = problem_params description['sweeper_class'] = sweeper_class @@ -70,20 +85,17 @@ def run(use_MPI, num_nodes, quad_type, residual_type, imex, initGuess, useNCCL): return controller.MS[0].levels[0] -def individual_test(num_nodes, quad_type, residual_type, imex, initGuess, useNCCL, launch=True): +def individual_test(launch=False, **kwargs): """ Make a test if the result matches between the MPI and non-MPI versions of a sweeper. Tests solution at the right end point and the residual. Args: - num_nodes (int): The number of nodes to use - quad_type (str): Type of nodes - residual_type (str): Type of residual computation - imex (bool): Use IMEX sweeper or not - initGuess (str): which initial guess should be used - useNCCL (bool): ... launch (bool): If yes, it will launch `mpirun` with the required number of processes """ + num_nodes = kwargs['num_nodes'] + useNCCL = kwargs['useNCCL'] + if launch: import os import subprocess @@ -93,9 +105,11 @@ def individual_test(num_nodes, quad_type, residual_type, imex, initGuess, useNCC my_env['PYTHONPATH'] = '../../..:.' my_env['COVERAGE_PROCESS_START'] = 'pyproject.toml' - cmd = f"mpirun -np {num_nodes} python {__file__} --test_sweeper {num_nodes} {quad_type} {residual_type} {imex} {initGuess} {useNCCL}".split() + cmd = f"mpirun -np {num_nodes} python {__file__}" - p = subprocess.Popen(cmd, env=my_env, cwd=".") + for key, value in kwargs.items(): + cmd += f' --{key}={value}' + p = subprocess.Popen(cmd.split(), env=my_env, cwd=".") p.wait() assert p.returncode == 0, 'ERROR: did not get return code 0, got %s with %2i processes' % ( @@ -109,25 +123,17 @@ def individual_test(num_nodes, quad_type, residual_type, imex, initGuess, useNCC import numpy as xp MPI = run( + **kwargs, use_MPI=True, - num_nodes=int(num_nodes), - quad_type=quad_type, - residual_type=residual_type, - imex=imex, - initGuess=initGuess, - useNCCL=useNCCL, ) nonMPI = run( + **kwargs, use_MPI=False, - num_nodes=int(num_nodes), - quad_type=quad_type, - residual_type=residual_type, - imex=imex, - initGuess=initGuess, - useNCCL=False, ) - assert xp.allclose(MPI.uend, nonMPI.uend, atol=1e-14), 'Got different solutions at end point!' + assert xp.allclose( + MPI.uend, nonMPI.uend, atol=1e-14 + ), f'Got different solutions at end point! {MPI.uend=} {nonMPI.uend=}' assert xp.allclose(MPI.status.residual, nonMPI.status.residual, atol=1e-14), 'Got different residuals!' @@ -136,8 +142,9 @@ def individual_test(num_nodes, quad_type, residual_type, imex, initGuess, useNCC @pytest.mark.parametrize("quad_type", ['GAUSS', 'RADAU-RIGHT']) @pytest.mark.parametrize("residual_type", ['last_abs', 'full_rel']) @pytest.mark.parametrize("imex", [True, False]) -@pytest.mark.parametrize("initGuess", ['spread', 'copy', 'zero']) -def test_sweeper(num_nodes, quad_type, residual_type, imex, initGuess, launch=True): +@pytest.mark.parametrize("init_guess", ['spread', 'copy', 'zero']) +@pytest.mark.parametrize("ML", [1, 2, 3]) +def test_sweeper(num_nodes, quad_type, residual_type, imex, init_guess, ML, launch=True): """ Make a test if the result matches between the MPI and non-MPI versions of a sweeper. Tests solution at the right end point and the residual. @@ -149,7 +156,16 @@ def test_sweeper(num_nodes, quad_type, residual_type, imex, initGuess, launch=Tr imex (bool): Use IMEX sweeper or not launch (bool): If yes, it will launch `mpirun` with the required number of processes """ - individual_test(num_nodes, quad_type, residual_type, imex, initGuess, useNCCL=False, launch=launch) + individual_test( + num_nodes=num_nodes, + quad_type=quad_type, + residual_type=residual_type, + imex=imex, + init_guess=init_guess, + useNCCL=False, + ML=ML, + launch=launch, + ) @pytest.mark.cupy @@ -158,8 +174,8 @@ def test_sweeper(num_nodes, quad_type, residual_type, imex, initGuess, launch=Tr @pytest.mark.parametrize("quad_type", ['GAUSS', 'RADAU-RIGHT']) @pytest.mark.parametrize("residual_type", ['last_abs', 'full_rel']) @pytest.mark.parametrize("imex", [False]) -@pytest.mark.parametrize("initGuess", ['spread', 'copy', 'zero']) -def test_sweeper_NCCL(num_nodes, quad_type, residual_type, imex, initGuess, launch=True): +@pytest.mark.parametrize("init_guess", ['spread', 'copy', 'zero']) +def test_sweeper_NCCL(num_nodes, quad_type, residual_type, imex, init_guess, launch=True): """ Make a test if the result matches between the MPI and non-MPI versions of a sweeper. Tests solution at the right end point and the residual. @@ -171,15 +187,35 @@ def test_sweeper_NCCL(num_nodes, quad_type, residual_type, imex, initGuess, laun imex (bool): Use IMEX sweeper or not launch (bool): If yes, it will launch `mpirun` with the required number of processes """ - individual_test(num_nodes, quad_type, residual_type, imex, initGuess, useNCCL=True, launch=launch) + individual_test( + num_nodes=num_nodes, + quad_type=quad_type, + residual_type=residual_type, + imex=imex, + init_guess=init_guess, + useNCCL=True, + ML=1, + launch=launch, + ) if __name__ == '__main__': - import sys - - if '--test_sweeper' in sys.argv: - imex = False if sys.argv[-3] == 'False' else True - useNCCL = False if sys.argv[-1] == 'False' else True - individual_test( - sys.argv[-6], sys.argv[-5], sys.argv[-4], imex=imex, initGuess=sys.argv[-2], useNCCL=useNCCL, launch=False - ) + str_to_bool = lambda me: False if me == 'False' else True + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument('--ML', type=int, help='Number of levels in space') + parser.add_argument('--num_nodes', type=int, help='Number of collocation nodes') + parser.add_argument('--quad_type', type=str, help='Quadrature rule', choices=['GAUSS', 'RADAU-RIGHT', 'RADAU-LEFT']) + parser.add_argument( + '--residual_type', + type=str, + help='Way of computing the residual', + choices=['full_rel', 'last_abs', 'full_abs', 'last_rel'], + ) + parser.add_argument('--imex', type=str_to_bool, help='Toggle for IMEX', choices=[True, False]) + parser.add_argument('--useNCCL', type=str_to_bool, help='Toggle for NCCL communicator', choices=[True, False]) + parser.add_argument('--init_guess', type=str, help='Initial guess', choices=['spread', 'copy', 'zero']) + args = parser.parse_args() + + individual_test(**vars(args)) diff --git a/pySDC/tests/test_transfer_classes/test_base_transfer_MPI.py b/pySDC/tests/test_transfer_classes/test_base_transfer_MPI.py new file mode 100644 index 0000000000..8caf7a6a9e --- /dev/null +++ b/pySDC/tests/test_transfer_classes/test_base_transfer_MPI.py @@ -0,0 +1,122 @@ +import pytest + + +def getLevel(nvars, num_nodes, index, useMPI): + from pySDC.core.Level import level + from pySDC.implementations.problem_classes.HeatEquation_ND_FD import heatNd_unforced + + if useMPI: + from pySDC.implementations.sweeper_classes.generic_implicit_MPI import generic_implicit_MPI as sweeper_class + else: + from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit as sweeper_class + + level_params = {} + level_params['problem_class'] = heatNd_unforced + level_params['problem_params'] = {'nvars': nvars} + level_params['sweeper_class'] = sweeper_class + level_params['sweeper_params'] = {'num_nodes': num_nodes, 'quad_type': 'GAUSS', 'do_coll_update': True} + level_params['level_params'] = {'dt': 1.0} + level_params['level_index'] = index + + L = level(**level_params) + + L.status.time = 0.0 + L.status.unlocked = True + L.u[0] = L.prob.u_exact(t=0) + L.sweep.predict() + return L + + +def get_base_transfer(nvars, num_nodes, useMPI): + from pySDC.implementations.transfer_classes.TransferMesh import mesh_to_mesh + + if useMPI: + from pySDC.implementations.transfer_classes.BaseTransferMPI import base_transfer_MPI as transfer_class + else: + from pySDC.core.BaseTransfer import base_transfer as transfer_class + + params = {} + params['fine_level'] = getLevel(nvars[0], num_nodes[0], 0, useMPI) + params['coarse_level'] = getLevel(nvars[1], num_nodes[1], 1, useMPI) + params['base_transfer_params'] = {} + params['space_transfer_class'] = mesh_to_mesh + params['space_transfer_params'] = {} + return transfer_class(**params) + + +@pytest.mark.mpi4py +@pytest.mark.parametrize('nvars', [32, 16]) +@pytest.mark.parametrize('num_procs', [2, 3]) +def test_MPI_nonMPI_consistency(num_procs, nvars): + import os + import subprocess + + my_env = os.environ.copy() + my_env['PYTHONPATH'] = '../../..:.' + my_env['COVERAGE_PROCESS_START'] = 'pyproject.toml' + + cmd = f"mpirun -np {num_procs} python {__file__} --nvars={nvars}".split() + + p = subprocess.Popen(cmd, env=my_env, cwd=".") + + p.wait() + assert p.returncode == 0, 'ERROR: did not get return code 0, got %s with %2i processes' % ( + p.returncode, + num_procs, + ) + + +def _test_MPI_nonMPI_consistency(nvars): + import numpy as np + from mpi4py import MPI + + num_nodes = (MPI.COMM_WORLD.size, MPI.COMM_WORLD.size) + _nvars = [nvars, nvars // 2] + base_transfer_params = {'nvars': _nvars, 'num_nodes': num_nodes} + + T = {useMPI: get_base_transfer(**base_transfer_params, useMPI=useMPI) for useMPI in [True, False]} + CF, CG = T[True].comm_fine, T[True].comm_coarse + + def assert_all_equal(operation_name): + err = [] + if not np.allclose(*(T[useMPI].fine.u[CF.rank + 1] for useMPI in [True, False])): + err += [f'Difference in u on fine level after {operation_name} on rank {CF.rank}'] + if not np.allclose(*(T[useMPI].fine.f[CF.rank + 1] for useMPI in [True, False])): + err += [f'Difference in f on fine level after {operation_name} on rank {CF.rank}'] + if not np.allclose(*(T[useMPI].coarse.u[CG.rank + 1] for useMPI in [True, False])): + err += [f'Difference in u on coarse level after {operation_name} on rank {CG.rank}'] + if not np.allclose(*(T[useMPI].coarse.f[CG.rank + 1] for useMPI in [True, False])): + err += [f'Difference in f on coarse level after {operation_name} on rank {CG.rank}'] + + if any(me is not None for me in T[False].fine.tau): + if not np.allclose(*(T[useMPI].fine.tau[CF.rank] for useMPI in [True, False])): + err += [f'Difference in tau correction on fine level after {operation_name} on rank {CF.rank}'] + if any(me is not None for me in T[False].coarse.tau): + if not np.allclose(*(T[useMPI].coarse.tau[CG.rank] for useMPI in [True, False])): + err += [f'Difference in tau correction on coarse level after {operation_name} on rank {CG.rank}'] + + globel_err = CF.allgather(err) + if any(len(me) > 0 for me in globel_err): + raise Exception(globel_err) + + assert_all_equal('initialization') + + for function in [ + 'restrict', + 'prolong', + 'prolong_f', + ]: + for me in T.values(): + me.__getattribute__(function)() + assert_all_equal(function) + print(f'Passed with {nvars=} and {CF.size=}') + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument('--nvars', type=int, nargs=1, help='Number of degrees of freedom in space') + args = parser.parse_args() + + _test_MPI_nonMPI_consistency(args.nvars[0])