<span style="color:red; font-family:Helvetica Neue, Helvetica, Arial, sans-serif; font-size:2em;">An Exception was encountered at '<a href="#papermill-error-cell">In [4]</a>'.</span>

# Verify the Conv-VAE sensorprocessing

Load a pre-trained model specified by an experiment/run trained by Train-Conv-VAE

This notebook runs a number of visualizations that allow to illustrate the performance of the trained encoding. The verification here primarily happens through visual observation. 

In [1]:
import sys
sys.path.append("..")
# adding the Julian-8897-Conv-VAE-PyTorch into the path

from exp_run_config import Config
Config.PROJECTNAME = "BerryPicker"

sys.path.append(Config()["conv_vae"]["code_dir"])
# from encoding_conv_vae.conv_vae import latest_json_and_model

from sensorprocessing import sp_conv_vae
from sensorprocessing import sp_helper
from sensorprocessing.sp_helper import get_transform_to_sp, load_picturefile_to_tensor

from demonstration.demonstration import Demonstration

import matplotlib.pyplot as plt
import pathlib
import random

import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# At some point in the development, this hack was necessary for some reason. 
# It seems that as of Feb 2025, the code runs on Windows and Linux without it.
#temp = pathlib.PosixPath
#pathlib.PosixPath = pathlib.WindowsPath

***ExpRun**: Loading pointer config file:
	/home/sa641631/.config/BerryPicker/mainsettings.yaml


***ExpRun**: Loading machine-specific config file:
	~/WORK/BerryPicker/cfg/settings.yaml


/apps/anaconda/anaconda-2023.09/lib/python3.11/pathlib.py


Using device: cuda


### Exp-run initialization
Create the exp/run-s that describe the parameters of the training. 
Some of the code here is structured in such a way as to make the notebook automatizable with papermill.

In [2]:
# *** Initialize the variables with default values 
# *** This cell should be tagged as parameters     
# *** If papermill is used, some of the values will be overwritten

experiment = "sensorprocessing_conv_vae"
# run = "sp_vae_256" 
run = "sp_vae_128" 

# If it is set to true, the exprun will be recreated from scratch
creation_style = "exist-ok"
# If not None, set an external experiment path
external_path = None

In [3]:
# Parameters
epochs = 10
creation_style = "exist-ok"
run = "sp_vae_128"
external_path = "/lustre/fs1/home/sa641631/WORK/BerryPicker/src/BerryPicker/src/~/WORK/BerryPicker/data_external/_automate"


<span id="papermill-error-cell" style="color:red; font-family:Helvetica Neue, Helvetica, Arial, sans-serif; font-size:2em;">Execution using papermill encountered an exception here and stopped:</span>

In [4]:
# create the necessary exp/run objects

if external_path:
    external_path = pathlib.Path(external_path)
    assert external_path.exists()
    Config().set_experiment_path(external_path)

# The experiment/run we are going to run: the specified model will be created
exp = Config().get_experiment(experiment, run, creation_style=creation_style)
print(exp)

AssertionError: 

## Verify the Conv-VAE by visual reconstruction
We can verify a Conv-VAE model visually based on its ability to recover the input image from the encoding. The intuition here would be that information that is lost during the recovery is not present in the encoding, and thus it won't be usable by the algorithms using this encoding either.

In [None]:
sp = sp_conv_vae.ConvVaeSensorProcessing(exp, device)
transform = get_transform_to_sp(exp)
demos = []
cameras = []
# load the demonstrations specified in the experiment validation data
for val in exp["validation_data"]:
    run, demo_name, camera = val
    exp_demo = Config().get_experiment("demonstration", run)
    demo = Demonstration(exp_demo, demo_name)
    demos.append(demo)
    cameras.append(camera)

# Choose n pictures from the validation set and store them in lists of images and imagefiles
n = 6
demo = demos[0]
camera = cameras[0]
images_processable = []
images_display = []
#imagefiles = []
for i in range(demo.metadata["maxsteps"]):
    rnd = random.randint(0, demo.metadata["maxsteps"] - 1)
    #imagefiles.append(demo.get_image_path(rnd))
    image_processable, image_display = demo.get_image(rnd, device=device, camera=camera, transform=transform)
    images_processable.append(image_processable)
    images_display.append(image_display)


