# RoboSINDy: Project Introduction

**Authors**: Julian Skifstad and Advaith Balaji

This notebook implements the paper "Data-driven discovery of coordinates and governing equations" by Champion et al. on the Franka Panda Arm. The goal is to compare this method with e2c. This work aims to explore and evaluate the performance of these approaches in a robotic planar pushing task.

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import torch
import os
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from numpngw import write_apng
from IPython.display import Image
from tqdm.notebook import tqdm
import time

In [None]:
from utils.panda_pushing_env import PandaImageSpacePushingEnv
from utils.visualizers import GIFVisualizer, NotebookVisualizer
from utils.utils import *
from sindy.SINDy import RoboSINDy, SindyDataset, NormalizationTransform, PushingImgSpaceController, img_space_pushing_cost_function
from e2c.E2C import RoboE2C, GloboE2C, RoboE2CLoss
from utils.process_data_ import process_data
from utils.panda_pushing_env import TARGET_POSE_FREE, TARGET_POSE_OBSTACLES, OBSTACLE_HALFDIMS, OBSTACLE_CENTRE, BOX_SIZE

Visualize panda arm environment in gym. Executes 3 random pushes.

In [None]:
%matplotlib inline

# Create the visualizer
fig = plt.figure(figsize=(8,8))
hfig = display(fig, display_id=True)
visualizer = NotebookVisualizer(fig=fig, hfig=hfig)

# Initialize the simulation environment
env = PandaImageSpacePushingEnv(visualizer=visualizer,
                                render_non_push_motions=True,
                                camera_heigh=800,
                                camera_width=800,
                                grayscale=True,
                                done_at_goal=False)
env.reset()
# Perform a sequence of 3 random actions:
states = []
for i in tqdm(range(3)):
    action_i = env.action_space.sample()
    state, reward, done, info = env.step(action_i)
    states.append(state)
    if done:
        break

view_states(states)

plt.close(fig)
env.close()

Collect a dataset npy file that also included the state_derivatives. This will open the npy file in dataset_path and compute the state discrete time derivatives using central difference formula. It will then repackage states, actions, and state derivatives into a new .npy file which we can use as our dataset.

In [None]:
dataset_path = "datasets/collected_data_large_push.npy"
dt = 1/240.0 # time step in pybullet


data_npy = np.load(dataset_path, allow_pickle=True)

samples = []
for item in data_npy:
    
    states = item['states']
    actions = item['actions']
    state_derivatives = []
    for i in range(1, len(states)-1):
        state_derivative = (states[i+1] - states[i-1]) / dt
        state_derivatives.append(state_derivative)
    state_derivatives = np.array(state_derivatives)

    for i, state in enumerate(states[1:-1]):
        sample = {
            'state': state,
            'action': actions[i],
            'state_derivative': state_derivatives[i]
        }
        samples.append(sample)

view_states([sample['state'] for sample in samples[:5]])

# print info about samples
print(f"Number of samples: {len(samples)}")
print(f"Sample state shape: {samples[0]['state'].shape}")
print(f"Sample action shape: {samples[0]['action'].shape}")
print(f"Sample state derivative shape: {samples[0]['state_derivative'].shape}")

In [None]:
# Some training and datset parameters
val_fraction = 0.2
batch_size = 64
num_epochs = 10000

The cell below will construct the Dataset used for RoboSINDy. It reads each sample from the .npy samples we created, normalizes each state and state derivatives, and constructs a dataloader to be used for training.

In [None]:
dataset = SindyDataset(data=samples)

val_size = int(val_fraction * len(dataset))
train_size = len(dataset) - val_size
train_data, val_data = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_data, batch_size=batch_size)
val_loader = DataLoader(val_data, batch_size=batch_size)

tot_train_states = []
tot_train_state_derivatives = []
for i in range(len(train_loader.dataset)):
    s = train_loader.dataset[i]['states']   
    sd = train_loader.dataset[i]['state_derivatives']  
    tot_train_states.append(s.unsqueeze(0))
    tot_train_state_derivatives.append(sd.unsqueeze(0))
tot_train_states = torch.cat(tot_train_states,dim=0)  
tot_train_state_derivatives = torch.cat(tot_train_state_derivatives, dim=0)  # (N,C,H,W)

