# ARTIFICIAL NEURAL NETWORK 
### NOTE: USE IN GOOGLE COLAB

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
from torch.autograd import Variable
import torch.utils.data as Data
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import pandas as pd
from sklearn.metrics import mean_absolute_error
torch.manual_seed(1) 

In [None]:
#setup Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
def get_dataset(filepath, right_arm=False):
    data0 = pd.read_csv(filepath + ".csv")
    data1 = pd.read_csv(filepath + "1.csv")
    data2 = pd.read_csv(filepath + "2.csv")
    df = pd.concat([data0, data1, data2], ignore_index=True)
    X = "R" if right_arm else "L"
    angle_names = ["{}ShoulderPitch".format(X),
                   "{}ShoulderRoll".format(X),
                   "{}ElbowYaw".format(X),
                   "{}ElbowRoll".format(X)]
    labels = df[angle_names]
    data = df[[column for column in df.columns if column not in angle_names]]
    # filter corrupted data!!!
    data = data.drop_duplicates(keep=False)
    labels = labels.loc[data.index]

    data, labels = torch.Tensor(data.values), torch.Tensor(labels.values)
    return Data.TensorDataset(data, labels)

In [None]:
def get_net(net_version=None):
    if net_version == 0:
        net = nn.Sequential(
                    nn.Linear(6, 200),
                    nn.LeakyReLU(),
                    nn.Linear(200, 100),
                    nn.LeakyReLU(),
                    nn.Linear(100, 100),
                    nn.LeakyReLU(),
                    nn.Linear(100, 100),
                    nn.LeakyReLU(),
                    nn.Linear(100, 100),
                    nn.LeakyReLU(),
                    nn.Linear(100, 100),
                    nn.LeakyReLU(),
                    nn.Linear(100, 100),
                    nn.LeakyReLU(),
                    nn.Linear(100, 100),
                    nn.LeakyReLU(),
                    nn.Linear(100, 100),
                    nn.LeakyReLU(),
                    nn.Linear(100, 50),
                    nn.LeakyReLU(),
                    nn.Linear(50, 4)
                )
    else:
        net = nn.Sequential(
                    nn.Linear(6, 200),
                    nn.LeakyReLU(),
                    nn.Linear(200, 100),
                    nn.LeakyReLU(),
                    nn.Linear(100, 4)
                )
    return net

In [None]:
def train_ann(filepath, net=None, batch_size=10000, epoch=300, learning_rate=0.01, decay=0, net_version=0):

    dataset = get_dataset(filepath)
    loader = Data.DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True, num_workers=2)
    
    if net is None:
        net = get_net(net_version)
    if torch.cuda.is_available():
        net.cuda()
        print('CUDA is available!  Training on GPU ...')

    optimizer = optim.Adam(net.parameters(), lr=learning_rate, weight_decay=decay)
    loss_func = nn.MSELoss()

    iters, losses = [], []
    n = 0
    best_result = np.inf
    best_epoch = 0
    for epoch in range(epoch):
        error = np.array([0.0, 0.0, 0.0, 0.0])
        for step, (data, labels) in enumerate(loader):
            #############################################
            #To Enable GPU Usage
            if torch.cuda.is_available():
                data = data.cuda()
                labels = labels.cuda()
            #############################################
            prediction = net(data)
            loss = loss_func(prediction, labels) 
            loss.backward()         # backpropagation, compute gradients
            optimizer.step()        # apply gradients
            optimizer.zero_grad()   # clear gradients for next train
            
            iters.append(n)
            losses.append(float(loss)/batch_size)             # compute *average* loss
            n += 1

            for x in range(4):
                if torch.cuda.is_available():
                    error[x] += mean_absolute_error(prediction[:, x].cpu().detach().numpy(), labels[:,x].cpu().detach().numpy())
                else:
                    error[x] += mean_absolute_error(prediction[:, x].detach().numpy(), labels[:,x].detach().numpy())
        print("Epoch: {}, Error: {}".format(epoch, error / step))
        
        if sum(error) < best_result:
            best_result = sum(error)
            best_epoch = epoch
            print("Best result at Epoch {}. Saving model parameters.".format(epoch))
            model_path = "ann_epoch{}".format(epoch)
            torch.save(net.state_dict(), model_path)   
            
        if (epoch + 1) % 100 == 0:
            # saving the best model at my local google drive for every 100th epoch
            model_path = "ann_epoch{}".format(best_epoch)
            state = torch.load(model_path)
            net.load_state_dict(state)
            model_path = "/content/drive/My Drive/Thesis/left_arm_ANN_model"
            torch.save(net.state_dict(), model_path)

    # plotting
    plt.title("Training Curve")
    plt.plot(iters, losses, label="Train")
    plt.xlabel("Iterations")
    plt.ylabel("Loss")
    plt.show()

In [None]:
left_arm_filepath = "/content/drive/My Drive/Thesis/left_arm_data"
net = get_net()
train_ann(left_arm_filepath, net)