In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import sys
sys.path.append("..")
sys.path.append(".")
import torch
import numpy as np
import plotly
import plotly.graph_objs as go
import pickle
from datetime import datetime
plotly.offline.init_notebook_mode(connected=True)

import robust_value_approx.relu_mpc as relu_mpc
import robust_value_approx.adversarial_sample as adversarial_sample
import robust_value_approx.utils as utils
import plotting_utils

# Double Integrator Example

In [None]:
import double_integrator_utils

N = 5

# value function we benchmark the resulting control against
vf = double_integrator_utils.get_value_function(N=N+1)
V = vf.get_value_function()

# value function used by the controller beyond one time step
vf_next = double_integrator_utils.get_value_function(N=N)
V_next = vf_next.get_value_function()

x0_lo = -1 * torch.ones(vf.sys.x_dim, dtype=vf.dtype)
x0_up = 1 * torch.ones(vf.sys.x_dim, dtype=vf.dtype)

# file options
sys_name = 'double_int'
x_samples_file = '../data/learn_value_function_' + sys_name + '_x'
v_samples_file = '../data/learn_value_function_' + sys_name + '_v'
model_file = '../data/' + sys_name

# Vertical Ball Paddle Example

In [None]:
import ball_paddle_utils

N = 5

# value function we benchmark the resulting control against
vf = ball_paddle_utils.get_value_function_vertical(N=N+1)
V = vf.get_value_function()

# value function used by the controller beyond one time step
vf_next = ball_paddle_utils.get_value_function_vertical(N=N)
V_next = vf_next.get_value_function()

x0_lo = torch.Tensor([1.5, .15, -5., -1.]).type(vf.dtype)
x0_up = torch.Tensor([2., .15, 1., 5.]).type(vf.dtype)

# data file options
sys_name = 'ball_paddle_vertical'
x_samples_file = '../data/learn_value_function_' + sys_name + '_x'
v_samples_file = '../data/learn_value_function_' + sys_name + '_v'
model_file = '../data/' + sys_name

# SLIP Goal

In [None]:
import slip_utils

N = 3
xf = torch.Tensor([6., 1.25, 0.])

# value function we benchmark the resulting control against
vf, slip = slip_utils.get_value_function(xf, N=N+1)
V = vf.get_value_function()

# value function used by the controller beyond one time step
vf_next, slip_next = slip_utils.get_value_function(xf, N=N)
V_next = vf_next.get_value_function()

x0_lo = torch.Tensor([0, .95, 4.]).type(vf.dtype)
x0_up = torch.Tensor([0, 1.25, 9.]).type(vf.dtype)

# data file options
sys_name = 'slip'
x_samples_file = '../data/learn_value_function_' + sys_name + '_x'
v_samples_file = '../data/learn_value_function_' + sys_name + '_v'
model_file = '../data/' + sys_name

# SLIP Gait

In [None]:
import slip_utils

N = 3
xf = torch.Tensor([0., 1.2, 5])

# value function we benchmark the resulting control against
vf, slip = slip_utils.get_value_function_gait(xf, N=N+1)
V = vf.get_value_function()

# value function used by the controller beyond one time step
vf_next, slip_next = slip_utils.get_value_function_gait(xf, N=N)
V_next = vf_next.get_value_function()

x0_lo = torch.Tensor([0, .95, 4.]).type(vf.dtype)
x0_up = torch.Tensor([0, 1.25, 9.]).type(vf.dtype)

# data file options
sys_name = 'slipgait'
x_samples_file = '../data/learn_value_function_' + sys_name + '_x'
v_samples_file = '../data/learn_value_function_' + sys_name + '_v'
model_file = '../data/' + sys_name

# Get Controllers

In [None]:
baseline_model = torch.load(model_file + '_baseline_model.pt')
robust_model = torch.load(model_file + '_robust_model.pt')
baseline_ctrl = relu_mpc.ReLUMPC(vf, baseline_model)
robust_ctrl = relu_mpc.ReLUMPC(vf, robust_model)
def eval_one_step_ctrl(ctrl, x0_samp):
    (u0, x1) = ctrl.get_ctrl(x0_samp)
    if u0 is None:
        return (None, None, None, None)
    (x_traj_next, u_traj_next, alpha_traj_next) = vf_next.sol_to_traj(x1, *V_next(x1)[1:])
    if x_traj_next is None:
        return (None, None, None, None)
    x_traj = torch.cat((x0_samp.unsqueeze(0).t(), x_traj_next), axis=1)
    u_traj = torch.cat((u0.unsqueeze(0).t(), u_traj_next), axis=1)
    # assumes no cost on alpha! (true on all benchmarks)    
    value = vf.traj_cost(x_traj[:,1:], u_traj)
    return (value, x_traj, u_traj, x1)

In [None]:
x_samples_validation = torch.load(x_samples_file + '_validation.pt')

# Evaluate their Performance on Average

In [None]:
cost_opt = torch.Tensor(0, 1).type(vf.dtype)
cost_baseline = torch.Tensor(0, 1).type(vf.dtype)
cost_robust = torch.Tensor(0, 1).type(vf.dtype)

