In [1]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix, ConfusionMatrixDisplay

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

sensor_names = ['Acc_x', 'Acc_y', 'Acc_z', 'Gyr_x', 'Gyr_y', 'Gyr_z']
label_names = ["Resting", "Walking", "Running", "Driving"]
# Last row of training data for train/test split
train_end_index = 3511

In [2]:
# load in the given "real" data
# this loading of generated and real data are done seperately to ensure that distrubtions between train and test sets are even.

labels = np.loadtxt('labels_train_1.csv', dtype='int')
data_slice_0 = np.loadtxt(sensor_names[0] + '_train_1.csv',
                            delimiter=',')
data = np.empty((data_slice_0.shape[0], data_slice_0.shape[1],
                    len(sensor_names)))
data[:, :, 0] = data_slice_0
del data_slice_0
for sensor_index in range(1, len(sensor_names)):
    data[:, :, sensor_index] = np.loadtxt(
        sensor_names[sensor_index] + '_train_1.csv', delimiter=',')
    

acc_data = data[:,:, 0:3]
gyr_data = data[:,:, 3:6]

total_acc = np.linalg.norm(acc_data, axis=-1)
total_gyr = np.linalg.norm(gyr_data, axis=-1)


total_data = np.stack((total_acc, total_gyr), axis=-1)

given_train_data = total_data[:train_end_index+1, :, :]
given_train_labels = labels[:train_end_index+1]
given_test_data = total_data[train_end_index+1:, :, :]
given_test_labels = labels[train_end_index+1:]

In [3]:
# now combine given and generated sets

# train_data = np.concatenate((given_train_data, gen_train_data), axis=0)
# train_labels = np.concatenate((given_train_labels, gen_train_labels), axis=0)
# test_data = np.concatenate((given_test_data, gen_test_data), axis=0)
# test_labels = np.concatenate((given_test_labels, gen_test_labels), axis=0)

train_data = given_train_data
train_labels = given_train_labels
test_data = given_test_data
test_labels = given_test_labels


In [4]:
import torch
import torch.nn as nn

