# Visual proprioception experiments

Experiments whether the robot can recover its own positional parameters from the visual input. This is useful in itself, but it is also a reasonable sanity test of whether the vision system is meaningful.

In [1]:
import sys
sys.path.append("..")
from settings import Config

import pathlib
#from pprint import pformat


#import matplotlib.pyplot as plt

import torch
import torch.nn as nn
#import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

torch.manual_seed(1)

from behavior_cloning.demo_to_trainingdata import BCDemonstration
from sensorprocessing import sp_conv_vae



Loading pointer config file: /home/lboloni/.config/BerryPicker/mainsettings.yaml
Loading machine-specific config file: /home/lboloni/Insync/lotzi.boloni@gmail.com/Google Drive/LotziStudy/Code/PackageTracking/BerryPicker/settings/settings-tredy2.yaml


## Creating the training and validation data

FIXME: the action in the case is actually the position

In [2]:
def load_demonstrations_as_proprioception_training(sp, task):
    """Loads all the images of a task, and processes it as two tensors as input and target data for proprioception training. 
    FIXME: currently returns the position data, can be used differently."""
    demos_dir = pathlib.Path(Config()["demos"]["directory"])
    task_dir = pathlib.Path(demos_dir, "demos", task)
    
    inputlist = []
    targetlist = []

    for demo_dir in task_dir.iterdir():
        if not demo_dir.is_dir():
            pass
        bcd = BCDemonstration(demo_dir, sensorprocessor=sp)
        print(bcd)
        z, a = bcd.read_z_a()
        print(z.shape)
        print(a.shape)

        for i in range(z.shape[0]):
            inp = torch.from_numpy(z[i])
            tgt = torch.from_numpy(a[i])
            inputlist.append(inp)
            targetlist.append(tgt)

    inputs = torch.stack(inputlist)
    targets = torch.stack(targetlist)
    return inputs, targets


In [3]:
# check if the data is already available, otherwise generate and save it
proprioception_input_file = pathlib.Path(Config()["explorations"]["proprioception_input_file"])
proprioception_target_file = pathlib.Path(Config()["explorations"]["proprioception_target_file"])
if proprioception_input_file.exists():
    inputs = torch.load(proprioception_input_file, weights_only=True)
    targets = torch.load(proprioception_target_file, weights_only=True)
else:
    task = "proprioception-uncluttered"
    sp = sp_conv_vae.ConvVaeSensorProcessing()
    inputs, targets = load_demonstrations_as_proprioception_training(sp, task)
    torch.save(inputs, proprioception_input_file)
    torch.save(targets, proprioception_target_file)

In [4]:

# Separate the training and validation data. 
# We will be shuffling the demonstrations 
rows = torch.randperm(inputs.size(0)) 
shuffled_inputs = inputs[rows]
shuffled_targets = targets[rows]

training_size = int( inputs.size(0) * 0.67 )
inputs_training = shuffled_inputs[1:training_size]
targets_training = shuffled_targets[1:training_size]

inputs_validation = shuffled_inputs[training_size:]
targets_validation = shuffled_targets[training_size:]

In [5]:
# Define the MLP regression model
class MLPRegression(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(MLPRegression, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, output_size)
        )

    def forward(self, x):
        return self.model(x)


In [6]:

# Initialize model, loss function, and optimizer
input_size = inputs_training.size(1)
hidden_size = 64
output_size = targets_training.size(1)

print(input_size)
print(output_size)

model = MLPRegression(input_size, hidden_size, output_size)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


128
6


In [7]:


# Create DataLoaders for batching
batch_size = 32
train_dataset = TensorDataset(inputs_training, targets_training)
test_dataset = TensorDataset(inputs_validation, targets_validation)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)



In [8]:
def train_and_save_proprioception_model(modelfile):
    """Trains and saves the proprioception model
    FIXME: must have parameters etc to investigate alternative models. 
    """

    # Training loop
    num_epochs = 20 # was 800
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for batch_X, batch_y in train_loader:
            # Forward pass
            predictions = model(batch_X)
            loss = criterion(predictions, batch_y)
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss / len(train_loader):.4f}')

    # Evaluate the model
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            predictions = model(batch_X)
            loss = criterion(predictions, batch_y)
            test_loss += loss.item()

    test_loss /= len(test_loader)
    print(f'Test Loss: {test_loss:.4f}')
    torch.save(model.state_dict(), modelfile)

In [10]:
modelfile = pathlib.Path(Config()["explorations"]["proprioception_mlp_model_file"])
if modelfile.exists():
    model.load_state_dict(torch.load(modelfile))
else:
    train_and_save_proprioception_model(modelfile)

  model.load_state_dict(torch.load(modelfile))


In [14]:
# Checking if the reloaded model works
model.eval()
test_loss = 0
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        predictions = model(batch_X)
        loss = criterion(predictions, batch_y)
        test_loss += loss.item()

test_loss /= len(test_loader)
print(f'Test Loss: {test_loss:.4f}')
torch.save(model.state_dict(), modelfile)

Test Loss: 632.5965
