Add early stop // explain it as well
Move visualizations up

In [None]:
# PyTorch Libraries for Neural Network
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split    # Scikit-Lean for preparing the train-test data split

# Pandas for reading data in .csv format 
import pandas as pd
import os

# Plotting tools
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import numpy as np
import plotly.graph_objs as go

In [None]:
def data_visualization(data, duration, randomseed=False, output=False):
    if output is True:
        dim1, dim2, dim3 = data[:,0], data[:,1], data[:,2]
    else:
        dim1, dim2, dim3 = np.array(data['hand_orig_rua_x']), np.array(data['hand_orig_rua_y']), np.array(data['hand_orig_rua_z'])

    if duration is not None:
        if randomseed:
            np.random.seed(42)
        else: 
            np.random.seed(None)
        reduced_samples = 10 * duration
        start_index = np.random.choice(len(dim1) - reduced_samples + 1)
        dim1 = dim1[start_index : start_index + reduced_samples]
        dim2 = dim2[start_index : start_index + reduced_samples]
        dim3 = dim3[start_index : start_index + reduced_samples]
        
    return dim1, dim2, dim3

def pltshow(x_lable, y_label, title, legend = False):
    
    plt.xlabel(x_lable)
    plt.ylabel(y_label)
    plt.title(title)
    if legend == True:
        plt.legend()
    plt.show()
    
def output_visualization(data, duration=None, randomseed=False):
    dim1, dim2, dim3 = data_visualization(data, duration, randomseed)

    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')
    ax.scatter(dim1, dim2, dim3, c='b', marker='o')

    ax.set_xlabel('X-Dimension')
    ax.set_ylabel('Y-Dimension')
    ax.set_zlabel('Z-Dimension')
    ax.set_title('Predicted Values in 3D Space')

    plt.show()

def interactive_output_visualization(data, duration=None, randomseed=False, output=False, output_data = None):
    dim1, dim2, dim3 = data_visualization(data, duration, randomseed, output)
    
    trace = go.Scatter3d(x=dim1, y=dim2, z=dim3, mode='markers', marker=dict(size=3, 
                                                                             color='blue', 
                                                                             opacity=0.8),
                                                                             name="Input Points")

    if output:
        dim1, dim2, dim3 = data_visualization(output_data, duration, randomseed, output)
        trace_output = go.Scatter3d(x=dim1, y=dim2, z=dim3, mode='markers', marker=dict(size=3, 
                                                                                    color='red', 
                                                                                    opacity=0.8), 
                                                                                    name='Output Points')
        traces = [trace, trace_output]
        title = 'Input and Output Points in 3D Space'
    else:
        traces = [trace]
        title = 'Values in 3D Space'

    layout = go.Layout(title=title, scene=dict(xaxis=dict(title='X-Dimension'),
                                                                        yaxis=dict(title='Y-Dimension'),
                                                                        zaxis=dict(title='Z-Dimension')))

    fig = go.Figure(data=traces, layout=layout)
    fig.show()

def correlationPlot(data):
    pressure_label = "sw_pres"  # Smart Watch Pressure
    yPos_label = "hand_orig_rua_y"    # Y Hand Position
    dim1, dim2 = np.array(data[pressure_label]), np.array(data[yPos_label])
    dim1, dim2 = dim1[1:], dim2[1:]
    
    slope, intercept = np.polyfit(dim1, dim2, deg=1)
    regression_line = slope * dim1 + intercept

    mse = np.mean((dim2 - regression_line)**2)

    plt.scatter(dim1, dim2, c='b', marker='.')
    plt.plot(dim1, regression_line, c='r', label='Linear Fit')
    pltshow("Smart Watch Pressure", "Y Hand Positon", f"Correlation between {pressure_label} & {yPos_label}\nMSE: {mse: .3f}", legend=True)

def accDistPlot(data, column_name = "sw_lacc_y"):
    acceleration_values = data[column_name]
    
    plt.plot(list(range(1, 1 + len(acceleration_values))), acceleration_values, color= 'b')
    pltshow("Time", "Acceleration", "Distribution of Acceleration over Time")

    mean_acceleration = np.mean(acceleration_values)
    std_acceleration = np.std(acceleration_values)
    plt.hist(acceleration_values, bins='auto', density=True, alpha=0.7, color='b')
    x = np.linspace(mean_acceleration - 3 * std_acceleration, mean_acceleration + 3 * std_acceleration, 100)
    y = (1 / (std_acceleration * np.sqrt(2 * np.pi))) * np.exp(-0.5 * ((x - mean_acceleration) / std_acceleration) ** 2)
    plt.plot(x, y, color='r')
    pltshow('Acceleration', 'Density','Distribution of Acceleration Over Time')

In [None]:
path = '/Users/affanbinusman/Dropbox (ASU)/IRL-Lab/P&G/hackathon_data'     # Update path where your data is located

# Get a list of all CSV files in the folder
csv_files = [file for file in os.listdir(path) if file.endswith('.csv')]

# Read data from each file and store it in a list
data_list = []
for csv_file in csv_files:
    file_path = os.path.join(path, csv_file)
    df = pd.read_csv(file_path)
    data_list.append(df)

