In [34]:
import os
import pandas as pd
import torch
from torch.utils.data import Dataset

class SensorDataset(Dataset):
    """
    Custom Dataset class for loading sensor data from CSV files.

    Args:
        data_dir (str): Path to the directory containing the CSV files.
        transform (callable, optional): Optional transform to apply to each sample.
    """
    def __init__(self, data_dir, transform=None):
        """
        Initialize the SensorDataset.
    
        Args:
            data_dir (str): Path to the directory containing the CSV files.
            transform (callable, optional): Optional transform to be applied to each sample.
        """
        self.data_dir = data_dir
        self.files = [f for f in os.listdir(data_dir) if f.endswith('.csv')]
        self.transform = transform

    def __len__(self):
        """Returns the number of files in the dataset."""
        return len(self.files)

    def __getitem__(self, idx):
        """
        Loads a single sample from the dataset.

        Args:
            idx (int): Index of the file to load.

        Returns:
            dict: A dictionary with the features tensor and the corresponding label tensor.
                  The keys are:
                  - 'features' (torch.Tensor): The feature data from the CSV file.
                  - 'label' (torch.Tensor): The label, where [1, 0] represents 'ADL' and [0, 1] represents 'Fall'.
        """
        file_path = os.path.join(self.data_dir, self.files[idx])
        data = pd.read_csv(file_path, delimiter=';')
        data.drop(data.columns[len(data.columns)-1], axis=1, inplace=True)

        X = data.values
        y = torch.Tensor([0, 1]) if '_Fall' in file_path else torch.Tensor([1, 0]) # 0 ADL, 1 Fall
        
        sample = {
            'features': torch.tensor(X, dtype=torch.float32).clone().detach(), 
            'label': y.clone().detach().int()
        }
        if self.transform:
            sample = self.transform(sample)

        return sample

In [21]:
# BLOCCO DA RIMUOVERE (?!?!?!)

from torch import nn

class LSTMClassifier(nn.Module):
    """LSTM-based time-series classifier."""
    
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim
        self.rnn = nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.batch_size = None
        self.hidden = None
    
    def forward(self, x):
        h0, c0 = self.init_hidden(x)
        out, (hn, cn) = self.rnn(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out
    
    def init_hidden(self, x):
        h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim)
        c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim)
        return [t.cuda() for t in (h0, c0)]

In [35]:
train = SensorDataset(data_dir="../../data/processed/train", transform=None)
valid = SensorDataset(data_dir="../../data/processed/valid", transform=None)

In [36]:
from torch.utils.data import DataLoader

train_loader = DataLoader(train, batch_size=128, shuffle=True)
valid_loader =  DataLoader(valid, batch_size=128, shuffle=True)

In [None]:
import torch
import torch.nn as nn
import torch.optim
from torch.utils.data import DataLoader

class LSTMClassifier(nn.Module):
    """
    LSTM model for classification tasks.

    Args:
        input_dim (int): The number of input features.
        hidden_dim (int): The number of features in the hidden state of the LSTM.
        layer_dim (int): The number of recurrent layers in the LSTM.
        output_dim (int): The number of output classes.
    """
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
        super(LSTMClassifier, self).__init__()
        self.layer_dim = layer_dim
        self.hidden_dim = hidden_dim
        self.lstm = nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        """
        Defines the forward pass of the LSTM model.

        Args:
            x (torch.Tensor): The input tensor of shape (batch_size, seq_dim, input_dim).

        Returns:
            torch.Tensor: The output tensor of shape (batch_size, output_dim).
        """
        h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(x.device)
        c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

input_dim = 6    
hidden_dim = 256
layer_dim = 3
output_dim = 2
seq_dim = 128

lr = 0.0005
n_epochs = 1000
best_acc = 0
patience, trials = 100, 0

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = LSTMClassifier(input_dim, hidden_dim, layer_dim, output_dim).to(device)
criterion = nn.CrossEntropyLoss()
opt = torch.optim.RMSprop(model.parameters(), lr=lr)

print('Start model training')

for epoch in range(1, n_epochs + 1):
    model.train()
    for i, batch in enumerate(train_loader):
        x_batch = batch["features"].to(device)
        y_batch = batch["label"].to(device)

        opt.zero_grad()
        out = model(x_batch)
        y_batch = y_batch.type(torch.FloatTensor).to(device)
        loss = criterion(out, y_batch)
        loss.backward()
        opt.step()
    
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for batch in valid_loader:
            x_val = batch["features"].to(device)
            y_val = batch["label"].to(device)
            out = model(x_val)
            preds = torch.argmax(out, dim=1)
            y_val_indices = torch.argmax(y_val, dim=1)  
            total += y_val_indices.size(0)
            correct += (preds == y_val_indices).sum().item()
    
    acc = correct / total

    if epoch % 5 == 0:
        print(f'Epoch: {epoch:3d}. Loss: {loss.item():.4f}. Acc.: {acc:2.2%}')

    if acc > best_acc:
        trials = 0
        best_acc = acc
        torch.save(model.state_dict(), 'best.pth')
        print(f'Epoch {epoch} best model saved with accuracy: {best_acc:2.2%}')
    else:
        trials += 1
        if trials >= patience:
            print(f'Early stopping on epoch {epoch}')
            break