In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd

# Read the CSV without dtype specification
df = pd.read_csv('/content/drive/My Drive/subset_participant1_labeled.csv', low_memory=False)

# Convert the magnetometer_z column to numeric, setting errors to NaN
df['Mz'] = pd.to_numeric(df['Mz'], errors='coerce')

# Check if there are any NaN values after conversion
nan_count = df['Mz'].isna().sum()
print(f"Number of NaN values in magnetometer_z after conversion: {nan_count}")

# Optionally, drop rows with NaN values or fill them
df.dropna(subset=['Mz'], inplace=True)  # Drop rows with NaN in magnetometer_z
# or alternatively:
# df['magnetometer_z'].fillna(0, inplace=True)  # Fill NaNs with 0

# Check rows where magnetometer_z is not numeric
invalid_rows = df[~df['Mz'].apply(lambda x: isinstance(x, (int, float)))]
print(f"Number of invalid rows in 'Mz': {len(invalid_rows)}")
print(invalid_rows.head())



Number of NaN values in magnetometer_z after conversion: 0
Number of invalid rows in 'Mz': 0
Empty DataFrame
Columns: [participant_id, activity_position, Ax, Ay, Az, Lx, Ly, Lz, Gx, Gy, Gz, Mx, My, Mz, normalized, activity, position]
Index: []


In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder, StandardScaler
import torch
from torch.utils.data import TensorDataset, DataLoader

# **Load the dataset**
data_path = '/content/drive/My Drive/subset_participant1_labeled.csv'
df = pd.read_csv(data_path, low_memory=False)

# **Convert the magnetometer_z column to numeric, setting errors to NaN**
df['Mz'] = pd.to_numeric(df['Mz'], errors='coerce')

# **Drop rows with NaN in magnetometer_z**
df.dropna(subset=['Mz'], inplace=True)

# **Define feature columns for sensor readings**
FEATURE_COLUMNS = ['Ax', 'Ay', 'Az',
                   'Lx', 'Ly', 'Lz',
                   'Gx', 'Gy', 'Gz',
                   'Mx', 'My', 'Mz']

# **Encode activity labels**
label_encoder = LabelEncoder()
df['activity_encoded'] = label_encoder.fit_transform(df['activity'])

# **Define window size and overlap for creating sequences**
WINDOW_SIZE = 2000
OVERLAP_SIZE = 1000
DOWNSAMPLE_FACTOR = 4

# **Adjust window size for downsampling**
DOWNSAMPLED_WINDOW_SIZE = WINDOW_SIZE // DOWNSAMPLE_FACTOR

# **Create sequences of sensor data and labels**
X = []
y = []

# **Create overlapping windows with downsampling**
for start in range(0, len(df) - DOWNSAMPLED_WINDOW_SIZE * DOWNSAMPLE_FACTOR + 1, OVERLAP_SIZE):
    end = start + DOWNSAMPLED_WINDOW_SIZE * DOWNSAMPLE_FACTOR
    window = df.iloc[start:end:DOWNSAMPLE_FACTOR]  # Downsampling

    # **Flatten the window data as a sequence of features (acts as input tokens for BERT)**
    feature_sequence = window[FEATURE_COLUMNS].values.flatten()  # Creates a flat sequence for BERT input
    label = window['activity_encoded'].mode()[0]  # Most common label in the window

    X.append(feature_sequence)
    y.append(label)

# **Convert lists to numpy arrays**
X = np.array(X)
y = np.array(y)

print(f"Feature sequence shape: {X.shape}")
print(f"Labels shape: {y.shape}")

Feature sequence shape: (259, 6000)
Labels shape: (259,)


In [None]:
# **Standardize the feature matrix `X`**
scaler = StandardScaler()
X_standardized = scaler.fit_transform(X)

print(f"Standardized feature sequence shape: {X_standardized.shape}")


Standardized feature sequence shape: (259, 6000)


In [None]:
import torch
import numpy as np

# **Define constants for reshaping and padding**
NUM_FEATURES = 12  # Number of sensor features (Ax, Ay, Az, Lx, Ly, Lz, Gx, Gy, Gz, Mx, My, Mz)
MAX_LENGTH = 512  # Maximum sequence length for BERT

# **Calculate SEQUENCE_LENGTH based on NUM_FEATURES**
# Ensure that SEQUENCE_LENGTH is an integer
SEQUENCE_LENGTH = MAX_LENGTH // NUM_FEATURES

# Adjust MAX_LENGTH to be exactly divisible by NUM_FEATURES
MAX_LENGTH = SEQUENCE_LENGTH * NUM_FEATURES

# **Convert `X_standardized` to a NumPy array and apply padding/truncation**
X_standardized = np.array(X_standardized)