mean_s = tot_train_states .mean(dim=(0,2,3))
std_s = tot_train_states .std( dim=(0,2,3))
mean_sd = tot_train_state_derivatives.mean(dim=(0,2,3))
std_sd = tot_train_state_derivatives.std( dim=(0,2,3))

normalization_constants = {
  'mean_state': mean_s,              
  'std_state':  std_s,               
  'mean_state_derivative': mean_sd,  
  'std_state_derivative': std_sd,    
}

norm_tr = NormalizationTransform(normalization_constants)
train_data.dataset.transform = norm_tr
val_data.dataset.transform = norm_tr

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size)

print(f"Loaded {len(train_loader.dataset)} training samples and {len(val_loader.dataset)} validation samples.\n")

# get one sample and check quality
sample = train_loader.dataset[0]
print(f"Sample state shape: {sample['states'].shape}")
print(f"Sample action shape: {sample['actions'].shape}")
print(f"Sample state derivative shape: {sample['state_derivatives'].shape}")
print(f"Sample state: {sample['states']}")


# Part 1: SINDy Dynamics model with 2 latent dimensions

#### We first train a model that learns the latent dynamics of the planar pushing task with 2 latent variables. The hope is that the 2 latent variables will each capture the dynamics of the object in the $x$ and $y$ direction.

In [None]:
#define the model
model = RoboSINDy(input_dim=32*32*1, batch_size=batch_size)

model.train_model(train_loader, num_epochs=num_epochs, learning_rate=0.001)

#save the model
model_path = 'trained_models/sindy_model_v3_latent2.pt'
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")

Load model from .pt file. Skip if you just trained the model!

In [None]:
#load model from state dict

### SET THE PATH TO YOUR MODEL HERE ###
model_path = 'trained_models/sindy_model_v2_latent2.pt'
########################################

sindy_dynamics_model = RoboSINDy(input_dim=32*32*1, batch_size=batch_size, latent_dim=2)
sindy_dynamics_model.load_state_dict(torch.load(model_path))
sindy_dynamics_model.eval()
print(f"Model loaded from {model_path}")

In [None]:
sindy_dynamics_model.xi_coefficients.data = mask_xi_matrix(sindy_dynamics_model.xi_coefficients.data)

print("Xi coefficients:")
print(sindy_dynamics_model.xi_coefficients.data.numpy())

Visualize the robot pushing the block using the image space dynamics inferred by SINDy.

In [None]:
import pybullet as p

# Control on an obstacle free environment
%matplotlib inline

fig = plt.figure(figsize=(8,8))
hfig = display(fig, display_id=True)
visualizer = NotebookVisualizer(fig=fig, hfig=hfig)

start_state = np.array([0.4, 0.0, 0.0])
target_state = np.array([0.7, -0.0, 0.0])

# start_state = np.array([0.4, 0.3, -np.pi/2])
# target_state = np.array([0.4, -0.0, 0.0])

# start_state = np.array([0.3, 0.3, np.pi])
# target_state = np.array([0.0, 0.3, 0.0])
                
# start_state = np.array([0.6, 0.1, -np.pi/4])
# target_state = np.array([0.8, -0.0, 0.0])

############### choose number of trials to run ###################
num_trials = 3
##################################################################

for i in range(num_trials):
    env = PandaImageSpacePushingEnv(visualizer=visualizer, render_non_push_motions=False,  camera_heigh=800, camera_width=800, render_every_n_steps=5, grayscale=True, target_pose_vis=target_state, start_state=start_state)
    env.object_target_pose = env._planar_pose_to_world_pose(target_state)
    state_0 = env.reset()
    controller = PushingImgSpaceController(env, sindy_dynamics_model, img_space_pushing_cost_function, normalization_constants, num_samples=50, horizon=10)

    state = state_0

    num_steps_max = 15

    trajectory = []
    goal_reached = False
    for i in range(num_steps_max):
        action = controller.control(state)
        state, reward, done, _ = env.step(action)

        # check if we have reached the goal
        end_pose = env.get_object_pos_planar()
        trajectory.append(end_pose)
        goal_distance = np.linalg.norm(end_pose[:2]-target_state[:2]) # evaluate only position, not orientation
        goal_reached = goal_distance < BOX_SIZE/2
        if done or goal_reached:
            break

    ########### WRITRE PATH TO SAVE TRAJECTORIES ############
    trajectory_file = "trash.txt"
    write_traj_to_file(trajectory=trajectory, filename=trajectory_file)
    #########################################################

    print(f'GOAL REACHED: {goal_reached}')


