In [4]:
import numpy as np
import torch
import pickle
import os
from sklearn.model_selection import train_test_split
import pandas as pd
from torch.optim.lr_scheduler import ReduceLROnPlateau

In [5]:
path = r"Y:\Datasets\Fyzio"
exercise = "Wide squat"


In [6]:
with open(os.path.join(path,f"X_Class_train_w_augmentation",exercise + ".pkl"),"rb") as f:
    x = np.array(pickle.load(f))
    print(x.shape)



(1136, 200, 20)


In [7]:
print("---- DATA DIAGNOSTICS ----")
print("Shape:", x.shape)
print("Dtype:", x.dtype)
print("NaN:", np.isnan(x).any())
print("Inf:", np.isinf(x).any())

---- DATA DIAGNOSTICS ----
Shape: (1136, 200, 20)
Dtype: float64
NaN: False
Inf: False


In [8]:
with open(os.path.join(path,"Y_Class_train_w_augmentation",exercise + ".pkl"),"rb") as f:
    y = np.array(pickle.load(f))
    print(y.shape)


(1136, 3)


In [9]:

x_norm = np.zeros_like(x)
means = []
stds = []
print(x.shape)
for ch in range(x.shape[2]):
    mean = x[:, :, ch].mean()
    std = x[:, :, ch].std()
    x_norm[:, :, ch] = (x[:, :, ch] - mean) / std
    means.append(mean)
    stds.append(std)

(1136, 200, 20)


In [10]:

y_norm_t = torch.from_numpy(y).float()
x_norm_t = torch.from_numpy(x_norm).float()
x_norm_t = x_norm_t.permute(0,2,1)
x_train,x_test,y_train,y_test = train_test_split(x_norm_t,y_norm_t,test_size=0.3,random_state=42)
unique, counts = torch.unique(y_train, return_counts=True)
for u, c in zip(unique, counts):
    print(f"Class {u.item()}: {c.item()} samples")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
print(torch.__version__)
print(torch.version.cuda)
print(torch.backends.cudnn.version())
print(x_train.shape)


Class 0.0: 569 samples
Class 1.0: 1816 samples
cuda
2.7.1+cu128
12.8
90701
torch.Size([795, 20, 200])


In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F



class ClassCNN(nn.Module):
    def __init__(self,in_channels=20,out_channels=3):
        super(ClassCNN,self).__init__()
        self.conv1 = nn.Conv1d(in_channels, 64, kernel_size=3, padding=1)
        self.normalization1=nn.BatchNorm1d(64)
        self.pool1= nn.MaxPool1d(2)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, padding=1)
        self.normalization2=nn.BatchNorm1d(128)
        self.pool2= nn.MaxPool1d(2)
        self.conv3 = nn.Conv1d(128, 64, kernel_size=3, padding=1)
        self.normalization3=nn.BatchNorm1d(64)
        self.pool3= nn.MaxPool1d(2)

        self.gap = nn.AdaptiveAvgPool1d(1)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(64, 8)
        self.dropout1 = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(8, out_channels)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.normalization1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = self.normalization2(x)
        x = self.pool2(x)
        x = self.conv3(x)
        x = F.relu(x)
        x = self.normalization3(x)
        x = self.pool3(x)
        x = self.gap(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = F.relu(x)
        # x = self.fc2(x)
        # x = F.relu(x)
        x = self.dropout1(x)
        out = self.fc2(x)
        return out


In [12]:
from torch.utils.data import Dataset

def add_noise(x, noise_factor=0.001):
    return x + noise_factor * torch.randn_like(x)

def time_shift(x, shift_max=10):
    shift = int(torch.randint(-shift_max, shift_max+1, (1,)))
    return torch.roll(x, shifts=shift)

def scale(x, factor_range=(0.9, 1.1)):
    factor = torch.empty(1).uniform_(*factor_range)
    return x * factor

def drop(x):
    x = x.clone()
    n_channels = x.shape[0]
    ch = torch.randint(0, n_channels,(1,)).item()
    x[ch, :] = 0.0
    return x

def random_augment(x):
    if torch.rand(1) < 0.5:
        x = add_noise(x)
    if torch.rand(1) < 0.4:
        x = time_shift(x)
    if torch.rand(1) < 0.3:
        x = scale(x)
    if torch.rand(1) < 0.4:
        x = drop(x)
    return x


class AugmentedSignalDataset(Dataset):
    def __init__(self, X, Y, augment=True):
        self.X = X
        self.Y = Y
        self.augment = augment

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x = self.X[idx]
        y = self.Y[idx]
        if self.augment:
            x = random_augment(x)
        return x,y

In [26]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

net = ClassCNN(in_channels=20, out_channels=3).to(device)

pos_weights = torch.tensor([19/3, 19/8, 19/4], dtype=torch.float32).to(device)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weights)

