In [None]:
import pydrake

# 构建动态系统

`pydrake` 的每个系统都继承自 `pydrake.systems.framework.LeafSystem`, 但是一般来说动力学系统由状态空间的形式表示, 其中 `x` 表示状态向量, `u` 表示输入向量, `y` 表示输出向量. 为了方便系统的实现, `pydrake` 提供了另一个子类 `pydrake.systems.primitives.SymbolicVectorSystem`.

一个连续非线性系统, 由下列状态空间方程描述:

$$
\begin{aligned}
    \dot{x} =& f(t,x,u), \\ 
    y =& g(t,x,u).
\end{aligned}
$$

[pydrake 符号计算引擎](https://drake.mit.edu/pydrake/pydrake.symbolic.html#pydrake.symbolic.Expression)可以表示各种形式的 $f()$ 和 $g()$ 函数, 比如

$$
\begin{aligned}
    \dot{x} =& -x+x^3, \\ 
    y =& x.
\end{aligned}
$$

这个系统没有输入, 有一维连续状态变量和一个输出, 用 `pydrake` 实现如下

In [None]:
from pydrake.symbolic import Variable
from pydrake.systems.primitives import SymbolicVectorSystem

# 定义新的符号变量
x = Variable('x')

# 定义系统
continuous_vector_system = SymbolicVectorSystem(state=[x], dynamics=[-x + x**3], output=[x])

`continuous_vector_system` 是 `pydrake` 的 `System` 类的实例. 注意 `state` 参数是 `symbolic::Variable` 的向量 (python list 可以自动类型转换), `dynamics` 和 `output` 参数是 `symbolic::Expression` 的向量. 

和连续系统相似, 离散系统的实现也很简单, 离散系统形如

$$
\begin{aligned}
    x[n+1] =& f(n,x,u),\\ 
    y[n] =& g(n,x,u).
\end{aligned}
$$

比如要实现这样的离散系统: 

$$
\begin{aligned}
    x[n+1] =& x^3[n],\\ 
    y[n] =& x[n].
\end{aligned}
$$


In [None]:
from pydrake.symbolic import Variable
from pydrake.systems.primitives import SymbolicVectorSystem

# 定义符号变量
x = Variable('x')

# 定义系统, 注意新增的 time_period 参数
discrete_vector_system = SymbolicVectorSystem(state=[x], dynamics=[x**3], output=[x], time_period=1.0)

尽管 `pydrake.systems.primitives.SymbolicVectorSystem` 使用起来简单, 容易上手, 但 pydrake 支持创建各种各样的系统, 像什么多输入多输出、混合离散连续、带约束的系统, 甚至是随机系统, 这些复杂的系统可以继承 `pydrake.systems.framework.LeafSystem`

In [None]:
from pydrake.systems.framework import LeafSystem
from pydrake.all import Context, ContinuousState, DiscreteValues  # For type hint.


class SimpleContinuousTimeSystem(LeafSystem):
    '''
    定义一个连续系统
    '''

    def __init__(self):
        super().__init__()
        state_index = self.DeclareContinuousState(1)  # 一个状态变量
        self.DeclareStateOutputPort('y', state_index)  # 一个输出: y=x.

    # xdot(t) = -x(t) + x^3(t)
    def DoCalcTimeDerivatives(self, context: Context, derivatives: ContinuousState) -> None:
        x = context.get_continuous_state_vector().GetAtIndex(0)
        xdot = -x + x**3
        derivatives.get_mutable_vector().SetAtIndex(0, xdot)


# 实例化系统
continuous_system = SimpleContinuousTimeSystem()

class SimpleDiscreteTimeSystem(LeafSystem):
    '''
    定义一个离散系统
    '''

    def __init__(self):
        super().__init__()
        state_index = self.DeclareDiscreteState(1)  # 一个状态变量
        self.DeclareStateOutputPort('y', state_index)
        self.DeclarePeriodicDiscreteUpdateEvent(
            period_sec=1.0,  # 一秒钟的时间步长
            offset_sec=0.0,  # 0时刻第一次事件
            update=self.Update
        )
    def Update(self, context: Context, discrete_state: DiscreteValues):
        x = context.get_discrete_state_vector().GetAtIndex(0)
        xnext = x ** 3
        discrete_state.get_mutable_vector().SetAtIndex(0, xnext)

# 实例化系统
discrete_system = SimpleDiscreteTimeSystem()

这两个用 `LeafSystem` 实现的系统和之前 `SymbolicVectorSystem` 实现的系统是相同的系统, 而且通过重载 `LeafSystem` 的方法, 可以定制更加复杂的系统, 且不受 `pydrake.symbolic` 限制. 但是以这种方式声明 `LeafSystem` 并不支持 `Drake` 的 `autodiff` 和 `pydrake.symbolic` 符号工具, 要做到这一点, 我们需要再添加几行代码来支持[模板](https://drake.mit.edu/pydrake/pydrake.systems.scalar_conversion.html?highlight=templatesystem#pydrake.systems.scalar_conversion.TemplateSystem). 

'drake' 已经提供了[一些实现好了的工具类和方法](https://drake.mit.edu/doxygen_cxx/group__systems.html), 他们都是 `LeafSystem` 的子类, 比如 [pydrake.systems.primitives.LinearSystem](https://drake.mit.edu/pydrake/pydrake.systems.primitives.html?highlight=linearsystem#pydrake.systems.primitives.LinearSystem) 和 [pydrake.systems.primitives.Linearize()](https://drake.mit.edu/pydrake/pydrake.systems.primitives.html?highlight=linearize#pydrake.systems.primitives.Linearize). 一般来说, 机器人动力学, 执行器和传感器仿真所需的类都已经实现好了. 

关于 `LeafSystem` 的更多高级用法, 可以看[custom_test.py](https://github.com/RobotLocomotion/drake/blob/master/bindings/pydrake/systems/test/custom_test.py)文件, 提供了许多测试样例. 

# 仿真

动态系统模型创建好后就是仿真了, 仿真通过 `pydrake.framework.analysis.Simulator` 类实现, 它提供了一系列数值积分的解决方案, 支持可变步长积分、刚性求解器和事件检测. 

查看系统的数据可以使用 `pydrake.framework.primitives.VectorLogSink` 系统来查看. 

以下是代码的实现

In [None]:
from pydrake.all import DiagramBuilder, LogVectorOutput, Simulator, VectorLogSink
import matplotlib.pyplot as plt

# 创建一个diagram, 里面有我们创建的动态系统
builder = DiagramBuilder()
system: SimpleDiscreteTimeSystem = builder.AddSystem(SimpleDiscreteTimeSystem())
logger: VectorLogSink = LogVectorOutput(system.get_output_port(0), builder)
diagram: pydrake.systems.framework.System = builder.Build()

# 创建一个仿真器
simulator = Simulator(diagram)

# 设置初始条件,  x(0)
state = simulator.get_mutable_context().get_mutable_discrete_state_vector()
state.SetFromVector([0.9])

# 仿真 10s
simulator.AdvanceTo(10)

# 画图
log = logger.FindLog(simulator.get_context())
plt.figure()
plt.stem(log.sample_times(), log.data().transpose(), use_line_collection=True)
plt.xlabel('n')
plt.ylabel('y[n]')


连续系统仿真也类似

In [None]:
# 创建一个diagram, 里面有我们创建的动态系统
builder = DiagramBuilder()
system: SimpleContinuousTimeSystem = builder.AddSystem(SimpleContinuousTimeSystem())
logger: VectorLogSink = LogVectorOutput(system.get_output_port(0), builder)
diagram: pydrake.systems.framework.System = builder.Build()

# 创建一个仿真器
simulator = Simulator(diagram)

# 设置初始条件,  x(0)
state = simulator.get_mutable_context().get_mutable_continuous_state_vector()
state.SetFromVector([0.5])

# 仿真 10s
simulator.AdvanceTo(10)

# 画图
log = logger.FindLog(simulator.get_context())
plt.figure()
plt.plot(log.sample_times(), log.data().transpose())
plt.xlabel('t')
plt.ylabel('y[t]')

之前用 `SymbolicVectorSystem` 创建的系统也可以

In [None]:
# 创建一个diagram, 里面有我们创建的动态系统
builder = DiagramBuilder()
continuous_vector_system = SymbolicVectorSystem(state=[x], dynamics=[-x + x**3], output=[x])
# 如果不加上一行, 第二次运行报错 `RuntimeError: C++ object must be owned by pybind11 when attempting to release to C++`, 原因未知, github issue也没有解决 TODO
system: SymbolicVectorSystem = builder.AddSystem(continuous_vector_system)  
logger: VectorLogSink = LogVectorOutput(system.get_output_port(0), builder)
diagram: pydrake.systems.framework.System = builder.Build()

# 创建一个仿真器
simulator = Simulator(diagram)

# 设置初始条件,  x(0)
state = simulator.get_mutable_context().get_mutable_continuous_state_vector()
state.SetFromVector([0.9])

# 仿真 10s
simulator.AdvanceTo(10)

# 画图
log = logger.FindLog(simulator.get_context())
plt.figure()
plt.plot(log.sample_times(), log.data().transpose())
plt.xlabel('t')
plt.ylabel('y[t]')

In [None]:
builder = DiagramBuilder()
discrete_vector_system = SymbolicVectorSystem(state=[x], dynamics=[x**3], output=[x], time_period=1.0)
system: SymbolicVectorSystem = builder.AddSystem(discrete_vector_system)
logger = LogVectorOutput(system.get_output_port(0), builder)
diagram = builder.Build()

simulator = Simulator(diagram)

state = simulator.get_mutable_context().get_mutable_discrete_state_vector()
state.SetFromVector([0.8])

simulator.AdvanceTo(10)

log = logger.FindLog(simulator.get_context())
plt.figure()
plt.stem(log.sample_times(), log.data().transpose(), use_line_collection=True)
plt.xlabel('n')
plt.ylabel('y[n]')

仿真器的运行速度比系统实际运行的速度快很多, 让仿真器减速可以使用 `Simulator` 的 `set_target_realtime_rate()` 方法. 比如, 机器人的仿真动画, 或者设计多进程实时控制系统. 

# 系统的 `Context`

上面的代码多次出现 `context` 的实例, 比如 `get_mutable_context()`. `context` 是 `drake` 框架中的一个核心概念, 它包含系统的所有动态信息,  包括时间、状态、输入和系统参数等, 系统的 `context` 囊括了仿真所需的一切信息, 给定 `context`, 系统的所有行为都是可复现的. 

系统可以创建 `context` 的实例, 比如 `CreateDefaultContext()` 方法. 在上面的仿真的例子中, 仿真器创建了一个 `context`, 通过这个 `context` 可以设置系统的初始条件(状态). 

`context` 只有当所有输入端口 (`port`) 都已连接的时候才会被完全定义. 对于没有直接连接到另一个系统输出的输入端口, 请考虑使用端口的 `FixValue` 方法. 

# 系统组合: `Diagram` 和 `DiagramBuilder`

`drake` 真正强大的地方在于可以将许多较小的系统组合成更复杂的系统(和 `MATLAB` 很像). 我们使用 `DiagramBuilder` 类来 `AddSystem()` 和 `Connect()` 输入输出端口, 或者将它们作为 `Diagram` 的输入输出, 然后调用 `Build()` 生成新的 `Diagram` 实例, 它同时也是整个大框架中的另一个系统, 和之前提到的系统一样可以进行仿真和分析. 

在下面的示例中, 我们连接了三个子系统: 一个被控对象 (plant), 一个控制器 (controller) 和一个记录器 (logger), 并将控制器的输入作为整个 `Diagram` 的输入: 

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pydot
from IPython.display import SVG, display
from pydrake.examples import PendulumPlant
from pydrake.systems.analysis import Simulator
from pydrake.systems.controllers import PidController
from pydrake.systems.framework import DiagramBuilder
from pydrake.systems.primitives import LogVectorOutput
from pydrake.all import System, MultibodyPlant

builder = DiagramBuilder()

# 添加控制对象 -- 倒立摆
pendulum: PendulumPlant = builder.AddSystem(PendulumPlant())
pendulum.set_name('pendulum')

# 添加 PID 控制器
controller: PidController = builder.AddSystem(PidController(kp=[10.0], ki=[1.0], kd=[1.0]))
controller.set_name('controller')

# 连接 PID 控制器和被控对象(自行脑补 simulink 的操作)
builder.Connect(pendulum.get_state_output_port(), controller.get_input_port_estimated_state())
builder.Connect(controller.get_output_port_control(), pendulum.get_input_port())

# 将控制器的给定输入作为整个 Diagram 的输入
builder.ExportInput(controller.get_input_port_desired_state())

# logger 记录倒立摆的状态
logger = LogVectorOutput(pendulum.get_state_output_port(), builder)
logger.set_name("logger")

diagram = builder.Build()
diagram.set_name('diagram')

display(SVG(pydot.graph_from_dot_data(diagram.GetGraphvizString(max_depth=2))[0].create_svg()))

和其他系统一样, `Diagram` 也有 `Context`. 比如 `Diagram.GetSubSystemContext()` 和 `Diagram.GetMutableSubsystemContext()`. (`mutable` 表示数据有写访问权限, 否则是常量 (`const`)). 尽管在 C++ 中强制使用 const 属性, 但 Python 没有, pydrake 使用`mutable` 只是为了强调数据可更改这一事实, 并与 C++ 代码保持一致. 

最后, 对 PID 控制系统仿真并可视化输出. 

In [None]:
# 创建一个仿真器
simulator = Simulator(diagram)
context = simulator.get_mutable_context()

# 倒立摆的期望姿态
desired_angle = np.pi / 2

# 设置倒立摆的初始状态
pendulum_context: Context = diagram.GetMutableSubsystemContext(pendulum, context)
pendulum_context.get_mutable_continuous_state_vector().SetFromVector([desired_angle + 0.1, 0.2])  # theta, theta_dot

# diagram 的给定输入
diagram.get_input_port(0).FixValue(context, [desired_angle, 0.])

# 由于我们可能会多次运行此单元格才会加这句话, 通常不用加
logger.FindMutableLog(context).Clear()

# 仿真10秒
simulator.AdvanceTo(10)

# Plotting
log = logger.FindLog(simulator.get_context())
t = log.sample_times()
plt.figure()
# Plot theta.
plt.plot(t, log.data()[0,:],'.-')
# Draw a line for the desired angle.
plt.plot([t[0], t[-1]], [desired_angle, desired_angle], 'g' )
plt.xlabel('time (seconds)')
plt.ylabel('theta (rad)')
plt.title('PID Control of the Pendulum')