plt.close(fig)

# Part 2: SINDy Dynamics model with 3 latent dimensions

#### We will now train a model that learns the latent dynamics of the planar pushing task with **3** latent variables. This model will likely be more expressive. 

In [None]:
#define the model
model = RoboSINDy(input_dim=32*32*1, batch_size=batch_size, latent_dim=3)

model.train_model(train_loader, num_epochs=num_epochs, learning_rate=0.001)

#save the model
model_path = 'trained_models/sindy_model_v9_latent3.pt'
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")

In [None]:
#load model from state dict

### SET THE PATH TO YOUR MODEL HERE ###
model_path = 'trained_models/sindy_model_v4_latent3.pt'
########################################

sindy_dynamics_model = RoboSINDy(input_dim=32*32*1, batch_size=batch_size, latent_dim=3)
sindy_dynamics_model.load_state_dict(torch.load(model_path))
sindy_dynamics_model.eval()
print(f"Model loaded from {model_path}")

In [None]:
sindy_dynamics_model.xi_coefficients.data = mask_xi_matrix(sindy_dynamics_model.xi_coefficients.data)

print("Xi coefficients:")
print(sindy_dynamics_model.xi_coefficients.data.numpy())

In [None]:
import pybullet as p

# Control on an obstacle free environment
%matplotlib inline

fig = plt.figure(figsize=(8,8))
hfig = display(fig, display_id=True)
visualizer = NotebookVisualizer(fig=fig, hfig=hfig)

start_state = np.array([0.4, 0.0, 0.0])
target_state = np.array([0.7, -0.0, 0.0])

# start_state = np.array([0.4, 0.3, -np.pi/2])
# target_state = np.array([0.4, -0.0, 0.0])

# start_state = np.array([0.3, 0.3, np.pi])
# target_state = np.array([0.0, 0.3, 0.0])
                
# start_state = np.array([0.6, 0.1, -np.pi/4])
# target_state = np.array([0.8, -0.0, 0.0])

############### choose number of trials to run ###################
num_trials = 3
##################################################################

for i in range(num_trials):
    env = PandaImageSpacePushingEnv(visualizer=visualizer, render_non_push_motions=False,  camera_heigh=800, camera_width=800, render_every_n_steps=5, grayscale=True, target_pose_vis=target_state, start_state=start_state)
    env.object_target_pose = env._planar_pose_to_world_pose(target_state)
    state_0 = env.reset()
    controller = PushingImgSpaceController(env, sindy_dynamics_model, img_space_pushing_cost_function, normalization_constants, num_samples=50, horizon=10)

    state = state_0

    num_steps_max = 15

    trajectory = []
    goal_reached = False
    for i in range(num_steps_max):
        action = controller.control(state)
        state, reward, done, _ = env.step(action)

        # check if we have reached the goal
        end_pose = env.get_object_pos_planar()
        trajectory.append(end_pose)
        goal_distance = np.linalg.norm(end_pose[:2]-target_state[:2]) # evaluate only position, not orientation
        goal_reached = goal_distance < BOX_SIZE/2
        if done or goal_reached:
            break

    ########### WRITRE PATH TO SAVE TRAJECTORIES ############
    trajectory_file = "trash.txt"
    write_traj_to_file(trajectory=trajectory, filename=trajectory_file)
    #########################################################

    print(f'GOAL REACHED: {goal_reached}')


plt.close(fig)

# Part 3: SINDy Dynamics model with 1 latent dimension

#### We will now train a model that learns the latent dynamics of the planar pushing task with **1** latent variable. I wonder what this will recover?

In [None]:
# TRAIN MODEL WITH LATENT DIMENSION 1

#define the model
model = RoboSINDy(input_dim=32*32*1, batch_size=batch_size, latent_dim=1)

model.train_model(train_loader, num_epochs=num_epochs, learning_rate=0.001)

