In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.preprocessing import StandardScaler
from torch.utils.data import Dataset, DataLoader


In [2]:
df=pd.read_csv(r'filtered_data.csv')

In [3]:
# Assuming your DataFrame `df` is already loaded
sensor_columns = ['chest_ACC_x', 'chest_ACC_y', 'chest_ACC_z', 'chest_ECG', 'chest_EMG', 'chest_EDA', 'chest_Temp', 'chest_Resp']

# 1. Normalize sensor data
scaler = StandardScaler()
df[sensor_columns] = scaler.fit_transform(df[sensor_columns])

# 2. Prepare data into sequences
sequence_length = 100  # Set this based on your dataset
class WESADDataset(Dataset):
    def __init__(self, df, sequence_length):
        self.labels = df['label'].values
        self.data = df[sensor_columns].values
        self.sequence_length = sequence_length

    def __len__(self):
        return len(self.data) - self.sequence_length

    def __getitem__(self, idx):
        x = self.data[idx:idx + self.sequence_length]
        y = self.labels[idx + self.sequence_length - 1]
        return x, y

dataset = WESADDataset(df, sequence_length)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)


In [5]:
dataloader

<torch.utils.data.dataloader.DataLoader at 0x218ca64ae90>

In [7]:
import torch
import torch.nn as nn
import torch.optim as optim

class Autoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dim=64, encoding_dim=32):
        super(Autoencoder, self).__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, encoding_dim),
            nn.ReLU()
        )
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(encoding_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded, decoded

# Initialize Autoencoder
input_dim = sequence_length * len(sensor_columns)
autoencoder = Autoencoder(input_dim=input_dim)

# Train the autoencoder
def train_autoencoder(autoencoder, dataloader, epochs=10):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(autoencoder.parameters(), lr=0.001)

    for epoch in range(epochs):
        for sequences, _ in dataloader:
            sequences = sequences.view(sequences.size(0), -1).float()  # Flatten the sequences
            encoded, decoded = autoencoder(sequences)
            loss = criterion(decoded, sequences)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        print(f'Epoch {epoch + 1}/{epochs}, Loss: {loss.item()}')

train_autoencoder(autoencoder, dataloader)


Epoch 1/10, Loss: 0.11941670626401901
Epoch 2/10, Loss: 0.11615324765443802
Epoch 3/10, Loss: 0.07671599090099335
Epoch 4/10, Loss: 0.10358940064907074
Epoch 5/10, Loss: 0.11077097803354263
Epoch 6/10, Loss: 0.11354660242795944
Epoch 7/10, Loss: 0.12414565682411194
Epoch 8/10, Loss: 0.11730360984802246
Epoch 9/10, Loss: 0.10712585598230362
Epoch 10/10, Loss: 0.09497424215078354


In [8]:
compressed_features = []
labels = []
with torch.no_grad():
    for sequences, label in dataloader:
        sequences = sequences.view(sequences.size(0), -1).float()
        encoded, _ = autoencoder(sequences)
        compressed_features.append(encoded)
        labels.append(label)

compressed_features = torch.cat(compressed_features)
labels = torch.cat(labels)


In [9]:
# View the shape of compressed features
print("Shape of compressed features:", compressed_features.shape)

# View the first few rows of compressed features
print("Sample compressed features:\n", compressed_features[:5])  # Display first 5 sequences

# View the shape and contents of the corresponding labels
print("Shape of labels:", labels.shape)
print("Sample labels:", labels[:5])


Shape of compressed features: torch.Size([5128256, 32])
Sample compressed features:
 tensor([[1.4854, 4.5527, 4.1314, 2.7329, 2.5833, 1.5376, 0.0000, 1.2473, 3.4496,
         0.8661, 0.0000, 1.4364, 0.9917, 1.9666, 1.7550, 1.4973, 0.5905, 0.0000,
         1.8036, 4.7737, 0.7741, 0.0000, 3.1547, 0.0000, 7.2852, 1.8069, 1.1083,
         1.7209, 7.5542, 5.7626, 0.0000, 0.0000],
        [2.2259, 0.3490, 2.4058, 1.9578, 4.4107, 0.1030, 0.0000, 3.2466, 3.2064,
         6.0168, 0.0000, 0.8798, 1.6959, 5.5112, 1.9753, 1.2332, 1.2592, 2.0299,
         1.6116, 3.9006, 1.9024, 0.0000, 5.5694, 0.0000, 4.7534, 1.5819, 0.6230,
         1.6578, 0.5552, 4.8985, 0.0000, 0.0000],
        [1.2717, 3.9236, 3.8350, 1.9377, 1.4747, 1.1834, 0.0000, 2.3352, 3.6959,
         3.4154, 0.0000, 2.0893, 1.5358, 2.6810, 4.3121, 0.7366, 1.4146, 1.3679,
         0.8295, 4.1262, 0.9337, 0.0000, 4.6844, 0.0000, 2.6083, 3.8802, 1.1624,
         1.5310, 1.0191, 1.5394, 0.0000, 0.0000],
        [1.1415, 2.2642, 2.6331, 4.8

In [10]:
df.shape

(5128356, 10)

In [11]:
import torch

# Reshape data into sequences
sequence_length = 100  # Adjust based on your choice
num_sequences = len(df) // sequence_length  # Integer division to get full sequences

# Reshape and truncate to ensure all data fits into sequences
reshaped_data = df[sensor_columns].values[:num_sequences * sequence_length].reshape(num_sequences, sequence_length, -1)

# Verify reshaped dimensions
print("Reshaped data shape:", reshaped_data.shape)


Reshaped data shape: (51283, 100, 8)


In [15]:
class SequenceAutoencoder(nn.Module):
    def __init__(self, input_dim, sequence_length, hidden_dim=64, encoding_dim=32):
        super(SequenceAutoencoder, self).__init__()
        # Encoder with Conv1d to handle sequence data
        self.encoder = nn.Sequential(
            nn.Conv1d(input_dim, hidden_dim, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv1d(hidden_dim, encoding_dim, kernel_size=3, stride=1, padding=1),
            nn.ReLU()
        )
        # Decoder with ConvTranspose1d to reconstruct the sequence
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(encoding_dim, hidden_dim, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.ConvTranspose1d(hidden_dim, input_dim, kernel_size=3, stride=1, padding=1),
            nn.Sigmoid()  # Use sigmoid to bound outputs between 0 and 1
        )

    def forward(self, x):
        # x shape: (batch_size, sequence_length, input_dim)
        x = x.permute(0, 2, 1)  # Convert to (batch_size, input_dim, sequence_length) for Conv1d
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        decoded = decoded.permute(0, 2, 1)  # Convert back to (batch_size, sequence_length, input_dim)
        return encoded, decoded

# Initialize the updated autoencoder with sequence handling
input_dim = len(sensor_columns)
sequence_length = 100  # Adjust to the desired length
autoencoder = SequenceAutoencoder(input_dim=input_dim, sequence_length=sequence_length)


In [16]:
from torch.utils.data import DataLoader, TensorDataset

# Create DataLoader for sequence data
# Assuming reshaped_data is already structured as (num_sequences, sequence_length, input_dim)
sequence_data = torch.tensor(reshaped_data, dtype=torch.float32)
sequence_labels = torch.tensor(df['label'].values[:num_sequences * sequence_length].reshape(num_sequences, sequence_length)[:, -1])

sequence_dataset = TensorDataset(sequence_data, sequence_labels)
sequence_loader = DataLoader(sequence_dataset, batch_size=32, shuffle=True)



In [17]:
def train_autoencoder(autoencoder, dataloader, epochs=10):
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(autoencoder.parameters(), lr=0.001)

    for epoch in range(epochs):
        for sequences, _ in dataloader:
            encoded, decoded = autoencoder(sequences)
            loss = criterion(decoded, sequences)  # MSE loss will now work correctly
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        print(f'Epoch {epoch + 1}/{epochs}, Loss: {loss.item()}')

train_autoencoder(autoencoder, sequence_loader)


Epoch 1/10, Loss: 0.49658674001693726
Epoch 2/10, Loss: 0.421083927154541
Epoch 3/10, Loss: 0.5324689149856567
Epoch 4/10, Loss: 0.485059529542923
Epoch 5/10, Loss: 0.43303829431533813
Epoch 6/10, Loss: 0.7635204195976257
Epoch 7/10, Loss: 0.4499130845069885
Epoch 8/10, Loss: 0.6334110498428345
Epoch 9/10, Loss: 0.4530365765094757
Epoch 10/10, Loss: 0.531245768070221


In [18]:
compressed_features = []
labels = []

# Ensure the model is in evaluation mode for inference
autoencoder.eval()

with torch.no_grad():
    for sequences, label in sequence_loader:
        # Extract the compressed (encoded) features
        encoded, _ = autoencoder(sequences)
        compressed_features.append(encoded)
        labels.append(label)

# Concatenate all batches to form the complete dataset
compressed_features = torch.cat(compressed_features)
labels = torch.cat(labels)

# View the shape of compressed features
print("Shape of compressed features:", compressed_features.shape)  # Expected: (num_sequences, encoding_dim, sequence_length)
print("Shape of labels:", labels.shape)


Shape of compressed features: torch.Size([51283, 32, 100])
Shape of labels: torch.Size([51283])


In [21]:
# Permute compressed_features if necessary
compressed_features = compressed_features.permute(0, 2, 1)  # Change to [num_sequences, sequence_length, encoding_dim]


In [22]:
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, n_layers=2):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        h, _ = self.lstm(x)
        out = self.fc(h[:, -1, :])  # Take the last time step output
        return out

# Initialize LSTM model
input_dim = compressed_features.size(2)  # encoding_dim
hidden_dim = 64
output_dim = len(df['label'].unique())  # Number of classes for classification
lstm_model = LSTMModel(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim)


In [23]:
import torch.optim as optim

def train_model(model, features, labels, epochs=10):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(epochs):
        total_loss = 0
        for i in range(len(features)):
            x = features[i].unsqueeze(0)  # Model expects batch input
            y = labels[i].unsqueeze(0)

            optimizer.zero_grad()
            output = model(x)
            loss = criterion(output, y)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f'Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(features):.4f}')

# Train each model separately
train_model(lstm_model, compressed_features, labels)

Epoch 1/10, Loss: 0.3444
Epoch 2/10, Loss: 0.2163
Epoch 3/10, Loss: 0.1732
Epoch 4/10, Loss: 0.1560
Epoch 5/10, Loss: 0.1483
Epoch 6/10, Loss: 0.1377
Epoch 7/10, Loss: 0.1361
Epoch 8/10, Loss: 0.1384
Epoch 9/10, Loss: 0.1376
Epoch 10/10, Loss: 0.1399


In [24]:
import torch

def evaluate_model(model, features, labels):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = len(labels)

    with torch.no_grad():  # Disable gradient computation for evaluation
        for i in range(len(features)):
            x = features[i].unsqueeze(0)  # Add batch dimension
            y = labels[i].unsqueeze(0)
            
            # Forward pass
            output = model(x)
            _, predicted = torch.max(output, 1)  # Get the index of the highest logit
            
            # Check if prediction is correct
            correct += (predicted == y).sum().item()

    accuracy = (correct / total) * 100
    print(f'Accuracy: {accuracy:.2f}%')

# Evaluate accuracy of the trained model
evaluate_model(lstm_model, compressed_features, labels)


Accuracy: 95.82%


In [25]:
class AttentionLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, n_layers=2):
        super(AttentionLSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, n_layers, batch_first=True)
        self.attention = nn.Linear(hidden_dim, 1)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        h, _ = self.lstm(x)
        attention_weights = torch.softmax(self.attention(h), dim=1)
        context = torch.sum(attention_weights * h, dim=1)  # Weighted sum over time steps
        out = self.fc(context)
        return out

# Initialize Attention model
attention_model = AttentionLSTM(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim)


In [26]:
import torch.optim as optim

def train_model(model, features, labels, epochs=10):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(epochs):
        total_loss = 0
        for i in range(len(features)):
            x = features[i].unsqueeze(0)  # Model expects batch input
            y = labels[i].unsqueeze(0)

            optimizer.zero_grad()
            output = model(x)
            loss = criterion(output, y)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f'Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(features):.4f}')


train_model(attention_model, compressed_features, labels)


Epoch 1/10, Loss: 0.2894
Epoch 2/10, Loss: 0.1644
Epoch 3/10, Loss: 0.1387
Epoch 4/10, Loss: 0.1251
Epoch 5/10, Loss: 0.1162
Epoch 6/10, Loss: 0.1128
Epoch 7/10, Loss: 0.1123
Epoch 8/10, Loss: 0.1118
Epoch 9/10, Loss: 0.1329
Epoch 10/10, Loss: 0.1517


In [27]:

evaluate_model(attention_model, compressed_features, labels)


Accuracy: 94.87%


In [28]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class AdvancedAttentionLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, n_layers=2, n_heads=4, attention_dim=64):
        super(AdvancedAttentionLSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, n_layers, batch_first=True, bidirectional=True)
        self.multihead_attention = nn.MultiheadAttention(embed_dim=2*hidden_dim, num_heads=n_heads, batch_first=True)
        self.fc1 = nn.Linear(2 * hidden_dim, attention_dim)  # Intermediate fully connected layer
        self.fc2 = nn.Linear(attention_dim, output_dim)      # Output layer

    def forward(self, x):
        # LSTM layer
        lstm_out, _ = self.lstm(x)  # lstm_out shape: [batch_size, sequence_length, 2*hidden_dim]
        
        # Multi-Head Attention
        attention_out, _ = self.multihead_attention(lstm_out, lstm_out, lstm_out)  # Self-attention on LSTM output
        
        # Aggregate across time steps by mean pooling or weighted attention pooling
        pooled_out = torch.mean(attention_out, dim=1)  # Mean pooling across time
        
        # Fully connected layers
        fc1_out = F.relu(self.fc1(pooled_out))         # Apply non-linearity after first FC layer
        out = self.fc2(fc1_out)                        # Final output layer for classification

        return out

