# I-MOTION Model 

In [6]:
import torch
import torch.nn as nn
import random
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd

# Set a fixed random seed for reproducibility across multiple libraries
random_seed = 42
random.seed(random_seed)
np.random.seed(random_seed)
torch.manual_seed(random_seed)
window_size = 600
# Check for CUDA (GPU support) and set device accordingly
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("CUDA is available. Using GPU.")
    torch.cuda.manual_seed(random_seed)
    torch.cuda.manual_seed_all(random_seed)  # For multi-GPU setups
    # Additional settings for ensuring reproducibility on CUDA
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
else:
    device = torch.device("cpu")
    print("CUDA not available. Using CPU.")

CUDA not available. Using CPU.


## Load Data

In [12]:
data_x = np.loadtxt("./train/raw_data/Acc_x.txt", delimiter=" ")
data_y = np.loadtxt("./train/raw_data/Acc_y.txt", delimiter=" ")
data_z = np.loadtxt("./train/raw_data/Acc_z.txt", delimiter=" ")
train_label = np.loadtxt("./train/train_label.txt", delimiter=" ")
train_order = np.loadtxt("./train/train_order.txt", dtype=float)
acc_data = np.dstack((data_x, data_y, data_z, train_label))
acc_data = acc_data[train_order.astype(np.int_).flatten() - 1, :, :]
print(acc_data.shape)
window_data = np.empty((acc_data.shape[0] * (acc_data.shape[1]/window_size), window_size, 3), dtype=np.float32)
windowed_labels = np.empty((10000, 1), dtype=np.float32)

window_index = 0
for frame_index in range(acc_data.shape[0]):
    
    frame_features = acc_data[frame_index, :, :3]
    frame_labels = acc_data[frame_index, :, 3]
    
    for start in range(0, acc_data.shape[1], window_size):
        end = start + window_size
        window_data[window_index] = frame_features[start:end, :]
        # Assuming all samples in a window have the same label, take the label of the first sample
        windowed_labels[window_index] = frame_labels[start]
        window_index += 1

# Verify the shapes
print("Features shape:", window_data.shape)  # Should be (10000, 600, 3)
print("Labels shape:", windowed_labels.shape)  # Should be (10000, 1)
    

(16310, 6000, 4)


## Helper Function

In [3]:
def calculate_padding(kernel_size):
    return (kernel_size - 1) // 2