#save the model
model_path = 'trained_models/sindy_model_v12_latent1.pt'
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")

In [None]:
#load model from state dict

### SET THE PATH TO YOUR MODEL HERE ###
model_path = 'trained_models/sindy_model_v4_latent1.pt'
########################################

sindy_dynamics_model = RoboSINDy(input_dim=32*32*1, batch_size=batch_size, latent_dim=1)
sindy_dynamics_model.load_state_dict(torch.load(model_path))
sindy_dynamics_model.eval()
print(f"Model loaded from {model_path}")

In [None]:
sindy_dynamics_model.xi_coefficients.data = mask_xi_matrix(sindy_dynamics_model.xi_coefficients.data)

print("Xi coefficients:")
print(sindy_dynamics_model.xi_coefficients.data.numpy())

In [None]:
import pybullet as p

# Control on an obstacle free environment
%matplotlib inline

fig = plt.figure(figsize=(8,8))
hfig = display(fig, display_id=True)
visualizer = NotebookVisualizer(fig=fig, hfig=hfig)

start_state = np.array([0.4, 0.0, 0.0])
target_state = np.array([0.7, -0.0, 0.0])

# start_state = np.array([0.4, 0.3, -np.pi/2])
# target_state = np.array([0.4, -0.0, 0.0])

# start_state = np.array([0.3, 0.3, np.pi])
# target_state = np.array([0.0, 0.3, 0.0])
                
# start_state = np.array([0.6, 0.1, -np.pi/4])
# target_state = np.array([0.8, -0.0, 0.0])

############### choose number of trials to run ###################
num_trials = 3
##################################################################

for i in range(num_trials):
    env = PandaImageSpacePushingEnv(visualizer=visualizer, render_non_push_motions=False,  camera_heigh=800, camera_width=800, render_every_n_steps=5, grayscale=True, target_pose_vis=target_state, start_state=start_state)
    env.object_target_pose = env._planar_pose_to_world_pose(target_state)
    state_0 = env.reset()
    controller = PushingImgSpaceController(env, sindy_dynamics_model, img_space_pushing_cost_function, normalization_constants, num_samples=50, horizon=10)

    state = state_0

    num_steps_max = 15

    trajectory = []
    goal_reached = False
    for i in range(num_steps_max):
        action = controller.control(state)
        state, reward, done, _ = env.step(action)

        # check if we have reached the goal
        end_pose = env.get_object_pos_planar()
        trajectory.append(end_pose)
        goal_distance = np.linalg.norm(end_pose[:2]-target_state[:2]) # evaluate only position, not orientation
        goal_reached = goal_distance < BOX_SIZE/2
        if done or goal_reached:
            break

    ########### WRITRE PATH TO SAVE TRAJECTORIES ############
    trajectory_file = "trash.txt"
    write_traj_to_file(trajectory=trajectory, filename=trajectory_file)
    #########################################################

    print(f'GOAL REACHED: {goal_reached}')


plt.close(fig)

# Part 4: Large latent dimension

#### We now train a model with a large function library and large latent dimension (8)

In [None]:
# TRAIN MODEL WITH LATENT DIMENSION 8

#define the model
model = RoboSINDy(input_dim=32*32*1, batch_size=batch_size, latent_dim=8)

model.train_model(train_loader, num_epochs=num_epochs, learning_rate=0.001)

#save the model
model_path = 'trained_models/sindy_model_v1_latent8.pt'
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")

In [None]:
#load model from state dict

### SET THE PATH TO YOUR MODEL HERE ###
model_path = 'trained_models/sindy_model_v1_latent8.pt'
########################################

sindy_dynamics_model = RoboSINDy(input_dim=32*32*1, batch_size=batch_size, latent_dim=8)
sindy_dynamics_model.load_state_dict(torch.load(model_path))
sindy_dynamics_model.eval()
print(f"Model loaded from {model_path}")

In [None]:
sindy_dynamics_model.xi_coefficients.data = mask_xi_matrix(sindy_dynamics_model.xi_coefficients.data)

print("Xi coefficients:")
print(sindy_dynamics_model.xi_coefficients.data.numpy())

In [None]:
import pybullet as p

# Control on an obstacle free environment
%matplotlib inline

