In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, random_split
from torchvision import transforms
import numpy as np
from tqdm import tqdm, trange
import matplotlib.pyplot as plt
from matplotlib import cm
import scipy.io as scio
import os

# 0 Check GPU

In [None]:
print("Torch cuda is available: ", torch.cuda.is_available())

# 1 Data processing
Before running this notebook, the Matlab script in `./data/adataprocessing.m` should be executed first.

`./data/adataprocessing.m` will transform the simulation results in `./data/raw_data/` into `./data/raw_dataset.mat`, which will be used in this notebook.

## 1.1 Data parameters

In [None]:
input_size = 2
output_size = 1
batch_size = 64
sequence_length = 100
train_dataset_rate = 0.8
validation_dataset_rate = 0.1
model_name = "model_20240525_only_force_case3_hyperpara_search"

## 1.2 Load data

In [None]:
data=scio.loadmat('./data/raw_dataset.mat')

current_data = data['current_data']
force_data = data['force_data']
voltage_data = data['voltage_data']
print("raw current shape: ",current_data.shape)
print("raw voltage shape: ",voltage_data.shape)
print("raw force shape: ",force_data.shape)

## 1.3 Transform data

In [None]:
split_data_len = int(current_data.shape[1]/sequence_length)*sequence_length
sample_len = current_data.shape[1]
print("For one sample,",split_data_len,'/',sample_len, "time steps are used.")
current_tensor = torch.tensor(current_data[:,current_data.shape[1]-split_data_len : current_data.shape[1]].reshape(-1,))
voltage_tensor = torch.tensor(voltage_data[:,current_data.shape[1]-split_data_len : current_data.shape[1]].reshape(-1,))
force_tensor = torch.tensor(force_data[:,current_data.shape[1]-split_data_len : current_data.shape[1]].reshape(-1,))

print("current_tensor size: ",current_tensor.shape)
print("voltage_tensor size: ",voltage_tensor.shape)
print("force_tensor size: ",force_tensor.shape)

x_raw_data = torch.stack((current_tensor, voltage_tensor), dim=1).float().reshape(-1, sequence_length, input_size)
print("\nx_raw_data size: ", x_raw_data.shape)
y_raw_data = force_tensor.float().reshape(-1, sequence_length, output_size).mean(dim=1)
print("y_raw_data size: ", y_raw_data.shape)

## 1.4 Normalization

In [None]:
x_mean = x_raw_data.reshape(-1,input_size).mean(dim=0)
x_std = x_raw_data.reshape(-1,input_size).std(dim=0)
y_mean = y_raw_data.reshape(-1,output_size).mean(dim=0)
y_std = y_raw_data.reshape(-1,output_size).std(dim=0)

print("For raw data:")
print("\tmean of x: ",x_mean)
print("\tstd of x: ",x_std)
print("\tmean of y: ",y_mean)
print("\tstd of y: ",y_std)


x_norm_data = ((x_raw_data.reshape(-1,input_size)-x_mean)/x_std).reshape(-1, sequence_length, input_size)
y_norm_data = ((y_raw_data.reshape(-1,output_size)-y_mean)/y_std).reshape(-1, output_size)
print("\nx_norm_data size: ", x_norm_data.shape)
print("y_norm_data size: ", y_norm_data.shape)

print("For normalized data:")
print("\tmean of x: ",x_norm_data.reshape(-1,input_size).mean(dim=0))
print("\tstd of x: ",x_norm_data.reshape(-1,input_size).std(dim=0))
print("\tmean of y: ",y_norm_data.reshape(-1,output_size).mean(dim=0))
print("\tstd of y: ",y_norm_data.reshape(-1,output_size).std(dim=0))

norm_dataset = TensorDataset(x_norm_data, y_norm_data)

## 1.5 Split data

In [None]:
train_size = int(train_dataset_rate * x_raw_data.shape[0])
test_size = x_raw_data.shape[0] - train_size
train_valid_dataset, test_data = random_split(norm_dataset, [train_size, test_size])

val_size = int(validation_dataset_rate * len(train_valid_dataset))
train_size = len(train_valid_dataset) - val_size
train_data, val_data = random_split(train_valid_dataset, [train_size, val_size])

print("train_dataset size: ",len(train_data))
print("validation_dataset size: ",len(val_data))
print("test_dataset size: ",len(test_data))

## 1.6 Save data sets

In [None]:
dir = "./models/" + model_name + '/running_vars/'
if not os.path.exists(dir):
    os.makedirs(dir)
torch.save(train_data,dir + "train_data.pt")
torch.save(val_data,dir + "validation_data.pt")
torch.save(test_data,dir + "test_data.pt")

In [None]:
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_data,  shuffle=False)

In [None]:
# Check data shape
for inputs, labels in train_loader:
    print(inputs.shape)
    print(inputs.dtype)
    print(labels.shape)
    print(labels.dtype)
    break

# 2 LSTM

## 2.1 Define model

