In [None]:
%pylab inline
from ipyparallel import Client, error
cluster=Client(profile="mpi")
view=cluster[:]
view.block=True

try:
    from openmdao.utils.notebook_utils import notebook_mode
except ImportError:
    !python -m pip install openmdao[notebooks]

```{note}
This feature requires MPI, and may not be able to be run on Colab.
```

# Distributed Components

At times when you need to perform a computation using large input arrays, you may want to perform that computation in multiple processes, where each process operates on some subset of the input values. This may be done purely for performance reasons, or it may be necessary because the entire input will not fit in the memory of a single machine. In any case, this can be accomplished in OpenMDAO using a distributed component. A distributed component is a component that operates on distributed variables. A variable is distributed if each process contains only a part of the whole variable.

We’ve already seen that by using [src_indices](connect-with-src-indices) we can connect an input to only a subset of an output variable. By giving different values for src_indices in each MPI process, we can distribute computations on a distributed output across the processes.

You tell the framework that a variable is distributed by setting its `distributed` option to True:


## Component Options

In [None]:
om.show_options_table("openmdao.core.component.Component")

```{note}
If a Component is distributed then all of its outputs are distributed.
```

## Distributed Component Example

The following example shows how to create a distributed component, *DistribComp*, that distributes its computation evenly across the available processes. A second component, *Summer*, sums the values from the distributed component into a scalar output value.

These components can found in the OpenMDAO test suite:

In [None]:
%%px 

from openmdao.utils.array_utils import evenly_distrib_idxs
import openmdao.api as om

class DistribComp(om.ExplicitComponent):
    """Simple Distributed Component."""

    def initialize(self):
        self.options.declare('size', types=int, default=1,
                             desc="Size of input and output vectors.")

    def setup(self):
        comm = self.comm
        rank = comm.rank

        size = self.options['size']

        # if comm.size is 2 and size is 15, this results in
        # 8 entries for proc 0 and 7 entries for proc 1
        sizes, offsets = evenly_distrib_idxs(comm.size, size)
        mysize = sizes[rank]
        start = offsets[rank]
        end = start + mysize

        self.add_input('invec', np.ones(mysize, float), distributed=True)
        self.add_output('outvec', np.ones(mysize, float), distributed=True)

    def compute(self, inputs, outputs):
        if self.comm.rank == 0:
            outputs['outvec'] = inputs['invec'] * 2.0
        else:
            outputs['outvec'] = inputs['invec'] * -3.0

```{note}
In this example component, we have explicitly specified *src_indices* when adding the input. This is not really necessary in this case, because it replicates the default behavior. If no *src_indices* are specified, OpenMDAO will assume an offset that is the sum of the sizes in all ranks up to the current rank and a range equal to the specified size (the size is given per the usual arguments to `add_input`).
```

In [None]:
%%px

class Summer(om.ExplicitComponent):
    """Sums an input array."""

    def initialize(self):
        self.options.declare('size', types=int, default=1, desc="Size of input vector.")

    def setup(self):
        self.add_input('invec', np.ones(self.options['size'], float))

        self.add_output('sum', 0.0, shape=1)

    def compute(self, inputs, outputs):
        outputs['sum'] = np.sum(inputs['invec'])

This example is run with four processes and a size of 15:

In [None]:
%%px 

import numpy as np
import openmdao.api as om

size = 15

model = om.Group()
prob = om.Problem(model)

# Distributed component "C2" requires an IndepVarComp to supply inputs.
indep = model.add_subsystem("indep", om.IndepVarComp())
indep.add_output('x', np.zeros(size), distributed=True)
model.add_subsystem("C2", DistribComp(size=size))

model.add_subsystem("C3", Summer(size=size))

comm = prob.comm
rank = comm.rank
sizes, offsets = evenly_distrib_idxs(comm.size, size)
mysize = sizes[rank]
start = offsets[rank]
end = start + mysize

model.connect('indep.x', 'C2.invec', src_indices=np.arange(start, end, dtype=int))
model.connect('C2.outvec', 'C3.invec', src_indices=om.slicer[:])

prob.setup()

In [None]:
%%px

prob.set_val('indep.x', np.ones(size))
prob.run_model()

print(prob.get_val('C2.invec', get_remote=True))

In [None]:
%%px

print(prob.get_val('C2.outvec'))

In [None]:
%%px

print(prob.get_val('C3.sum'))

In [None]:
%%px

from openmdao.utils.assert_utils import assert_near_equal

assert_near_equal(prob.get_val('C3.sum'), -25.)

