# Imports

In [71]:
import os
import torch
from torch.utils.data import Dataset
from transformers import BertTokenizer, BertModel
import pandas as pd
import numpy as np
import torch.nn as nn
from torch.utils.data import random_split
from sklearn.metrics import confusion_matrix, classification_report
from tqdm import tqdm

# Loader

In [72]:
CLASSES = ["wearing", "packaging", "drinking", "passing"]

class IMUTextDataset(Dataset):
    def __init__(self, root_dir, tokenizer, bert_model):
        self.root_dir = root_dir
        self.tokenizer = tokenizer
        self.bert_model = bert_model
        self.samples = self._prepare_dataset()

    def _prepare_dataset(self):
        samples = []
        for dir_name in os.listdir(self.root_dir):
            label = dir_name.split('_')[0]
            full_path = os.path.join(self.root_dir, dir_name)
            if os.path.isdir(full_path):
                samples.append((full_path, label))
        return samples

    def _aggregate_samples(self, data, window_size=10):
        selected_data = data.iloc[:, :6]   
        while len(selected_data) < window_size:
            selected_data = selected_data.append(pd.Series(), ignore_index=True)
        aggregated_data = selected_data.groupby(np.arange(len(selected_data)) // window_size).mean()
        return aggregated_data

    def _get_text_embedding(self, text):
        inputs = self.tokenizer(text, return_tensors='pt', padding=True, truncation=True, max_length=128)
        with torch.no_grad():
            outputs = self.bert_model(**inputs)
        return outputs.last_hidden_state.mean(dim=1)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        path, label = self.samples[idx]
        imu_data = pd.read_csv(os.path.join(path, 'out.csv'))
        imu_aggregated = self._aggregate_samples(imu_data)

        text_embeddings = []
        for txt_file in sorted(os.listdir(path)):
            if txt_file.endswith('.txt'):
                try:
                    with open(os.path.join(path, txt_file), 'r') as file:
                        text_data = file.read()
                        text_embedding = self._get_text_embedding(text_data)
                        text_embeddings.append(text_embedding)
                except FileNotFoundError:
                    text_embeddings.append(torch.zeros(768))  # Handle missing text file

        # Truncate or pad text_embeddings to match imu_aggregated length
        text_embeddings = text_embeddings[:len(imu_aggregated)]
        while len(text_embeddings) < len(imu_aggregated):
            text_embeddings.append(torch.zeros(768))

        # Concatenate imu data and text embeddings
        combined_data = []
        for imu, text in zip(imu_aggregated.values, text_embeddings):
            imu_tensor = torch.tensor(imu, dtype=torch.float32)
            if imu_tensor.ndim == 1:
                imu_tensor = imu_tensor.unsqueeze(0)
            if text.ndim == 1:
                text = text.unsqueeze(0)
            combined = torch.cat((imu_tensor, text), dim=1)
            combined_data.append(combined)

        # Stack the combined data
        # This will be of shape [seq_len, features], where features = imu + text embedding size
        batch_data = torch.cat(combined_data, dim=0)

        sample = {
            'combined_data': batch_data,
            'label': CLASSES.index(label)  # Convert label to tensor
        }
        return sample

# Model

In [73]:
class IMUTextClassifier(nn.Module):
    def __init__(self, combined_input_size, hidden_size, num_classes):
        super(IMUTextClassifier, self).__init__()
        self.lstm = nn.LSTM(input_size=combined_input_size, hidden_size=hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, combined_input):
        # The combined_input is already a combination of IMU data and text data
        lstm_out, _ = self.lstm(combined_input)
        lstm_out = lstm_out[:, -1, :]  # Get the output of the last time step
        out = self.fc(lstm_out)
        return out

# Split

In [74]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased')
# Assuming 'samples' is a list of all your samples (file paths and labels)

dataset = IMUTextDataset(root_dir='../data/', tokenizer=tokenizer, bert_model=bert_model)

# Assume 'dataset' is your initialized PyTorch dataset
dataset_size = len(dataset)
train_size = int(0.75 * dataset_size)
test_size = dataset_size - train_size

# Split the dataset into train and test sets
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# You can now use these subsets with a DataLoader
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=1, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False)

# Train

In [75]:
combined_input_size = 774  # 6 IMU features + 768 text embedding size
hidden_size = 128
num_classes = 4

model = IMUTextClassifier(combined_input_size, hidden_size, num_classes)


In [76]:
# Assuming model is already defined and initialized
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10  # Define the number of epochs
for epoch in tqdm(range(num_epochs)):
    model.train()  # Set the model to training mode
    for batch in train_loader:
        combined_data, labels = batch['combined_data'], batch['label']
        outputs = model(combined_data)  # Pass the combined data directly to the model
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

100%|██████████| 10/10 [04:02<00:00, 24.21s/it]


# Eval

In [77]:
# Function to convert tensors to numpy arrays
def to_numpy(tensor):
    return tensor.cpu().detach().numpy()

# Evaluation
model.eval()
all_labels = []
all_predictions = []

with torch.no_grad():
    for batch in test_loader:
        combined_data, labels = batch['combined_data'], batch['label']
        outputs = model(combined_data)
        _, predicted = torch.max(outputs, 1)
        
        all_labels.extend(to_numpy(labels))
        all_predictions.extend(to_numpy(predicted))

# Convert all labels and predictions to numpy arrays for sklearn functions
all_labels = np.array(all_labels)
all_predictions = np.array(all_predictions)

# Calculate and print metrics
print(classification_report(all_labels, all_predictions))

# Calculate and print the confusion matrix
conf_matrix = confusion_matrix(all_labels, all_predictions)
print("Confusion Matrix:")
print(conf_matrix)

              precision    recall  f1-score   support

           0       1.00      1.00      1.00         2
           1       1.00      1.00      1.00         4
           2       1.00      1.00      1.00         2
           3       1.00      1.00      1.00         2

    accuracy                           1.00        10
   macro avg       1.00      1.00      1.00        10
weighted avg       1.00      1.00      1.00        10

Confusion Matrix:
[[2 0 0 0]
 [0 4 0 0]
 [0 0 2 0]
 [0 0 0 2]]


# Save

In [78]:
# Save the entire model
torch.save(model, '../model/imu_text_rnn.pth')

# Load

In [None]:
# To load the model later
loaded_model = torch.load('../model/imu_text_rnn.pth')