In [None]:
class LSTMEncoder(nn.Module):
    def __init__(self, input_size=2, hidden_size=50, num_layers=1):
        super(LSTMEncoder, self).__init__()
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
    
    def forward(self, x):
        outputs, (hidden, cell) = self.lstm(x)
        return hidden, cell

class LSTMDecoder(nn.Module):
    def __init__(self, input_size=2, hidden_size=50, num_layers=1, output_size=50):
        super(LSTMDecoder, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x, hidden, cell):
        output, (hidden, cell) = self.lstm(x, (hidden, cell))
        output = hidden[-1, :,:]
        output = self.fc(output)
        return output

class LSTMAutoencoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMAutoencoder, self).__init__()
        self.encoder = LSTMEncoder(input_size, hidden_size, num_layers)
        self.decoder = LSTMDecoder(input_size, hidden_size, num_layers, output_size)
    
    def forward(self, x):
        hidden, cell = self.encoder(x)
        decoded = self.decoder(x, hidden, cell)
        return decoded


## 2.2 Model parameters

### 2.2.1 Hyper-parameter searching

In [None]:
hidden_size_search = list(range(80,161,24))
num_layers_search = list(range(1,4,1))
learning_rate_search = np.linspace(0.001, 0.003, 2)
search_loss = np.zeros((len(hidden_size_search),
                        len(num_layers_search),
                        len(learning_rate_search)))
num_loops = len(hidden_size_search)*len(num_layers_search)*len(learning_rate_search)
with tqdm(total=num_loops) as pbar:
    for hi, hidden_size in enumerate(hidden_size_search):
        for ni, num_layers in enumerate(num_layers_search):
            for li, learning_rate in enumerate(learning_rate_search):
                model = LSTMAutoencoder(input_size, hidden_size, num_layers, output_size)
                loss_function = nn.MSELoss()
                if torch.cuda.is_available():
                    model = model.cuda()
                    loss_function = loss_function.cuda()

                optimizer = optim.Adam(model.parameters(), lr=learning_rate)

                num_epochs = 10
                for epoch in range(num_epochs):
                    model.train()
                    for inputs, targets in train_loader:
                        if torch.cuda.is_available():
                            inputs = inputs.cuda()
                            targets = targets.cuda()
                        optimizer.zero_grad()
                        output = model(inputs)
                        loss = loss_function(output, targets) 
                        loss.backward()
                        optimizer.step()
                model.eval()
                test_loss = 0.0
                with torch.no_grad(): 
                    for inputs, targets in test_loader:
                        if torch.cuda.is_available():
                            inputs = inputs.cuda()
                            targets = targets.cuda()
                        output = model(inputs)
                        loss = loss_function(output, targets) 
                        test_loss += loss.item() * inputs.size(0)
                search_loss[hi,ni,li] = test_loss / len(test_loader.dataset)
                pbar.update(1)

In [None]:
min_loss = np.min(np.nan_to_num(search_loss,nan=np.sum(np.nan_to_num(search_loss,nan=1))))
print(f"Minimum loss: {min_loss}")

min_index_flat = np.argmin(search_loss)
min_index_3d = np.unravel_index(min_index_flat, search_loss.shape)
print(f"Index of the minimum loss: {min_index_3d}")

print(f"Best parameters:\
      \n\thidden state = {hidden_size_search[min_index_3d[0]]}\
      \n\thidden layers = {num_layers_search[min_index_3d[1]]}\
      \n\tlearning rate = { learning_rate_search[min_index_3d[2]]}")

In [None]:
param1 = hidden_size_search
param2 = num_layers_search
results = search_loss[:,:,min_index_3d[2]].T

fig = plt.figure(figsize=(10, 6))
ax = fig.add_subplot(111, projection='3d')

x = np.arange(len(param1))
y = np.arange(len(param2))
x, y = np.meshgrid(x, y)

ax.plot_wireframe(x, y, results, color='b')
ax.scatter(min_index_3d[0], min_index_3d[1], min_loss, color='r', s=50, label=f'Min Value')


ax.set_xticks(np.arange(len(param1)))
ax.set_xticklabels(hidden_size_search)
ax.set_yticks(np.arange(len(param2)))
ax.set_yticklabels(num_layers_search)
ax.set_xlabel('Hidden States size')
ax.set_ylabel('Hidden Layers size')
ax.set_zlabel('Loss')
ax.set_title(f'Loss Surface for Different Hidden States and Layers \n(Learning Rate = {learning_rate_search[min_index_3d[2]]})')
# plt.subplots_adjust(left=0.0, right=1, top=0.9, bottom=0.1)
ax.legend()

dir = "./models/"+ model_name +"/results/"
if not os.path.exists(dir):
    os.makedirs(dir)
plt.savefig(dir+"hyper_parameter.png")
plt.show()

### 2.2.2 Choose best parameters

In [None]:
min_loss = np.min(search_loss)
min_index_flat = np.argmin(search_loss)
min_index_3d = np.unravel_index(min_index_flat, search_loss.shape)

