In [1]:
import os
import numpy as np 
import pandas as pd
import torch
import torch.nn as nn
from torch.optim import SGD, Adam
from torch.utils.data import Dataset, DataLoader, random_split
from tqdm import tqdm

In [2]:
# reading the two csv files that has the min and max coordinates to use it for normalization if needed
min_matrix = pd.read_csv('./data/min_matrix.csv', names=['x', 'y', 'z'])
max_matrix = pd.read_csv('./data/max_matrix.csv', names=['x', 'y', 'z'])
# reading the csv file that contains tensions of the four cables with 
# row index has the same number of file that corresponds to these tensions 
cable_matrix = pd.read_csv('./data/cable_matrix.csv', names=['t1', 't2', 't3', 't4'])

In [3]:
# defining the class that create the dataset by reading the files of the shapes 
# and tensions that creates these shapes and output a python dictionary 
# that has the tip position and the tensions 
class TensionToTipPos(Dataset):
    def __init__(self, shape_files, root_dir, cable_matrix):
        super(TensionToTipPos, self).__init__()
        self.shape_files = shape_files
        self.root_dir = root_dir
                
        self.cable_matrix = cable_matrix
        # device to put the tensors on to speed up 
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        
    def __len__(self):
        return len(self.cable_matrix)
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        shape_file = self.shape_files[idx]
        tip_position = self.shape_matrix_to_tip_position(shape_file)
        
        t1, t2, t3, t4 = self.cable_matrix.iloc[idx, :]
        tensions = torch.tensor([t1, t2, t3, t4], dtype=torch.float32)
        sample = {'tip_position': tip_position, 'tensions': tensions}
        
        return sample
    
    
    def shape_matrix_to_tip_position(self, shape_file):
        # read shape matrix then read the coordinate of the entry the tip position and convert it to cm
        matrix_dir = os.path.join(self.root_dir, shape_file)
        shape_matrix = pd.read_csv(matrix_dir, header=None).T
        tip_position = shape_matrix.iloc[-1,:]*100
        return torch.tensor(tip_position, dtype=torch.float32)

In [8]:
shape_files_dir = './data/shape/'
shape_files = os.listdir(shape_files_dir)
shape_files.sort()

In [9]:
# create the dataset by the class defined above
TensionToTipPos_ds = TensionToTipPos(shape_files=shape_files, 
                                   root_dir=shape_files_dir, 
                                   cable_matrix=cable_matrix)
TensionToTipPos_dl = DataLoader(TensionToTipPos_ds, batch_size=1,shuffle=True)

In [10]:
# an example of the dataset
data_itr = iter(TensionToTipPos_dl)
sample = next(data_itr)
print(sample)

{'tip_position': tensor([[ 5.0883, -1.5191, -3.2569]]), 'tensions': tensor([[9.9118, 7.8857, 7.5932, 3.7592]])}


In [32]:
# the dataset created from matlab was originally 250K, we split it into 100K and 150K
subDataset_a, subDataset_b = random_split(TensionToTipPos_ds, lengths=[100000, 150000])
# again split the the 100K into 80K for training and 20K for testing
TensionToTipPos_ds_tr, TensionToTipPos_ds_test = random_split(subDataset_a, lengths=[80000, 20000])
TensionToTipPos_dl_tr = DataLoader(TensionToTipPos_ds_tr, batch_size=1024, shuffle=True)
TensionToTipPos_dl_test = DataLoader(TensionToTipPos_ds_test, batch_size=1, shuffle=True)