# Initialize the advanced attention model
input_dim = compressed_features.size(2)  # Should match the encoding_dim from the autoencoder
hidden_dim = 64
output_dim = len(df['label'].unique())  # Number of classes
attention_dim = 64
n_heads = 4

attention_model = AdvancedAttentionLSTM(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim, n_heads=n_heads, attention_dim=attention_dim)

In [29]:
# Train the advanced attention model
train_model(attention_model, compressed_features, labels)


Epoch 1/10, Loss: 0.3130
Epoch 2/10, Loss: 0.1966
Epoch 3/10, Loss: 0.1780
Epoch 4/10, Loss: 0.1687
Epoch 5/10, Loss: 0.1642
Epoch 6/10, Loss: 0.1611
Epoch 7/10, Loss: 0.1632
Epoch 8/10, Loss: 0.1590
Epoch 9/10, Loss: 0.1609
Epoch 10/10, Loss: 0.1625


In [30]:
evaluate_model(attention_model, compressed_features, labels)


Accuracy: 96.35%


In [38]:
from sklearn.model_selection import train_test_split

# Assume `compressed_features` and `labels` are tensors from the autoencoder's output
# Convert them to numpy for easier manipulation with train_test_split
compressed_features_np = compressed_features.numpy()
labels_np = labels.numpy()