hidden_size = hidden_size_search[min_index_3d[0]]
num_layers = num_layers_search[min_index_3d[1]]
learning_rate = learning_rate_search[min_index_3d[2]]

model = LSTMAutoencoder(input_size, hidden_size, num_layers, output_size)
loss_function = nn.MSELoss()
if torch.cuda.is_available():
    model = model.cuda()
    loss_function = loss_function.cuda()

optimizer = optim.Adam(model.parameters(), lr=learning_rate)
dir = "./models/"+ model_name +"/results/"
if not os.path.exists(dir):
    os.makedirs(dir)
torch.save(hidden_size,dir + "hidden_size.pt")
torch.save(num_layers,dir + "num_layers.pt")
torch.save(learning_rate,dir + "learning_rate.pt")


## 2.3 Training and validation

In [None]:
num_epochs = 50
train_loss_epoch = np.zeros(num_epochs)
val_loss_epoch = np.zeros(num_epochs)
dir = "./models/" + model_name + '/epochs/'
if not os.path.exists(dir):
    os.makedirs(dir)
for epoch in trange(num_epochs):

    # Training phase
    model.train()
    train_loss = 0
    for inputs, targets in train_loader:
        if torch.cuda.is_available():
            inputs = inputs.cuda()
            targets = targets.cuda()
        optimizer.zero_grad()
        # print("input shape:", inputs.shape)
        # print("target shape:", targets.shape)
        output = model(inputs)
        # print("output shape:", output.shape)
        loss = loss_function(output, targets) 
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * inputs.size(0)
    train_loss_epoch[epoch] = train_loss/len(train_loader)
    
    
    # Validation phase
    model.eval()
    val_loss = 0.0
    with torch.no_grad(): 
        for inputs, targets in val_loader:
            if torch.cuda.is_available():
                inputs = inputs.cuda()
                targets = targets.cuda()
            output = model(inputs)
            loss = loss_function(output, targets) 
            val_loss += loss.item() * inputs.size(0)
        val_loss_epoch[epoch] = val_loss/len(val_loader)
    
    torch.save(model.state_dict(), dir + "model_"+str(epoch+1)+".pth")
    
plt.plot(train_loss_epoch)
plt.plot(val_loss_epoch)
plt.legend(["training loss","validation loss"])
plt.xlabel("Epoch")

plt.ylabel("Loss")
dir = "./models/"+ model_name +"/results/"
if not os.path.exists(dir):
    os.makedirs(dir)
plt.savefig(dir+"loss.png")
plt.show()

In [None]:
dir = "./models/"+ model_name +"/results/"
if not os.path.exists(dir):
    os.makedirs(dir)
torch.save(train_loss_epoch,dir + "training_loss.pt")
torch.save(val_loss_epoch,dir + "validation_loss.pt")

## 2.4 Test

In [None]:
# Load model
selected_model = num_epochs #the last model
dir = "./models/" + model_name + '/epochs/' + "model_"+str(num_epochs) +".pth"
model = LSTMAutoencoder(input_size, hidden_size, num_layers, output_size)
if torch.cuda.is_available():
    model = model.cuda()
model.load_state_dict(torch.load(dir))

In [None]:
# Test phase
model.eval()
test_loss = 0.0
test_output = torch.tensor([])
test_target = torch.tensor([])
if torch.cuda.is_available():
    test_output = test_output.cuda()
    test_target = test_target.cuda()
with torch.no_grad(): 
    for inputs, targets in test_loader:
        if torch.cuda.is_available():
            inputs = inputs.cuda()
            targets = targets.cuda()
        output = model(inputs)
        test_output = torch.cat((test_output, output))
        test_target = torch.cat((test_target, targets))
        loss = loss_function(output, targets) 
        test_loss += loss.item() * inputs.size(0)
if torch.cuda.is_available():
    test_output = test_output.cpu()
    test_target = test_target.cpu()
mse = test_loss / len(test_loader.dataset)
rmse = mse ** 0.5
print("MSE = ",mse)
print("RMSE = ",rmse)

mse_tensor = torch.tensor([mse,rmse])
dir = "./models/"+ model_name +"/results/"
if not os.path.exists(dir):
    os.makedirs(dir)
torch.save(mse_tensor, dir + "mse_rmse.pt")

In [None]:
plot_num = 200
plt.plot(test_target[0:plot_num,:])
plt.plot(test_output[0:plot_num,:])
plt.legend(["True external force","Estimated external force"],loc="upper right")
dir = "./models/"+ model_name +"/results/"
if not os.path.exists(dir):
    os.makedirs(dir)
plt.savefig(dir+"test.png")
plt.show()

In [None]:
dir = "./models/"+ model_name +"/results/"
if not os.path.exists(dir):
    os.makedirs(dir)
torch.save(test_target,dir + "test_target.pt")
torch.save(test_output,dir + "test_output.pt")