In [119]:
import torch.nn as nn
import torch
import numpy as np
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import os
from sklearn.utils import shuffle
import plotly.graph_objects as go
from sklearn.model_selection import train_test_split
import pandas as pd


In [120]:
layers = {
    'layer1': {'type': 'Conv1d', 'kernel_size': 7, 'stride': 3, 'out_channels': 2},
    'layer2': {'type': 'MaxPool', 'kernel_size': 2, 'stride': 2},
    'layer3': {'type': 'Conv1d', 'kernel_size': 5, 'stride': 2, 'out_channels': 4},
    'layer4': {'type': 'MaxPool', 'kernel_size': 2, 'stride': 2},
    'layer5': {'type': 'Flatten'}
}

def layer_calculation(layers, input_shape, channels=1):
    input_shape = input_shape
    channels = channels
    for i in np.arange(1, len(layers)+1):
        layer = layers[f"layer{i}"]
        if layer['type'] == 'Conv1d':
            kernel_size = layer['kernel_size']
            stride = layer['stride']
            out_channels = layer['out_channels']
            
            output_length = (input_shape - kernel_size) // stride + 1
            output_shape = (out_channels, output_length)
            print(f"Layer {i} ({layers[f'layer{i}']['type']}): input shape ({channels}, {input_shape}), output shape {output_shape}")
            input_shape = output_length
            channels = out_channels
            
        # Add more layer types as needed
        if layer['type'] == 'MaxPool':
            kernel_size = layer['kernel_size']
            stride = layer['stride']
            
            output_length = (input_shape - kernel_size) // stride + 1
            output_shape = (channels, output_length)
            print(f"Layer {i} ({layers[f'layer{i}']['type']}): input shape ({channels}, {input_shape}), output shape ({channels}, {output_length})")
            input_shape = output_length
            
        if layer['type'] == 'Flatten':
            print(f"Layer {i} ({layers[f'layer{i}']['type']}): input shape ({channels}, {input_shape}), output shape {channels * input_shape}")
            input_shape = input_shape
        
layer_calculation(layers, input_shape = 260)
    
    

Layer 1 (Conv1d): input shape (1, 260), output shape (2, 85)
Layer 2 (MaxPool): input shape (2, 85), output shape (2, 42)
Layer 3 (Conv1d): input shape (2, 42), output shape (4, 19)
Layer 4 (MaxPool): input shape (4, 19), output shape (4, 9)
Layer 5 (Flatten): input shape (4, 9), output shape 36


In [121]:
# structure, layers

class CNN(nn.Module):
    def __init__(self, input_size):
        super().__init__()
        
        # layers
        self.conv1 = nn.Conv1d(input_size[0], 2, kernel_size = 7, stride= 3, padding=0, dilation=1, groups=1, bias=True)
        self.maxpool1 = nn.MaxPool1d(kernel_size = 2, stride = 2, padding = 0) # max pooling layer
        self.conv2 = nn.Conv1d(2, 4, kernel_size = 5, stride = 2, padding = 0) # average pooling layer
        self.maxpool2 = nn.MaxPool1d(kernel_size = 2, stride = 2, padding = 0) # max pooling layer
        self.flatten = nn.Flatten() # flatten layer
        self.output_binary = nn.Linear(36, 1) # output layer for binary classification
        
        # activation functions
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()        
        
    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.maxpool1(x)
        x = self.relu(self.conv2(x))
        x = self.maxpool2(x)
        x = self.flatten(x)
        x = self.output_binary(x)
        return x
    
# data loader
class DataLoaderAcc(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]
    
# create model
model = CNN(input_size=(1, 260))

# define criteria
criterion = nn.BCEWithLogitsLoss()

# define optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

# model
model

