In [None]:
# importing libraries

import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt 

from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.preprocessing import MinMaxScaler

import torch 
from torch import nn 
from torch.utils.data import TensorDataset, DataLoader

from tqdm import tqdm
from timeit import default_timer as timer

In [None]:
# Define the batch size and model save paths and epochs

BATCH_SIZE = 128
PATH = 'Models/Final/02_Sequential_10M/'
epochs = 10

In [None]:
# Loading the dataset

data = pd.read_csv('ik_dataset_4.csv')
data.head()

In [None]:
# Normalising the data 

scaler = MinMaxScaler()
data = pd.DataFrame(scaler.fit_transform(data))
data.columns = ['joint_0','joint_1','joint_2','joint_3','joint_4','joint_5','x','y','z']
data.head()

In [None]:
# Train test split

data_train, data_test = train_test_split(data, train_size=0.8, random_state=42)
data_train.shape, data_test.shape

In [None]:
# Device agnostic code

if torch.cuda.is_available():
    device = 'cuda'
else:
    device = 'cpu'
device

In [None]:
# Converting the data to tensors
# We convert each column into seperate tensors and vertically stack them at the end so that more space is not used for each of the model

# Train
x_train = torch.Tensor(data_train['x'].to_numpy()).to(device)
y_train = torch.Tensor(data_train['y'].to_numpy()).to(device)
z_train = torch.Tensor(data_train['z'].to_numpy()).to(device)
joint_0_train = torch.Tensor(data_train['joint_0'].to_numpy()).to(device)
joint_1_train = torch.Tensor(data_train['joint_1'].to_numpy()).to(device)
joint_2_train = torch.Tensor(data_train['joint_2'].to_numpy()).to(device)
joint_3_train = torch.Tensor(data_train['joint_3'].to_numpy()).to(device)
joint_4_train = torch.Tensor(data_train['joint_4'].to_numpy()).to(device)
joint_5_train = torch.Tensor(data_train['joint_5'].to_numpy()).to(device)

# Test
x_test = torch.Tensor(data_test['x'].to_numpy()).to(device)
y_test = torch.Tensor(data_test['y'].to_numpy()).to(device)
z_test = torch.Tensor(data_test['z'].to_numpy()).to(device)
joint_0_test = torch.Tensor(data_test['joint_0'].to_numpy()).to(device)
joint_1_test = torch.Tensor(data_test['joint_1'].to_numpy()).to(device)
joint_2_test = torch.Tensor(data_test['joint_2'].to_numpy()).to(device)
joint_3_test = torch.Tensor(data_test['joint_3'].to_numpy()).to(device)
joint_4_test = torch.Tensor(data_test['joint_4'].to_numpy()).to(device)
joint_5_test = torch.Tensor(data_test['joint_5'].to_numpy()).to(device)

x_train.shape, x_test.shape

In [None]:

# Defining the model 

class InverseKinematicsABBIRB140(nn.Module):
    
    """ This is a model structure specifically designed to calculate the inverse kinematics of the industrial robot ABB IRB 140, but can be used for any robot
        This has a layer structure of input -> 64 -> 128 -> 64 -> output with non-linear activation and normalisation layers
    Args:
        input_features (int): defines the number of features given as input to the model
        output_features (int): defines the number of features the model outputs
    """
    
    def __init__(self, input_features, output_features):
        super().__init__()
        self.layer_stack_1 = nn.Sequential(
            nn.Linear(in_features=input_features, out_features=128),
            nn.ReLU(),
            nn.BatchNorm1d(num_features=128),
            nn.Dropout()
        )
        self.layer_stack_2 = nn.Sequential(
            nn.Linear(in_features=128, out_features=128),
            nn.ReLU(),
            nn.BatchNorm1d(num_features=128),
            nn.Dropout()
        )
        self.layer_stack_3 = nn.Sequential(
            nn.Linear(in_features=128, out_features=128),
            nn.ReLU(),
            nn.BatchNorm1d(num_features=128),
            nn.Dropout()
        )
        self.layer_stack_4 = nn.Sequential(
            nn.Linear(in_features=128, out_features=128),
            nn.ReLU(),
            nn.BatchNorm1d(num_features=128),
            nn.Dropout(),
            nn.Linear(in_features=128, out_features=output_features)
        )
    
    def forward(self, x):
        x = self.layer_stack_1(x)
        x = self.layer_stack_2(x)
        x = self.layer_stack_3(x)
        x = self.layer_stack_4(x)
        return x   
    

