# Feature Extraction for EMG signals

## using LSTM --->  TRAINING + EVALUATION + EXTRACTION

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from models import LSTM_Emb_Classifier, EMG_Feature_Extractor
from utils.loaders import ActionNetEmgDataset
from torch.utils.data import DataLoader
from utils.logger import logger
from tqdm import tqdm
import pickle
import os
from utils.loaders import FeaturesExtendedEMGDataset
from models import MLP

### Training [ + EVALUATION ]

In [None]:
BATCH_SIZE = 32
LR = 0.008
MOMENTUM = 0.9
WEIGHT_DECAY = 1e-4
STEP_SIZE = 20
GAMMA = 0.1
NUM_EPOCHS = 50

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
if torch.backends.mps.is_available():
    DEVICE = 'mps'
    logger.info("------ USING APPLE SILICON GPU ------")
 

2024-06-09 17:01:29 LOG INFO ------ USING APPLE SILICON GPU ------


In [None]:
# Parametri del modello
input_dim = 16
hidden_dim = 128
embedding_dim = 64
output_dim = 20  # Definisci il numero di classi

# Inizializzazione del modello, della loss function e dell'ottimizzatore
model = LSTM_Emb_Classifier(input_dim=input_dim, hidden_dim=hidden_dim, embedding_dim=embedding_dim, num_class=output_dim)
model = model.to(DEVICE)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LR)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=STEP_SIZE, gamma=GAMMA)

train_dataset = ActionNetEmgDataset('train', 25, 5, True, './action-net', "./action-net/saved_emg", 2) # Inserisci il path al dataset di training
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True, drop_last=True) # Inserisci il dataloader per il training

val_dataset = ActionNetEmgDataset('test', 25, 5, True, './action-net', "./action-net/saved_emg", 2)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True, drop_last=True)

logger.info(f"Model: {model}")
logger.info(f"len train_dataset: {len(train_dataset)}")
logger.info(f"len train_loader: {len(train_loader)}")


2024-06-09 17:01:30 LOG INFO Model: LSTM_Emb_Classifier(
  (lstm): LSTM(16, 128, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc1): Linear(in_features=128, out_features=64, bias=True)
  (fc2): Linear(in_features=64, out_features=20, bias=True)
)
2024-06-09 17:01:30 LOG INFO len train_dataset: 1795
2024-06-09 17:01:30 LOG INFO len train_loader: 56