batch_size = 32
optimizer = optim.Adam(net.parameters(), lr=5e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)

X_train_tensor = x_train.detach().clone()
Y_train_tensor = y_train.detach().clone().float()
X_val_tensor = x_test.detach().clone()
Y_val_tensor = y_test.detach().clone().float()

train_dataset = AugmentedSignalDataset(X_train_tensor, Y_train_tensor, augment=False)
val_dataset = AugmentedSignalDataset(X_val_tensor, Y_val_tensor, augment=False)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, pin_memory=True)

epochs = 100
for epoch in range(epochs):
    net.train()
    train_loss = 0.0
    correct = 0
    total = 0

    for batch_X, batch_y in train_loader:
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)

        optimizer.zero_grad()
        outputs = net(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

        preds = (torch.sigmoid(outputs) > 0.5).float()
        correct += (preds == batch_y).all(dim=1).sum().item()
        total += batch_y.size(0)

    train_loss /= len(train_loader)
    train_acc = correct / total

    net.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)
            val_outputs = net(batch_X)
            loss = criterion(val_outputs, batch_y)
            val_loss += loss.item()

            preds = (torch.sigmoid(val_outputs) > 0.5).float()
            correct += (preds == batch_y).all(dim=1).sum().item()
            total += batch_y.size(0)

    val_loss /= len(val_loader)
    val_acc = correct / total

    scheduler.step(val_loss)

    print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
          f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")


Epoch 1/100, Train Loss: 2.2024, Train Acc: 0.2931, Val Loss: 2.1405, Val Acc: 0.3812
Epoch 2/100, Train Loss: 2.0791, Train Acc: 0.3447, Val Loss: 1.9519, Val Acc: 0.5015
Epoch 3/100, Train Loss: 1.9846, Train Acc: 0.3887, Val Loss: 1.8561, Val Acc: 0.5103
Epoch 4/100, Train Loss: 1.9434, Train Acc: 0.3899, Val Loss: 1.7860, Val Acc: 0.5367
Epoch 5/100, Train Loss: 1.9107, Train Acc: 0.4390, Val Loss: 1.7367, Val Acc: 0.5543
Epoch 6/100, Train Loss: 1.8206, Train Acc: 0.4491, Val Loss: 1.6812, Val Acc: 0.5630
Epoch 7/100, Train Loss: 1.8173, Train Acc: 0.4453, Val Loss: 1.6406, Val Acc: 0.5630
Epoch 8/100, Train Loss: 1.7708, Train Acc: 0.4553, Val Loss: 1.6142, Val Acc: 0.5660
Epoch 9/100, Train Loss: 1.7686, Train Acc: 0.4226, Val Loss: 1.5602, Val Acc: 0.5748
Epoch 10/100, Train Loss: 1.7288, Train Acc: 0.4403, Val Loss: 1.5371, Val Acc: 0.5836
Epoch 11/100, Train Loss: 1.7055, Train Acc: 0.4503, Val Loss: 1.5093, Val Acc: 0.5748
Epoch 12/100, Train Loss: 1.6875, Train Acc: 0.4717,