In [33]:
# the class that holds the forward kinematics architecture
class FK_Net(nn.Module):
    def __init__(self):
        super(FK_Net, self).__init__()
        self.fc1 = nn.Sequential(nn.Linear(4, 16),
                                 nn.ReLU())
        self.fc2 = nn.Sequential(nn.Linear(16,16),
                                 nn.ReLU())
        self.fc3 = nn.Sequential(nn.Linear(16,3))
        
    def forward(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        return x

In [34]:
# the function that is used to train the model
def train_model(train_dl, model):
    model.train()
    # define the loss function, we will use predefined pytorch MSE Loss function
    criterion = nn.MSELoss()
    # define the optimizater
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
    # enumerate epochs
    for epoch in range(20):
        loss_arr = []
        # enumerate mini batches
        for i, sample_batch in tqdm(enumerate(train_dl)):
            # clear the gradients
            optimizer.zero_grad()
            # compute the model output
            yhat = model(sample_batch['tensions'].to(device))
            # calculate loss
            loss = criterion(yhat, sample_batch['tip_position'].to(device))
            loss_arr.append(loss.item())
            # credit assignment
            loss.backward()
            # update model weights
            optimizer.step()
        if epoch % 10 == 0 or epoch == 0:
            print('Epoch: {}, mean loss: {}'.format(epoch, np.mean(loss_arr)))

In [35]:
# initiate the model and put it on the gpu
model = FK_Net().cuda()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') 
# train the model
train_model(TensionToTipPos_dl_tr, model)

79it [34:20, 26.08s/it]
0it [00:00, ?it/s]

Epoch: 0, mean loss: 9.406562436985064


79it [29:36, 22.48s/it]
79it [29:48, 22.63s/it]
79it [33:49, 25.69s/it]
79it [30:51, 23.44s/it]
79it [30:28, 23.15s/it]
79it [45:55, 34.88s/it] 
79it [30:00, 22.79s/it]
79it [29:38, 22.51s/it]
79it [30:05, 22.85s/it]
79it [29:15, 22.22s/it]
0it [00:00, ?it/s]

Epoch: 10, mean loss: 1.384048087687432


79it [29:20, 22.29s/it]
79it [29:13, 22.20s/it]
79it [29:11, 22.18s/it]
79it [29:16, 22.24s/it]
79it [29:18, 22.26s/it]
79it [29:18, 22.26s/it]
79it [29:27, 22.37s/it]
79it [31:12, 23.70s/it]
79it [30:36, 23.25s/it]


In [36]:
# Specify a path to save the model's weights
PATH = "FCN_FK_DrBerkeCode.pt"

# Save the model into the path
torch.save(model, PATH)

In [12]:
# the class that holds the inverse kinematics architecture
class IK_Net(nn.Module):
    def __init__(self):
        super(IK_Net, self).__init__()
        self.fc1 = nn.Sequential(nn.Linear(3, 16),
                                 nn.ReLU())
        self.fc2 = nn.Sequential(nn.Linear(16,16),
                                 nn.ReLU())
        self.fc3 = nn.Sequential(nn.Linear(16,4))
        
    def forward(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        return x

In [13]:
# train the model
def train_model_IK(train_dl, model):
    model.train()
    # define the optimization
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
    # enumerate epochs
    for epoch in range(20):
        loss_arr = []
        # enumerate mini batches
        for i, sample_batch in tqdm(enumerate(train_dl)):
            # clear the gradients
            optimizer.zero_grad()
            # compute the model output by inserting the tip position as the input
            yhat = model(sample_batch['tip_position'].to(device))
            # calculate loss between the model output and the original tensions of the cable 
            loss = criterion(yhat, sample_batch['tensions'].to(device))
            loss_arr.append(loss.item())
            # credit assignment
            loss.backward()
            # update model weights
            optimizer.step()
        if epoch % 10 == 0 or epoch == 0:
            print('Epoch: {}, mean loss: {}'.format(epoch, np.mean(loss_arr)))

In [14]:
# initiate the inverse model and put it on the gpu
model = IK_Net().cuda()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') 
# train the model
train_model_IK(TensionToTipPos_dl_tr, model)

79it [39:59, 30.37s/it] 
0it [00:00, ?it/s]

Epoch: 0, mean loss: 21.580687969545775


79it [28:58, 22.00s/it]
79it [28:56, 21.98s/it]
79it [28:57, 22.00s/it]
79it [28:56, 21.98s/it]
79it [28:56, 21.98s/it]
79it [28:58, 22.00s/it]
79it [29:00, 22.03s/it]
79it [28:55, 21.97s/it]
79it [28:59, 22.01s/it]
79it [28:57, 21.99s/it]
0it [00:00, ?it/s]

Epoch: 10, mean loss: 4.7789053192621544


79it [28:58, 22.01s/it]
79it [28:59, 22.02s/it]
79it [28:55, 21.96s/it]
79it [28:55, 21.97s/it]
79it [28:56, 21.97s/it]
79it [29:01, 22.04s/it]
79it [29:03, 22.07s/it]
79it [29:06, 22.10s/it]
79it [29:08, 22.13s/it]


In [15]:
# Specify a path to save the inverse model's weights
PATH = "FCN_IK_DrBerkeCode.pt"

# Save the inverse model into the path 
torch.save(model_FK, PATH)

# Testing Part

In [17]:
# test the model
def test_model_IK(test_dl, model):
    model.eval()
    abs_error = torch.zeros([len(test_dl),4]).to('cuda')
    for i, sample_batch in tqdm(enumerate(test_dl)):
        # compute the model output
        yhat = model(sample_batch['tip_position'].to('cuda'))
        # retrieve numpy array
        yhat = yhat.detach()
        targets = sample_batch['tensions'].to('cuda')
        abs_error[i,:] = torch.absolute(targets - yhat)
    print(i+1)
    return abs_error

In [18]:
# test the model IK
abs_error_accumulated = test_model_IK(TensionToTipPos_dl_test, model)

20000it [09:35, 34.76it/s]

20000





In [21]:
abs_error_accumulated.mean(0)

tensor([1.6662, 1.6887, 1.6410, 1.6588], device='cuda:0')

In [22]:
abs_error_accumulated.std(0)

tensor([1.2758, 1.2878, 1.2948, 1.2942], device='cuda:0')

In [39]:
# test the model
def test_model_FK(test_dl, model):
    model.eval()
    abs_error = torch.zeros([len(test_dl),3]).to('cuda')
    for i, sample_batch in tqdm(enumerate(test_dl)):
        # compute the model output
        yhat = model(sample_batch['tensions'].to('cuda'))
        # retrieve numpy array
        yhat = yhat.detach()
        targets = sample_batch['tip_position'].to('cuda')
        abs_error[i,:] = torch.absolute(targets - yhat)
    print(i+1)
    return abs_error

In [40]:
# test the model 
model_FK = FK_Net().cuda()
abs_error_cumulated = test_model_FK(TensionToTipPos_dl_test, model_FK)

20000it [09:08, 36.49it/s]

20000





In [41]:
abs_error_accumulated.mean(0)

tensor([3.0876, 3.4107, 3.2158], device='cuda:0')

In [42]:
abs_error_accumulated.std(0)

tensor([2.7049, 2.3013, 2.2238], device='cuda:0')

In [43]:
# test the Iverse and forward models 
# first by inserting the tip position to infer the tensions by inverse model
# then the output of the IK model inout to forward model 
# and compare the output of the FK model with truth value
def test_model_FK_IK(test_dl, model_FK, model_IK):
    model.eval()
    abs_error = torch.zeros([len(test_dl),3]).to('cuda')
    for i, sample_batch in tqdm(enumerate(test_dl)):
        # compute the inverse model output
        IK_output = model_IK(sample_batch['tip_position'].to('cuda'))
        # compute the forward model output
        FK_output = model_FK(IK_output)
        # retrieve numpy array
        FK_output = FK_output.detach()
        targets = sample_batch['tip_position'].to('cuda')
        abs_error[i,:] = torch.absolute(targets - FK_output)
    # calculate mse
    print(i+1)
    return abs_error

In [45]:
model_FK = FK_Net().cuda()
model_IK = IK_Net().cuda()
abs_error_accumulated = test_model_FK_IK(TensionToTipPos_dl_test, model_FK, model_IK)

20000it [08:05, 41.17it/s]

20000





In [46]:
abs_error_accumulated.mean(0)

tensor([4.1383, 3.3612, 3.3428], device='cuda:0')

In [47]:
abs_error_accumulated.std(0)

tensor([2.5764, 2.2733, 2.1268], device='cuda:0')