In [1]:
import torch
from torch.distributions import Uniform
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams
import matplotlib as mpl
import seaborn as sns 
import gpytorch 

import ipywidgets
from ipywidgets import interact
import IPython
# If in your browser the figures are not nicely vizualized, change the following line.
rcParams['font.size'] = 20
rcParams['figure.figsize'] = (20, 8)

import warnings
warnings.filterwarnings('ignore')

layout = ipywidgets.Layout(width='auto', height='40px')
%load_ext autoreload
%autoreload 2

from rllib.environment import SystemEnvironment
from rllib.environment.systems.inverted_pendulum import InvertedPendulum
from rllib.agent import MPCAgent
from rllib.util.rollout import rollout_agent

# MPC: Planning with a known model for a fixed horizon

In [2]:
from rllib.value_function.abstract_value_function import AbstractValueFunction
import torch 
import torch.nn as nn 
from torch.distributions import MultivariateNormal
from rllib.model import AbstractModel
from rllib.reward.utilities import tolerance


class StateTransform(nn.Module):
    """Transform pendulum states to cos, sin, angular_velocity."""

    extra_dim = 1

    def forward(self, states_):
        """Transform state before applying function approximation."""
        angle, angular_velocity = torch.split(states_, 1, dim=-1)
        states_ = torch.cat(
            (torch.cos(angle), torch.sin(angle), angular_velocity), dim=-1
        )
        return states_

    def inverse(self, states_):
        """Inverse transformation of states."""
        cos, sin, angular_velocity = torch.split(states_, 1, dim=-1)
        angle = torch.atan2(sin, cos)
        states_ = torch.cat((angle, angular_velocity), dim=-1)
        return states_


def large_state_termination(state, action, next_state=None):
    """Termination condition for environment."""
    if not isinstance(state, torch.Tensor):
        state = torch.tensor(state)
    if not isinstance(action, torch.Tensor):
        action = torch.tensor(action)

    done = torch.any(torch.abs(state) > 200, dim=-1) | torch.any(
        torch.abs(action) > 200, dim=-1
    )
    return (
        torch.zeros(*done.shape, 2)
        .scatter_(dim=-1, index=(~done).long().unsqueeze(-1), value=-float("inf"))
        .squeeze(-1)
    )


class PendulumSparseReward(AbstractModel):
    """Reward for Inverted Pendulum."""

    def __init__(self, action_cost=0, radius=0.05):
        super().__init__(dim_state=(2,), dim_action=(1,), model_kind="rewards", dim_reward=(1,))
        self.action_cost = action_cost
        self.reward_offset = 0
        self.radius = 0.05

    def forward(self, state, action, next_state):
        """See `abstract_reward.forward'."""
        if not isinstance(state, torch.Tensor):
            state = torch.tensor(state, dtype=torch.get_default_dtype())
        if not isinstance(action, torch.Tensor):
            action = torch.tensor(action, dtype=torch.get_default_dtype())

        cos_angle = torch.cos(state[..., 0])
        velocity = state[..., 1]

        angle_tolerance = tolerance(cos_angle, lower=1 - self.radius, upper=1.0, margin=0.1)
        velocity_tolerance = tolerance(velocity, lower=-0.5, upper=0.5, margin=0.5)
        state_cost = angle_tolerance * velocity_tolerance

        action_tolerance = tolerance(action[..., 0], lower=-0.1, upper=0.1, margin=0.1)
        action_cost = self.action_cost * (action_tolerance - 1)

        cost = state_cost + action_cost

        return cost.unsqueeze(-1), torch.zeros(1)


class PendulumDenseReward(AbstractModel):
    """Reward for Inverted Pendulum."""

    def __init__(self, action_cost=0.0):
        super().__init__(dim_state=(2,), dim_action=(1,), model_kind="rewards", dim_reward=(1,))
        self.action_cost = action_cost
        self.reward_offset = 0

    def forward(self, state, action, next_state):
        """See `abstract_reward.forward'."""
        if not isinstance(state, torch.Tensor):
            state = torch.tensor(state, dtype=torch.get_default_dtype())
        if not isinstance(action, torch.Tensor):
            action = torch.tensor(action, dtype=torch.get_default_dtype())

        cos_angle = 1 - torch.cos(state[..., 0])
        state_cost = cos_angle ** 2
        action_cost = self.action_cost * (action ** 2).sum(-1)

        return -(action_cost + state_cost).unsqueeze(-1), torch.tensor(0.0)