CNN(
  (conv1): Conv1d(1, 2, kernel_size=(7,), stride=(3,))
  (maxpool1): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv1d(2, 4, kernel_size=(5,), stride=(2,))
  (maxpool2): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (output_binary): Linear(in_features=36, out_features=1, bias=True)
  (relu): ReLU()
  (sigmoid): Sigmoid()
)

In [122]:
### laod data
root_dir = "/home/elias/2025/sshfs_mounter_2025/data_elias/ECSS_2026/raw_lab"

### get data from csv files
df_list = []
for f in os.listdir(root_dir):
    df_list.append(pd.read_csv(os.path.join(root_dir, f)))
    

# use 20% of dataframes for testing
test_size = int(0.2 * len(df_list))
test_list = []
for _ in range(test_size):
    test_list.append(df_list.pop(np.random.randint(0, len(df_list))))
    
train_data = pd.concat(df_list, ignore_index=True)
test_data = pd.concat(test_list, ignore_index=True)

# shuffle datasets
train_data = shuffle(train_data, random_state=42).reset_index(drop=True)
test_data = shuffle(test_data, random_state=42).reset_index(drop=True)

# data reshaping
y_train = np.reshape(train_data['label'].replace('locomotion', 1).replace('no locomotion', 0).values, (-1, 1))
X_train = np.reshape(train_data.drop(columns=['label']).values, (-1, 1, 260))
y_test = np.reshape(test_data['label'].replace('locomotion', 1).replace('no locomotion', 0).values, (-1, 1))
X_test = np.reshape(test_data.drop(columns=['label']).values, (-1, 1, 260))

# normalize
train_mean = np.mean(X_train)
train_sd = np.std(X_train)

X_train = (X_train - train_mean) / train_sd
X_test = (X_test - train_mean) / train_sd

X_train = X_train.astype(np.float32)
X_test = X_test.astype(np.float32)
y_train = y_train.astype(np.float32)
y_test = y_test.astype(np.float32)

print(f"Train data shape: {X_train.shape}, with mean: {np.mean(X_train):.5f}, and std: {np.std(X_train):.5f}")
print(f"Test data shape: {X_test.shape}, with mean: {np.mean(X_test):.5f}, and std: {np.std(X_test):.5f}")
print(f"Train labels shape: {y_train.shape}, with distribution: {np.unique(y_train, return_counts=True)}")
print(f"Test labels shape: {y_test.shape}, with distribution: {np.unique(y_test, return_counts=True)}")

Train data shape: (31523, 1, 260), with mean: 0.00000, and std: 1.00000
Test data shape: (6857, 1, 260), with mean: -0.00002, and std: 1.07414
Train labels shape: (31523, 1), with distribution: (array([0., 1.], dtype=float32), array([24309,  7214]))
Test labels shape: (6857, 1), with distribution: (array([0., 1.], dtype=float32), array([5143, 1714]))


  y_train = np.reshape(train_data['label'].replace('locomotion', 1).replace('no locomotion', 0).values, (-1, 1))
  y_test = np.reshape(test_data['label'].replace('locomotion', 1).replace('no locomotion', 0).values, (-1, 1))


In [123]:
def add_train_data(): 
    '''
    Adds the field data for walking to the train data
    param: train_data = training data from lab
    param: include_field = boolean to include field data or not
    return: train_data with field data added if include_field is True
    '''
    path = "/home/elias/2025/sshfs_mounter_2025/data_elias/ECSS_2026/raw_field"
    
    files = sorted(os.listdir(path))
    df_l = []
    
    for f in files:
        df = pd.read_csv(os.path.join(path, f)) 
        if len(df) == 0:
            continue
        else:
            df_l.append(df)
        
    field_data = pd.concat(df_l, ignore_index=True)
    
    print('field training examples', len(field_data))
    
    # randomly choose 16000 examples to add to training data
    field_data_train = field_data.sample(n=16000, random_state=42).reset_index(drop=True)
    field_data_test = field_data.sample(n=16000, random_state=22).reset_index(drop=True)
    
    return field_data_train, field_data_test


