In [1]:
import sys, copy, torch, argparse
import numpy as np
import torch.nn.functional as F
from torch import nn
from tqdm.notebook import tqdm
from torch.utils.data import DataLoader

from sEMG_transformer_cVAE_loo import load_raw_signals, sEMGSignalDataset, sEMGtransformerVAE, count_correct

In [2]:
parser = argparse.ArgumentParser(description="sEMG transformer training configurations")
# experiment config
parser.add_argument('--sub_idx', type=int, default=0, help="subject index")
# training config
parser.add_argument('--seed', type=int, default=0, help="random seed")
parser.add_argument('--epochs', type=int, default=1, help="number of epochs")
parser.add_argument('--bsz', type=int, default=64, help="batch size")
# optimizer config
parser.add_argument('--lr', type=float, default=0.001, help="learning rate")
parser.add_argument('--wd', type=float, default=0.001, help="weight decay")
parser.add_argument('--step_size', type=int, default=500, help="lr scheduler step size")
parser.add_argument('--gamma', type=float, default=0.8, help="lr scheduler gamma")
# model config
parser.add_argument('--psz', type=int, default=64, help="signal patch size")
parser.add_argument('--d_model', type=int, default=256, help="transformer embedding dim")
parser.add_argument('--nhead', type=int, default=8, help="transformer number of attention heads")
parser.add_argument('--dim_feedforward', type=int, default=1024, help="transformer feed-forward dim")
parser.add_argument('--num_layers', type=int, default=3, help="number of transformer encoder layers")
parser.add_argument('--dropout', type=float, default=0.3, help="dropout rate")

sys.argv = ['f']
config = parser.parse_args()

In [3]:
signals, labels, vfi_1, sub_id, sub_skinfold = load_raw_signals("../data/subjects_40_v6.mat")
np.random.seed(config.seed)
torch.manual_seed(config.seed)

<torch._C.Generator at 0x7f793845ce50>

In [4]:
sub_test = config.sub_idx
print(f"Subject R{sub_id[config.sub_idx][0][0][0]}")

X, Y, C = [], [], []
for i in range(40):
  # stack all inputs into [N,C,L] format
  x = np.stack(signals[i], axis=1)

  # one-hot encode the binary labels
  N = labels[i][0].shape[0]
  mapped_indices = (labels[i][0] == 1).astype(int)
  y_onehot = np.zeros((N, 2))
  y_onehot[np.arange(N), mapped_indices.flatten()] = 1

  X.append(x)
  Y.append(y_onehot)
  C.append(sub_skinfold[i][0].mean(axis=1))

Subject R44


In [5]:
# normalize the signals channel-wise
X_means = np.mean(np.concatenate(X, axis=0), axis=(0,2))
X_stds = np.std(np.concatenate(X, axis=0), axis=(0,2))
for i in range(40):
  X[i] = (X[i] - X_means[np.newaxis,:,np.newaxis]) / X_stds[np.newaxis,:,np.newaxis]
print(f"X {np.concatenate(X, axis=0).shape}")

# leave-one-out split
X_test, Y_test, C_test = X[sub_test], Y[sub_test], C[sub_test]
X, Y, C = X[:sub_test] + X[sub_test+1:], Y[:sub_test] + Y[sub_test+1:], C[:sub_test] + C[sub_test+1:]
X, Y, C = np.concatenate(X, axis=0), np.concatenate(Y, axis=0), np.concatenate(C, axis=0)

num_samples = X.shape[0]
indices = np.arange(num_samples)
np.random.shuffle(indices)
split_idx = int(num_samples*0.9)
train_idx, valid_idx = indices[:split_idx], indices[split_idx:]

X_train, X_valid = X[train_idx], X[valid_idx]
Y_train, Y_valid = Y[train_idx], Y[valid_idx]
C_train, C_valid = C[train_idx], C[valid_idx]
print(f"X_train {X_train.shape}")
print(f"X_valid {X_valid.shape}")
print(f"X_test {X_test.shape}")

X (6472, 4, 4000)
X_train (5676, 4, 4000)
X_valid (631, 4, 4000)
X_test (165, 4, 4000)


In [8]:
print(C.shape)
print(C.max())
print(C.min())

(6307,)
15.85
3.35


In [None]:
dataset_train = sEMGSignalDataset(X_train, Y_train)
dataset_valid = sEMGSignalDataset(X_valid, Y_valid)
dataset_test  = sEMGSignalDataset(X_test, Y_test)

dataloader_train = DataLoader(dataset_train, batch_size=config.bsz, shuffle=True)
dataloader_valid = DataLoader(dataset_valid, batch_size=config.bsz, shuffle=False)
dataloader_test  = DataLoader(dataset_test,  batch_size=config.bsz, shuffle=False)

