In [2]:
import numpy as np

## Define deviated environments

In [30]:
# Create a mutated environment
from gym.envs.classic_control import CartPoleEnv
from types import SimpleNamespace

class MutatedCartPoleEnv(CartPoleEnv):
    def __init__(self, masscart = 1.0, masspole = 0.1, length = 0.5, force_mag = 10.0):
        super().__init__()
        
        self.spec = SimpleNamespace()
        self.spec.id = f"MutatedCartPole-{masscart}-{masspole}-{length}-{force_mag}"
        
        self.gravity = 9.8
        self.masscart = masscart
        self.masspole = masspole
        self.total_mass = self.masspole + self.masscart
        self.length = length  # actually half the pole's length
        self.polemass_length = self.masspole * self.length
        self.force_mag = force_mag
        self.tau = 0.02  # seconds between state updates
    
    def reset_to(self, state):
        self.state = state
        self.steps_beyond_done = None
        return np.array(self.state, dtype=np.float32)

### Visualize gym helpers

In [4]:
from IPython import display
import matplotlib.pyplot as plt
import time
%matplotlib inline


# Define visual experiment
def init_fig(env):
    plt.figure()
    plt.title(f"{env.spec.id} | Episode: 0 | Step: 0 | Reward: 0.0")
    plt.axis('off')
    return plt.imshow(env.render(mode='rgb_array'))

def update_fig(img, env, episode, step, total_reward, done):
    plt.title(f"{env.spec.id} | Episode: {episode} | Step: {step} | Reward: {total_reward} | Done: {done}")
    img.set_data(env.render(mode='rgb_array'))
    display.display(plt.gcf())
    display.clear_output(wait=True)

def visual_experiment(env, next_action, max_episode_steps=100, max_episodes=3, visualize_in_notebook=True, sleep=0.01):
    episode_rewards = []
    env.reset()

    # initialize figure in notebook
    if visualize_in_notebook:
        img = init_fig(env)

    for episode in range(1, max_episodes+1):
        # reset env
        time.sleep(1)
        obs = env.reset()
        total_reward = 0.0
        
        for step in range(1, max_episode_steps+1):
            action = next_action(obs)
            obs, reward, done, info = env.step(action)
            total_reward += reward
            
            if visualize_in_notebook:
                update_fig(img, env, episode, step, total_reward, done)
            else:
                env.render()
            
            if sleep > 0.0:
                time.sleep(sleep)

        episode_rewards.append(total_reward)

    print(f"Rewards: {episode_rewards}")
    print(f"Mean reward of {len(episode_rewards)} episodes: {np.mean(episode_rewards)}")
    print(f"Std reward of {len(episode_rewards)} episodes: {np.std(episode_rewards)}")

## Define variables and objective function

In [5]:
# define variables and their bounds
masscart = [0.001, 2.0]
force_mag = [0.001, 20.0]
bounds = np.array([
    masscart,
    force_mag
])

def scale_x(x, bounds):
    """Scale the input numbers in [0, 1] to the range of each variable"""
    return bounds[:, 0] + x * (bounds[:, 1] - bounds[:, 0])

def normalize_x(x_scaled, bounds):
    return (x_scaled - bounds[:, 0]) / (bounds[:, 1] - bounds[:, 0])

In [6]:
# define objective function
x_original = [
    1.0, # default masscart (not normalized)
    10.0 # default force_mag
]

# l-2 norm distance, the input x is in the original domain
def min_deviation(x):
    """The variables of the objective function are normalized to [0, 1)"""
    return np.sqrt(np.sum((x - x_original) ** 2))

# wrapped objective function where input x is in [0, 1] and should be scaled to the original domain
def scaled_min_deviation(x):
    return min_deviation(scale_x(x, bounds))

## Define RL Agent

In [7]:
# load agent
from stable_baselines3 import PPO

# TODO: PPO seems to be non-deterministic
model_ppo = PPO.load("best_model")

In [8]:
from stable_baselines3 import DQN

model_dqn = DQN.load("best_dqn")

#### IMPORTANT: the agent is often non-deterministic

In [21]:
env = MutatedCartPoleEnv()
env.seed(19279)

r1 = []
a1 = []

obs = env.reset()
for _ in range(50):
    r1.append(obs)
    action = model_dqn.predict(obs)[0]
    a1.append(action)
    obs, reward, done, info = env.step(action)
env.close()

env = MutatedCartPoleEnv()
env.seed(19279)

r2 = []
a2 = []

obs = env.reset()
for _ in range(50):
    r2.append(obs)
    action = model_dqn.predict(obs)[0]
    a2.append(action)
    obs, reward, done, info = env.step(action)
env.close()

In [22]:
# the row that the observation becomes different
np.where(np.array(r1) != np.array(r2))[0][0]

13

In [23]:
# the row that the action becomes different
np.where(np.array(a1) != np.array(a2))[0][0]

12

## Define STL robustness estimation functions

In [24]:
def random_sample(env, next_action, max_episode_steps=100, max_episodes=3, seed=None):
    space = env.observation_space
    episode_rewards = []
    episode_obs_records = []
    
    if seed is not None:
        env.seed(seed)

    for episode in range(max_episodes):
        # reset env
        obs = env.reset()
        total_reward = 0.0
        obs_records = [obs]
        out_space = False
        
        for step in range(max_episode_steps):
            action = next_action(obs)
            obs, reward, done, info = env.step(action)
            
            total_reward += reward
            # if the observation is out of the space, clip it and then terminate the episode
            if np.sum(obs < space.low) > 0 or np.sum(obs > space.high) > 0:
                obs = np.clip(obs, space.low, space.high)
                out_space = True
            obs_records.append(obs)
            
            if out_space:
                obs_records = np.append(obs_records, np.full((max_episode_steps-step-1, len(obs)), obs), axis=0)
                break

        episode_rewards.append(total_reward)
        episode_obs_records.append(obs_records)

    return np.array(episode_rewards), np.array(episode_obs_records)