def calculate_output_length(input_length, kernel_size, stride, padding):
    return ((input_length + 2 * padding - kernel_size) // stride) + 1

def calculate_pooling_padding(window_size, stride, input_size, output_size):
    return ((output_size - 1) * stride - input_size + window_size) // 2

## Preprocessing

In [None]:
def preprocess_accelerometer_data(accel_data, alpha=0.8, m=5):
    # Initialize gravity and linear acceleration
    gravity = np.zeros((1, 3))
    linear_acceleration = np.zeros_like(accel_data)
    
    # Step 1: Remove Gravity
    for i in range(accel_data.shape[0]):
        gravity = alpha * gravity + (1 - alpha) * accel_data[i, :]
        linear_acceleration[i, :] = accel_data[i, :] - gravity

    # Step 2: Smooth Data
    K = len(linear_acceleration.shape(0))
    smoothed_data = np.zeros_like(linear_acceleration)
    
    for k in range(1, K + 1):  # k is 1-indexed in the mathematical formula
        if k <= m // 2:
            # Early data points: smaller window size that grows
            smoothed_data[k - 1, :] = np.sum(linear_acceleration[:2 * k - 1, :], axis=0) / (2 * k - 1)
        elif k > K - m // 2:
            # Late data points: smaller window size that shrinks
            smoothed_data[k - 1, :] = np.sum(linear_acceleration[2 * k - K - 1:, :]) / (2 * (K - k) + 1)
        else:
            # Middle data points: fixed window size
            smoothed_data[k - 1, :] = np.sum(linear_acceleration[k - m // 2 - 1 : k + m // 2, :]) / m

    # Step 3: Calculate Magnitude
    magnitudes = np.sqrt(np.sum(smoothed_data**2, axis=1))

    return magnitudes

# Example usage
# Assuming `accel_data` is a numpy array of shape (512, 3) representing x, y, z acceleration
accel_data = np.random.rand(512, 3)  # Random data for demonstration purposes
processed_data = preprocess_accelerometer_data(accel_data)

print(processed_data.shape)  # Output the shape of the processed data


## Model Construction

In [None]:
class IMOTION_CNN(nn.Module):
    def __init__(self, in_feature, in_channel, out_feature, pool_window_size, pool_stride_size, 
                 pool_padding, conv_filter_list, conv_kernel_list, conv_stride, full_connection_size):
        super(IMOTION_CNN, self).__init__()
        self.pool = nn.MaxPool1d(kernel_size=pool_window_size, stride=pool_stride_size, padding=pool_padding)
        assert len(conv_filter_list) == len(conv_kernel_list)
        self.conv_layers = nn.ModuleList([
            nn.Conv1d(in_channels=in_channel if i == 0 else conv_filter_list[i-1], 
                      out_channels=conv_filter_list[i], 
                      kernel_size=conv_kernel_list[i], stride=conv_stride)
            for i in range(len(conv_filter_list))
        ])
        self.final_matrix_size = conv_filter_list[-1] * (in_feature / (2 ** len(conv_filter_list)))
        self.fc1 = nn.Linear(self.final_matrix_size, full_connection_size) 
        self.fc2 = nn.Linear(full_connection_size, out_feature)


    def forward(self, x):
        for conv in self.conv_layers:
            x = self.pool(torch.relu(conv(x)))
        x = x.view(-1, self.final_matrix_size)  # Flattening the tensor
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

## Training & Evaluation

In [6]:
def train_model(model, train_loader, optimizer, criterion, epochs=100, device='cpu'):
    model.train()  # Set the model to training mode
    loss_values = []  # Initialize a list to store the average loss per epoch

    for epoch in range(epochs):
        total_loss = 0  # Track total loss for each epoch

        for X, A, labels in train_loader:
            # Move data to the specified device
            X, A, labels = X.to(device), A.to(device), labels.to(device)

            optimizer.zero_grad()  # Clear gradients for the next train step
            output = model(X, A)  # Forward pass
            loss = criterion(output, labels)  # Compute the loss
            loss.backward()  # Backward pass to compute gradients
            optimizer.step()  # Update model parameters

            total_loss += loss.item()  # Accumulate the loss

        avg_loss = total_loss / len(train_loader)  # Calculate average loss
        loss_values.append(avg_loss)  # Append average loss to list

        # Print the average loss for the current epoch
        print(f'Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}')

    return loss_values


def evaluate_model(model, test_loader, device='cpu'):
    """
    Evaluates the model on a test dataset.

    Args:
        model (torch.nn.Module): The neural network model to be evaluated.
        test_loader (torch.utils.data.DataLoader): DataLoader for the test data.
        device (str, optional): The device to run the model on ('cpu' or 'cuda'). Defaults to 'cpu'.

    Returns:
        tuple: A tuple containing the accuracy, precision, recall, and F1 score of the model on the test dataset.

    This function performs a forward pass on the test dataset to obtain the model's predictions,
    then calculates and returns various evaluation metrics including accuracy, precision, recall, and F1 score.
    """
    model.eval()  # Set the model to evaluation mode
    true_labels = []  # List to store actual labels
    predictions = []  # List to store model predictions

    with torch.no_grad():  # Disable gradient computation
        for X, A, labels in test_loader:
            # Move data to the specified device
            X, A, labels = X.to(device), A.to(device), labels.to(device)

            output = model(X, A)  # Forward pass
            _, predicted = torch.max(output.data, 1)  # Get the index of the max log-probability

            true_labels += labels.tolist()  # Append actual labels
            predictions += predicted.tolist()  # Append predicted labels

    # Calculate evaluation metrics
    accuracy = accuracy_score(true_labels, predictions)
    precision = precision_score(true_labels, predictions, average='weighted')
    recall = recall_score(true_labels, predictions, average='weighted')
    f1 = f1_score(true_labels, predictions, average='weighted')

    return accuracy, precision, recall, f1

## Main Script

## Test