In [1]:
import json
import os
from torch.utils.data import Dataset, random_split, DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
import torch.optim as optim
import re

  from .autonotebook import tqdm as notebook_tqdm


## Config

In [2]:
# train params
train_size = 0.8
val_size = 0.1
test_size = 0.1
batch_size = 16
learning_rate = 0.001
epochs = 2

choosen_joints = ["RAnkle_x"]

# labels
LABELS = {
    "unknown": 0,
    "gear2" : 1,
    "gear3" : 2,
    "gear4" : 3,
}

# file config
files = ["09", "14_cut", "15_cut", "17_cut", "18_cut", "22_cut", "25", "38", "40", "43", "44", "54_cut"]
file_name = "labeled_cycles_"
path = "../cycle_splits"

TODO create cycle plot?!

In [3]:
# half ChatPGT generated
class CustomDataset(Dataset):
    
    def __init__(self, 
                 files, 
                 path, 
                 file_name="labeled_cycles_", 
                 choosen_joints=["RAnkle_x"],
                 label_dict = {
                    "unknown": 0,
                    "gear2" : 1,
                    "gear3" : 2,
                    "gear4" : 3,},
                 transform=None,
                 target_transform=None, 
                 padding_value=0.0):
        
        self.label_dict = label_dict
        self.choosen_joints = choosen_joints
        self.padding_value = padding_value
        self.data, self.labels = self.__load_data(files, file_name, path)
        self.transform = transform
        self.target_transform = target_transform

    def __load_data(self, files, file_name, path):
        data = []
        labels = []
        longest_cycle = 0  # Track longest cycle length

        for file in files:
            file_path = os.path.join(path, file_name + file + ".json")
            with open(file_path, 'r') as f:
                data_json = json.load(f)

            for cycle in data_json.values():
                # Extract joint data as (num_joints, time_steps)
                cycle_data = [torch.tensor(cycle[joint], dtype=torch.float32) for joint in self.choosen_joints]

                # Stack into a (num_joints, time_steps) tensor
                cycle_tensor = torch.stack(cycle_data)  # Shape: (num_joints, time_steps)
                longest_cycle = max(longest_cycle, cycle_tensor.shape[1])  # Update max length

                data.append(cycle_tensor)
                labels.append(cycle["Label"])

        # Pad all cycles to match the longest cycle length
        padded_data = []
        for cycle in data:
            num_joints, time_steps = cycle.shape

            # Pad the time_steps dimension
            pad_length = longest_cycle - time_steps
            padded_cycle = torch.nn.functional.pad(cycle, (0, pad_length), value=self.padding_value)  # Pad last dim

            padded_data.append(padded_cycle)

        # Stack all padded cycles into a final tensor
        padded_data = torch.stack(padded_data)  # Shape: (num_cycles, num_joints, max_time)

        return padded_data, labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        
        label = self.label_dict[self.labels[idx]]
        item = self.data[idx]
        
        if self.transform:
            item = self.transform(item)

        if self.target_transform:
            label = self.target_transform(label)

        return item, label


In [4]:
# create custom dataset
dataset = CustomDataset(files, path, file_name, choosen_joints)

# split data into train, val and test set
generator1 = torch.Generator().manual_seed(42)
train_split = int(dataset.__len__() * 0.8)
val_split = int(dataset.__len__() * 0.1)
test_split = int(dataset.__len__() - train_split - val_split)

print(f"Train size = {train_split}, Val size = {val_split}, Test size =  {test_split}")

train_data, val_data, test_data = random_split(dataset, [train_split, val_split, test_split], generator=generator1)

# create dataloader
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

Train size = 121, Val size = 15, Test size =  16


In [5]:
class Net(nn.Module):

    def __init__(self, input_channels, hidden1, hidden2, output):
        super(Net, self).__init__()
        
        self.linear1 = nn.Linear(input_channels, hidden1)
        self.activation = nn.ReLU()
        self.linear2 = nn.Linear(hidden1, hidden2)
        self.activation2 = nn.ReLU()
        self.output_layer = nn.Linear(hidden2, output)

    def forward(self, input):
        
        out = self.activation(self.linear1(input))
        out = self.activation2(self.linear2(out))
        out = self.output_layer(out)
        
        return out 

In [6]:
input_channels = len(dataset[0][0][0]) # get first entry, then item and then first joint
hidden1 = 128
hidden2 = 64
output = len(set(dataset.labels))

net = Net(input_channels=input_channels, hidden1=hidden1, hidden2=hidden2, output=output)
print(net)

Net(
  (linear1): Linear(in_features=97, out_features=128, bias=True)
  (activation): ReLU()
  (linear2): Linear(in_features=128, out_features=64, bias=True)
  (activation2): ReLU()
  (output_layer): Linear(in_features=64, out_features=3, bias=True)
)


## Training the Neural Net

In [7]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=learning_rate)

TODO
    - add validation
    - add accuracy
    - add other metrics
    - add plots
    - add epoch and batch loss?!

In [8]:
for epoch in range(epochs):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        outputs = outputs.squeeze(1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        print(labels)
        print(torch.argmax(outputs, dim=1))

        # print statistics
        running_loss += loss.item()
        print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / batch_size:.3f}')
        running_loss = 0.0

print('Finished Training')

tensor([2, 2, 0, 2, 2, 2, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2])
tensor([2, 2, 2, 2, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 2, 2])
[1,     1] loss: 0.058
tensor([1, 1, 0, 2, 1, 2, 2, 2, 1, 2, 0, 2, 2, 2, 2, 2])
tensor([2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])
[1,     2] loss: 0.086
tensor([2, 2, 2, 1, 0, 2, 2, 1, 1, 2, 2, 2, 2, 2, 2, 0])
tensor([2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])
[1,     3] loss: 0.034
tensor([2, 0, 2, 1, 2, 2, 2, 2, 0, 2, 1, 2, 2, 1, 2, 2])
tensor([2, 2, 2, 2, 2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2])
[1,     4] loss: 0.055
tensor([2, 2, 2, 0, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])
tensor([1, 2, 2, 0, 2, 1, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2])
[1,     5] loss: 0.015
tensor([2, 2, 2, 1, 1, 1, 2, 1, 0, 2, 1, 0, 2, 2, 2, 2])
tensor([2, 2, 2, 1, 2, 1, 2, 2, 2, 2, 1, 0, 2, 2, 0, 2])
[1,     6] loss: 0.040
tensor([2, 2, 2, 1, 2, 0, 2, 0, 2, 2, 2, 2, 1, 1, 0, 2])
tensor([2, 2, 2, 1, 2, 2, 2, 0, 2, 2, 2, 2, 0, 1, 2, 0])
[1,     7] loss: 0.034
tensor([2, 2, 1, 0, 2, 2, 2, 2, 2])
tenso