# CNN SUBNET
This notebook shows how to use a CNN as a subnet in the encoder of a subspace system identification model. The CNN is used to process image inputs. The model is trained on a dataset of images and forces. The model is then tested on a test dataset to predict the forces. The model is evaluated using the NRMS metric.

### Importing the required libraries

In [1]:
!pip install git+https://github.com/GerbenBeintema/deepSI@master

Collecting git+https://github.com/GerbenBeintema/deepSI@master
  Cloning https://github.com/GerbenBeintema/deepSI (to revision master) to /tmp/pip-req-build-sfh3c_46
  Running command git clone --filter=blob:none --quiet https://github.com/GerbenBeintema/deepSI /tmp/pip-req-build-sfh3c_46
  Resolved https://github.com/GerbenBeintema/deepSI to commit 28c96c174fa2e1c83aeb26091d67785d468a4bee
  Preparing metadata (setup.py) ... [?25ldone
[0m

In [2]:
import deepSI
from deepSI import System_data, System_data_list
import numpy as np
from matplotlib import pyplot as plt
import torch
import cv2
from torch import optim, nn
from tqdm.auto import tqdm
import matplotlib
from torch.utils.data import Dataset, DataLoader
matplotlib.rcParams['mathtext.fontset'] = 'stix'
matplotlib.rcParams['font.family'] = 'STIXGeneral'
matplotlib.rcParams['pdf.fonttype'] = 42
matplotlib.rcParams['ps.fonttype'] = 42
linewidth = 3.320
reducesize = 0.95
fig_fontsize = (10*19/28)*reducesize
plt.rcParams.update({'font.size': fig_fontsize})
dpi = 200
pad = 0.4

# Download the FFmpeg binaries
The FFmpeg binaries are required to extract the frames from the video files. The following code downloads the FFmpeg binaries and extracts them to a specified directory.

In [3]:
archive_path = 'ffmpeg-master-latest-linux64-gpl.tar.xz'
output_dir = 'ffmpeg/'  # You can use an existing or new directory

# Extract the archive
!tar -xf {archive_path} -C {output_dir}

In [4]:
import os

# Example directory containing the FFmpeg binaries
ffmpeg_dir = 'ffmpeg/ffmpeg-master-latest-linux64-gpl/bin'

# Add the FFmpeg directory to the PATH environment variable
os.environ['PATH'] = ffmpeg_dir + os.pathsep + os.environ['PATH']

# Data Preparation
The data is prepared by extracting the frames from the video files. The frames are then stored in a NumPy array. The forces are stored in a separate NumPy array.

In [5]:
# Load the npz file
adi = 12
systemsdir = 'systems_force/'
filen = 'combined_output_20_3.npz'
data = np.load(filen, allow_pickle=True)

# Arrays 'frames' and 'forces' within your npz file
frames = data['frames']
forces = data['forces']

# Determine the size of each set
total_size = len(frames)
train_size = int(total_size * 0.6)  # 60% of the data for training
val_size = int(total_size * 0.20)  # 20% of the data for validation
test_size = total_size - train_size - val_size  # Remaining 20% for testing

# Function to resize and reshape frames with progress bar
def resize_and_reshape_frames(frames, batch_size, new_height, new_width):
    num_frames = frames.shape[0]
    resized_and_reshaped_frames = np.zeros((num_frames, frames.shape[3], new_height, new_width), dtype=np.uint8)
    for start in tqdm(range(0, num_frames, batch_size), desc="Resizing frames"):
        end = start + batch_size
        batch_frames = frames[start:end]
        for i in range(batch_frames.shape[0]):
            resized_frame = cv2.resize(batch_frames[i], (new_width, new_height), interpolation=cv2.INTER_LINEAR)
            resized_and_reshaped_frames[start + i] = resized_frame.transpose(2, 0, 1)
    return resized_and_reshaped_frames

# Resize and reshape parameters
new_height = frames.shape[1] // 2
new_width = frames.shape[2] // 2
batch_size = 30

# Resize and reshape all frames
frames_resized_reshaped = resize_and_reshape_frames(frames, batch_size, new_height, new_width)

Resizing frames:   0%|          | 0/400 [00:00<?, ?it/s]

# Split the data
The data is split into training, validation, and testing sets. The training set is used to train the model, the validation set is used to validate the model during training, and the testing set is used to evaluate the model after training.

In [6]:
# Split the data
del frames
frames_train = frames_resized_reshaped[:train_size]
frames_val = frames_resized_reshaped[train_size:train_size + val_size]
frames_test = frames_resized_reshaped[train_size + val_size:]
n_channels, height, width = frames_train.shape[1], frames_train.shape[2], frames_train.shape[3]


In [7]:
forces_train = forces[:train_size, 2]
forces_val = forces[train_size:train_size + val_size, 2]
forces_test = forces[train_size + val_size:, 2]
del forces
# Initialize the SS_encoder_CNN_video system
#sys_vbss = deepSI.fit_systems.SS_encoder_CNN_video(na=adi, nb=adi)
n_channels, height, width = frames_train.shape[1], frames_train.shape[2], frames_train.shape[3]
#sys_vbss.init_nets(nu=1, ny=(n_channels, height, width))

In [8]:
# Create System_data instances with resized and reshaped frames
system_data = System_data(y=forces_train, u=frames_train)
system_data_val = System_data(y=forces_val, u=frames_val)
system_data_test = System_data(y=forces_test, u=frames_test)
del forces_train, frames_train, forces_val, frames_val, forces_test, frames_test

# Define the CNN encoder
The CNN encoder is defined as a subclass of the `nn.Module` class in PyTorch. The CNN encoder takes the past input and output data as input and returns the initial state of the system.

In [1]:
from deepSI.utils.torch_nets import CNN_encoder


#Psi(upast, ypast) where upast is an image and ypast an vector
#nb, nu, na, ny, nx
class CNN_encoder_image_input(nn.Module):
    def __init__(self, nb, nu, na, ny, nx, n_nodes_per_layer=64, n_hidden_layers=2, activation=nn.Tanh, features_ups_factor=1.33):
        super(CNN_encoder_image_input, self).__init__()
        self.net = CNN_encoder(na, ny, nb, nu, nx, \
                               n_nodes_per_layer=n_nodes_per_layer, \
                               n_hidden_layers=n_hidden_layers, \
                                activation=activation, \
                                features_ups_factor=features_ups_factor)

    def forward(self, upast, ypast):
        return self.net(ypast, upast)

#nx, nu
class CNN_encoder_f_image(nn.Module):
    def __init__(self, nx, nu, n_nodes_per_layer=64, n_hidden_layers=2, activation=nn.Tanh, features_ups_factor=1.33):
        super(CNN_encoder_f_image, self).__init__()
        self.net = CNN_encoder(1, nx, 1, nu, nx, 
                               n_nodes_per_layer=n_nodes_per_layer, 
                               n_hidden_layers=n_hidden_layers, 
                               activation=activation, 
                               features_ups_factor=features_ups_factor)

    def forward(self, x, u):
        # Ensure x and u are tensors
        if isinstance(x, list) or isinstance(x, np.ndarray):
            x = torch.tensor(x, dtype=torch.float32)
        if isinstance(u, list) or isinstance(u, np.ndarray):
            u = torch.tensor(u, dtype=torch.float32)
        
        # Ensure the correct shape for x and u
        if len(x.shape) == 1:
            x = x.unsqueeze(0)
        if len(u.shape) == 1:
            u = u.unsqueeze(0)
        
        # x.shape = (Nbatch, nx)
        # u.shape = (Nbatch, C, H, W)
        return self.net(x[:, None], u[:, None])

    
#nx, ny, nu=-1
class CNN_encoder_h_image(nn.Module):
    def __init__(self, nx, ny, nu, n_nodes_per_layer=64, n_hidden_layers=2, activation=nn.Tanh, features_ups_factor=1.33):
        super(CNN_encoder_h_image, self).__init__()
        self.net = CNN_encoder(1, nx, 1, nu, ny, \
                               n_nodes_per_layer=n_nodes_per_layer, \
                               n_hidden_layers=n_hidden_layers, \
                                activation=activation, \
                                features_ups_factor=features_ups_factor)

    def forward(self, x, u):
        #x.shape = (Nbatch, nx)
        #u.shape = (Nbatch, C, H, W)

        #self.net 
        # x -> upast = (Nbatch, nb, nu)
        # u -> ypast = (Nbatch, na, C, H, W)
        return self.net(x[:,None], u[:,None])



NameError: name 'nn' is not defined

# Define the SS_encoder_CNN_video system
The `SS_encoder_CNN_video` system is defined as a subclass of the `SS_encoder_general` class in deepSI. The `SS_encoder_CNN_video` system uses the CNN encoder defined above to process image inputs.

In [10]:
# from deepSI.utils import CNN_chained_upscales, CNN_encoder
from deepSI.fit_systems import *

class SS_encoder_CNN_video_input(SS_encoder_general):
    """The subspace encoder convolutonal neural network with image inputs

    Notes
    -----
    The subspace encoder

    """
    def __init__(self, nx=10, na=20, nb=20, feedthrough=True, e_net=CNN_encoder_image_input, \
                 f_net=CNN_encoder_f_image, h_net=CNN_encoder_h_image, \
                                            e_net_kwargs={}, f_net_kwargs={}, h_net_kwargs={}):
        super(SS_encoder_CNN_video_input, self).__init__(nx=nx,na=na,nb=nb, feedthrough=feedthrough, \
            e_net=e_net,               f_net=f_net,                h_net=h_net, \
            e_net_kwargs=e_net_kwargs, f_net_kwargs=f_net_kwargs,  h_net_kwargs=h_net_kwargs)

model = SS_encoder_CNN_video_input(nx=5, na=12, nb=12)

# nu = (3,25,30)
# ny = 1
# N = 200
# u_seq = np.random.randn(N,*nu)
# y_seq = np.random.randn(N, ny)
#sys_data = System_data(u_seq, y_seq)

print(system_data)
model.init_model(nu=(3,135,240), ny=1)


System_data of length: 7200 nu=(3, 135, 240) ny=None normed=False dt=None


# Patching the methods
The `SS_encoder_general` class is patched to include the `apply_experiment` and `loss` methods. The `apply_experiment` method is used to apply the system to the data, while the `loss` method is used to calculate the loss during training. The patched methods are defined below.

In [11]:
from deepSI.fit_systems.fit_system import Tictoctimer
from deepSI.systems import *
import time
import itertools
from copy import deepcopy
import warnings
# Get the System_torch class from deepSI
System_torch = deepSI.fit_systems.fit_system.System_torch

In [12]:
def patched_apply_experiment(self, sys_data, save_state=False, init_state=True):
    if isinstance(sys_data, (tuple, list, System_data_list)):
        return System_data_list([self.apply_experiment(sd, save_state=save_state, init_state=init_state) for sd in sys_data])
    
    sys_data_norm = self.norm.transform(sys_data)  # Normalize the system data

    dt_old = self.dt
    if sys_data.dt is not None:
        self.dt = sys_data.dt  # Update the system's dt if available

    U, Y = sys_data_norm.u, []
    if not init_state:
        k0 = 0
    elif sys_data_norm.y is not None:
        k0 = self.init_state(sys_data_norm)
        Y.extend(sys_data_norm.y[:k0])
    else:
        k0, _ = 0, self.reset_state()

    if save_state:
        X = [self.get_state()] * k0

    for u in U[k0:]:
        if save_state:
            X.append(self.get_state())
        Y.append(self.measure_act(u))  # Measure and advance state

    if dt_old is not None:
        self.dt = dt_old

    # Ensure that U, Y, and X are properly shaped
    U = np.array(U, dtype=object)
    Y = np.array(Y, dtype=object)
    X = np.array(X, dtype=object) if save_state else None

    return self.norm.inverse_transform(System_data(u=U, y=Y, x=X, normed=True, cheat_n=k0, dt=sys_data.dt))

# Define the patched version of the loss method
def patched_loss(self, uhist, yhist, ufuture, yfuture, loss_nf_cutoff=None, **Loss_kwargs):
    x = self.encoder(uhist, yhist)  # Initialize Nbatch number of states
    errors = []
    for y, u in zip(torch.transpose(yfuture, 0, 1), torch.transpose(ufuture, 0, 1)):  # Iterate over time
        y_pred = self.hn(x, u) if self.feedthrough else self.hn(x)
        
        # Ensure that y and y_pred have the same shape
        if y.dim() != y_pred.dim():
            y_pred = y_pred.view(-1) if y.dim() == 1 else y_pred.view(-1, 1)
            y = y.view(-1) if y_pred.dim() == 1 else y.view(-1, 1)

        error = nn.functional.mse_loss(y, y_pred)
        errors.append(error)  # Calculate error after taking n-steps
        
        if loss_nf_cutoff is not None and error.item() > loss_nf_cutoff:
            print(len(errors), end=' ')
            break
        
        x = self.fn(x, u)  # Advance state

    return torch.mean(torch.stack(errors))

# Monkey patch the methods in the respective classes
SS_encoder_general.apply_experiment = patched_apply_experiment  # Replace with the actual class name
SS_encoder_general.loss = patched_loss  # Replace with the actual class name



In [13]:
def patched_RMS_System_data(self, real, multi_average=True):
    y, yhat = real.y[self.cheat_n:], self.y[self.cheat_n:]
    error = np.sqrt(np.mean((y - yhat) ** 2, axis=0))
    if multi_average:
        return np.mean(error)
    return error

def patched_RMS_System_data_list(self, real, multi_average=True):
    rms_values = [sd.RMS(sdo, multi_average=multi_average) for sd, sdo in zip(self.sdl, real.sdl)]
    weights = np.array([len(sd) for sd in self.sdl])
    return np.sum(weights * rms_values) / np.sum(weights)

# Patch the RMS method in System_data
System_data.RMS = patched_RMS_System_data

# Patch the RMS method in System_data_list
System_data_list.RMS = patched_RMS_System_data_list

In [14]:
def patched_measure_act_multi(self, actions):
    feedthrough = self.feedthrough if hasattr(self, 'feedthrough') else False
    outputs = []
    for action in actions:
        if not isinstance(action, torch.Tensor):
            action = torch.tensor(action)
        if not isinstance(self.state, torch.Tensor):
            self.state = torch.tensor(self.state)
        
        if feedthrough:
            y_predict = self.hn(self.state, action)
        else:
            y_predict = self.hn(self.state)
        
        outputs.append(y_predict.detach().numpy())
    return outputs

# Patch the measure_act_multi method in the System class
System.measure_act_multi = patched_measure_act_multi

# Training the model
The model is trained using the training data. The model is trained for a specified number of epochs. The loss is calculated using the mean squared error (MSE) loss function. The model is evaluated using the NRMS metric.

In [None]:
def get_base_results(load=True, timeout=10000,n_channels=n_channels, height=height, width=width):
    train, val, test = system_data, system_data_val, system_data_test
    print(train)
    if load:
        sys_vbss_s = deepSI.load_system(systemsdir+'sse-cnn-base-force-best')
        sys_vbss_s._dt = None
        sys_vbss_s.feedthrough=True
        sys_vbss_s.norm.u0 = np.mean(train.u, axis=(0, 2, 3))[:, None, None]
        sys_vbss_s.norm.ustd = np.std(train.u, axis=(0, 2, 3))[:, None, None]
        ##Normalize forces by computing mean and standard deviation over samples
        sys_vbss_s.norm.y0 = np.mean(train.y, axis=0)
        sys_vbss_s.norm.ystd = np.std(train.y, axis=0)
        print(sys_vbss_s.norm)
        sys_vbss_best = deepSI.load_system(systemsdir+'sse-cnn-base-force-last')
        sys_vbss_best._dt = None
        sys_vbss_t = sys_vbss_best.apply_experiment(test).NRMS(test)
    else:
        
        sys_vbss_s = SS_encoder_CNN_video_input(nx=8, na=12, nb=12)
        sys_vbss_s.init_model(nu=(3,135,240), ny=1)
        sys_vbss_s.feedthrough=True
        ##Normalize the frames by computing mean and standard deviation over samples, height, and width
        sys_vbss_s.norm.u0 = np.mean(train.u, axis=(0, 2, 3))[:, None, None]
        sys_vbss_s.norm.ustd = np.std(train.u, axis=(0, 2, 3))[:, None, None]
        ##Normalize forces by computing mean and standard deviation over samples
        sys_vbss_s.norm.y0 = np.mean(train.y, axis=0)
        sys_vbss_s.norm.ystd = np.std(train.y, axis=0)
        print(sys_vbss_s.norm)
        ## n_channels, height, width = frames_train.shape[1], frames_train.shape[2], frames_train.shape[3]
        sys_vbss_s.fit(system_data, val_sys_data=system_data_val, cuda=True, 
                       epochs=round(1500/(2*adi)), timeout=timeout, 
                       batch_size=32, 
                       validation_measure='sim-NRMS_sys_norm', 
                       loss_kwargs={'online_construct': True, 'nf':12},
                       auto_fit_norm=False,
                       #optimizer_kwargs={'lr':5e-4}
                      )
        sys_vbss_s.save_system('sse-cnn-base-force-best')
        sys_vbss_t = sys_vbss_s.apply_experiment(test).NRMS(test)
        sys_vbss_s.checkpoint_load_system('_last')
        sys_vbss_s.save_system('sse-cnn-base-force-last')
    return sys_vbss_s, sys_vbss_t

sys_vbss = get_base_results(load=False)

System_data of length: 7200 nu=(3, 135, 240) ny=None normed=False dt=None
System_data_norm: (u0=[[[179.06065963]]

 [[178.54673909]]

 [[174.84719291]]], ustd=[[[81.04010374]]

 [[81.95123325]]

 [[88.00169806]]], y0=-2.81599406957767, ystd=2.198542996611473)
Model already initilized (init_model_done=True), skipping initilizing of the model, the norm and the creation of the optimizer
N_training_samples = 7179, batch_size = 32, N_batch_updates_per_epoch = 224
Initial Validation sim-NRMS_sys_norm= 0.97310185
Starting indefinite training until 10000 seconds have passed due to provided timeout


0it [00:00, ?it/s]

########## New lowest validation loss achieved ########### sim-NRMS_sys_norm = 0.08216279000043869
Epoch    1, sqrt loss  0.3984, Val sim-NRMS_sys_norm 0.08216, Time Loss: 12.6%, data: 39.5%, val: 47.8%,  1.2 batches/sec
Epoch    2, sqrt loss 0.08218, Val sim-NRMS_sys_norm 0.08755, Time Loss: 11.7%, data: 41.6%, val: 46.7%,  1.2 batches/sec
########## New lowest validation loss achieved ########### sim-NRMS_sys_norm = 0.05564379692077637
Epoch    3, sqrt loss 0.06614, Val sim-NRMS_sys_norm 0.05564, Time Loss: 11.1%, data: 42.9%, val: 45.9%,  1.2 batches/sec
########## New lowest validation loss achieved ########### sim-NRMS_sys_norm = 0.04400921240448952
Epoch    4, sqrt loss 0.06055, Val sim-NRMS_sys_norm 0.04401, Time Loss: 10.8%, data: 42.1%, val: 47.0%,  1.2 batches/sec
Epoch    5, sqrt loss 0.05664, Val sim-NRMS_sys_norm 0.05439, Time Loss: 10.8%, data: 42.5%, val: 46.7%,  1.2 batches/sec
Epoch    6, sqrt loss 0.05145, Val sim-NRMS_sys_norm 0.05865, Time Loss: 10.7%, data: 42.1%, 

# Hyperparameter search $n_x$
The hyperparameter search is performed to find the optimal value of $n_x$. The model is trained for different values of $n_x$ and the NRMS metric is evaluated for each value. The results are plotted to determine the optimal value of $n_x$.

In [None]:
del sys_vbss
def get_repro_systems(nxlist, n_repeat=7, load=True, timeout=600):
    train, val, test = system_data, system_data_val, system_data_test
    
    if load:
        return [[deepSI.load_system(systemsdir+f'reprod-enc-{nx}-{i}-last2')  for i in range(n_repeat)] for nx in nxlist]
    
    for nx in nxlist:
        for i in range(n_repeat):
            sys_vbss_s = SS_encoder_CNN_video_input(nx=nx, na=12, nb=12)
            sys_vbss_s.init_model(nu=(3,135,240), ny=1)
            sys_vbss_s.feedthrough=True
            ##Normalize the frames by computing mean and standard deviation over samples, height, and width
            sys_vbss_s.norm.u0 = np.mean(train.u, axis=(0, 2, 3))[:, None, None]
            sys_vbss_s.norm.ustd = np.std(train.u, axis=(0, 2, 3))[:, None, None]
            ##Normalize forces by computing mean and standard deviation over samples
            sys_vbss_s.norm.y0 = np.mean(train.y, axis=0)
            sys_vbss_s.norm.ystd = np.std(train.y, axis=0)
            print(sys_vbss_s.norm)
            ## n_channels, height, width = frames_train.shape[1], frames_train.shape[2], frames_train.shape[3]
            sys_vbss_s.fit(system_data, val_sys_data=system_data_val, cuda=True, 
                           epochs=round(1500/(2*adi)), timeout=timeout, 
                           batch_size=64, 
                           validation_measure='sim-NRMS_sys_norm', 
                           loss_kwargs={'online_construct': True, 'nf':5},
                           auto_fit_norm=False,
                           #optimizer_kwargs={'lr':5e-4}
                          )        
            sys_vbss_s.save_system(f'reprod-enc-{nx}-{i}-best2')
            sys_vbss_s.checkpoint_load_system('_last')
            sys_vbss_s.save_system(f'reprod-enc-{nx}-{i}-last2')
            del sys_vbss_s
    return [[deepSI.load_system(systemsdir+f'reprod-enc-{nx}-{i}-last2')  for i in range(n_repeat)] for nx in nxlist]

In [None]:
outrepo = get_repro_systems([4,5,8,20],n_repeat=3, load=True, timeout=1600)
from matplotlib import cm

# Adjust the figure height dynamically, reduce the height factor to make figures less tall
plt.figure(figsize=(linewidth, 0.8 * len(outrepo)), dpi=dpi)  
cmap = cm.get_cmap('tab10')

for i, outi in enumerate(outrepo):
    col = cmap(i / max(1, len(outrepo) - 1))  # Safeguard division by zero
    ax = plt.subplot(len(outrepo), 1, i + 1)  # Adjust subplot index dynamically

    # Optionally hide x-axis labels except for the last subplot
    if i < len(outrepo) - 1:
        ax.xaxis.set_ticklabels([])

    for k, outij in enumerate(outi):
        plt.semilogy(outij.batch_id, outij.Loss_val, 'o', alpha=0.3, c=col, markersize=0.5,
                     label=f'$n_x={outij.nx}$' if k == 0 else None)
    plt.legend(loc='upper right')
    plt.xlim(-100, 1700)
    plt.ylim(0.01, 1)
    plt.grid()

    if i == len(outrepo) // 2:  # Center label for the middle plot if multiple
        plt.ylabel('Test error (NRMS)')

plt.xlabel('Optimization time (seconds)')
plt.tight_layout(pad=pad)
plt.savefig('figures_force/nx-repro.pdf')
plt.savefig('figures_force/nx-repro.jpg', dpi=600)
plt.show()
del outrepo


# Hyperparameter search $n_f$

In [None]:
def get_nf_systems(nflist,base_epochs=20,load=True,last=True):
    train, val, test = system_data, system_data_val, system_data_test
    
    if load:
        return [deepSI.load_system(systemsdir+f'enc-nf-{nf}-last') for nf in nflist]
    
    for nf in nflist:
        sys_vbss_s = SS_encoder_CNN_video_input(nx=8, na=12, nb=12)
        sys_vbss_s.init_model(nu=(3,135,240), ny=1)
        sys_vbss_s.feedthrough=True
        ##Normalize the frames by computing mean and standard deviation over samples, height, and width
        sys_vbss_s.norm.u0 = np.mean(train.u, axis=(0, 2, 3))[:, None, None]
        sys_vbss_s.norm.ustd = np.std(train.u, axis=(0, 2, 3))[:, None, None]
        ##Normalize forces by computing mean and standard deviation over samples
        sys_vbss_s.norm.y0 = np.mean(train.y, axis=0)
        sys_vbss_s.norm.ystd = np.std(train.y, axis=0)
        print(sys_vbss_s.norm)
        ## n_channels, height, width = frames_train.shape[1], frames_train.shape[2], frames_train.shape[3]
        sys_vbss_s.fit(system_data, val_sys_data=system_data_val, cuda=True, 
                       epochs=base_epochs,  
                       batch_size=64, 
                       validation_measure='sim-NRMS_sys_norm', 
                       loss_kwargs={'online_construct': True, 'nf':nf},
                       auto_fit_norm=False,
                       #optimizer_kwargs={'lr':5e-4}
                      )    
        sys_vbss_s.save_system(f'enc-nf-{nf}-best')
        sys_vbss_s.checkpoint_load_system('_last')
        sys_vbss_s.save_system(f'enc-nf-{nf}-last')
    if last:
        return [deepSI.load_system(systemsdir+f'enc-nf-{nf}-last') for nf in nflist]
    else:
        return [deepSI.load_system(systemsdir+f'enc-nf-{nf}-best') for nf in nflist]

In [None]:
from matplotlib import cm
nflist = [4,5,7,12,20]
outnf = get_nf_systems(nflist,load=True)
# torch.save(outnf,'outnf')


cmap = cm.get_cmap('viridis')

for z,k in enumerate(['time','batch_id']):
    plt.figure(figsize=(linewidth,1.5),dpi=dpi)
    for (i,outi),nf in zip(enumerate(outnf),nflist):
        x = i/(len(outnf)-1)
        time = []
        loss_val = []
        plt.semilogy(outi.__getattribute__(k),outi.Loss_val,c=cmap(x),label='T = '+str(nf))
    plt.ylabel('Test Error (NRMS)')
    if z==0:
        plt.xlabel('Optimization time (seconds)')
    else:
        plt.xlabel('Batch updates completed')
        plt.xlim(0,2500)
    plt.legend(loc='upper right')
    plt.grid()
    plt.tight_layout(pad=pad)
    plt.savefig(f'figures_force/nf-influ-{k}.pdf')
    plt.show()
del outnf

# Hyperparameter search $n_a$ and $n_b$

In [None]:
def get_na_nb_systems(nalist,load=True,timeout=1200):
    train, val, test = system_data, system_data_val, system_data_test
    
    if load:
        return [deepSI.load_system(systemsdir+f'enc-na-{na}-last') for na in nalist]
    
    for na in nalist:
        sys_vbss_s = SS_encoder_CNN_video_input(nx=8, na=na, nb=na)
        sys_vbss_s.init_model(nu=(3,135,240), ny=1)
        sys_vbss_s.feedthrough=True
        ##Normalize the frames by computing mean and standard deviation over samples, height, and width
        sys_vbss_s.norm.u0 = np.mean(train.u, axis=(0, 2, 3))[:, None, None]
        sys_vbss_s.norm.ustd = np.std(train.u, axis=(0, 2, 3))[:, None, None]
        ##Normalize forces by computing mean and standard deviation over samples
        sys_vbss_s.norm.y0 = np.mean(train.y, axis=0)
        sys_vbss_s.norm.ystd = np.std(train.y, axis=0)
        print(sys_vbss_s.norm)
        ## n_channels, height, width = frames_train.shape[1], frames_train.shape[2], frames_train.shape[3]
        sys_vbss_s.fit(system_data, val_sys_data=system_data_val, cuda=True, 
                       timeout=timeout, 
                       batch_size=64, 
                       validation_measure='sim-NRMS_sys_norm', 
                       loss_kwargs={'online_construct': True, 'nf':12},
                       auto_fit_norm=False,
                       #optimizer_kwargs={'lr':5e-4}
                      )   
        sys_vbss_s.save_system(f'enc-na-{na}-best')
        sys_vbss_s.checkpoint_load_system('_last')
        sys_vbss_s.save_system(f'enc-na-{na}-last')
    return [deepSI.load_system(systemsdir+f'enc-na-{na}-last') for na in nalist]

In [None]:
nalist = [6,8,10,12,15,18,20]
outna = get_na_nb_systems(nalist,load=True)
# torch.save(outna,'outna')

from matplotlib import cm
plt.figure(figsize=(linewidth,1.5),dpi=dpi)
cmap = cm.get_cmap('viridis')
# nalist = [1,2,3,4,5,10,20]


for (i,outi),na in zip(enumerate(outna),nalist):
    x = i/(len(outna)-1)
    time = []
    loss_val = []
    plt.semilogy(outi.time,outi.Loss_val,c=cmap(x),label=f'$n_a = n_b = {str(na)}$',linewidth=1)
plt.ylabel('Test error (NRMS)')
plt.xlabel('Optimization time (seconds)')
plt.legend(loc='upper right',ncol=2)
plt.grid()
plt.tight_layout(pad=pad)
plt.savefig(f'figures_force/nanb-fig.pdf')
plt.show()
del outna

# Model evaluation
The model is evaluated using the test data. The NRMS metric is calculated to evaluate the model's performance.

In [None]:
sys_vbss_s = deepSI.load_system(os.path.join(systemsdir, "sse-cnn-base-force-best"))
sys_vbss_s.feedthrough=True
##Normalize the frames by computing mean and standard deviation over samples, height, and width
sys_vbss_s.norm.u0 = np.mean(system_data.u, axis=(0, 2, 3))[:, None, None]
sys_vbss_s.norm.ustd = np.std(system_data.u, axis=(0, 2, 3))[:, None, None]
##Normalize forces by computing mean and standard deviation over samples
sys_vbss_s.norm.y0 = np.mean(system_data.y, axis=0)
sys_vbss_s.norm.ystd = np.std(system_data.y, axis=0)
tested_nrms = sys_vbss_s.apply_experiment(system_data_test)

In [None]:
system_data_test_y = system_data_test.y  # Replace with actual data
tested_nrms_y = tested_nrms.y  # Replace with actual data

# Plot the data
plt.plot(system_data_test_y, label='Measured')
plt.plot(tested_nrms_y, label='Simulated Force')
plt.plot([a - b for a, b in zip(system_data_test_y, tested_nrms_y)], label='Residual')
print(sys_vbss_s.apply_experiment(system_data_test).NRMS(system_data_test))
# Label axes
plt.ylabel('Force (N)')
plt.xlabel('Time (s)')

# Adjust x-ticks and labels
#x_ticks = range(0, 200, 20)
#x_tick_labels = [str(x/20) for x in x_ticks]
#plt.xticks(ticks=x_ticks, labels=x_tick_labels)

# Add legend
plt.legend()
plt.grid()
# Show and save the plot
plt.savefig("testplots.eps")
plt.show()