# Padding or truncating standardized sequences to fit the max length (512)
X_padded = np.array(
    [
        np.pad(seq, (0, max(0, MAX_LENGTH - len(seq))), 'constant')[:MAX_LENGTH]
        for seq in X_standardized
    ]
)

# **Convert to torch tensor and reshape to (num_samples, sequence_length, num_features)**
X_padded = torch.tensor(X_padded, dtype=torch.float32).view(-1, SEQUENCE_LENGTH, NUM_FEATURES)

# **Convert labels to tensor**
y_tensor = torch.tensor(y, dtype=torch.long)

# **Check the shape to ensure padding/truncating and reshaping worked correctly**
print(f"Padded and reshaped input shape: {X_padded.shape}, Labels shape: {y_tensor.shape}")


Padded and reshaped input shape: torch.Size([259, 42, 12]), Labels shape: torch.Size([259])


In [None]:
# **Create the Dataset and DataLoader**
batch_size = 16
dataset = TensorDataset(X_padded, y_tensor)

# **Split dataset into training and testing sets (80% train, 20% test)**
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

# **Data loaders**
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

print(f"Train DataLoader size: {len(train_loader)}, Test DataLoader size: {len(test_loader)}")

Train DataLoader size: 13, Test DataLoader size: 4


In [None]:
import torch
import torch.nn as nn
from transformers import BertConfig, BertModel

# Define a custom BERT-based model
class TimeSeriesBERT(nn.Module):
    def __init__(self, num_classes, num_features):
        super(TimeSeriesBERT, self).__init__()

        # Define the BERT configuration
        self.bert_config = BertConfig(
            hidden_size=128,
            num_attention_heads=4,
            num_hidden_layers=4,
            intermediate_size=256
        )

        # Initialize the BERT model
        self.bert = BertModel(self.bert_config)

        # Linear layer to map input features to BERT hidden size
        self.linear_mapping = nn.Linear(num_features, 128)

        # Classification layer
        self.classifier = nn.Linear(128, num_classes)

    def forward(self, x):
        # Linear mapping to match BERT's hidden size
        x = self.linear_mapping(x)

        # Pass the input through BERT
        bert_output = self.bert(inputs_embeds=x)['pooler_output']

        # Classification layer
        logits = self.classifier(bert_output)
        return logits

# Number of activity classes
num_classes = len(torch.unique(y_tensor))

# Initialize the model
model = TimeSeriesBERT(num_classes=num_classes, num_features=12)

# Move the model to the appropriate device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)


In [None]:
for batch in train_loader:
    inputs, labels = batch
    inputs = inputs.to(device).to(torch.float32)  # Move to device and ensure float32 type
    labels = labels.to(device).to(torch.long)     # Move to device and ensure long type


In [None]:
import torch.optim as optim
import torch.nn as nn
from sklearn.utils.class_weight import compute_class_weight

# Get all unique classes from the entire dataset
all_classes = np.unique(y)  # Ensure all possible classes are considered
train_labels = [y_tensor[i].item() for i in train_dataset.indices]  # Extract training labels

# Compute class weights using all possible classes
class_weights = compute_class_weight(class_weight='balanced', classes=all_classes, y=train_labels)

# Convert to tensor and move to the appropriate device
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float32).to(device)

# Define the loss function with class weights
criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Gradient clipping
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

# Specify number of epochs for training
num_epochs = 20




In [None]:
def initialize_weights(m):
    # Apply Xavier initialization to Linear layers
    if isinstance(m, nn.Linear):
        nn.init.xavier_uniform_(m.weight)
        if m.bias is not None:
            nn.init.constant_(m.bias, 0)

    # Apply Xavier initialization to Embedding layers (optional)
    elif isinstance(m, nn.Embedding):
        nn.init.xavier_uniform_(m.weight)

    # Apply Xavier initialization to LayerNorm layers (optional)
    elif isinstance(m, nn.LayerNorm):
        nn.init.constant_(m.bias, 0)
        nn.init.constant_(m.weight, 1.0)

# Apply weight initialization across the model
model.apply(initialize_weights)



