# Authoring Leaf Systems
For instructions on how to run these tutorial notebooks, please see the [index](./index.ipynb).


In [None]:
import numpy as np
from pydrake.common.value import AbstractValue
from pydrake.math import RigidTransform, RotationMatrix
from pydrake.systems.analysis import Simulator
from pydrake.systems.framework import BasicVector, LeafSystem
from pydrake.trajectories import PiecewisePolynomial

The [Modeling Dynamical Systems](./dynamical_systems.ipynb) tutorial gave a
very basic introduction to Drake's Systems framework, including writing a basic
[LeafSystem](https://drake.mit.edu/doxygen_cxx/classdrake_1_1systems_1_1_leaf_system.html).
In this notebook we'll provide a more advanced/complete overview of authoring
those systems.

## Input and Output Ports

Leaf systems can have any number of input and output ports.  There are two basic types of ports: **vector-valued** ports and "**abstract-valued**" ports. Let's start with vector-valued ports, which are the easiest to work.  The following system declares two vector input ports, and two vector output ports.

In [None]:
class MyAdder(LeafSystem):
    def __init__(self):
        LeafSystem.__init__(self)  # Don't forget to initialize the base class.
        self.DeclareVectorInputPort(name="a", size=2)
        self.DeclareVectorInputPort(name="b", size=2)
        self.DeclareVectorOutputPort(name="sum", size=2, calc=self.CalcSum)
        self.DeclareVectorOutputPort(name="difference",
                                     size=2,
                                     calc=self.CalcDifference)

    def CalcSum(self, context, output):
        # Evaluate the input ports to obtain the 2x1 vectors
        a = system.get_input_port(0).Eval(context)
        b = system.get_input_port(1).Eval(context)

        # Write the sum into the output vector
        output.SetFromVector(a + b)

    def CalcDifference(self, context, output):
        # Evaluate the input ports to obtain the 2x1 vectors
        a = system.get_input_port(0).Eval(context)
        b = system.get_input_port(1).Eval(context)

        # Write the difference into output vector
        output.SetFromVector(a - b)

# Construct an instance of this system and a context
system = MyAdder()
context = system.CreateDefaultContext()

# Fix the input ports to some constant values
system.get_input_port(0).FixValue(context, [3, 4])
system.get_input_port(1).FixValue(context, [1, 2])

# Evaluate the output ports
print(f"sum: {system.get_output_port(0).Eval(context)}")
print(f"difference: {system.get_output_port(1).Eval(context)}")

There are a few things to notice here:
- You can declare as many inputs or outputs as you like (and they need not be the same size).
- All of the inputs can be evaluated from any of the system methods that accept a `Context`.
- For each output port, you define a difference callback function that implements the output.

It is also helpful to understand that:
- Input and output ports do not have any timing semantics on their own; output ports are evaluated whenever their output is requested, and [use caching](https://drake.mit.edu/doxygen_cxx/group__cache__design__notes.html) by default to avoid repeated computation.
- You can also make inputs optional, by checking [InputPort::HasValue()](https://drake.mit.edu/doxygen_cxx/classdrake_1_1systems_1_1_input_port.html#a5536b94a4642fa4cf47164437dc66ae8) in the system method implementations.

Keeping track of the order of the ports can be a bit brittle. So we can improve the code e.g. by using their index or referencing them by name.  Here is a modified version of that example which uses both approaches:

In [None]:
class MyAdder(LeafSystem):
    def __init__(self):
        LeafSystem.__init__(self)  # Don't forget to initialize the base class.
        self._a_index = self.DeclareVectorInputPort(name="a",
                                                    size=2).get_index()
        self._b_index = self.DeclareVectorInputPort(name="b",
                                                    size=2).get_index()
        self.DeclareVectorOutputPort(name="sum", size=2, calc=self.CalcSum)
        self.DeclareVectorOutputPort(name="difference",
                                     size=2,
                                     calc=self.CalcDifference)

    def CalcSum(self, context, output):
        # Evaluate the input ports to obtain the 2x1 vectors
        a = system.get_input_port(self._a_index).Eval(context)
        b = system.get_input_port(self._b_index).Eval(context)

        # Write the sum into the output vector
        output.SetFromVector(a + b)

    def CalcDifference(self, context, output):
        # Evaluate the input ports to obtain the 2x1 vectors
        a = system.GetInputPort("a").Eval(context)
        b = system.GetInputPort("b").Eval(context)

        # Write the difference into output vector
        output.SetFromVector(a - b)

# Construct an instance of this system and a context
system = MyAdder()
context = system.CreateDefaultContext()

# Fix the input ports to some constant values
system.GetInputPort("a").FixValue(context, [3, 4])
system.GetInputPort("b").FixValue(context, [1, 2])

# Evaluate the output ports
print(f"sum: {system.GetOutputPort('sum').Eval(context)}")
print(f"difference: {system.GetOutputPort('difference').Eval(context)}")

Not all inputs and outputs are best represented as a vector. Drake's system framework also supports passing structure types across ports, which is implemented in C++ using a technique called "type erasure". From the perspective of the systems framework, classes work with an "abstract" data type, and only the system implementation itself needs to know how to work with the structured type. In practice, if you can overlook our small amount of boilerplate, the usage is straightforward. Let's give an example of a `LeafSystem` that uses a `RigidTransform` in its input and output ports:

In [None]:
class RotateAboutZ(LeafSystem):
    def __init__(self):
        LeafSystem.__init__(self)  # Don't forget to initialize the base class.
        self.DeclareAbstractInputPort(name="in",
                                      model_value=AbstractValue.Make(
                                          RigidTransform()))
        self.DeclareAbstractOutputPort(
            name="out",
            alloc=lambda: AbstractValue.Make(RigidTransform()),
            calc=self.CalcOutput)

    def CalcOutput(self, context, output):
        # Evaluate the input ports to obtain the RigidTransform
        X_1 = system.get_input_port(0).Eval(context)

        X_2 = RigidTransform(RotationMatrix.MakeZRotation(np.pi / 2)) @ X_1

        # Set the output RigidTransform
        output.set_value(X_2)

# Construct an instance of this system and a context
system = RotateAboutZ()
context = system.CreateDefaultContext()

# Fix the input port to a constant value
system.get_input_port(0).FixValue(context, RigidTransform())

# Evaluate the output port
print(f"output: {system.get_output_port(0).Eval(context)}")

## State Variables

Systems store their state in the `Context`, and work with state in a very similar way to input and output ports. Again, we have have vector-valued state or abstract-valued state. Abstract state can only be updated in a discrete-time fashion, but vector-valued state can be declared to be either **discrete** or **continuous**.  

A discrete state $x_d$ will evolved based on a update events, the most common of which would be a simple periodic event to define a difference equation.  We saw an example of this in the [introductory notebook](./dynamical_systems.ipynb).

In [None]:
# Define the system.
class SimpleDiscreteTimeSystem(LeafSystem):
    def __init__(self):
        LeafSystem.__init__(self)

        state_index = self.DeclareDiscreteState(1)  # One state variable.
        self.DeclareStateOutputPort("y", state_index)  # One output: y=x.
        self.DeclarePeriodicDiscreteUpdateEvent(
            period_sec=1.0,  # One second timestep.
            offset_sec=0.0,  # The first event is at time zero.
            update=self.Update) # Call the Update method defined below.

    # x[n+1] = x^3[n]
    def Update(self, context, discrete_state):
        x = context.get_discrete_state_vector().GetAtIndex(0)
        xnext = x**3
        discrete_state.get_mutable_vector().SetAtIndex(0, xnext)

# Instantiate the System
system = SimpleDiscreteTimeSystem()
simulator = Simulator(system)
simulator.AdvanceTo(4.0)

You can find details on the implementation and order of these events [here](https://drake.mit.edu/doxygen_cxx/group__discrete__systems.html). It's also possible to define more complicated [events](https://drake.mit.edu/doxygen_cxx/group__events__description.html) for updating the discrete variables. 

Note that the states of a system are not immediately accessible to other systems in a `Diagram`. For a system to share its state, it must do that using an output port. We provide the `DeclareStateOutputPort()` method to make this common case easy.

The evolution of continuous vector-valued state variables is governed by a differential equation. While it is possible and convenient to declare many different groups of discrete state variables (they may even be updated at different rates by different events), we only define zero or one continuous state vector for a system, and define its dynamics by overloading the `LeafSystem::DoCalcTimeDerivatives()` method. Here is the simple continuous-time example in the introductory notebook again: 

In [None]:
# Define the system.
class SimpleContinuousTimeSystem(LeafSystem):
    def __init__(self):
        LeafSystem.__init__(self)

        state_index = self.DeclareContinuousState(1)  # One state variable.
        self.DeclareStateOutputPort("y", state_index)  # One output: y=x.

    # xdot(t) = -x(t) + x^3(t)
    def DoCalcTimeDerivatives(self, context, derivatives):
        x = context.get_continuous_state_vector().GetAtIndex(0)
        xdot = -x + x**3
        derivatives.get_mutable_vector().SetAtIndex(0, xdot)

# Instantiate the System
system = SimpleContinuousTimeSystem()
simulator = Simulator(system)
simulator.AdvanceTo(4.0)

For abstract-valued state, we have a similar workflow. We declare the state in the constructor, and define update events to update that state.  Discrete update events are restricted to updating *only* the discrete states, so we use "unrestricted" events to update the abstract state. Here is an example which stores a simple `PiecewisePolynomial` trajectory as abstract state:

In [None]:
class AbstractStateSystem(LeafSystem):
    def __init__(self):
        LeafSystem.__init__(self)

        self._traj_index = self.DeclareAbstractState(
            AbstractValue.Make(PiecewisePolynomial()))
        self.DeclarePeriodicUnrestrictedUpdateEvent(period_sec=1.0,
                                                    offset_sec=0.0,
                                                    update=self.Update)

    # x[n+1] = x^3[n]
    def Update(self, context, discrete_state):
        t = context.get_time()
        # Update the state
        context.get_mutable_abstract_state(int(self._traj_index)).set_value(
            PiecewisePolynomial.FirstOrderHold(
                [t, t + 1],
                np.array([[-np.pi / 2.0 + 1., -np.pi / 2.0 - 1.], [-2., 2.]])))


system = AbstractStateSystem()
simulator = Simulator(system)
simulator.AdvanceTo(4.0)

## Parameters

Parameters are like states, except that they are constant through the lifetime of a simulation. Parameters in the systems framework are declared and accessed almost identically to state, but they are never updated. Once again, we have vector-valued (declared with `DeclareNumericParameter`) and abstract-valued (via `DeclareAbstractParameter`) parameters.

In [None]:
class SystemWithParameters(LeafSystem):
    def __init__(self):
        LeafSystem.__init__(self)  # Don't forget to initialize the base class.

        self.DeclareNumericParameter(BasicVector([1.2, 3.4]))
        self.DeclareAbstractParameter(
            AbstractValue.Make(
                RigidTransform(RotationMatrix.MakeXRotation(np.pi / 6))))

        # Declare output ports to demonstrate how to access the parameters in
        # system methods.
        self.DeclareVectorOutputPort(name="numeric",
                                     size=2,
                                     calc=self.OutputNumeric)
        self.DeclareAbstractOutputPort(
            name="abstract",
            alloc=lambda: AbstractValue.Make(RigidTransform()),
            calc=self.OutputAbstract)

    def OutputNumeric(self, context, output):
        output.SetFromVector(context.get_numeric_parameter(0).get_value())

    def OutputAbstract(self, context, output):
        output.set_value(context.get_abstract_parameter(0).get_value())

# Construct an instance of this system and a context
system = SystemWithParameters()
context = system.CreateDefaultContext()

# Evaluate the output ports
print(f"numeric: {system.get_output_port(0).Eval(context)}")
print(f"abstract: {system.get_output_port(1).Eval(context)}")

## Supporting Scalar Type Conversion (double, AutoDiff, and Symbolic)

In order to support AutoDiff and Symbolic types throughout the systems framework, LeafSystems can be written to support "scalar type conversion". In Python, adding this support requires a small amount of boilerplate.  Here is a simple example:

In [None]:
from pydrake.systems.framework import LeafSystem_
from pydrake.systems.scalar_conversion import TemplateSystem
from pydrake.autodiffutils import AutoDiffXd
from pydrake.symbolic import Expression

@TemplateSystem.define("RunningCost_")
def RunningCost_(T):

    class Impl(LeafSystem_[T]):

        def _construct(self, converter=None):
            LeafSystem_[T].__init__(self, converter)
            self.DeclareVectorInputPort("state", 2)
            self.DeclareVectorInputPort("command", 1)
            self.DeclareVectorOutputPort("cost", 1,self.CostOutput)

        def _construct_copy(self, other, converter=None):
            Impl._construct(self, converter=converter)

        def CostOutput(self, context, output):
            x = self.get_input_port(0).Eval(context)
            u = self.get_input_port(1).Eval(context)[0]
            Q = np.diag([10, 1])
            output[0] = x.dot(Q.dot(x)) + u**2

    return Impl

RunningCost = RunningCost_[None]  # Default instantiation

The important steps are:
- Add the @TemplateSystem decorator
- Derive from `LeafSystem_[T]` instead of simply `LeafSystem`.
- Implement the `_construct` method *instead* of the typical `__init__` method.
- Implement the `_construct_copy` method.
- Add the default instantiation, so that you can continue to refer to the system as e.g. `RunningCost` in addition to using `RunningCost_[float]`.

For further details, you can find the related documentation for scalar conversion in C++ [here](https://drake.mit.edu/doxygen_cxx/group__system__scalar__conversion.html).

In [None]:
# Having done this, we can still use the system in the original way:
system = RunningCost()
context = system.CreateDefaultContext()
system.get_input_port(0).FixValue(context, [1, 2])
system.get_input_port(1).FixValue(context, [3])
print(system.get_output_port().Eval(context))

# But we can also now use an autodiff or symbolic versions of the system,
# either by declaring them directly:
system_ad = RunningCost_[AutoDiffXd]
system_symbolic = RunningCost_[Expression]

# or by scalar conversion:
system_ad = system.ToAutoDiffXd()
system_symbolic = system.ToSymbolic()

# We can also convert the time, state, parameters, and (if needed) input ports
# from the original system:
context_symbolic = system_symbolic.CreateDefaultContext()
context_symbolic.SetTimeStateAndParametersFrom(context)
system_symbolic.FixInputPortsFrom(system, context, context_symbolic)
print(system_symbolic.get_output_port().Eval(context_symbolic))

## Further reading

[Caching in the systems framework](https://drake.mit.edu/doxygen_cxx/group__cache__design__notes.html)

[Declaring events](https://drake.mit.edu/doxygen_cxx/group__events__description.html)

[Stochastic systems](https://drake.mit.edu/doxygen_cxx/group__stochastic__systems.html)