cost_opt_nl = torch.Tensor(0, 1).type(vf.dtype)
cost_baseline_nl = torch.Tensor(0, 1).type(vf.dtype)
cost_robust_nl = torch.Tensor(0, 1).type(vf.dtype)

x1_baselines = torch.Tensor(0, vf.sys.x_dim).type(vf.dtype)
x1_robusts = torch.Tensor(0, vf.sys.x_dim).type(vf.dtype)

In [None]:
num_samples = 100
# sample_down = 10
# num_samples = int(x_samples_validation.shape[0] / sample_down)
for i in range(num_samples):
    x0_samp = torch.rand(vf.sys.x_dim) * (x0_up - x0_lo) + x0_lo
#     x0_samp = x_samples_validation[i*sample_down,:]

    optimal_value, opt_s, opt_alpha = V(x0_samp)
    if optimal_value is None:
        print("opt bad")
        continue
    (x_traj_opt, u_traj_opt, alpha_traj_opt) = vf.sol_to_traj(x0_samp, opt_s, opt_alpha)
    
#     x_traj_opt_nl = slip_utils.slip_nonlinear_traj(slip, x0_samp, u_traj_opt)
#     if x_traj_opt_nl is None:
#         print("nl opt bad")
#         continue
#     optimal_nl_value = vf.traj_cost(x_traj_opt_nl[:,1:], u_traj_opt)
    
    (baseline_value, baseline_x_traj, baseline_u_traj, x1_baseline) = eval_one_step_ctrl(baseline_ctrl, x0_samp)
    if baseline_value is None:
        print("baseline bad")
        continue
    x1_baselines = torch.cat((x1_baselines, x1_baseline.unsqueeze(0)), axis=0)
    
#     baseline_x_traj_nl = slip_utils.slip_nonlinear_traj(slip, x0_samp, baseline_u_traj)
#     if baseline_x_traj_nl is None:
#         print("nl baseline bad")
#         continue
#     baseline_nl_value = vf.traj_cost(baseline_x_traj_nl[:,1:], baseline_u_traj)
    
    (robust_value, robust_x_traj, robust_u_traj, x1_robust) = eval_one_step_ctrl(robust_ctrl, x0_samp)
    if robust_value is None:
        print("robust bad")
        continue
    x1_robusts = torch.cat((x1_robusts, x1_robust.unsqueeze(0)), axis=0)

#     robust_x_traj_nl = slip_utils.slip_nonlinear_traj(slip, x0_samp, robust_u_traj)
#     if robust_x_traj_nl is None:
#         print("nl robust bad")
#         continue
#     robust_nl_value = vf.traj_cost(robust_x_traj_nl[:,1:], robust_u_traj)
    
    cost_opt = torch.cat((cost_opt, torch.Tensor([[optimal_value]]).type(vf.dtype)), 0)
    cost_baseline = torch.cat((cost_baseline, torch.Tensor([[baseline_value.item()]]).type(vf.dtype)), 0)
    cost_robust = torch.cat((cost_robust, torch.Tensor([[robust_value.item()]]).type(vf.dtype)), 0)
    
#     cost_opt_nl = torch.cat((cost_opt_nl, torch.Tensor([[optimal_nl_value]]).type(vf.dtype)), 0)
#     cost_baseline_nl = torch.cat((cost_baseline_nl, torch.Tensor([[baseline_nl_value.item()]]).type(vf.dtype)), 0)
#     cost_robust_nl = torch.cat((cost_robust_nl, torch.Tensor([[robust_nl_value.item()]]).type(vf.dtype)), 0)
    
    utils.update_progress((i + 1) / num_samples)

In [None]:
state_i = 1
nbin = 100
fig = go.Figure()
fig.add_trace(go.Histogram(x=x1_baselines[:,state_i], name="baseline", nbinsx=nbin, bingroup=1, marker_color=plotting_utils.BASELINE_COLOR))
fig.add_trace(go.Histogram(x=x1_robusts[:,state_i], name="robust", nbinsx=nbin, bingroup=1, marker_color=plotting_utils.ROBUST_COLOR))
fig.update_layout(barmode='overlay')
fig.update_traces(opacity=0.75)
fig.show()

In [None]:
fig = plotting_utils.control_perf(cost_opt, cost_baseline, cost_robust, nbin=100, bartop=120, clamp_val=10000)
fig.show()

In [None]:
print("Robust mean: " + str(torch.mean(cost_robust - cost_opt).item()))
print("Baseline mean: " + str(torch.mean(cost_baseline - cost_opt).item()))

In [None]:
fig = plotting_utils.control_perf(cost_opt_nl, cost_baseline_nl, cost_robust_nl, nbin=100, bartop=120, clamp_val=100000)
fig.show()

In [None]:
print("Nonlinear Robust sub-opt mean: " + str(torch.mean(cost_robust_nl - cost_opt_nl).item()))
print("Nonlinear Baseline sub-opt mean: " + str(torch.mean(cost_baseline_nl - cost_opt_nl).item()))

