In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam

torch.manual_seed(42)
np.random.seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# ---------------- Dataset ----------------
class EMGDataset(Dataset):
    def __init__(self, x_path, y_path):
        self.X = np.load(x_path)
        self.y = np.argmax(np.load(y_path), axis=1)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x = torch.tensor(self.X[idx], dtype=torch.float32)
        y = torch.tensor(self.y[idx], dtype=torch.long)
        return x, y

# ---------------- CNN Encoder ----------------
class CNNEncoder(nn.Module):
    def __init__(self, emb_dim=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(6, 64, kernel_size=10),
            nn.ReLU(),
            nn.Conv1d(64, 64, kernel_size=10),
            nn.ReLU(),
            nn.MaxPool1d(3),

            nn.Conv1d(64, 256, kernel_size=10),
            nn.ReLU(),
            nn.Conv1d(256, 256, kernel_size=10),
            nn.ReLU(),

            nn.AdaptiveAvgPool1d(1)
        )
        self.fc = nn.Linear(256, emb_dim)  # map CNN output to embedding

    def forward(self, x):
        x = x.permute(0, 2, 1)  # (B, channels, seq_len)
        x = self.net(x).squeeze(-1)  # (B, 256)
        z = F.normalize(self.fc(x), dim=1)
        return z

# ---------------- Full Model ----------------
class FullModel(nn.Module):
    def __init__(self, emb_dim=128, num_classes=62):
        super().__init__()
        self.encoder = CNNEncoder(emb_dim)
        self.classifier = nn.Linear(emb_dim, num_classes)

    def forward(self, x):
        z = self.encoder(x)
        logits = self.classifier(z)
        return z, logits

# ---------------- Margin Embedding Loss ----------------
class MarginEmbeddingLoss(nn.Module):
    def __init__(self, margin=1.0):
        super().__init__()
        self.margin = margin

    def forward(self, z, y):
        dist = torch.cdist(z, z, p=2)
        y = y.unsqueeze(1)
        pos_mask = (y == y.T).float()
        neg_mask = (y != y.T).float()
        eye = torch.eye(len(z), device=z.device)
        pos_mask = pos_mask - eye
        pos_loss = pos_mask * dist.pow(2)
        neg_loss = neg_mask * F.relu(self.margin - dist).pow(2)
        loss = (pos_loss.sum() + neg_loss.sum()) / (pos_mask.sum() + neg_mask.sum() + 1e-8)
        return loss

# ---------------- Early Stopping ----------------
class EarlyStopping:
    def __init__(self, patience=10):
        self.patience = patience
        self.best_acc = -1
        self.counter = 0
        self.best_state = None

    def step(self, acc, model):
        if acc > self.best_acc:
            self.best_acc = acc
            self.best_state = {k: v.cpu() for k, v in model.state_dict().items()}
            self.counter = 0
            return False
        else:
            self.counter += 1
            return self.counter >= self.patience

    def restore(self, model):
        model.load_state_dict(self.best_state)