In [None]:
def evaluate(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for x, y in data_loader:
            x = x.reshape(BATCH_SIZE, 5, 25, -1)
            x = x.permute(1, 0, 2, 3)
            y = y.to(device)
            
            for i in range(5):
                x_t = x[i].float().to(device)
                outputs, embeddings = model(x_t)
                _, predicted = torch.max(outputs, 1)
                total += y.size(0)
                correct += (predicted == y).sum().item()

    accuracy = correct / total
    return accuracy


In [20]:
model.train()
for epoch in range(NUM_EPOCHS):
        model.train()
        epoch_loss = [0.0, 0]
        for i_val,(x, y) in tqdm(enumerate(train_loader)):
            x = x.reshape(BATCH_SIZE, 5, 25, -1)
            x = x.permute(1, 0, 2, 3)
            y = y.to(DEVICE)
            #logger.info(f"X: {x[0][0]}")
            # Category Loss
            #logger.info(f"X: {x.size()}")
            for i in range(5):
                x_t = x[i].float().to(DEVICE)
                outputs, embeddings = model(x_t)
                # Log details about the outputs
                #logger.info(f"Output type: {cls_o.logits.shape}")
                
                criterion = nn.CrossEntropyLoss()
                loss = criterion(outputs, y.long())

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                epoch_loss[0] += loss.item()
                epoch_loss[1] += x.size(0)

                if (i_val + 1) % (len(train_loader) // 5) == 0:
                    logger.info("[{}/{}]".format(i_val + 1, len(train_loader)))
            
        scheduler.step()
        logger.info(f'[EPOCH {epoch+1}] Avg. Loss: {epoch_loss[0] / epoch_loss[1]}')


        #save checkpoint in a file
        if (epoch+1) % 10 == 0:
            train_accuracy = evaluate(model, train_loader, DEVICE)
            val_accuracy = evaluate(model, val_loader, DEVICE)
            logger.info(f'[EPOCH {epoch+1}] Train Accuracy: {train_accuracy}')
            logger.info(f'[EPOCH {epoch+1}] Val Accuracy: {val_accuracy}')
            torch.save(model.state_dict(), f'./saved_models/LSTM_Emb_Classifier/final_LSTM_Emb_epoch_{epoch+1}.pth')
        if (epoch+1) % STEP_SIZE == 0:
            logger.info(f'Current LR: {scheduler.get_last_lr()}')

6it [00:00,  8.10it/s]2024-06-09 17:01:36 LOG INFO [11/56]
2024-06-09 17:01:36 LOG INFO [11/56]
2024-06-09 17:01:36 LOG INFO [11/56]
2024-06-09 17:01:36 LOG INFO [11/56]
2024-06-09 17:01:36 LOG INFO [11/56]
17it [00:01, 22.57it/s]2024-06-09 17:01:36 LOG INFO [22/56]
2024-06-09 17:01:36 LOG INFO [22/56]
2024-06-09 17:01:36 LOG INFO [22/56]
2024-06-09 17:01:36 LOG INFO [22/56]
2024-06-09 17:01:36 LOG INFO [22/56]
32it [00:01, 33.10it/s]2024-06-09 17:01:36 LOG INFO [33/56]
2024-06-09 17:01:36 LOG INFO [33/56]
2024-06-09 17:01:36 LOG INFO [33/56]
2024-06-09 17:01:36 LOG INFO [33/56]
2024-06-09 17:01:36 LOG INFO [33/56]
43it [00:01, 41.01it/s]2024-06-09 17:01:36 LOG INFO [44/56]
2024-06-09 17:01:36 LOG INFO [44/56]
2024-06-09 17:01:36 LOG INFO [44/56]
2024-06-09 17:01:36 LOG INFO [44/56]
2024-06-09 17:01:36 LOG INFO [44/56]
53it [00:01, 42.85it/s]2024-06-09 17:01:37 LOG INFO [55/56]
2024-06-09 17:01:37 LOG INFO [55/56]
2024-06-09 17:01:37 LOG INFO [55/56]
2024-06-09 17:01:37 LOG INFO [55/56

KeyboardInterrupt: 

### Extraction

In [34]:
train_dataset = ActionNetEmgDataset('train', 25, 5, True, './action-net', "./action-net/saved_emg", 2) # Inserisci il path al dataset di training
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=4, pin_memory=True, drop_last=True) # Inserisci il dataloader per il training

val_dataset = ActionNetEmgDataset('test', 25, 5, True, './action-net', "./action-net/saved_emg", 2)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=True, num_workers=4, pin_memory=True, drop_last=True)

model.load_state_dict(torch.load(f'./saved_models/LSTM_Emb_Classifier/final_LSTM_Emb_epoch_40.pth'))
model.eval()

embeddings_list = []
labels_list = []

with torch.no_grad():
    for i_val,(x, y) in tqdm(enumerate(val_loader)):
        x = x.reshape(1, 5, 25, -1)
        x = x.permute(1, 0, 2, 3)
        y = y.to(DEVICE)
        for i in range(5):
                x_t = x[i].float().to(DEVICE)
                sample = {}
                outputs, embeddings = model(x_t)
                embeddings_list.append(embeddings.cpu())
                labels_list.append(y.cpu())

# Combina le features e le labels in una lista di dizionari
data_to_save = [{'features': emb[0], 'labels': lbl} for emb, lbl in zip(embeddings_list, labels_list)]
# print size of the data
print(f"Data size: {len(data_to_save)}")
# Salva i dati in un file pickle
'''
features_file = "saved_features/EMG_Emb_LSTM_25_dense_D1_train.pkl"
with open(features_file, 'wb') as f:
    pickle.dump(data_to_save, f)
'''

199it [00:21,  9.08it/s] 

Data size: 995





'\nfeatures_file = "saved_features/EMG_Emb_LSTM_25_dense_D1_train.pkl"\nwith open(features_file, \'wb\') as f:\n    pickle.dump(data_to_save, f)\n'

In [35]:
sample = data_to_save[0]
print(f"Data size: {sample['features'].size()}")

features_file = "saved_features/EMG_Emb_LSTM_25_dense_D1_test.pkl"
with open(features_file, 'wb') as f:
    pickle.dump(data_to_save, f)

Data size: torch.Size([64])


## EGM Feature Extraction using Signal Properties such as Integrated EMG, Mean Squared Value, Variance, Root Mean Square, Kurtosis

In [None]:
from models import EMG_Feature_Extractor


train_dataset = ActionNetEmgDataset('test', 25, 5, True, './action-net', "./action-net/saved_emg", 2) # Inserisci il path al dataset di training
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=4, pin_memory=True, drop_last=True) # Inserisci il dataloader per il training

embeddings_list = []
labels_list = []

for i_val,(x, y) in tqdm(enumerate(train_loader)):
            x = x.reshape(1, 5, 25, -1)
            x = x.permute(1, 0, 2, 3)
            y = y.to(DEVICE)
            #logger.info(f"X: {x[0][0]}")
            # Category Loss
            #logger.info(f"X: {x.size()}")
            for i in range(5):
                sample = {}
                x_t = x[i].float().to(DEVICE)
                embeddings = EMG_Feature_Extractor(x_t[0])
                embeddings_list.append(embeddings.cpu())  # Mantenere gli embeddings sulla CPU per salvare
                labels_list.append(y.cpu())  # Mantenere le etichette sulla CPU per salvare

data_to_save = [{'features': emb, 'labels': lbl} for emb, lbl in zip(embeddings_list, labels_list)]

features_file = "saved_features/EMG_Emb_Stat_25_dense_D1_test.pkl"
with open(features_file, 'wb') as f:
    pickle.dump(data_to_save, f)