In [None]:
# Verify whether the sp can process the images from the file
# This is turned off, because most demonstrations will be in video 
# so no independent picture files
transform = get_transform_to_sp(exp)
for image in images_processable:
    # sensor_readings, _ = load_picturefile_to_tensor(imagefile, transform, device)
    output = sp.process(image)
    print(output)

In [None]:
# Verify whether we can process the images loaded from the demonstration
for image in images_processable:
    z = sp.process(image)
    print(f"The encoding is\n {z}")

# Visualize the VAE reconstruction

In [None]:
def visualize_VAE(sp, exp, device, image, axoriginal, axreconstr):
    """Helper function to show the original and the reconstruction in fields of a picture."""
    # transform = sp_helper.get_transform_to_sp(exp)
    #input, image = sp_helper.load_picturefile_to_tensor(picture_name, transform, device=device)
    # Running the input on the output
    output, mu, logvar = sp.model(image)
    # Output: the visual reconstruction
    original = image[0].cpu().permute(1, 2, 0).detach().numpy()
    output_for_pic = output[0].cpu().permute(1, 2, 0).detach().numpy()
    # Showing the input and the reconstruction    
    #axoriginal.imshow(image.to("cpu").numpy()[0][0])
    axoriginal.imshow(original)
    axoriginal.set_title("Original")
    axreconstr.imshow(output_for_pic)
    axreconstr.set_title("Reconstruct")
    return output, mu, logvar

In [None]:
# This cell visualizes the original and reconstructed pictures by going inside 
# the sensorprocessing object and accessing the model
fig, axs = plt.subplots(2, n, figsize=(10, 5))
for i in range(n):
    output, mu, logvar = visualize_VAE(sp, exp, device, images_processable[i], axs[0,i], axs[1,i])
    print(f"Pictures{i}\nmu={mu}\nlogvar={logvar}")

fig_reconstructed_path = pathlib.Path(exp.data_dir(), "orig_reconstr.pdf")
fig.savefig(fig_reconstructed_path, bbox_inches='tight')


## Reconstruction from noisy latent encoding


In [None]:
fig, axs = plt.subplots(3, n, figsize=(10, 5))
for i in range(n):
    output, mu, logvar = visualize_VAE(sp, exp, device, images_processable[i], axs[0,i], axs[1,i])
    # print(f"Pictures{i}\nmu={mu}\nlogvar={logvar}")
    # this samples a new z with its logvar
    z2 = sp.model.reparameterize(mu, logvar)
    # adding some noise to the encoding (FIXME: add random noise)
    for j in range(exp["latent_size"]):
        z2[0][j] = z2[0][j] + 0.001
    #output2 = sp.model.decode(z2)
    output2 = sp.model.decode(mu)
    output_for_pic2 = output2[0].cpu().permute(1, 2, 0).detach().numpy()
    axs[2,i].imshow(output_for_pic2)
    axs[2,i].set_title("Noised")

fig_reconstruction_from_noisy_path = pathlib.Path(exp.data_dir(), "reconstr_from_noisy.pdf")
fig.savefig(fig_reconstruction_from_noisy_path, bbox_inches='tight')


# Generating random samples from the model

In [None]:

# this can be actually done by just calling sp.model.sample!
samples = sp.model.sample(num_samples = 25, current_device=device)
fig, axs = plt.subplots(5, 5, figsize=(10, 10))
i = 0
for x in range(0, 5):
    for y in range(0, 5):
        output_for_pic = samples[i].cpu().permute(1, 2, 0).detach().numpy()
        axs[x][y].imshow(output_for_pic)
        i += 1

fig_random_samples = pathlib.Path(exp.data_dir(), "random_samples.pdf")
fig.savefig(fig_random_samples, bbox_inches='tight')