class CNN_RNN_Model(nn.Module):
    def __init__(self, num_classes=4, input_channels=2, feature_length=60, rnn_hidden_size=128, num_rnn_layers=2):
        super(CNN_RNN_Model, self).__init__()
        
        # CNN for feature extraction
        self.cnn = nn.Sequential(
            nn.Conv1d(in_channels=input_channels, out_channels=16, kernel_size=3, stride=1, padding=1),  # Conv layer
            nn.ReLU(),  # Activation
            nn.MaxPool1d(kernel_size=2, stride=2),  # Downsampling
            nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),  # Another Conv layer
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        
        # Compute the output size after CNN layers
        cnn_output_size = 32 * (feature_length // 4)  # Adjust this if CNN structure changes

        # RNN for temporal dependency learning
        self.rnn = nn.LSTM(input_size=cnn_output_size, 
                           hidden_size=rnn_hidden_size, 
                           num_layers=num_rnn_layers, 
                           batch_first=True)

        # Fully connected layer for classification
        self.fc = nn.Linear(rnn_hidden_size, num_classes)

    def forward(self, x):
        # Input shape: (batch_size, seq_length, input_channels, feature_length)
        batch_size, seq_length, channels, feature_length = x.size()

        # Merge batch and sequence dimensions for CNN processing
        x = x.view(-1, channels, feature_length)  # Shape: (batch_size * seq_length, channels, feature_length)
        
        # Pass through CNN
        x = self.cnn(x)  # Shape: (batch_size * seq_length, 32, feature_length // 4)
        x = x.view(batch_size, seq_length, -1)  # Reshape back: (batch_size, seq_length, cnn_output_size)

        # Pass through RNN
        rnn_out, _ = self.rnn(x)  # Shape: (batch_size, seq_length, rnn_hidden_size)

        # Use the final output of the RNN for classification
        final_output = rnn_out[:, -1, :]  # Shape: (batch_size, rnn_hidden_size)

        # Pass through the fully connected layer
        out = self.fc(final_output)  # Shape: (batch_size, num_classes)

        return out

# Model initialization
model = CNN_RNN_Model(num_classes=4, input_channels=2, feature_length=60)
print(model)

# Example input
# (batch_size=8, seq_length=10, input_channels=2, feature_length=60)
input_data = torch.rand(8, 10, 2, 60)
output = model(input_data)
print("Output shape:", output.shape)  # Should be (8, 4)


CNN_RNN_Model(
  (cnn): Sequential(
    (0): Conv1d(2, 16, kernel_size=(3,), stride=(1,), padding=(1,))
    (1): ReLU()
    (2): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv1d(16, 32, kernel_size=(3,), stride=(1,), padding=(1,))
    (4): ReLU()
    (5): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (rnn): LSTM(480, 128, num_layers=2, batch_first=True)
  (fc): Linear(in_features=128, out_features=4, bias=True)
)
Output shape: torch.Size([8, 4])


In [18]:
# create sequences of examples to learn the temporal relationship between them.

def create_data_sequences(features, labels, seq_length):
    # Generate sequences and corresponding labels
    sequences = []
    sequence_labels = []

    for i in range(len(features) - seq_length + 1):
        # Extract a sequence of `seq_length` examples
        sequence = features[i:i + seq_length]  # Shape: (seq_length, 60, 2)
        sequences.append(sequence.transpose(0, 2, 1))  # Reshape to (seq_length, 2, 60)
        
        # Use the label of the last example in the sequence as the sequence label
        sequence_labels.append(labels[i + seq_length - 1])

    # Convert to tensors
    sequences = torch.tensor(sequences, dtype=torch.float32)  # Shape: (num_sequences, seq_length, 2, 60)
    sequence_labels = torch.tensor(sequence_labels, dtype=torch.long)  # Shape: (num_sequences,)

    return sequences, sequence_labels

In [19]:
#one_hot_sequence_labels = F.one_hot(sequence_labels-1, num_classes=4).float()

sequences, sequence_labels = create_data_sequences(train_data, train_labels, seq_length=5)

dataset = TensorDataset(sequences, sequence_labels-1)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

model = CNN_RNN_Model(num_classes=4, input_channels=2, feature_length=60)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Training loop
num_epochs = 50
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#device = torch.device("cpu")
model.to(device)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch_sequences, batch_labels in dataloader:
        batch_sequences, batch_labels = batch_sequences.to(device), batch_labels.to(device)
        
        # Forward pass
        outputs = model(batch_sequences)
        predicted_classes = torch.argmax(outputs, dim=1)

        loss = criterion(outputs, batch_labels)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()

    # Print epoch loss
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss / len(dataloader):.4f}")

print("Training complete!")

Epoch [1/10], Loss: 0.9261
Epoch [2/10], Loss: 0.5655
Epoch [3/10], Loss: 0.4680
Epoch [4/10], Loss: 0.4611
Epoch [5/10], Loss: 0.4188
Epoch [6/10], Loss: 0.3903
Epoch [7/10], Loss: 0.4359
Epoch [8/10], Loss: 0.3821
Epoch [9/10], Loss: 0.3586
Epoch [10/10], Loss: 0.3792
Training complete!


In [39]:
test_data.shape

(1699, 60, 2)

In [41]:
test_sequences, test_sequence_labels = create_data_sequences(test_data, test_labels, seq_length=5)

model.eval()  # Set model to evaluation mode
with torch.no_grad():
    test_data_pt = torch.tensor(test_data, dtype=torch.float)

    test_sequences = test_sequences.to(device)
    
    test_outputs = model(test_sequences)

    predicted_classes = torch.argmax(test_outputs, dim=1)

tensor(1)

In [42]:
predicted_classes.unique()

tensor([0, 1, 2, 3], device='cuda:0')

In [49]:
#test_labels_pt = torch.tensor(test_labels - 1, dtype=torch.long)

# Compute micro and macro-averaged F1 scores
accuracy = accuracy_score(test_sequence_labels-1, predicted_classes.cpu())
micro_f1 = f1_score(test_sequence_labels-1, predicted_classes.cpu(), average='micro')
macro_f1 = f1_score(test_sequence_labels-1, predicted_classes.cpu(), average='macro')
print(f'Accuracy: {accuracy}')
print(f'Micro-averaged F1 score: {micro_f1}')
print(f'Macro-averaged F1 score: {macro_f1}')

Accuracy: 0.8106194690265487
Micro-averaged F1 score: 0.8106194690265487
Macro-averaged F1 score: 0.8584841858877553