# ---------------- Training & Evaluation ----------------
def train_and_eval(model, train_loader, test_loader, margin_loss, alpha=1.0, epochs=50, patience=10):
    ce_loss = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=5e-4)
    stopper = EarlyStopping(patience)

    for epoch in range(epochs):
        model.train()
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            z, logits = model(x)
            loss = ce_loss(logits, y) + alpha * margin_loss(z, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Evaluation
        model.eval()
        correct, total = 0, 0
        with torch.no_grad():
            for x, y in test_loader:
                x, y = x.to(device), y.to(device)
                _, logits = model(x)
                pred = logits.argmax(dim=1)
                correct += (pred == y).sum().item()
                total += y.size(0)

        acc = correct / total
        print(f"Epoch {epoch+1:02d} | Test Acc: {acc:.4f}")

        if stopper.step(acc, model):
            print("Early stopping triggered")
            break

    stopper.restore(model)
    return stopper.best_acc

# ---------------- Main Loop ----------------
BASE = "models/Data/Data/62_classes/UserDependenet"
margin_loss = MarginEmbeddingLoss(margin=1.0)
all_accs = []

for split in range(1, 11):
    print(f"\n========== SPLIT {split} ==========")
    train_ds = EMGDataset(f"{BASE}/Train/X_train_{split}.npy", f"{BASE}/Train/y_train_{split}.npy")
    test_ds = EMGDataset(f"{BASE}/Test/X_test_{split}.npy", f"{BASE}/Test/y_test_{split}.npy")

    train_loader = DataLoader(train_ds, batch_size=128, shuffle=True)
    test_loader = DataLoader(test_ds, batch_size=128, shuffle=False)

    model = FullModel(emb_dim=128).to(device)

    acc = train_and_eval(model, train_loader, test_loader, margin_loss, alpha=1.0, epochs=50, patience=10)
    print(f"Best Split Accuracy: {acc:.4f}")
    all_accs.append(acc)

    del model
    torch.cuda.empty_cache()

print("FINAL RESULT ")
print("Average Accuracy (Margin + CE):", np.mean(all_accs))

Using device: cuda

Epoch 01 | Test Acc: 0.0556
Epoch 02 | Test Acc: 0.1742
Epoch 03 | Test Acc: 0.3444
Epoch 04 | Test Acc: 0.5129
Epoch 05 | Test Acc: 0.5702
Epoch 06 | Test Acc: 0.6210
Epoch 07 | Test Acc: 0.6685
Epoch 08 | Test Acc: 0.6887
Epoch 09 | Test Acc: 0.7024
Epoch 10 | Test Acc: 0.7306
Epoch 11 | Test Acc: 0.7202
Epoch 12 | Test Acc: 0.7194
Epoch 13 | Test Acc: 0.7194
Epoch 14 | Test Acc: 0.7516
Epoch 15 | Test Acc: 0.7315
Epoch 16 | Test Acc: 0.7371
Epoch 17 | Test Acc: 0.7484
Epoch 18 | Test Acc: 0.7048
Epoch 19 | Test Acc: 0.7315
Epoch 20 | Test Acc: 0.7153
Epoch 21 | Test Acc: 0.7444
Epoch 22 | Test Acc: 0.7435
Epoch 23 | Test Acc: 0.7032
Epoch 24 | Test Acc: 0.7548
Epoch 25 | Test Acc: 0.7298
Epoch 26 | Test Acc: 0.7548
Epoch 27 | Test Acc: 0.7540
Epoch 28 | Test Acc: 0.7500
Epoch 29 | Test Acc: 0.7637
Epoch 30 | Test Acc: 0.7435
Epoch 31 | Test Acc: 0.7516
Epoch 32 | Test Acc: 0.7484
Epoch 33 | Test Acc: 0.7435
Epoch 34 | Test Acc: 0.7282
Epoch 35 | Test Acc: 0.7605


In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam

torch.manual_seed(42)
np.random.seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

class EMGDataset(Dataset):
    def __init__(self, x_path, y_path):
        X = np.load(x_path)
        y = np.argmax(np.load(y_path), axis=1)

        for i in range(X.shape[0]):
            for c in range(6):
                X[i, :, c] = (X[i, :, c] - X[i, :, c].mean()) / (X[i, :, c].std() + 1e-8)

        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

class CNNBiLSTMEncoder(nn.Module):
    def __init__(self, emb_dim=128):
        super().__init__()

        self.cnn = nn.Sequential(
            nn.Conv1d(6, 64, kernel_size=10),
            nn.ReLU(),
            nn.Conv1d(64, 64, kernel_size=10),
            nn.ReLU(),
            nn.MaxPool1d(3),

            nn.Conv1d(64, 256, kernel_size=10),
            nn.ReLU(),
            nn.Conv1d(256, 256, kernel_size=10),
            nn.ReLU()
        )

        self.bilstm = nn.LSTM(
            input_size=256,
            hidden_size=128,
            batch_first=True,
            bidirectional=True
        )

        self.fc = nn.Linear(256, emb_dim)

    def forward(self, x):
        x = x.permute(0, 2, 1)       
        x = self.cnn(x)               
        x = x.permute(0, 2, 1)        

        lstm_out, _ = self.bilstm(x)
        feat = lstm_out[:, -1, :]     

        z = F.normalize(self.fc(feat), dim=1)
        return z

class FullModel(nn.Module):
    def __init__(self, emb_dim=128, num_classes=62):
        super().__init__()
        self.encoder = CNNBiLSTMEncoder(emb_dim)
        self.classifier = nn.Linear(emb_dim, num_classes)

    def forward(self, x):
        z = self.encoder(x)
        logits = self.classifier(z)
        return z, logits

class MarginEmbeddingLoss(nn.Module):
    def __init__(self, margin=1.0):
        super().__init__()
        self.margin = margin

    def forward(self, z, y):
        dist = torch.cdist(z, z, p=2)

        y = y.unsqueeze(1)
        pos_mask = (y == y.T).float()
        neg_mask = (y != y.T).float()

        eye = torch.eye(len(z), device=z.device)
        pos_mask = pos_mask - eye

        pos_loss = pos_mask * dist.pow(2)
        neg_loss = neg_mask * F.relu(self.margin - dist).pow(2)

        loss = (pos_loss.sum() + neg_loss.sum()) / (pos_mask.sum() + neg_mask.sum() + 1e-8)
        return loss

class EarlyStopping:
    def __init__(self, patience=10):
        self.patience = patience
        self.best_acc = -1
        self.counter = 0
        self.best_state = None

    def step(self, acc, model):
        if acc > self.best_acc:
            self.best_acc = acc
            self.best_state = {k: v.cpu() for k, v in model.state_dict().items()}
            self.counter = 0
            return False
        else:
            self.counter += 1
            return self.counter >= self.patience

    def restore(self, model):
        model.load_state_dict(self.best_state)

def train_and_eval(model, train_loader, test_loader, margin_loss,
                   alpha=1.0, epochs=500, patience=10):

    ce_loss = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=5e-4)
    stopper = EarlyStopping(patience)

    for epoch in range(epochs):
        model.train()
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)

            z, logits = model(x)
            loss = ce_loss(logits, y) + alpha * margin_loss(z, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        model.eval()
        correct, total = 0, 0
        with torch.no_grad():
            for x, y in test_loader:
                x, y = x.to(device), y.to(device)
                _, logits = model(x)
                pred = logits.argmax(1)
                correct += (pred == y).sum().item()
                total += y.size(0)

        acc = correct / total
        print(f"Epoch {epoch+1:02d} | Test Acc: {acc:.4f}")

        if stopper.step(acc, model):
            print("Early stopping triggered")
            break

    stopper.restore(model)
    return stopper.best_acc

BASE = "models/Data/Data/62_classes/UserDependenet"
margin_loss = MarginEmbeddingLoss(margin=1.0)
all_accs = []

for split in range(1, 20):
    print(f"\n========== SPLIT {split} ==========")

    train_ds = EMGDataset(
        f"{BASE}/Train/X_train_{split}.npy",
        f"{BASE}/Train/y_train_{split}.npy"
    )
    test_ds = EMGDataset(
        f"{BASE}/Test/X_test_{split}.npy",
        f"{BASE}/Test/y_test_{split}.npy"
    )

    train_loader = DataLoader(train_ds, batch_size=128, shuffle=True)
    test_loader = DataLoader(test_ds, batch_size=128, shuffle=False)

    model = FullModel(emb_dim=128, num_classes=62).to(device)

    acc = train_and_eval(
        model,
        train_loader,
        test_loader,
        margin_loss,
        alpha=1.0,
        epochs=500,
        patience=10
    )

    print(f"Best Split Accuracy: {acc:.4f}")
    all_accs.append(acc)

    del model
    torch.cuda.empty_cache()

print("\nFINAL RESULT")
print("Average Accuracy (CNN+BiLSTM + CE + Margin):", np.mean(all_accs))