In [25]:
# Compute robustness value of the STL formula:
#   G ( pos > -2.4 & pos < 2.4 & angle > -12 degree & angle < 12 degree )
import signal_tl as stl

pos = stl.Predicate('pos')
angle = stl.Predicate('angle')

pos_threshold = 2.4
angle_threshold = 12 * 2 * np.pi / 360

phi = stl.Always(
    (pos > -pos_threshold) & (pos < pos_threshold) &\
    (angle > -angle_threshold) & (angle < angle_threshold)
)

def compute_STL_robustness(records):
    time_index = np.arange(records.shape[1])
    robs = []
    for i in range(len(records)):
        trace = {
            "pos": stl.Signal(records[i, :, 0], time_index),
            "angle": stl.Signal(records[i, :, 2], time_index)
        }
        rob = stl.compute_robustness(phi, trace)
        robs.append(rob.at(0))
    return np.array(robs)

In [26]:
def build_STL_robustness(model, max_episode_steps=200, max_episodes=10, seed=12345):
    def f(x):
        masscart = x[0]
        force_mag = x[1]
        env = MutatedCartPoleEnv(masscart=masscart, force_mag=force_mag)

        # fix the seed so that we can fix the initial states for sampling
        _, records = random_sample(
            env,
            lambda obs: model.predict(obs)[0],
            max_episode_steps=max_episode_steps,
            max_episodes=max_episodes,
            seed=seed
        )

        env.close()

        robs = compute_STL_robustness(records)
        return robs.min()
    
    return f

In [27]:
seed = 19279

# define constraints where input x is in the original domain
STL_robustness = build_STL_robustness(model_dqn, 200, 10, seed)

# wrapped constraints function where input x is in [0, 1] and should be scaled to the original domain
def scaled_STL_robustness(x):
    return STL_robustness(scale_x(x, bounds))

In [28]:
def visualize_deviated_env(x, model):
    masscart = x[0]
    force_mag = x[1]
    env = MutatedCartPoleEnv(masscart=masscart, force_mag=force_mag)
    
    env.seed(1234) # fix the seed so that we can fix the initial states for sampling
    visual_experiment(env, lambda obs: model.predict(obs)[0], max_episode_steps=200, max_episodes=1)
    env.close()

## Use CMA-ES

In [29]:
import cma

### CMA-ES for minimizing the deviation s.t. the STL is violated (robustness value <= 0)

In [None]:
# set initial variable values to the default value
x0 = normalize_x(x_original, bounds); x0
sigma0 = 0.2

cfun = cma.ConstrainedFitnessAL(scaled_min_deviation, lambda x: [scaled_STL_robustness(x)], find_feasible_first=True)
x, es = cma.fmin2(cfun, x0, sigma0, {'bounds': [0.0, 1.0], 'tolstagnation': 0}, callback=cfun.update)

In [None]:
x = es.result.xfavorite  # the original x-value may be meaningless
print("Solution:", scale_x(x, bounds))
print("Worst-case robustness value:", scaled_STL_robustness(x))  # show constraint violation values

In [None]:
# plt.rc('font', size=14) 

es.plot()

fig = plt.gcf()
fig.set_figwidth(18)
fig.set_figheight(18)
fig.title.set_fontsize(14)
fig.xaxis.label.set_fontsize(14)
fig.yaxis.label.set_fontsize(14)

In [None]:
cfun.al.loggers.plot()

fig = plt.gcf()
fig.set_figwidth(18)
fig.set_figheight(18)
fig.title.set_fontsize(14)
fig.xaxis.label.set_fontsize(14)
fig.yaxis.label.set_fontsize(14)

In [None]:
cfun.best_feas.info

In [None]:
x = cfun.best_feas.info['x']
print("Solution:", scale_x(x, bounds))
print("Worst-case robustness value:", scaled_STL_robustness(x))  # show constraint violation values

In [None]:
c = es.countiter
x = cfun.find_feasible(es)
print("find_feasible took {} iterations".format(es.countiter - c))
print("Solution:", scale_x(x, bounds))
print("Worst-case robustness value:", scaled_STL_robustness(x))  # show constraint violation values

### Use CMAES to minimize the STL robustness of some env deviation

In [None]:
# set initial variable values to the default value
x0 = normalize_x(x_original, bounds); x0
sigma0 = 0.2

x, es = cma.fmin2(scaled_STL_robustness, x0, sigma0, {'bounds': [0.0, 1.0]})

In [None]:
print("Solution:", scale_x(x, bounds))
print("Worst-case robustness value:", scaled_STL_robustness(x))  # show constraint violation values

### Use CMAES to minimize the STL robustness of some initial value

In [35]:
# define the range of the initial set
init_range = np.repeat([[-0.05, 0.05]], 4, axis=0); init_range

array([[-0.05,  0.05],
       [-0.05,  0.05],
       [-0.05,  0.05],
       [-0.05,  0.05]])

In [38]:
# define the objective function, i.e., the robustness value of one run
# the input x is in the original domain
def robustness_of_one_run(x):
    

array([-0.04, -0.03, -0.02, -0.01])

## Use SLSQP

In [None]:
from scipy import optimize

res = optimize.minimize(
    min_deviation,
    x0,
    method='SLSQP',
    bounds=np.repeat([[0.0, 1.0]], len(x0), axis=0),
    constraints=[
        { 'type': 'ineq', 'fun': lambda x: -STL_robustness(x) } # STL_robustness(x) <= 0
    ]
)
print(res)