Building your own sweeper
--------------------------------
While pySDC comes with a lot of functionality, its purpose is to be extendable and (in a sense) modular.
Users are able to replace nearly all key functionality with their own approaches.
One particular aspect of this are custom sweepers.
In this example, we will look at sweepers that enable parallelism across the nodes.

A sweeper in pySDC takes values $u_m$ at the nodes and updates them using the SDC iteration rule to get $u_{m+1}$, using the particular type of $Q_\Delta$ (and of course the scheme itself, be it IMEX, implicit, velocity-verlet and so on).
The sweeper is also responsible for computing the residual on a time-step and for getting the final value $u_{\textrm{end}}$ at the right boundary of the interval.
It also provides the mechanism to integrate over the nodes using the $Q$-matrix and it can set an initial guess for each node.

Let's look briefly into the maths of the project we have set our minds to.
Recall that SDC solves fully implicit Runge-Kutta rules that arise from discretizing the integral version of an initial value problem using a quadrature rule:
$$u(t_0+\Delta t) = \int_{t_0}^{t_0 + \Delta t} f(u(t), t) dt + u(t_0) \rightarrow \vec{u} = \Delta t QF(\vec{u}, \vec{t}) + \vec{u}_0.$$
The matrix $Q$ carries the quadrature weights, the vector $u$ contains the solutions at the quadrature nodes $\vec{t}$ (which go from 0 to 1), $F$ is some vector valued operator that evaluates the right hand side at the collocation nodes and $\vec{u}$ carries the initial conditions in every component.

$Q$ is typically dense, which makes solving this directly prohibitively expensive.
SDC alleviates this by introducing iterations and a preconditioner.
The result is
$$(I - \Delta t Q_\Delta F)(\vec{u}^{k+1}) = \vec{u}_0 + \Delta t(Q-Q_\Delta)F(\vec{u}^k),$$
with $Q_\Delta$ the preconditioner and $k$ the iteration number.

Remember that we can solve each iteration by forward substitution because we choose a lower triangular preconditioner.
Looking equation component wise, we sweep through the collocation nodes by perform the following "implicit Euler" steps:
$$u^{k+1}_{m+1} - \Delta t \tilde{q}_{m+1, m+1}f(u^{k+1}_{m+1}) = u_0 + \Delta t \sum^m_{j=1}\tilde{q}_{m+1, j}f(u^{k+1}_j) + \Delta t \sum^M_{j=1}(q_{m+1, j} - \tilde{q}_{m+1, j})f(u^{k}_j),$$
with $m$ the index of the current collocation node and $M$ the number of collocation nodes.
The first part of the right hand side after the initial conditions is the forward subsitituion part which depends on the current iteration at previous collocation nodes.
Everything else on the right hand side depends only on the previous iteration.

This means if we can eliminate that part, we can update the solutions at the nodes in parallel!
This would be nice because it would allow us to parallelize the right hand side evaluations and the "implicit Euler" steps across the collocation nodes, which make for the bulk of computation time in SDC.

