This notebook provides examples to go along with the [textbook](http://manipulation.csail.mit.edu/rl.html).  I recommend having both windows open, side-by-side!

In [None]:
import gym
import numpy as np

from pydrake.all import (
    InputPort, EventStatus, OutputPort, PortDataType, RandomGenerator, 
    Simulator,
)

class DrakeGymEnv(gym.Env):

    def __init__(self,
                 system,
                 time_step,
                 reward_callback_or_port,
                 action_port=None,
                 action_space=None,
                 observation_port=None,
                 observation_space=None,
                 render_rgb_port=None):
        self.system = system
        self.time_step = time_step
        self.simulator = Simulator(self.system)
        self.generator = RandomGenerator()

        # Setup rewards
        if isinstance(reward_callback_or_port, OutputPort):
            self.reward = lambda context: reward_callback_or_port(context)
        else:
            assert callable(reward_callback_or_port)
            self.reward = reward_callback_or_port

        # Setup actions (resorting to defaults whenever possible)
        if action_port:
            assert isinstance(action_port, InputPort)
            self.action_port = action_port
        else:
            assert system.num_input_ports() == 1
            self.action_port = system.get_input_port()
        if action_space:
            self.action_space = action_space
            if self.action_port.get_data_type() == PortDataType.kVectorValued:
                assert np.array_equal(self.action_space.shape, [
                                      self.action_port.size()])
        elif self.action_port.get_data_type() == PortDataType.kVectorValued:
            # TODO(russt): Is this helpful, or is it better to force people to 
            # specify a bounded box?
            num_actions = self.action_port.size()
            self.action_space = gym.spaces.Box(
                low=np.full((num_actions), -np.inf),
                high=np.full((num_actions), np.inf))
        else:
            raise ValueError(
                "Could not infer the action space from your action port; please pass in the action_space argument.")

        # Setup observations (resorting to defaults whenever possible)
        if observation_port:
            assert isinstance(observation_port, OutputPort)
            self.observation_port = observation_port
        else:
            if isinstance(reward_callback_or_port, OutputPort):
                assert system.num_output_ports() == 2
                possible_port_indices = [0, 1]
                possible_port_indices.remove(
                    reward_callback_or_port.get_index())
                self.observation_port = system.get_output_port(
                    possible_port_indices[0])
            else:
                assert system.num_output_ports() == 1
                self.observation_port = system.get_output_port()
        if observation_space:
            self.observation_space = observation_space
            if self.observation_port.get_data_type() == \
                PortDataType.kVectorValued:
                assert np.array_equal(self.observation_space.shape, [
                                      self.observation_port.size()])
        elif self.observation_port.get_data_type() == \
                PortDataType.kVectorValued:
            # TODO(russt): Is this helpful, or is it better to force people to 
            # specify a bounded box?
            num_obs = self.observation_port.size()
            self.observation_space = gym.spaces.Box(
                low=np.full((num_obs), -np.inf),
                high=np.full((num_obs), np.inf))
        else:
            raise ValueError(
                "Could not infer the observation space from your observation port; please pass in the observation_space argument.")

        self.metadata['render.modes'] = ['human', 'ascii']

        # (Maybe) setup rendering
        if render_rgb_port:
            assert isinstance(render_rgb_port, OutputPort)
            assert render_rgb_port.get_data_type() == \
                PortDataType.kAbstractValued
            self.metadata['render.modes'].append('rgb_array')
        self.render_rgb_port = render_rgb_port

    def step(self, action):
        context = self.simulator.get_context()
        time = context.get_time()

        self.action_port.FixValue(context, action)
        self.simulator.AdvanceTo(time + self.time_step)

        observation = self.observation_port.Eval(context)
        reward = self.reward(context)
        done = False
        monitor = self.simulator.get_monitor()
        if monitor:
            status = monitor(context)
            done = status == EventStatus.kReachedTermination or \
                status == EventStatus.kFailed
        info = dict()

        return observation, reward, done, info

    def reset(self):
        context = self.simulator.get_mutable_context()
        self.system.SetRandomContext(context, self.generator)
        self.simulator.Initialize()
        # Note: The output port will be evaluated without fixing the input port.
        return self.observation_port.Eval(context)
    
    def render(self, mode='human'):
        if mode == 'human':
            self.system.Publish(self.simulator.get_context())
            return
        elif mode == 'ansi':
            return str(self.simulator.get_context())
        elif mode == 'rgb_array':
            assert self.render_rgb_port, \
                "You must set render_rgb_port in the constructor"
            return np.array(self.rgb_port.Eval(context))
        else:
            super(DrakeGym, self).render(mode=mode)

    def seed(self, seed=None):
        if seed:
            self.generator = RandomGenerator(seed)
        else:
            seed = self.generator()
        # Note: One could call self.action_space.seed(self.generator()) here, 
        # but it appears that is not the standard approach:
        # https://github.com/openai/gym/issues/681
        return [seed]





In [None]:
from manipulation.meshcat_cpp_utils import StartMeshcat

meshcat = StartMeshcat()

In [None]:
from pydrake.all import (
    DiagramBuilder, SceneGraph, MeshcatVisualizerCpp, MeshcatVisualizerParams
)
from pydrake.examples.pendulum import PendulumPlant, PendulumGeometry
from stable_baselines3.common.env_checker import check_env

def PendulumExample():
    builder = DiagramBuilder()
    plant = builder.AddSystem(PendulumPlant())
    scene_graph = builder.AddSystem(SceneGraph())
    PendulumGeometry.AddToBuilder(
        builder, plant.get_state_output_port(), scene_graph)
    MeshcatVisualizerCpp.AddToBuilder(
        builder, scene_graph, meshcat, 
        MeshcatVisualizerParams(publish_period=np.inf))
    
    builder.ExportInput(plant.get_input_port(), "torque")
    builder.ExportOutput(plant.get_state_output_port(), "state")
    diagram = builder.Build()

    def reward(context):
        plant_context = plant.GetMyContextFromRoot(context)
        state = plant_context.get_continuous_state_vector()
        u = plant.get_input_port().Eval(plant_context)[0]
        theta = state[0] % 2*np.pi  # Wrap to 2*pi
        theta_dot = state[1]
        return (theta-np.pi)**2 + 0.1*theta_dot**2 + 0.001*u

    env = DrakeGymEnv(diagram, time_step=0.05, reward_callback_or_port=reward)
    check_env(env)

    max_torque = 3
    env = DrakeGymEnv(diagram, time_step=0.05, reward_callback_or_port=reward, 
                      action_space=gym.spaces.Box(low=np.array([-max_torque]),
                                                  high=np.array([max_torque])))

PendulumExample()


In [None]:
class DrakeMultibodyGym(DrakeGym):
    def __init__(robot_description_file,
                 time_step,
                 multibody_time_step=0.0, viewer='meshcat', X_WCamera=None):
        builder = DiagramBuilder()
        self.plant, self.scene_graph = AddMultibodyPlantSceneGraph(
            builder, time_step=multibody_time_step)
        parser = Parser(self.plant)
        parser.AddRobotFromFile(robot_description_file)
        self.plant.Finalize()

        # TODO(russt): Add a viewer
        # TODO(russt): Add a camera
        builder.ExportInput(self.plant.get_actuation_input_port(), 
                            "actuation")
        builder.ExportOutput(self.plant.get_state_output_port(),
                                "state")

        super().__init__(builder.Build(), time_step)