fig = plt.figure(figsize=(8,8))
hfig = display(fig, display_id=True)
visualizer = NotebookVisualizer(fig=fig, hfig=hfig)

# start_state = np.array([0.4, 0.0, 0.0])
# target_state = np.array([0.7, -0.0, 0.0])

start_state = np.array([0.4, 0.3, -np.pi/2])
target_state = np.array([0.4, -0.0, 0.0])

# start_state = np.array([0.3, 0.3, np.pi])
# target_state = np.array([0.0, 0.3, 0.0])
                
# start_state = np.array([0.6, 0.1, -np.pi/4])
# target_state = np.array([0.8, -0.0, 0.0])

############### choose number of trials to run ###################
num_trials = 3
##################################################################

for i in range(num_trials):
    env = PandaImageSpacePushingEnv(visualizer=visualizer, render_non_push_motions=False,  camera_heigh=800, camera_width=800, render_every_n_steps=5, grayscale=True, target_pose_vis=target_state, start_state=start_state)
    env.object_target_pose = env._planar_pose_to_world_pose(target_state)
    state_0 = env.reset()
    controller = PushingImgSpaceController(env, sindy_dynamics_model, img_space_pushing_cost_function, normalization_constants, num_samples=50, horizon=10)

    state = state_0

    num_steps_max = 15

    trajectory = []
    goal_reached = False
    for i in range(num_steps_max):
        action = controller.control(state)
        state, reward, done, _ = env.step(action)

        # check if we have reached the goal
        end_pose = env.get_object_pos_planar()
        trajectory.append(end_pose)
        goal_distance = np.linalg.norm(end_pose[:2]-target_state[:2]) # evaluate only position, not orientation
        goal_reached = goal_distance < BOX_SIZE/2
        if done or goal_reached:
            break

    ########### WRITRE PATH TO SAVE TRAJECTORIES ############
    trajectory_file = "trash.txt"
    write_traj_to_file(trajectory=trajectory, filename=trajectory_file)
    #########################################################

    print(f'GOAL REACHED: {goal_reached}')


plt.close(fig)

# Part 5: Training the Locally Linear Model (2 Dimension Latent Space)

#### For comparison, we compare RoboSINDy to another latent dynamics model constrained to be locally linear. This model is based on Embed to Control (E2C) by Watter et. al. (2015). 

#### Here, we train the model on a 2 dimensional latent space.

In [None]:
from e2c.E2C import RoboE2C, RoboE2CLoss
from utils.process_data_ import process_data_multiple_step, process_data

# Train the dynamics model
LATENT_DIM = 2
ACTION_DIM = 3
NUM_CHANNELS = 1

collected_data = np.load('datasets/collected_data_large_push.npy', allow_pickle=True)

model = RoboE2C(latent_dim=LATENT_DIM, action_dim=ACTION_DIM, num_channels=NUM_CHANNELS)

state_loss_fn = nn.MSELoss()
latent_loss_fn = nn.MSELoss()
loss = RoboE2CLoss(state_loss_fn, latent_loss_fn, alpha=0.1)

# Compute normalization constants
train_loader, val_loader, norm_constants = process_data(collected_data, batch_size=batch_size)

optimizer = optim.Adam(model.parameters(), lr=0.0001)
pbar = tqdm(range(30000))

train_losses = []
model.train()
for epoch_i in pbar:
    train_loss_i = 0.

    for batch_idx, batch in enumerate(train_loader):
        states = batch['states']
        actions = batch['actions']
        optimizer.zero_grad()
        loss = loss(model, states, actions)
        loss.backward()
        optimizer.step()
        train_loss_i += loss.item()

    pbar.set_description(f'Latent dim {LATENT_DIM} - Loss: {train_loss_i:.4f}')
    train_losses.append(train_loss_i)

# ---

# plot loss: 
fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(12, 3))
axes = [axes]
axes[0].plot(train_losses, label=f'latent_dim: {LATENT_DIM}')
axes[0].grid()
axes[0].legend()
axes[0].set_title('Train Loss')
axes[0].set_xlabel('Epochs')
axes[0].set_ylabel('Train Loss')
axes[0].set_yscale('log')

# save model:
save_filename = 'e2c_derivative_big_data_2dim.pt'
torch.save(model.state_dict(), save_filename)