But this part comes from the preconditioner, so the natural thing to do is select a different preconditiner.
We get rid of the forward subsitution term by using one that is not just lower triangular, but diagonal.
Lucky for us, [someone has looked into this a bit already](https://juser.fz-juelich.de/record/849786/files/Paper.pdf), but generating good diagonal preconditioners is still an active area of research.

We will now write a sweeper that uses MPI to indeed parallelize SDC with diagonal preconditioners over $M$ processes.
For computing the residual or for integrating over the nodes, communication over the nodes is needed. Note that we will not deal with all the details here, no error checking, no ideal memory layout etc. to keep the example short.

Before we start, however, let's make sure MPI jobs can be run. We connect to our ``ipcluster`` and check if all looks good:

In [1]:
import ipyparallel as ipp
rc = ipp.Client()
view = rc[:]

In [2]:
%%px
from mpi4py import MPI

print(MPI.COMM_WORLD.Get_size())

[stdout:0] 4


[stdout:2] 4


[stdout:1] 4


[stdout:3] 4


We then start with the initialization of our new sweeper, nothing fancy here.
Note the cell magic ``%%px``, which we already used above.
This will tell the notebook to run the code in the engine provided by the ``ipcluster`` rather than the kernel you have selected for the notebook.
Remember: If you want to run in the ``pySDC_tutorial`` virtual environement, you have to start the cluster in a shell that has this virtual environment activated, because selecting the kernel in the notebook will have no effect on the cluster.

In [3]:
%%px
import jdc  # required to split the class definition into multiple cells...
from pySDC.core.Sweeper import sweeper

class generic_implicit_MPI(sweeper):

    def __init__(self, params):
        """
        Args:
            params (dict): Parameters for the sweeper
        """

        # call parent's initialization routine
        super().__init__(params)
        
        # set Q_Delta matrix and store MPI rank
        self.QI = self.get_Qdelta_implicit(self.coll, qd_type=self.params.QI)
        self.rank = self.params.comm.Get_rank()

The next thing our sweeper should be able to do is to integrate over the nodes using $Q$. As in the serial case this is just evaluating $Qu$, but now each process has only a single entry of $u$ (i.e. process $m$ has $u_m$). That's a standard use case for ``MPI Reduce``, but each process has to participate in $M$ of those reductions to get all availabe data. Note that there's an easier way to do that if you accept to store not 1 but all $M$ values on each process.

In [5]:
%%px
%%add_to generic_implicit_MPI
def integrate(self):
    """
    Compute dtQF(u).
    """
    # get current level and problem
    lvl = self.level
    prob = lvl.prob
    
    assert self.params.comm.size == self.coll.num_nodes, "This sweeper is only implemented for one collocation node per process."

    me = prob.dtype_u(prob.init, val=0.0)
    for m in range(self.coll.num_nodes):
        if m == self.rank:
            # if it's my turn, add to result the incoming data
            self.params.comm.Reduce(lvl.dt * self.coll.Qmat[m + 1, self.rank + 1] * lvl.f[self.rank + 1],
                                    me, root=m, op=MPI.SUM)
        else:
            # if it's not my turn, contribute to global sum
            self.params.comm.Reduce(lvl.dt * self.coll.Qmat[m + 1, self.rank + 1] * lvl.f[self.rank + 1],
                                    None, root=m, op=MPI.SUM)
    return me

We end up with the $m$th part of the integral over nodes on process $m$. The next step now is to add the actual sweep mechanics. This is actually very simple: build the right-hand side of SDC and apply the preconditioner. Since $Q_\Delta$ is diagonal and the nodes are distributed, there is no loop involved (which indicates that we're going to get speeeeeeeedup).

In [6]:
%%px
%%add_to generic_implicit_MPI
def update_nodes(self):
    # get current level and problem
    lvl = self.level
    prob = lvl.prob

     # start building the right-hand side of for the SDC sweep
    # get Q F(u^k)
    rhs = self.integrate()
    # substract Q_Delta F(u^k)
    rhs -= lvl.dt * self.QI[self.rank + 1, self.rank + 1] * lvl.f[self.rank + 1]
    # add initial value
    rhs += lvl.u[0]

    # implicit solve with prefactor stemming from the diagonal of Qd
    lvl.u[self.rank + 1] = prob.solve_system(rhs, lvl.dt * self.QI[self.rank + 1, self.rank + 1], lvl.u[self.rank + 1],
                                             lvl.time + lvl.dt * self.coll.nodes[self.rank])
    # update function values
    lvl.f[self.rank + 1] = prob.eval_f(lvl.u[self.rank + 1], lvl.time + lvl.dt * self.coll.nodes[self.rank])

    # indicate presence of new values at this level (pySDC internal thing, should always do this in your sweeper)
    lvl.status.updated = True

Now that there's a way how to get from $u^k$ to $u^{k+1}$, we need to do some bookkeeping: compute the residual, compute the end point, define initial guesses. Nothing fancy here, but necessary nonetheless:

In [7]:
%%px
%%add_to generic_implicit_MPI
def compute_end_point(self):
    """
    Compute the solution at the right interval boundary, which may or may not be a collocation node.
    """
    assert self.coll.right_is_node, "No implementation for computing end point if it is not a collocation node yet!"
    
    # get current level
    lvl = self.level
    lvl.uend[:] = self.params.comm.bcast(lvl.u[self.rank + 1], root=self.params.comm.Get_size() - 1)

def compute_residual(self, stage=None):
    """
    Compute the residual.
    
    Args:
        stage (str): Current stage
    """
    # get current level
    lvl = self.level

    # compute the residual for each node
    res = self.integrate()
    res += lvl.u[0] - lvl.u[self.rank + 1]
    
    # use abs function from data type here
    res_norm = abs(res)
    
    # find maximal residual over the nodes
    lvl.status.residual = self.params.comm.allreduce(res_norm, op=MPI.MAX)

    # indicate that the residual has seen the new values
    lvl.status.updated = False

def predict(self):
    """
    This is used before we start iterating to provide an initial guess at all nodes.
    """
    # get current level and problem
    lvl = self.level
    prob = lvl.prob

    # evaluate RHS at left point
    lvl.f[0] = prob.eval_f(lvl.u[0], lvl.time)
    
    # we just implement 'spread' here for simplicity
    assert self.params.initial_guess == 'SPREAD', f"Initial guess \"{self.params.initial_guess}\" not implemented!"
    lvl.u[self.rank + 1] = prob.dtype_u(lvl.u[0])
    lvl.f[self.rank + 1] = prob.eval_f(lvl.u[self.rank + 1], lvl.time + lvl.dt * self.coll.nodes[self.rank])

    lvl.uend = prob.dtype_u(lvl.u[0])

    # indicate that this level is now ready for sweeps
    lvl.status.unlocked = True
    lvl.status.updated = True

Now we have a full new sweeper which allows us to run in parallel over $M$ nodes (since we did not spend much thought on this, note that it HAS to be exactly $M$ processes.. much room for optimization here!). In order to use all this, we have to pass the new sweeper to the controller and include the MPI communicator as parameter for the sweeper.

In [8]:
%%px
from pySDC.implementations.problem_classes.AllenCahn_2D_FD import allencahn_fullyimplicit

# initialize problem parameters
problem_params = dict()
problem_params['nu'] = 2
problem_params['eps'] = 0.04
problem_params['radius'] = 0.25
problem_params['nvars'] = [(128, 128)]
problem_params['newton_maxiter'] = 100
problem_params['newton_tol'] = 1E-08
problem_params['lin_tol'] = 1E-09
problem_params['lin_maxiter'] = 100

# initialize level parameters
level_params = dict()
level_params['restol'] = 1E-07
level_params['dt'] = 1E-03 / 2
level_params['nsweeps'] = 1

# initialize sweeper parameters
sweeper_params = dict()
sweeper_params['quad_type'] = 'RADAU-RIGHT'
sweeper_params['num_nodes'] = 4
sweeper_params['initial_guess'] = 'SPREAD'

# initialize step parameters
step_params = dict()
step_params['maxiter'] = 50

# setup parameters "in time"
t0 = 0
Tend = 0.016

# initialize controller parameters
controller_params = dict()
controller_params['logger_level'] = 30

# fill description dictionary for easy step instantiation
description = dict()
description['problem_class'] = allencahn_fullyimplicit
description['problem_params'] = problem_params
description['level_params'] = level_params
description['step_params'] = step_params

Now we need to put in our new sweeper. Note that we also have to choose a different $Q_\Delta$. We'll use a pre-defined one from the base sweeper class, pass the MPI communicator to the sweeper and put all this into the description of the problem to finally instantiate our controller:

In [9]:
%%px
from pySDC.implementations.controller_classes.controller_nonMPI import controller_nonMPI

sweeper_params['QI'] = ['MIN3']
sweeper_params['comm'] = MPI.COMM_WORLD

description['sweeper_class'] = generic_implicit_MPI
description['sweeper_params'] = sweeper_params

controller = controller_nonMPI(num_procs=1, controller_params=controller_params, description=description)

Looks good so far, so we'll set up the initial conditions and run this:

In [None]:
%%px
prob = controller.MS[0].levels[0].prob
uinit = prob.u_exact(t0)

uend_par, stats_par = controller.run(u0=uinit, t0=t0, Tend=Tend)

OK, no message so far (which is good, logger level is set to silent). All participating processes now have the results and stats, so let's see:

In [10]:
%%px
rank = MPI.COMM_WORLD.Get_rank()

from pySDC.helpers.stats_helper import get_sorted

timing = get_sorted(stats_par, type='timing_run', sortby='time')
print(f'Time to solution on rank {rank}: {timing[0][1]:6.4f} sec.' )

if rank == 0:
    import matplotlib.pylab as plt

    plt.subplot(1, 2, 1)
    plt.title("u(t0)")
    plt.imshow(uinit,extent=[-0.5,0.5,-0.5,0.5])

    plt.subplot(1, 2, 2)
    plt.title("u(Tend)")
    plt.imshow(uend_par,extent=[-0.5,0.5,-0.5,0.5])
    
    plt.show()

[stdout:2] Time to solution on rank 2: 8.0760 sec.


[stdout:1] Time to solution on rank 1: 8.0726 sec.


[stdout:3] Time to solution on rank 3: 8.0801 sec.


[stdout:0] Time to solution on rank 0: 8.0595 sec.


%px:   0%|          | 0/4 [00:00<?, ?tasks/s]

Anyway, this is how you can use a custom sweeper.
And in a very similar manner you can replace everything, from controllers (e.g. for adaptivity or fault tolerance) and the transfer module (e.g. if you need mass matrices) to the step and level classes (e.g. if you need more data available).

We hope you felt like getting a first version of the concept into pySDC was not too difficult and you feel encouraged to perhaps try implementing one of your projects some time.

Of course, pySDC suffers from the same issues as any other code written by someone else.
While object oriented programming allows to avoid copy-pasting lots of code, it can be daunting to sift through myriads of files to find out what why certain details work the way they do.
Please know that you can always write us an email or contact us on GitHub if you need some clarification!