# Concatenate the data from all files into a single DataFrame
data = pd.concat(data_list, ignore_index=True) # Use the `ignore_index=True` parameter to reset the index of the concatenated DataFrame

In [None]:
path = '/Users/affanbinusman/Dropbox (ASU)/IRL-Lab/P&G/hackathon_data'     # Update path where your data is located

# Get a list of all CSV files in the folder
csv_files = [file for file in os.listdir(path) if file.endswith('.csv')]
csv_file = csv_files[0]
# Read data from each file and store it in a list
data_list = []
# for csv_file in csv_files:
file_path = os.path.join(path, csv_file)
df = pd.read_csv(file_path)
data_list.append(df)

# Concatenate the data from all files into a single DataFrame
data = pd.concat(data_list, ignore_index=True) # Use the `ignore_index=True` parameter to reset the index of the concatenated DataFrame

In [None]:
def hamilton_product(a: np.array, b: np.array):
    """
    Hamilton product for two quaternions or a Vec4 and a Quaternion.
    :param a: quaternion or vec4 in order [w,x,y,z]
    :param b: quaternion in order [w,x,y,z]
    """
    # check shape to deal with a whole column of rotations
    if len(a.shape) > 1:
        a = [a[:, 0], a[:, 1], a[:, 2], a[:, 3]]
    if len(b.shape) > 1:
        b = [b[:, 0], b[:, 1], b[:, 2], b[:, 3]]
    h_p = np.array([
        a[0] * b[0] - a[1] * b[1] - a[2] * b[2] - a[3] * b[3],
        a[0] * b[1] + a[1] * b[0] + a[2] * b[3] - a[3] * b[2],
        a[0] * b[2] - a[1] * b[3] + a[2] * b[0] + a[3] * b[1],
        a[0] * b[3] + a[1] * b[2] - a[2] * b[1] + a[3] * b[0]
    ], dtype=np.float64)
    if len(h_p.shape) > 1:
        return h_p.transpose()
    else:
        return h_p
    

def quat_invert(q: np.array):
    """
    estimates the inverse rotation.
    :param q: input quaternion
    :return: inverse quaternion
    """
    q_s = q * np.array([1, -1, -1, -1], dtype=np.float64)  # the conjugate of the quaternion
    if len(q.shape) > 1:
        return q_s / np.sum(np.square(q, dtype=np.float64), axis=1, keepdims=True, dtype=np.float64)
    else:
        return q_s / np.sum(np.square(q, dtype=np.float64), dtype=np.float64)

In [None]:
# relative watch orientation in global
sw_quat_raw = hamilton_product(quat_invert(sw_rot_fwd), sw_rot)


In [None]:
# file_path = '/Users/affanbinusman/Dropbox (ASU)/IRL-Lab/P&G/hackathon_data/watch_phone_motive_rec_2023-07-07_17-28-44.csv'     # Update path where your data is located
# data = pd.read_csv(file_path)

# print(len(data), data.shape)

duration = 40       
output_visualization(data)
output_visualization(data, duration=duration, randomseed=False)

interactive_output_visualization(data)
interactive_output_visualization(data, duration=duration, randomseed=False)

correlationPlot(data)

accDistPlot(data)

In [None]:
print('sw_rotvec_w')
accDistPlot(data, 'sw_rotvec_w')
print('sw_gyro_x')
accDistPlot(data, 'sw_gyro_x')
print('sw_lvel_x')
accDistPlot(data, 'sw_lvel_x')
print('sw_lacc_x')
accDistPlot(data, 'sw_lacc_x')
print('sw_pres')
accDistPlot(data, 'sw_pres')
print('sw_lacc_x')
accDistPlot(data, 'sw_lacc_x')
print('sw_grav_x')
accDistPlot(data, 'sw_grav_x')

Importing libraries 

Model Architecture & Data Loader

In [None]:
"""
    Data Loader: 
        This serves as the framework to load data using pandas and segment out the useful columns (labels and inputs)
"""

class MyDataset(Dataset):
    def __init__ (self, data):
        self.data = data
        self.features = self.data.iloc[:, 27:44].values     # All smart watch inputs that contribute towards the outputs
        self.labels = self.data.iloc[:, 4:7].values          # The model predicts X, Y, Z coordinates of hand

    def __len__ (self):
        return len(self.data)
    
    def __getitem__ (self, idx):
        features = torch.tensor(self.features[idx], dtype=torch.float32) 
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        return features, label
    
"""
    Neural Network / Model:
        A simple linear architecture based model with 3 layers (including input and output layers). 
        The layer dimensions are: 17 - 128 - 3
"""

class MyModel(nn.Module):
    def __init__ (self, input_size, hidden_size, output_size):
        super (MyModel, self).__init__()
        
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.relu = nn.LeakyReLU()
        
        # self.fc1 = nn.Linear(input_size, hidden_size + int(hidden_size/3))
        # self.fc2 = nn.Linear(hidden_size + int(hidden_size/3), hidden_size - int(hidden_size/3))
        # self.fc3 = nn.Linear(hidden_size - int(hidden_size/3), output_size)
        # self.relu = nn.ReLU()
        
        # self.fc1 = nn.Linear(input_size, 256)
        # self.fc2 = nn.Linear(256, 64)
        # self.fc3 = nn.Linear(64, 16)
        # self.fc3 = nn.Linear(32, 16)
        # self.fc3 = nn.Linear(16, 8)
        # self.fc3 = nn.Linear(8, output_size)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