In [None]:
# Load the model

model_path = 'trained_models/e2c_derivative_big_data_2dim.pt'
########################################
LATENT_DIM = 2
ACTION_DIM = 3
NUM_CHANNELS = 1

model = RoboE2C(latent_dim=LATENT_DIM, action_dim=ACTION_DIM, num_channels=NUM_CHANNELS)
# model_path = os.path.join('multi_step_latent_dynamics_model.pt')
model.load_state_dict(torch.load(model_path, weights_only=True))
model.eval()

In [None]:
import pybullet as p
from e2c.E2C import PushingImgSpaceController
# Control on an obstacle free environment
%matplotlib inline

fig = plt.figure(figsize=(8,8))
hfig = display(fig, display_id=True)
visualizer = NotebookVisualizer(fig=fig, hfig=hfig)

start_state = np.array([0.4, 0.0, 0.0])
target_state = np.array([0.7, -0.0, 0.0])

# start_state = np.array([0.4, 0.3, -np.pi/2])
# target_state = np.array([0.4, -0.0, 0.0])

# start_state = np.array([0.3, 0.3, np.pi])
# target_state = np.array([0.0, 0.3, 0.0])
                
# start_state = np.array([0.6, 0.1, -np.pi/4])
# target_state = np.array([0.8, -0.0, 0.0])

############### choose number of trials to run ###################
num_trials = 3
##################################################################

for i in range(num_trials):
    env = PandaImageSpacePushingEnv(visualizer=visualizer, render_non_push_motions=False,  camera_heigh=800, camera_width=800, render_every_n_steps=5, grayscale=True, target_pose_vis=target_state, start_state=start_state)
    env.object_target_pose = env._planar_pose_to_world_pose(target_state)
    state_0 = env.reset()
    controller = PushingImgSpaceController(env, model, img_space_pushing_cost_function, norm_constants, num_samples=50, horizon=10)

    state = state_0

    num_steps_max = 15

    trajectory = []
    goal_reached = False
    for i in range(num_steps_max):
        action = controller.control(state)
        state, reward, done, _ = env.step(action)

        # check if we have reached the goal
        end_pose = env.get_object_pos_planar()
        trajectory.append(end_pose)
        goal_distance = np.linalg.norm(end_pose[:2]-target_state[:2]) # evaluate only position, not orientation
        goal_reached = goal_distance < BOX_SIZE/2
        if done or goal_reached:
            break

    ########### WRITRE PATH TO SAVE TRAJECTORIES ############
    trajectory_file = "trash.txt"
    write_traj_to_file(trajectory=trajectory, filename=trajectory_file)
    #########################################################

    print(f'GOAL REACHED: {goal_reached}')


plt.close(fig)

# Part 6: Training the Locally Linear Model (3 Dimension Latent Space)

#### Here, we train the locally linear model on a 3 dimensional latent space.

In [None]:
from e2c.E2C import RoboE2C, RoboE2CLoss
from utils.process_data_ import process_data_multiple_step, process_data

# Train the dynamics model
LATENT_DIM = 3
ACTION_DIM = 3
NUM_CHANNELS = 1

collected_data = np.load('datasets/collected_data_large_push.npy', allow_pickle=True)

model = RoboE2C(latent_dim=LATENT_DIM, action_dim=ACTION_DIM, num_channels=NUM_CHANNELS)

state_loss_fn = nn.MSELoss()
latent_loss_fn = nn.MSELoss()
loss_fn = RoboE2CLoss(state_loss_fn, latent_loss_fn, alpha=0.1)

# Compute normalization constants
train_loader, val_loader, norm_constants = process_data(collected_data, batch_size=batch_size)

optimizer = optim.Adam(model.parameters(), lr=0.0001)
pbar = tqdm(range(30000))

train_losses = []
model.train()
for epoch_i in pbar:
    train_loss_i = 0.

    for batch_idx, batch in enumerate(train_loader):
        states = batch['states']
        actions = batch['actions']
        optimizer.zero_grad()
        loss = loss_fn(model, states, actions)
        loss.backward()
        optimizer.step()
        train_loss_i += loss.item()

    pbar.set_description(f'Latent dim {LATENT_DIM} - Loss: {train_loss_i:.4f}')
    train_losses.append(train_loss_i)