In [None]:
print("Nonlinear Robust mean: " + str(torch.mean(cost_robust_nl).item()))
print("Nonlinear Baseline mean: " + str(torch.mean(cost_baseline_nl).item()))
print("Nonlinear Optimal mean: " + str(torch.mean(cost_opt_nl).item()))

# Saving Results

In [None]:
now = datetime.now()
pickle.dump([cost_opt, cost_baseline, cost_robust], open("control_perf_" + sys_name + "_" + now.strftime("%m%d%Y%H%M%S") + ".p", "wb"))

In [None]:
now = datetime.now()
pickle.dump([cost_opt_nl, cost_baseline_nl, cost_robust_nl], open("control_perf_nl_" + sys_name + "_" + now.strftime("%m%d%Y%H%M%S") + ".p", "wb"))

# Loading Results

In [None]:
[cost_opt, cost_baseline, cost_robust] = pickle.load(open("control_perf_slipgait_01302020161353.p", "rb"))

In [None]:
[cost_opt_nl, cost_baseline_nl, cost_robust_nl] = pickle.load(open("control_perf_nl_slipgait_01302020161353.p", "rb"))

# SLIP Gait Plotting

In [None]:
num_samples = 100
# sample_down = 100
# num_samples = int(x_samples_validation.shape[0] / sample_down)
for i in range(num_samples):
    x0_samp = torch.rand(vf.sys.x_dim) * (x0_up - x0_lo) + x0_lo
#     x0_samp = x_samples_validation[i*sample_down,:]

    optimal_value, opt_s, opt_alpha = V(x0_samp)
    if optimal_value is None:
        print("opt bad")
        continue
    (x_traj_opt, u_traj_opt, alpha_traj_opt) = vf.sol_to_traj(x0_samp, opt_s, opt_alpha)
    
    (baseline_value, baseline_x_traj, baseline_u_traj) = eval_one_step_ctrl(baseline_ctrl, x0_samp)
    if baseline_value is None:
        print("baseline bad")
        continue
    
    (robust_value, robust_x_traj, robust_u_traj) = eval_one_step_ctrl(robust_ctrl, x0_samp)
    if robust_value is None:
        print("robust bad")
        continue

    if robust_value < baseline_value:
        print(x0_samp)
        print("Robust: " + str(robust_value))
        print(robust_u_traj)
        print(robust_x_traj)
        print("Baseline: " + str(baseline_value))
        print(baseline_u_traj)
        print(baseline_x_traj)
        print("Optimal:" + str(optimal_value))
        print(u_traj_opt)
        print(x_traj_opt)
        print("---")
        
    print(i)

In [None]:
# Evaluate performance on nonlinear system
# x0_samp = torch.rand(vf.sys.x_dim) * (x0_up - x0_lo) + x0_lo
# x0_samp = torch.Tensor([0.0000, 1.0869, 6.6037]).type(vf.dtype)
# x0_samp = torch.Tensor([0.0000, 1.2500, 7.3988]).type(vf.dtype)
# x0_samp = torch.Tensor([0.0000, 1.1193, 5.2765]).type(vf.dtype)
# x0_samp = torch.Tensor([0.0000, 0.9500, 4.0000]).type(vf.dtype)
# x0_samp = torch.Tensor([0.0000, 1.2109, 7.8834]).type(vf.dtype)
# x0_samp = torch.Tensor([0.0000, 1.2112, 7.9895]).type(vf.dtype)
# x0_samp = torch.Tensor([0.0000, 1.2113, 4.0963]).type(vf.dtype)                   
x0_samp = torch.Tensor([0.0000, 1.1845, 5.1934]).type(vf.dtype) 

optimal_value, opt_s, opt_alpha = V(x0_samp)
if optimal_value is None:
    print("opt bad")
(x_traj_opt, u_traj_opt, alpha_traj_opt) = vf.sol_to_traj(x0_samp, opt_s, opt_alpha)
print(u_traj_opt)

fig = plotting_utils.slip_traj(slip, x_traj_opt[:,:-1], u_traj_opt[:,:-1], xf)
fig.show()

In [None]:
(baseline_value, baseline_x_traj, baseline_u_traj) = eval_one_step_ctrl(baseline_ctrl, x0_samp)
if baseline_value is None:
    print("baseline bad")
print(baseline_u_traj)
fig = plotting_utils.slip_traj(slip, baseline_x_traj[:,:-1], baseline_u_traj[:,:-1], xf)
fig.show()

In [None]:
(robust_value, robust_x_traj, robust_u_traj) = eval_one_step_ctrl(robust_ctrl, x0_samp)
if robust_value is None:
    print("robust bad")
print(robust_u_traj)
fig = plotting_utils.slip_traj(slip, robust_x_traj[:,:-1], robust_u_traj[:,:-1], xf)
fig.show()

In [None]:
fig.write_image("slip_baseline.png")

# Loading Old Data

In [None]:
[cost_opt, cost_baseline, cost_robust] = pickle.load(open("control_perf_double_int_01252020175702.p", "rb"))