In [None]:
"""
    We encourage you to play around with these parameters to see what works better for this model architecture and dataset.
"""
batch_size = 500        
learning_rate = 0.0005
num_epochs = 100
test_data_size = 0.2            # Range 0-1. You may increase or decrease the recommended testing data size by updating this variable

hidden_size = 256                # This represents the middle layer of the neural network. You may manually play around with the model architecture by adding more layers
criterion = nn.SmoothL1Loss()   # Try out different loss functions and optimizers to see what works with different tasks. 

In [None]:
# Data loading calls to class and functions
dataset = MyDataset(data)

train_dataset, test_dataset = train_test_split(dataset, test_size=test_data_size, shuffle=True)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Initialzing the model with inputs
input_size = len(dataset.features[0])
output_size = len(dataset.labels[0])

model = MyModel(input_size, hidden_size, output_size)
criterion = nn.SmoothL1Loss() #mean square / regression
optimizer = optim.Adam(model.parameters(), lr=learning_rate)        # For more information, visit: https://pytorch.org/docs/stable/nn.html#loss-functions, https://pytorch.org/docs/stable/optim.html

"""
    If you have GPU resource available, the training process will be faster!
    It's okay if you dont have a GPU. The code works without one as well. 
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
# For visually observing the losses
losses = {"training" : [], 
          "evaluation" : [], 
          "labels" : [], 
          "outputs" : [],
          "labels_t" : [], 
          "outputs_t" : []
          }

# Training & Evaluation Porcess
patience = 50
best_loss = float('inf')
num_epochs_without_improvement = 0

for epoch in range(num_epochs):

    all_outputs_training = []
    all_labels_training = []
    # Training Process
    model.train(True)
    running_loss = 0.0                          # To calculates the losses in training for each epoch
    for inputs, labels in train_loader:
        inputs = inputs.to(device)              # Transfers data to GPU/CPU
        labels = labels.to(device)
        
        optimizer.zero_grad()                   # Zeros the optimizer before generating output
        outputs = model(inputs)                 # Calculates the output
        loss = criterion(outputs, labels)       # Finds loss as per the criteria defined
        loss.backward()                         # Back propogation of loss
        optimizer.step()                        # Updates parameters based on gradients computed duing back propogation
        

        # print(loss)
        running_loss += loss.item()             # Calculates loss over the training

        all_outputs_training.append(outputs.detach().numpy())  # Append outputs to the list
        all_labels_training.append(labels.detach().numpy())    # Append labels to the list
    
    training_loss = running_loss/len(train_loader)
    # print(len(train_loader))

    """
    Evaluating the model that has been trained (so far). 
    You would notice similar steps as during the training process. The lack of a few lines of code is because we 
    are evaluating the model here and not training it.
    """
    model.train(False)
    model.eval()
    total_loss = 0.0
    all_outputs = []
    all_labels = []

    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            
            all_outputs.append(outputs.cpu().numpy())  # Append outputs to the list
            all_labels.append(labels.cpu().numpy())    # Append labels to the list
                
    mean_loss = total_loss / len(test_loader)

    if mean_loss < best_loss:
        best_loss = mean_loss
        num_epochs_without_improvement = 0
    else:
        num_epochs_without_improvement += 1
        if num_epochs_without_improvement == patience:
            print("Early stopping triggered. Stopping training.")
            break
    
    losses["training"].append(training_loss)
    losses["evaluation"].append(mean_loss)
    losses["outputs"] = np.concatenate(all_outputs)
    losses["labels"] = np.concatenate(all_labels)
    
    losses["outputs_t"] = np.concatenate(all_outputs_training)
    losses["labels_t"] = np.concatenate(all_labels_training)

    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {training_loss}, Smooth L1 Loss: {mean_loss}")

In [None]:
epochs_total = range(1, 1 + len(losses["evaluation"]))

plt.subplots(1, figsize=(20,5))
plt.plot(epochs_total, losses["training"], label = "Training Loss")
plt.plot(epochs_total, losses["evaluation"], label = "Evaluation Loss")
pltshow('Epochs', 'Loss', 'Losses over Epochs')

put a 3d display of outputs with original values

In [None]:
print(losses["labels"].shape, losses["outputs"].shape, losses["labels_t"].shape, losses["outputs_t"].shape)


In [None]:
losses["outputs_t"]

In [None]:
losses["labels_t"]

losses["outputs"]

In [None]:
interactive_output_visualization(losses["labels"], duration=2, randomseed=False, output=True, output_data=losses["outputs"])
print()

interactive_output_visualization(losses["labels_t"], duration=None, randomseed=False, output=True, output_data=losses["outputs_t"])

Visualizations

correlation plots / pressure & y-pos hand
3d visualization of ground truth
distrubution of accelaration