class PendulumModel(AbstractModel):
    """Pendulum Model.

    Torch implementation of a pendulum model using euler forwards integration.
    """

    def __init__(
        self, mass, length, friction, step_size=1 / 80, noise: MultivariateNormal = None
    ):
        super().__init__(dim_state=(2,), dim_action=(1,))
        self.mass = mass
        self.length = length
        self.friction = friction
        self.step_size = step_size
        self.noise = noise

    def forward(self, state, action,next_state):
        """Get next-state distribution."""
        # Physical dynamics
        action = torch.clamp(action, -1.0, 1.0)
        mass = self.mass
        gravity = 9.81
        length = self.length
        friction = self.friction
        inertia = mass * length ** 2
        dt = self.step_size

        angle, angular_velocity = torch.split(state, 1, dim=-1)
        for _ in range(1):
            x_ddot = (
                (gravity / length) * torch.sin(angle)
                + action * (1 / inertia)
                - (friction / inertia) * angular_velocity
            )

            angle = angle + dt * angular_velocity
            angular_velocity = angular_velocity + dt * x_ddot

        next_state = torch.cat((angle, angular_velocity), dim=-1)

        if self.noise is None:
            return next_state, torch.zeros(1)
        else:
            return next_state + self.noise.mean, self.noise.covariance_matrix



        
        
class PendulumReturnToGo(AbstractValueFunction):
    def __init__(self):
        super().__init__(dim_state = 2, dim_action = 2)
    
    def forward(self, x):
        angle, velocity = x[..., 0], x[..., 1]
        cos_angle = torch.cos(angle)
        out = torch.zeros_like(angle)
        
        idx = cos_angle < np.cos(np.pi / 6)
        not_idx = cos_angle >= np.cos(np.pi / 6)
        
        out = 400 * cos_angle
        return 400 * cos_angle.unsqueeze(-1) 

In [4]:
initial_distribution = Uniform(torch.tensor([3.14- 0.001, -0.01]), torch.tensor([3.14 + 0.001, +0.01]))
sparse_reward_model = PendulumSparseReward(action_cost=0.1)
dense_reward_model = PendulumDenseReward(action_cost=0.1)
environment = SystemEnvironment(
        InvertedPendulum(mass=0.3, length=0.5, friction=0.005, step_size=1 / 80),
        reward=dense_reward_model,
        initial_state=initial_distribution.sample,
    )
dynamical_model =  PendulumModel(mass=0.3, length=0.5, friction=0.005, step_size=1 / 80)

def mpc(sparse_reward, return_to_go, solver, horizon, initial_position):
    if sparse_reward:
        environment.reward = sparse_reward_model
    else:
        environment.reward = dense_reward_model
       
    initial_distribution = Uniform(torch.tensor([initial_position - 0.001, -0.01]), torch.tensor([initial_position + 0.001, +0.01]))
    environment.initial_state = initial_distribution.sample
    environment.reset()
    if return_to_go:
        terminal_reward = PendulumReturnToGo()
    else:
        terminal_reward = None 
    agent = MPCAgent.default(
            environment=environment,
            mpc_solver_name=solver,
            dynamical_model=dynamical_model,
            reward_model=environment.reward,
            exploration_episodes=0,
            horizon=horizon,
            terminal_reward=terminal_reward,
        )
    try:

        rollout_agent(agent=agent, environment=environment, max_steps=400, num_episodes=1, render=True)
    except KeyboardInterrupt:
        environment.close() 

interact(
    mpc, 
    action_cost=ipywidgets.FloatSlider(value=0.1, min=0, max=0.5, continuous_update=False),
    sparse_reward=False, 
    return_to_go=False,
    solver=["CEMShooting", "MPPIShooting", "RandomShooting"], 
    horizon=ipywidgets.IntSlider(value=30, min=5, max=50, continuous_update=False), 
    initial_position = ipywidgets.FloatSlider(value=np.pi, min=0, max=np.pi, continuous_update=False)    
);


interactive(children=(Checkbox(value=False, description='sparse_reward'), Checkbox(value=False, description='r…

## Demo Guide:

#### some explanation
This demo shows running the model based reinforcement learning method. It means we know the accurate model of the environment.Now the task is to get the optimal policy.
- sparse_reward: Using the sparse reward:  A tolerance function is returns 1 if x is in [lower, upper]. If it is outside, it decays exponentially according to a margin. In this pendulum model the low was set to be:
$lower=0.95, upper=1.0$ for the cos value of angle (top is 0 angle). and 
$lower-0.5, upper=0.5$ fpr the velocity.
- solver:[CEMShooting(Cross-entropy method)](https://arxiv.org/abs/1907.02057), [Model-Predictive Path Integral(MPPIShooting)](https://ieeexplore.ieee.org/document/7487277), RandomShooting. Different MPC splver.
- hoizon: the look ahead horizon of MPC method
- initial position: the starting angle of the pendulum model 

#### play around
- Try small horizon and see if the algorithm work or not.
- Try small horizon with return_to_go setting to True
- Use the sparse reward and compare the difference.