```{note}
In this example, we introduce a new component called an [IndepVarComp](indepvarcomp.ipynb). If you used OpenMDAO prior to version 3.2, then you are familiar with this component. It is used to define an independent variable.

You usually do not have to define these because OpenMDAO defines and uses them automatically for all unconnected inputs in your model. However, when we define a distributed input, we often use the “src_indices” attribute to determine the allocation of that input to the processors that the component sees. For some sets of these indices, it isn’t possible to easily determine the full size of the corresponding independent variable, and the *IndepVarComp* cannot be created automatically. So, for unconnected inputs on a distributed component, you must manually create one, as we did in this example.
```

## Distributed Component with Derivatives

Derivatives can be computed for distributed components as shown in the following variation on the example. Also, in this version, we have taken advantage of the automatic determination of *src_indices*.

In [None]:
%%px 

class DistribCompDerivs(om.ExplicitComponent):
    """Simple Distributed Component with Derivatives."""

    def initialize(self):
        self.options.declare('size', types=int, default=1,
                             desc="Size of input and output vectors.")

    def setup(self):
        comm = self.comm
        rank = comm.rank

        size = self.options['size']

        # if comm.size is 2 and size is 15, this results in
        # 8 entries for proc 0 and 7 entries for proc 1
        sizes, _ = evenly_distrib_idxs(comm.size, size)
        self.mysize = mysize = sizes[rank]

        # don't set src_indices on the input, just use default behavior
        self.add_input('invec', np.ones(mysize, float),distributed=True)
        self.add_output('outvec', np.ones(mysize, float),distributed=True)

    def setup_partials(self):
        # declare partial derivatives (diagonal of mysize)
        self.declare_partials('outvec', 'invec',
                              rows=np.arange(0, self.mysize),
                              cols=np.arange(0, self.mysize))

    def compute(self, inputs, outputs):
        if self.comm.rank == 0:
            outputs['outvec'] = inputs['invec'] * 2.0
        else:
            outputs['outvec'] = inputs['invec'] * -3.0

    def compute_partials(self, inputs, J):
        # get mysize from the input vector for this process
        mysize = inputs['invec'].size

        if self.comm.rank == 0:
            J['outvec', 'invec'] = np.ones((mysize,)) * 2.0
        else:
            J['outvec', 'invec'] = np.ones((mysize,)) * -3.0

In [None]:
%%px

class SummerDerivs(om.ExplicitComponent):
    """Sums an input array."""

    def initialize(self):
        self.options.declare('size', types=int, default=1,
                             desc="Size of input and output vectors.")

    def setup(self):
        self.add_input('invec', np.ones(self.options['size'], float))

        self.add_output('sum', 0.0, shape=1)

    def setup_partials(self):
        # the derivative is constant
        self.declare_partials('sum', 'invec', val=1.)

    def compute(self, inputs, outputs):
        outputs['sum'] = np.sum(inputs['invec'])

This example is again run with four processes and a `size` of 15. We can use [assert_check_partials](../working_with_derivatives/unit_testing_partials) to verify that the partial derivatives are calculated correctly.

In [None]:
%%px

import numpy as np
import openmdao.api as om
from openmdao.utils.assert_utils import assert_check_partials, assert_near_equal

size = 15

model = om.Group()

# Distributed component "C2" requires an IndepVarComp to supply inputs.
indep = model.add_subsystem("indep", om.IndepVarComp())
indep.add_output('x', np.zeros(size), distributed=True)
model.add_subsystem("C2", DistribCompDerivs(size=size))
model.add_subsystem("C3", SummerDerivs(size=size))

model.connect('indep.x', 'C2.invec')
model.connect('C2.outvec', 'C3.invec', src_indices=om.slicer[:])

prob = om.Problem(model)
prob.setup()

In [None]:
%%px

prob.set_val('indep.x', np.ones(size))
prob.run_model()

print(prob.get_val('C2.invec', get_remote=True))

In [None]:
%%px

print(prob.get_val('C2.outvec'))

In [None]:
%%px

print(prob.get_val('C3.sum'))

In [None]:
%%px

assert_check_partials(prob.check_partials())

In [None]:
%%px

J = prob.compute_totals(of=['C2.outvec'], wrt=['indep.x'])
print(J[('C2.outvec', 'indep.x')])

In [None]:
%%px

actual = J[('C2.outvec', 'indep.x')]
expected = np.eye(15)*np.append(2*np.ones(4), -3*np.ones(11))
expected = np.append(expected, np.zeros((15,45)),axis=1)
assert_near_equal(actual, expected)