In this Colab, we compare reward models trained with IRL to hand-written reward functions in a simple "point mass" environment.

In each case, we have an original reward model $r_o$ and a target $r_t$. We seek to find the reward model $r$ that is equivalent to $r_o$ and is a minimal distance from the target $r_t$. Specifically, $r$ ranges over the set of reward models that are positive affine transformations and potential shaped versions of $r_o$:
  $$r(s,a,s') = \lambda(r_o(s,a,s') + \gamma \phi(s') - \phi(s)) + c,$$
where $\lambda > 0$ is a rescaling constant, $c \in \mathbb{R}$ a shift constant and $\phi(s)$ a potential function.

We solve the optimization problem:
  $$\min_{\lambda, c, \phi} D(r, r_t),$$
where the distance metric $D$ is:
  $$D(r_A, r_B) = \mathbb{E}_{s,a,s'} \left[(r_A(s,a,s') - r_B(s, a, s'))^2\right]$$

Note this distance metric depends on a state-action-next state distribution. In this notebook, we choose a simple distribution that samples $s$ and $a$ randomly, and then computes $s'$ by the (deterministic) dynamics.

# Setup: Imports, Environment Creation and Loading Models
---

In [0]:
import collections
import functools
import itertools
import math
import os

import gym
from stable_baselines.common import vec_env
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import scipy as sp
import tensorflow as tf
import uuid
import xarray

from imitation.policies import base
from imitation.rewards import reward_net
from imitation.util import rollout

from evaluating_rewards import serialize
from evaluating_rewards.envs import point_mass
from evaluating_rewards.experiments import comparisons
from evaluating_rewards.experiments import datasets
from evaluating_rewards.experiments import point_mass_analysis
from evaluating_rewards.experiments import visualize

In [0]:
# Config
MODEL_PATH = os.path.expanduser("~/output/train_adversarial")  # saved reward models
OUT_PATH = os.path.expanduser("~/output/comparison")  # path to save results to
env_name = 'evaluating_rewards/PointMassLineFixedHorizon-v0'

HARDCODED_MODEL_TYPES = [
  'Zero-v0',
  'PointMassGroundTruth-v0',
  'PointMassSparseReward-v0',
  'PointMassSparseRewardNoCtrl-v0',
  'PointMassDenseReward-v0',
  'PointMassDenseRewardNoCtrl-v0',
]

In [0]:
def find_model_paths(env_name):
  root_dir = os.path.join(MODEL_PATH, env_name.replace('/', '_'))
  return {name: os.path.join(root_dir, name,
                             'checkpoints', 'final', 'discrim', 'reward_net')
          for name in os.listdir(root_dir)}

def load_trained_models(venv, model_paths):
  reward_models = collections.OrderedDict()

  for k, path in model_paths.items():
    reward_models[f'{k}_shaped'] = serialize.load_reward('imitation/BasicShapedRewardNet_shaped', path, venv)
    reward_models[f'{k}_unshaped'] = serialize.load_reward('imitation/BasicShapedRewardNet_unshaped', path, venv)
  
  return reward_models

def load_hardcoded_models(venv, model_types):
  reward_models = collections.OrderedDict()

  for kind in model_types:
    reward_models[kind] = serialize.load_reward(f'evaluating_rewards/{kind}',
                                                None, venv)
  
  return reward_models

In [0]:
# Environment creation and model loading 
env = gym.make(env_name)
venv = vec_env.DummyVecEnv([lambda: env])

graph = tf.Graph()
with graph.as_default():
  sess = tf.Session(graph=graph)
  with sess.as_default():
    reward_models = load_hardcoded_models(venv, HARDCODED_MODEL_TYPES)
    model_paths = find_model_paths(env_name)
    reward_models.update(load_trained_models(venv, model_paths))

# Reward model plots
-----



In [0]:
def visualize_all(env, env_name, reward_models, goal=None, density=11):
  if goal is None:
    goal = np.array([0.0])
  
  with sess.as_default():
    rewards, _ = point_mass_analysis.evaluate_multiple_reward_models(env,
                                                                     reward_models, 
                                                                     goal=goal, 
                                                                     density=11)
  acc_figs = point_mass_analysis.plot_multiple_rewards(rewards,
                                                       goal, 
                                                       zaxis='acceleration')
  vel_figs = point_mass_analysis.plot_multiple_rewards(rewards,
                                                      goal, 
                                                      zaxis='velocity')
  pos_figs = point_mass_analysis.plot_multiple_rewards(rewards,
                                                      goal, 
                                                      zaxis='position')

  save_path = os.path.join(OUT_PATH, env_name.replace('/', '_'))
  visualize.save_figs(os.path.join(save_path, 'byacc'), acc_figs.items())
  visualize.save_figs(os.path.join(save_path, 'byvel'), vel_figs.items())
  visualize.save_figs(os.path.join(save_path, 'bypos'), pos_figs.items())

visualize_all(env, env_name, reward_models)

# Potential Matching
-----



In [0]:
random_policy = base.RandomPolicy(env.observation_space, env.action_space)
random_policy_dataset = datasets.rollout_generator(env, random_policy)
hardcoded_policy = point_mass.PointMassPolicy(env)
hardcoded_dataset = datasets.rollout_generator(env, hardcoded_policy)
random_model_dataset = datasets.random_generator(env)

In [0]:
def match_pipeline(env, env_name, original, target, name,
                   dataset=random_model_dataset, goal=None, **kwargs):
  with sess.as_default():
    with graph.as_default():
      res = point_mass_analysis.match_pipeline(env, original, target, dataset, **kwargs)

  save_path = os.path.join(OUT_PATH, env_name.replace("/", "_"))
  visualize.save_figs(os.path.join(save_path, "bypos"),
                      {name: res["fig"]}.items())
  
  return res

In [0]:
hardcoded_models = set(HARDCODED_MODEL_TYPES).difference(['Zero-v0'])
irl_models = set(reward_models.keys()).difference(HARDCODED_MODEL_TYPES)
fit_models = {}
for target_model in hardcoded_models:
  fit_models[target_model] = {}
  for irl_model in irl_models:
    print(f"Matching {irl_model} to {target_model}")

    res = match_pipeline(env, env_name,
                         reward_models[irl_model],
                         reward_models[target_model],
                         name=f"{irl_model}_vs_{target_model}")
    fit_models[target_model][irl_model] = res

In [0]:
with sess.as_default():
  print(comparisons.constant_baseline(sparse_vs_gt['match'], reward_models['PointMassGroundTruth-v0'], dataset=random_model_dataset))

# State Distributions
-----




In [0]:
fig = plt.figure()
point_mass_analysis.plot_state_density(random_model_dataset, 2**16)
plt.title('Random Model-Based')

fig = plt.figure()
point_mass_analysis.plot_state_density(random_policy_dataset, 2**16)
plt.title('Random Policy')

fig = plt.figure()
point_mass_analysis.plot_state_density(hardcoded_dataset, 2**16)
plt.title('Hardcoded')