In [None]:
# Define the training step
def train_step(model: torch.nn.Module,
               batch: tuple,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               device: torch.device = device):
    
    xtrain, ytrain = batch
    xtrain, ytrain = xtrain.to(device), ytrain.to(device)
    
    model.train()
    ypreds = model(xtrain)
    loss = loss_fn(ypreds, ytrain)
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    return loss.item()

In [None]:
# Function to print the training time

def print_train_time(start:float, end:float, device:torch.device=None):
    total_time = end-start
    print(f"Train time on device {device} : {total_time:.4f} seconds")
    return total_time

In [None]:
# Define the testing step

def test_step(model: torch.nn.Module,
              batch: tuple,
              loss_fn: torch.nn.Module,
              device: torch.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')):
    
    xtest, ytest = batch
    xtest, ytest = xtest.to(device), ytest.to(device)
    
    model.eval()
    with torch.inference_mode():
        ypreds = model(xtest)
        loss = loss_fn(ypreds, ytest)
    
    return loss.item()

In [None]:
# Setting up model 0

NAME_0 = 'model_0.pth'
MODEL_SAVE_PATH_0 = PATH + NAME_0

train_dataset = TensorDataset(torch.stack((x_train, y_train ,z_train), dim=1), joint_0_train.unsqueeze(dim=1))
test_dataset = TensorDataset(torch.stack((x_test, y_test ,z_test), dim=1), joint_0_test.unsqueeze(dim=1))

train_dataloader = DataLoader(dataset = train_dataset, batch_size = BATCH_SIZE, shuffle = True)
test_dataloader = DataLoader(dataset = test_dataset, batch_size = BATCH_SIZE, shuffle = False)

torch.manual_seed(42)
model_0 = InverseKinematicsABBIRB140(input_features=3, output_features=1).to(device)

loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(params=model_0.parameters(), lr=0.001)

In [None]:
# Training model 0

torch.manual_seed(42)
train_time_start_model_0 = timer()

for epoch in range(epochs):
    with tqdm(enumerate(train_dataloader), total=len(train_dataloader)) as t:

        train_loss = 0
        # Training loop
        for i, batch in t:
            batch_loss = train_step(model=model_0, batch=batch, loss_fn=loss_fn, optimizer=optimizer)
            train_loss += batch_loss
            t.set_description(f'Epoch [{epoch+1}/{epochs}] (Training)')
            t.set_postfix({
                'Train Batch loss': batch_loss,
                'Train loss': train_loss/(i+1)
            })
                
    with tqdm(enumerate(test_dataloader), total=len(test_dataloader)) as t:
        test_loss = 0
        # Testing loop
        for i, batch in t:
            batch_loss = test_step(model=model_0, batch=batch, loss_fn=loss_fn)
            test_loss += batch_loss
            t.set_description(f'Epoch [{epoch+1}/{epochs}] (Testing) ')
            t.set_postfix({
                'Test batch loss': batch_loss,
                'Test loss': test_loss /(i+1)
            })
    
    print('---------------------------------------------------------------------------------------------------------------------------------------------------------')

train_time_end_model_0 = timer()
train_time_model_0 = print_train_time(start=train_time_start_model_0, end=train_time_end_model_0, device=device)

In [None]:
# Saving model_0 and its training time, loss values

torch.save(obj=model_0.state_dict(), f=MODEL_SAVE_PATH_0)
model_0_dict = {'Model name' : 'Model 0', 'Training time' : train_time_model_0, 'Train loss' : train_loss/len(train_dataloader), 'Test loss' : test_loss/len(test_dataloader)}

In [None]:
# Setting up model 1

NAME_1 = 'model_1.pth'
MODEL_SAVE_PATH_1 = PATH + NAME_1

train_dataset = TensorDataset(torch.stack((x_train, y_train ,z_train, joint_0_train), dim=1), joint_1_train.unsqueeze(dim=1))
test_dataset = TensorDataset(torch.stack((x_test, y_test ,z_test, joint_0_test), dim=1), joint_1_test.unsqueeze(dim=1))

train_dataloader = DataLoader(dataset = train_dataset, batch_size = BATCH_SIZE, shuffle = True)
test_dataloader = DataLoader(dataset = test_dataset, batch_size = BATCH_SIZE, shuffle = False)

torch.manual_seed(42)
model_1 = InverseKinematicsABBIRB140(input_features=4, output_features=1).to(device)

loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(params=model_1.parameters(), lr=0.001)

In [None]:
# Training model 1

torch.manual_seed(42)
train_time_start_model_1 = timer()

for epoch in range(epochs):
    with tqdm(enumerate(train_dataloader), total=len(train_dataloader)) as t:

        train_loss = 0
        # Training loop
        for i, batch in t:
            batch_loss = train_step(model=model_1, batch=batch, loss_fn=loss_fn, optimizer=optimizer)
            train_loss += batch_loss
            t.set_description(f'Epoch [{epoch+1}/{epochs}] (Training)')
            t.set_postfix({
                'Train Batch loss': batch_loss,
                'Train loss': train_loss/(i+1)
            })
                
    with tqdm(enumerate(test_dataloader), total=len(test_dataloader)) as t:
        test_loss = 0
        # Testing loop
        for i, batch in t:
            batch_loss = test_step(model=model_1, batch=batch, loss_fn=loss_fn)
            test_loss += batch_loss
            t.set_description(f'Epoch [{epoch+1}/{epochs}] (Testing) ')
            t.set_postfix({
                'Test batch loss': batch_loss,
                'Test loss': test_loss /(i+1)
            })
    
    print('---------------------------------------------------------------------------------------------------------------------------------------------------------')

train_time_end_model_1 = timer()
train_time_model_1 = print_train_time(start=train_time_start_model_1, end=train_time_end_model_1, device=device)

In [None]:
# Saving model_1 and its training time, loss values

torch.save(obj=model_1.state_dict(), f=MODEL_SAVE_PATH_1)
model_1_dict = {'Model name' : 'Model 1', 'Training time' : train_time_model_1, 'Train loss' : train_loss/len(train_dataloader), 'Test loss' : test_loss/len(test_dataloader)}

In [None]:
# Setting up model 2

NAME_2 = 'model_2.pth'
MODEL_SAVE_PATH_2 = PATH + NAME_2

train_dataset = TensorDataset(torch.stack((x_train, y_train ,z_train, joint_0_train, joint_1_train), dim=1), joint_2_train.unsqueeze(dim=1))
test_dataset = TensorDataset(torch.stack((x_test, y_test ,z_test, joint_0_test, joint_1_test), dim=1), joint_2_test.unsqueeze(dim=1))

train_dataloader = DataLoader(dataset = train_dataset, batch_size = BATCH_SIZE, shuffle = True)
test_dataloader = DataLoader(dataset = test_dataset, batch_size = BATCH_SIZE, shuffle = False)

torch.manual_seed(42)
model_2 = InverseKinematicsABBIRB140(input_features=5, output_features=1).to(device)

loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(params=model_2.parameters(), lr=0.001)

In [None]:
# Training model 2

torch.manual_seed(42)
train_time_start_model_2 = timer()

for epoch in range(epochs):
    with tqdm(enumerate(train_dataloader), total=len(train_dataloader)) as t:

        train_loss = 0
        # Training loop
        for i, batch in t:
            batch_loss = train_step(model=model_2, batch=batch, loss_fn=loss_fn, optimizer=optimizer)
            train_loss += batch_loss
            t.set_description(f'Epoch [{epoch+1}/{epochs}] (Training)')
            t.set_postfix({
                'Train Batch loss': batch_loss,
                'Train loss': train_loss/(i+1)
            })
                
    with tqdm(enumerate(test_dataloader), total=len(test_dataloader)) as t:
        test_loss = 0
        # Testing loop
        for i, batch in t:
            batch_loss = test_step(model=model_2, batch=batch, loss_fn=loss_fn)
            test_loss += batch_loss
            t.set_description(f'Epoch [{epoch+1}/{epochs}] (Testing) ')
            t.set_postfix({
                'Test batch loss': batch_loss,
                'Test loss': test_loss /(i+1)
            })
    
    print('---------------------------------------------------------------------------------------------------------------------------------------------------------')

train_time_end_model_2 = timer()
train_time_model_2 = print_train_time(start=train_time_start_model_2, end=train_time_end_model_2, device=device)

In [None]:
# Saving model_2 and its training time, loss values

torch.save(obj=model_2.state_dict(), f=MODEL_SAVE_PATH_2)
model_2_dict = {'Model name' : 'Model 2', 'Training time' : train_time_model_2, 'Train loss' : train_loss/len(train_dataloader), 'Test loss' : test_loss/len(test_dataloader)}

In [None]:
# Setting up model 3

NAME_3 = 'model_3.pth'
MODEL_SAVE_PATH_3 = PATH + NAME_3

train_dataset = TensorDataset(torch.stack((x_train, y_train ,z_train, joint_0_train, joint_1_train, joint_2_train), dim=1), joint_3_train.unsqueeze(dim=1))
test_dataset = TensorDataset(torch.stack((x_test, y_test ,z_test, joint_0_test, joint_1_test, joint_2_test), dim=1), joint_3_test.unsqueeze(dim=1))

train_dataloader = DataLoader(dataset = train_dataset, batch_size = BATCH_SIZE, shuffle = True)
test_dataloader = DataLoader(dataset = test_dataset, batch_size = BATCH_SIZE, shuffle = False)

torch.manual_seed(42)
model_3 = InverseKinematicsABBIRB140(input_features=6, output_features=1).to(device)

loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(params=model_3.parameters(), lr=0.001)

In [None]:
# Training model 3

torch.manual_seed(42)
train_time_start_model_3 = timer()


for epoch in range(epochs):
    with tqdm(enumerate(train_dataloader), total=len(train_dataloader)) as t:

        train_loss = 0
        # Training loop
        for i, batch in t:
            batch_loss = train_step(model=model_3, batch=batch, loss_fn=loss_fn, optimizer=optimizer)
            train_loss += batch_loss
            t.set_description(f'Epoch [{epoch+1}/{epochs}] (Training)')
            t.set_postfix({
                'Train Batch loss': batch_loss,
                'Train loss': train_loss/(i+1)
            })
                
    with tqdm(enumerate(test_dataloader), total=len(test_dataloader)) as t:
        test_loss = 0
        # Testing loop
        for i, batch in t:
            batch_loss = test_step(model=model_3, batch=batch, loss_fn=loss_fn)
            test_loss += batch_loss
            t.set_description(f'Epoch [{epoch+1}/{epochs}] (Testing) ')
            t.set_postfix({
                'Test batch loss': batch_loss,
                'Test loss': test_loss /(i+1)
            })
    
    print('---------------------------------------------------------------------------------------------------------------------------------------------------------')

train_time_end_model_3 = timer()
train_time_model_3 = print_train_time(start=train_time_start_model_3, end=train_time_end_model_3, device=device)

In [None]:
# Saving model_3 and its training time, loss values

torch.save(obj=model_3.state_dict(), f=MODEL_SAVE_PATH_3)
model_3_dict = {'Model name' : 'Model 3', 'Training time' : train_time_model_3, 'Train loss' : train_loss/len(train_dataloader), 'Test loss' : test_loss/len(test_dataloader)}

In [None]:
# Setting up model 4

NAME_4 = 'model_4.pth'
MODEL_SAVE_PATH_4 = PATH + NAME_4

train_dataset = TensorDataset(torch.stack((x_train, y_train ,z_train, joint_0_train, joint_1_train, joint_2_train, joint_3_train), dim=1), joint_4_train.unsqueeze(dim=1))
test_dataset = TensorDataset(torch.stack((x_test, y_test ,z_test, joint_0_test, joint_1_test, joint_2_test, joint_3_test), dim=1), joint_4_test.unsqueeze(dim=1))

train_dataloader = DataLoader(dataset = train_dataset, batch_size = BATCH_SIZE, shuffle = True)
test_dataloader = DataLoader(dataset = test_dataset, batch_size = BATCH_SIZE, shuffle = False)

torch.manual_seed(42)
model_4 = InverseKinematicsABBIRB140(input_features=7, output_features=1).to(device)

loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(params=model_4.parameters(), lr=0.001)

In [None]:
# Training model 4

torch.manual_seed(42)
train_time_start_model_4 = timer()

for epoch in range(epochs):
    with tqdm(enumerate(train_dataloader), total=len(train_dataloader)) as t:

        train_loss = 0
        # Training loop
        for i, batch in t:
            batch_loss = train_step(model=model_4, batch=batch, loss_fn=loss_fn, optimizer=optimizer)
            train_loss += batch_loss
            t.set_description(f'Epoch [{epoch+1}/{epochs}] (Training)')
            t.set_postfix({
                'Train Batch loss': batch_loss,
                'Train loss': train_loss/(i+1)
            })
                
    with tqdm(enumerate(test_dataloader), total=len(test_dataloader)) as t:
        test_loss = 0
        # Testing loop
        for i, batch in t:
            batch_loss = test_step(model=model_4, batch=batch, loss_fn=loss_fn)
            test_loss += batch_loss
            t.set_description(f'Epoch [{epoch+1}/{epochs}] (Testing) ')
            t.set_postfix({
                'Test batch loss': batch_loss,
                'Test loss': test_loss /(i+1)
            })
    
    print('---------------------------------------------------------------------------------------------------------------------------------------------------------')

train_time_end_model_4 = timer()
train_time_model_4 = print_train_time(start=train_time_start_model_4, end=train_time_end_model_4, device=device)

In [None]:
# Saving model_4 and its training time, loss values

torch.save(obj=model_4.state_dict(), f=MODEL_SAVE_PATH_4)
model_4_dict = {'Model name' : 'Model 4', 'Training time' : train_time_model_4, 'Train loss' : train_loss/len(train_dataloader), 'Test loss' : test_loss/len(test_dataloader)}

In [None]:
# Setting up model 5

NAME_5 = 'model_5.pth'
MODEL_SAVE_PATH_5 = PATH + NAME_5

train_dataset = TensorDataset(torch.stack((x_train, y_train ,z_train, joint_0_train, joint_1_train, joint_2_train, joint_3_train, joint_4_train), dim=1), joint_5_train.unsqueeze(dim=1))
test_dataset = TensorDataset(torch.stack((x_test, y_test ,z_test, joint_0_test, joint_1_test, joint_2_test, joint_3_test, joint_4_test), dim=1), joint_5_test.unsqueeze(dim=1))

train_dataloader = DataLoader(dataset = train_dataset, batch_size = BATCH_SIZE, shuffle = True)
test_dataloader = DataLoader(dataset = test_dataset, batch_size = BATCH_SIZE, shuffle = False)

torch.manual_seed(42)
model_5 = InverseKinematicsABBIRB140(input_features=8, output_features=1).to(device)

loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(params=model_5.parameters(), lr=0.001)

In [None]:
# Training model 5

torch.manual_seed(42)
train_time_start_model_5 = timer()

for epoch in range(epochs):
    with tqdm(enumerate(train_dataloader), total=len(train_dataloader)) as t:

        train_loss = 0
        # Training loop
        for i, batch in t:
            batch_loss = train_step(model=model_5, batch=batch, loss_fn=loss_fn, optimizer=optimizer)
            train_loss += batch_loss
            t.set_description(f'Epoch [{epoch+1}/{epochs}] (Training)')
            t.set_postfix({
                'Train Batch loss': batch_loss,
                'Train loss': train_loss/(i+1)
            })
                
    with tqdm(enumerate(test_dataloader), total=len(test_dataloader)) as t:
        test_loss = 0
        # Testing loop
        for i, batch in t:
            batch_loss = test_step(model=model_5, batch=batch, loss_fn=loss_fn)
            test_loss += batch_loss
            t.set_description(f'Epoch [{epoch+1}/{epochs}] (Testing) ')
            t.set_postfix({
                'Test batch loss': batch_loss,
                'Test loss': test_loss /(i+1)
            })
    
    print('---------------------------------------------------------------------------------------------------------------------------------------------------------')

train_time_end_model_5 = timer()
train_time_model_5 = print_train_time(start=train_time_start_model_5, end=train_time_end_model_5, device=device)

In [None]:
# Saving model_5 and its training time, loss values

torch.save(obj=model_5.state_dict(), f=MODEL_SAVE_PATH_5)
model_5_dict = {'Model name' : 'Model 5', 'Training time' : train_time_model_5, 'Train loss' : train_loss/len(train_dataloader), 'Test loss' : test_loss/len(test_dataloader)}

In [None]:
# Comparing the training time and loss of various models 

compare_models = pd.DataFrame([model_0_dict, model_1_dict, model_2_dict, model_3_dict, model_4_dict, model_5_dict])
compare_models