model = sEMGtransformerVAE(patch_size=config.psz, d_model=config.d_model, nhead=config.nhead, dim_feedforward=config.dim_feedforward,
                           dropout=config.dropout, num_layers=config.num_layers)
model.to("cuda")

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=config.lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=config.step_size, gamma=config.gamma)
scaler = torch.cuda.amp.GradScaler()

In [None]:

accuracy_valid_best = 0
accuracy_test_best = 0
model_best = None
for epoch in tqdm(range(500), desc="Training"):
  loss_train = 0
  correct_train = 0
  model.train()
  for batch, (inputs, targets) in enumerate(dataloader_train):
    inputs, targets = inputs.to("cuda"), targets.to("cuda")

    # make sure inputs are divisiable by the patch_size
    # convert from raw signals to signal patches
    inputs = inputs[:, :, :(inputs.shape[2]//config.psz)*config.psz]
 
    optimizer.zero_grad()
    with torch.autocast(device_type="cuda", dtype=torch.float16):
      outputs, mu, logvar = model(inputs)
      #print(f"mu {mu.shape}")
      #print(f"logvar {logvar.shape}")
      #print(f"inputs {inputs.flatten(1).shape}")
      #print(f"outputs {outputs.flatten(1).shape}")
      loss_mse = F.mse_loss(inputs.flatten(1), outputs.flatten(1), reduction='sum')
      # print(loss_mse)
      loss_kl = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
      # print(loss_kl)
      loss = loss_mse + loss_kl

    scaler.scale(loss).backward()
    scaler.step(optimizer)
    scaler.update()

    # correct_train += count_correct(outputs, targets)
    loss_train += loss.item()

  # loss_valid = 0
  # correct_valid = 0
  # model.eval()
  # for inputs, targets in dataloader_valid:
  #   inputs, targets = inputs.to("cuda"), targets.to("cuda")
  #   outputs = model(inputs)
  #   loss = criterion(outputs, targets)
  #   correct_valid += count_correct(outputs, targets)
  #   loss_valid += loss.item()

  # if correct_valid/len(dataset_valid) > accuracy_valid_best: 
  #   accuracy_valid_best = correct_valid/len(dataset_valid)
  #   print(f"accuracy_valid_best: {accuracy_valid_best}")
  #   model_best = copy.deepcopy(model)
  #   correct_test = 0
  #   for inputs, targets in dataloader_test:
  #     inputs, targets = inputs.to("cuda"), targets.to("cuda")
  #     outputs = model(inputs)
  #     correct_test += count_correct(outputs, targets)
  #   accuracy_test_best = correct_test/len(dataset_test)
  #   print(f"accuracy_test_best: {accuracy_test_best}")

  scheduler.step()


In [None]:
C_train = C[train_idx]
Y_train_cpt = np.argmax(Y_train, axis=1)
Y_pred = []
dataloader_train = DataLoader(dataset_train, batch_size=config.bsz, shuffle=False)
for inputs, targets in dataloader_train:
  inputs, targets = inputs.to("cuda"), targets.to("cuda")
  outputs = model_best(inputs)
  _, predicted = torch.max(F.softmax(outputs, dim=1), 1)
  Y_pred.append(predicted.cpu().numpy())
Y_pred_cpt = np.concatenate(Y_pred, axis=0)

In [None]:
from sklearn.metrics import accuracy_score

accuracy = accuracy_score(Y_train_cpt, Y_pred_cpt)
print(f"Accuracy: {accuracy:.4f}")

In [None]:
from mlconfound.stats import partial_confound_test

ret = partial_confound_test(Y_train_cpt, Y_pred_cpt, C_train, cat_y=True, cat_yhat=True, cat_c=False)
print(ret.p)

In [None]:
import torch
import torch.nn as nn

class TransformerVAE(nn.Module):
    def __init__(self, d_model, latent_dim):
        super(TransformerVAE, self).__init__()
        self.encoder = nn.TransformerEncoderLayer(d_model=d_model, nhead=8)
        self.decoder = nn.TransformerDecoderLayer(d_model=d_model, nhead=8)
        
        # Linear layers for mean and log-variance
        self.fc_mu = nn.Linear(d_model, latent_dim)
        self.fc_logvar = nn.Linear(d_model, latent_dim)
        
        # Linear layer to map latent space back to d_model
        self.fc_decode = nn.Linear(latent_dim, d_model)
    
    def encode(self, x):
        h = self.encoder(x)
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar
    
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + std * eps
    
    def decode(self, z):
        z = self.fc_decode(z)
        return self.decoder(z)
    
    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

# Example usage
d_model = 512
latent_dim = 256
model = TransformerVAE(d_model, latent_dim)
x = torch.randn(10, 63, d_model)  # Example input tensor
reconstructed_x, mu, logvar = model(x)