train_data, field_data_test = add_train_data()

train_data.drop(columns=['speed'], inplace=True)

y_adds = np.reshape(train_data['label'].replace('locomotion', 1).replace('no locomotion', 0).values, (-1, 1)).astype(np.float32)
X_adds = np.reshape(train_data.drop(columns=['label']).values, (-1, 1, 260)).astype(np.float32)

# concatenate train data with adds
X_train = np.concatenate((X_train, X_adds), axis=0)
y_train = np.concatenate((y_train, y_adds), axis=0)

field training examples 286997


  y_adds = np.reshape(train_data['label'].replace('locomotion', 1).replace('no locomotion', 0).values, (-1, 1)).astype(np.float32)


In [124]:
# load data, set batch size, shuffling
dataset = DataLoader(DataLoaderAcc(X_train, y_train), batch_size=32, shuffle=True)

# set model.train() to enable training mode
model.train()

# number of epochs
num_epochs = 20

# training loop
for epoch in range(num_epochs):
    epoch_loss = 0.0
    for inputs, targets in dataset:
        # zero the parameter gradients
        optimizer.zero_grad() # reset gradients
        
        # forward pass
        outputs = model(inputs) # forward propagation
        
        # compute loss
        loss = criterion(outputs, targets) # loss calculation
        
        # backward pass and optimization
        loss.backward() # backpropagation
        optimizer.step() # update weights
        
        # loss accumulation
        epoch_loss += loss.item()
    
    avg_loss = epoch_loss / len(dataset)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')

Epoch [1/20], Loss: 0.4803
Epoch [2/20], Loss: 0.3798
Epoch [3/20], Loss: 0.3453
Epoch [4/20], Loss: 0.2166
Epoch [5/20], Loss: 0.1368
Epoch [6/20], Loss: 0.1075
Epoch [7/20], Loss: 0.0853
Epoch [8/20], Loss: 0.0678
Epoch [9/20], Loss: 0.0502
Epoch [10/20], Loss: 0.0402
Epoch [11/20], Loss: 0.0360
Epoch [12/20], Loss: 0.0321
Epoch [13/20], Loss: 0.0308
Epoch [14/20], Loss: 0.0298
Epoch [15/20], Loss: 0.0286
Epoch [16/20], Loss: 0.0279
Epoch [17/20], Loss: 0.0278
Epoch [18/20], Loss: 0.0277
Epoch [19/20], Loss: 0.0273
Epoch [20/20], Loss: 0.0269


In [125]:
X_test = torch.tensor(X_test)
with torch.no_grad():
    predictions = model(X_test) # model inference with
binary_predictions = (torch.sigmoid(predictions) >= 0.5).int()

In [126]:
correct_predictions = (binary_predictions.numpy() == y_test.astype(np.int32)).sum()
accuracy = correct_predictions / y_test.shape[0]
print(f'Train Accuracy: {accuracy:.4f}')

Train Accuracy: 0.9981


In [127]:
y_test = np.array(field_data_test['label'].replace({'locomotion': 1, 'no locomotion': 0}).values).reshape(-1, 1).astype(np.float32)
X_test = np.array(field_data_test.drop(columns=['label', 'speed']).values).reshape(-1, 1, 260).astype(np.float32)

  y_test = np.array(field_data_test['label'].replace({'locomotion': 1, 'no locomotion': 0}).values).reshape(-1, 1).astype(np.float32)


In [128]:
X_test = torch.tensor(X_test)
with torch.no_grad():
    predictions = model(X_test) # model inference with
binary_predictions = (torch.sigmoid(predictions) >= 0.5).int()
correct_predictions = (binary_predictions.numpy() == y_test.astype(np.int32)).sum()
accuracy = correct_predictions / y_test.shape[0]
print(f'Train Accuracy: {accuracy:.4f}')

Train Accuracy: 0.9905