# Split data into training and testing sets (80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(
    compressed_features_np, labels_np, test_size=0.2, random_state=42
)

# Check the shape of each set
print("Training features shape:", X_train.shape)
print("Testing features shape:", X_test.shape)
print("Training labels shape:", y_train.shape)
print("Testing labels shape:", y_test.shape)


Training features shape: (41026, 100, 32)
Testing features shape: (10257, 100, 32)
Training labels shape: (41026,)
Testing labels shape: (10257,)


In [39]:
# Convert back to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)


In [43]:
# Create PyTorch datasets and dataloaders
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Check shapes to confirm
print("Training set shape:", X_train.shape, y_train.shape)
print("Testing set shape:", X_test.shape, y_test.shape)

Training set shape: torch.Size([41026, 100, 32]) torch.Size([41026])
Testing set shape: torch.Size([10257, 100, 32]) torch.Size([10257])


In [47]:
class TransformerModel(nn.Module):
    def __init__(self, input_size, num_heads, hidden_size, output_size, num_layers=2):
        super(TransformerModel, self).__init__()
        
        self.input_fc = nn.Linear(input_size, hidden_size)  # Project input to hidden size
        self.transformer = Transformer(d_model=hidden_size, nhead=num_heads, num_encoder_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)  # Final output layer

    def forward(self, x):
        # x: [batch_size, seq_len, input_size]
        x = self.input_fc(x)  # Map to [batch_size, seq_len, hidden_size]
        x = self.transformer(x, x)  # Pass through Transformer
        
        # Use the last output of the sequence
        x = x[:, -1, :]  # Take the last output for classification: [batch_size, hidden_size]
        
        return self.fc(x)  # [batch_size, output_size]


In [48]:
import torch.optim as optim

def train_model(model, train_loader, test_loader, num_epochs=5, learning_rate=0.001):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.float(), labels.long()
            
            optimizer.zero_grad()
            outputs = model(inputs)

            # Calculate loss and backpropagate
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        train_accuracy = 100 * correct / total
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Training Accuracy: {train_accuracy:.2f}%")
        
        # Evaluation phase
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.float(), labels.long()
                outputs = model(inputs)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        test_accuracy = 100 * correct / total
        print(f"Test Accuracy after epoch {epoch+1}: {test_accuracy:.2f}%\n")


In [50]:
# Model parameters
input_size = compressed_features.size(2)  # Number of input features (e.g., 8 from compressed output)
num_heads = 4        # Number of attention heads
hidden_size = 64     # Hidden size for embedding and transformer layers
output_size = len(torch.unique(labels))  # Number of output classes (e.g., 5 for the WESAD dataset)
num_layers = 2       # Number of transformer layers

# Initialize the model
model = TransformerModel(input_size, num_heads, hidden_size, output_size, num_layers)

# Train and evaluate the model
train_model(model, train_loader, test_loader, num_epochs=5, learning_rate=0.001)


Epoch [1/5], Loss: 0.2934, Training Accuracy: 89.45%
Test Accuracy after epoch 1: 89.54%

Epoch [2/5], Loss: 0.2134, Training Accuracy: 92.50%
Test Accuracy after epoch 2: 95.14%

Epoch [3/5], Loss: 0.1849, Training Accuracy: 93.60%
Test Accuracy after epoch 3: 93.81%

Epoch [4/5], Loss: 0.1668, Training Accuracy: 94.29%
Test Accuracy after epoch 4: 92.65%

Epoch [5/5], Loss: 0.1584, Training Accuracy: 94.71%
Test Accuracy after epoch 5: 95.37%