# ---

# plot loss: 
fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(12, 3))
axes = [axes]
axes[0].plot(train_losses, label=f'latent_dim: {LATENT_DIM}')
axes[0].grid()
axes[0].legend()
axes[0].set_title('Train Loss')
axes[0].set_xlabel('Epochs')
axes[0].set_ylabel('Train Loss')
axes[0].set_yscale('log')

# save model:
save_filename = 'robo_e2c_3dim_vn.pt'
torch.save(model.state_dict(), save_filename)

In [None]:
# Load the model

model_path = 'trained_models/robo_e2c_3dim_v2.pt'
########################################
LATENT_DIM = 3
ACTION_DIM = 3
NUM_CHANNELS = 1

model = RoboE2C(latent_dim=LATENT_DIM, action_dim=ACTION_DIM, num_channels=NUM_CHANNELS)
# model_path = os.path.join('multi_step_latent_dynamics_model.pt')
model.load_state_dict(torch.load(model_path, weights_only=True))
model.eval()

collected_data = np.load('datasets/collected_data_large_push.npy', allow_pickle=True)
train_loader, val_loader, norm_constants = process_data(collected_data, batch_size=batch_size)


In [None]:
import pybullet as p
from e2c.E2C import PushingImgSpaceController
# Control on an obstacle free environment
%matplotlib inline

fig = plt.figure(figsize=(8,8))
hfig = display(fig, display_id=True)
visualizer = NotebookVisualizer(fig=fig, hfig=hfig)

start_state = np.array([0.4, 0.3, -np.pi/2.0])
target_state = np.array([0.4, -0.0, 0.0])

############### choose number of trials to run ###################
num_trials = 4
##################################################################

for i in range(num_trials):
    env = PandaImageSpacePushingEnv(visualizer=visualizer, render_non_push_motions=False,  camera_heigh=800, camera_width=800, render_every_n_steps=5, grayscale=True, target_pose_vis=target_state, start_state=start_state)
    env.object_target_pose = env._planar_pose_to_world_pose(target_state)
    state_0 = env.reset()
    controller = PushingImgSpaceController(env, model, img_space_pushing_cost_function, norm_constants, num_samples=50, horizon=10)

    state = state_0

    num_steps_max = 15

    trajectory = []
    goal_reached = False
    for i in range(num_steps_max):
        action = controller.control(state)
        state, reward, done, _ = env.step(action)

        # check if we have reached the goal
        end_pose = env.get_object_pos_planar()
        trajectory.append(end_pose)
        goal_distance = np.linalg.norm(end_pose[:2]-target_state[:2]) # evaluate only position, not orientation
        goal_reached = goal_distance < BOX_SIZE/1.5
        if done or goal_reached:
            break

    ########### WRITRE PATH TO SAVE TRAJECTORIES ############
    trajectory_file = "roboE2C_3dim_horizontal.txt"
    write_traj_to_file(trajectory=trajectory, filename=trajectory_file)
    #########################################################

    print(f'GOAL REACHED: {goal_reached}')


plt.close(fig)

# Part 7: Training the Globally Linear Model (3 Dimension Latent Space)

#### We follow inspiration from the E2C paper and present a globally linear latent space model. We train the globally linear model on a 3 dimensional latent space.

In [None]:
from e2c.E2C import GloboE2C, RoboE2CLoss
from utils.process_data_ import process_data

# Train the dynamics model
LATENT_DIM = 3
ACTION_DIM = 3
NUM_CHANNELS = 1

collected_data = np.load('datasets/collected_data_large_push.npy', allow_pickle=True)

model = GloboE2C(latent_dim=LATENT_DIM, action_dim=ACTION_DIM, num_channels=NUM_CHANNELS)

state_loss_fn = nn.MSELoss()
latent_loss_fn = nn.MSELoss()
loss_fn = RoboE2CLoss(state_loss_fn, latent_loss_fn, alpha=0.1)

# Compute normalization constants
train_loader, val_loader, norm_constants = process_data(collected_data, batch_size=batch_size)

optimizer = optim.Adam(model.parameters(), lr=0.0001)
pbar = tqdm(range(35000))