TimeSeriesBERT(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 128, padding_idx=0)
      (position_embeddings): Embedding(512, 128)
      (token_type_embeddings): Embedding(2, 128)
      (LayerNorm): LayerNorm((128,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-3): 4 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=128, out_features=128, bias=True)
              (key): Linear(in_features=128, out_features=128, bias=True)
              (value): Linear(in_features=128, out_features=128, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=128, out_features=128, bias=True)
              (LayerNorm): LayerNorm((128,), eps=1e-12, elementwise_

In [None]:
# Replace NaNs with the mean of each sequence
for i in range(X_padded.size(0)):  # Iterate over each sequence
    seq = X_padded[i]
    nan_mask = torch.isnan(seq)
    mean_value = seq[~nan_mask].mean() if torch.sum(~nan_mask) > 0 else 0
    seq[nan_mask] = mean_value
# Check for NaNs or infinite values in the dataset
print(f"Number of NaN values in X_padded: {torch.isnan(X_padded).sum().item()}")
print(f"Number of infinite values in X_padded: {torch.isinf(X_padded).sum().item()}")


Number of NaN values in X_padded: 0
Number of infinite values in X_padded: 0


In [None]:
for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0

    for batch in train_loader:
        # Unpack the batch and send data to device
        inputs, labels = batch
        inputs = inputs.to(device).to(torch.float32)  # Ensure inputs are float32
        labels = labels.to(device).to(torch.long)     # Ensure labels are long for classification

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward and optimize
        loss.backward()

        max_grad_norm=1.0
        # Clip gradients to prevent exploding gradient problem
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)

        optimizer.step()
        total_loss += loss.item()

    # Calculate average loss for this epoch
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

Epoch [1/20], Loss: 2.1990
Epoch [2/20], Loss: 1.8125
Epoch [3/20], Loss: 1.6336
Epoch [4/20], Loss: 1.5847
Epoch [5/20], Loss: 1.5064
Epoch [6/20], Loss: 1.3976
Epoch [7/20], Loss: 1.3552
Epoch [8/20], Loss: 1.3612
Epoch [9/20], Loss: 1.2933
Epoch [10/20], Loss: 1.2705
Epoch [11/20], Loss: 1.2511
Epoch [12/20], Loss: 1.1966
Epoch [13/20], Loss: 1.1763
Epoch [14/20], Loss: 1.1620
Epoch [15/20], Loss: 1.1524
Epoch [16/20], Loss: 1.1150
Epoch [17/20], Loss: 1.0774
Epoch [18/20], Loss: 1.0685
Epoch [19/20], Loss: 1.1079
Epoch [20/20], Loss: 1.1117


In [None]:
import torch
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import numpy as np

# Set the model to evaluation mode
model.eval()

# Initialize lists to store true and predicted labels
y_true = []
y_pred = []

# Disable gradient calculation for evaluation
with torch.no_grad():
    for batch in test_loader:
        # Unpack the batch and move data to the device
        inputs, labels = batch
        inputs = inputs.to(device).to(torch.float32)
        labels = labels.to(device).to(torch.long)

        # Forward pass to get predictions
        outputs = model(inputs)

        # Get the predicted class (the index of the highest logit)
        _, predicted = torch.max(outputs, 1)

        # Store true and predicted labels
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

# Convert lists to numpy arrays
y_true = np.array(y_true)
y_pred = np.array(y_pred)

# Calculate and print accuracy
accuracy = accuracy_score(y_true, y_pred)
print(f"Test Accuracy: {accuracy:.4f}")

# Print the classification report (precision, recall, F1-score for each class)
print("Classification Report:")
print(classification_report(y_true, y_pred, target_names=label_encoder.classes_))

# Print confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)
print("Confusion Matrix:")
print(conf_matrix)


Test Accuracy: 0.1731
Classification Report:
                    precision    recall  f1-score   support

            Biking       0.08      0.10      0.09        10
           Running       1.00      1.00      1.00         3
           Sitting       0.08      0.17      0.11         6
          Standing       0.00      0.00      0.00        13
           Walking       1.00      0.67      0.80         6
Walking Downstairs       0.00      0.00      0.00         4
  Walking Upstairs       0.00      0.00      0.00        10

          accuracy                           0.17        52
         macro avg       0.31      0.28      0.29        52
      weighted avg       0.20      0.17      0.18        52

Confusion Matrix:
[[1 0 3 3 0 2 1]
 [0 3 0 0 0 0 0]
 [1 0 1 2 0 1 1]
 [6 0 3 0 0 0 4]
 [0 0 1 1 4 0 0]
 [0 0 2 2 0 0 0]
 [4 0 2 3 0 1 0]]


In [None]:
# Check the distribution of true labels
print("True label distribution:")
unique, counts = np.unique(y_true, return_counts=True)
print(dict(zip(unique, counts)))

# Check the distribution of predicted labels
print("Predicted label distribution:")
unique_pred, counts_pred = np.unique(y_pred, return_counts=True)
print(dict(zip(unique_pred, counts_pred)))


True label distribution:
{0: 10, 1: 3, 2: 6, 3: 13, 4: 6, 5: 4, 6: 10}
Predicted label distribution:
{0: 12, 1: 3, 2: 12, 3: 11, 4: 4, 5: 4, 6: 6}