train_losses = []
model.train()
for epoch_i in pbar:
    train_loss_i = 0.

    for batch_idx, batch in enumerate(train_loader):
        states = batch['states']
        actions = batch['actions']
        optimizer.zero_grad()
        loss = loss_fn(model, states, actions)
        loss.backward()
        optimizer.step()
        train_loss_i += loss.item()

    pbar.set_description(f'Latent dim {LATENT_DIM} - Loss: {train_loss_i:.4f}')
    train_losses.append(train_loss_i)

# ---

# plot loss: 
fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(12, 3))
axes = [axes]
axes[0].plot(train_losses, label=f'latent_dim: {LATENT_DIM}')
axes[0].grid()
axes[0].legend()
axes[0].set_title('Train Loss')
axes[0].set_xlabel('Epochs')
axes[0].set_ylabel('Train Loss')
axes[0].set_yscale('log')

# save model:
save_filename = 'global_e2c_3dim_v2.pt'
torch.save(model.state_dict(), save_filename)

In [None]:
# Load the model
from e2c.E2C import GloboE2C, RoboE2CLoss
from utils.process_data_ import process_data
model_path = 'trained_models/global_e2c_3dim_v2.pt'
########################################
LATENT_DIM = 3
ACTION_DIM = 3
NUM_CHANNELS = 1

model = GloboE2C(latent_dim=LATENT_DIM, action_dim=ACTION_DIM, num_channels=NUM_CHANNELS)
# model_path = os.path.join('multi_step_latent_dynamics_model.pt')
model.load_state_dict(torch.load(model_path, weights_only=True))
model.eval()

collected_data = np.load('datasets/collected_data_large_push.npy', allow_pickle=True)
train_loader, val_loader, norm_constants = process_data(collected_data, batch_size=batch_size)


In [None]:
import pybullet as p
from e2c.E2C import PushingImgSpaceController
# Control on an obstacle free environment
%matplotlib inline

fig = plt.figure(figsize=(8,8))
hfig = display(fig, display_id=True)
visualizer = NotebookVisualizer(fig=fig, hfig=hfig)
start_state = np.array([0.4, 0.3, -np.pi/2.0])
target_state = np.array([0.4, -0.0, 0.0])
# start_state = np.array([0.4, 0.0, 0.0])
# target_state = np.array([0.7, -0.0, 0.0])

# start_state = np.array([0.4, 0.3, -np.pi/2])
# target_state = np.array([0.4, -0.0, 0.0])

# start_state = np.array([0.3, 0.3, np.pi])
# target_state = np.array([0.0, 0.3, 0.0])
                
# start_state = np.array([0.6, 0.1, -np.pi/4])
# target_state = np.array([0.8, -0.0, 0.0])

############### choose number of trials to run ###################
num_trials = 1
##################################################################

for i in range(num_trials):
    env = PandaImageSpacePushingEnv(visualizer=visualizer, render_non_push_motions=False,  camera_heigh=800, camera_width=800, render_every_n_steps=5, grayscale=True, target_pose_vis=target_state, start_state=start_state)
    env.object_target_pose = env._planar_pose_to_world_pose(target_state)
    state_0 = env.reset()
    controller = PushingImgSpaceController(env, model, img_space_pushing_cost_function, norm_constants, num_samples=50, horizon=10)

    state = state_0

    num_steps_max = 15

    trajectory = []
    goal_reached = False
    for i in range(num_steps_max):
        action = controller.control(state)
        state, reward, done, _ = env.step(action)

        # check if we have reached the goal
        end_pose = env.get_object_pos_planar()
        trajectory.append(end_pose)
        goal_distance = np.linalg.norm(end_pose[:2]-target_state[:2]) # evaluate only position, not orientation
        goal_reached = goal_distance < BOX_SIZE/2
        if done or goal_reached:
            break

    ########### WRITRE PATH TO SAVE TRAJECTORIES ############
    trajectory_file = "globoE2C_3dim.txt"
    write_traj_to_file(trajectory=trajectory, filename=trajectory_file)
    #########################################################

    print(f'GOAL REACHED: {goal_reached}')


plt.close(fig)

In [None]:
# Print the global coefficients
print(model.A)
print(model.B)
print(model.b)
