# Human Activity Recognition
This notebook provides some guidelines for building a classifier for the MotionSense dataset

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, TimeSeriesSplit
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.preprocessing import LabelEncoder
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Generate time-series data
The original MotionSense dataset comes in a slightly cumbersome format, but the authors do provide a few functions to produce Pandas DataFrames with control over what is produced. For our purposes we are not concerned with all of the target data, as we only need to know the activity that was being recorded.

In [None]:
def get_ds_infos():
    """
    Read the file includes data subject information.

    EEE4114: Technically we do not need these data, as we are not concerned about identifying the subjects.

    Data Columns:
    0: code [1-24]
    1: weight [kg]
    2: height [cm]
    3: age [years]
    4: gender [0:Female, 1:Male]

    Returns:
        A pandas DataFrame that contains information about data subjects' attributes
    """

    dss = pd.read_csv("drive/MyDrive/motion-sense-master/data/data_subjects_info.csv")
    print("[INFO] -- Data subjects' information is imported.")

    return dss

def set_data_types(data_types=["userAcceleration"]):
    """
    Select the sensors and the mode to shape the final dataset.

    EEE4114F: Choose sensors that you think are useful or would like to include in training.
    You can choose all of them, or you could opt to try a limited set of input features

    Args:
        data_types: A list of sensor data type from this list: [attitude, gravity, rotationRate, userAcceleration]

    Returns:
        It returns a list of columns to use for creating time-series from files.
    """
    dt_list = []
    for t in data_types:
        if t != "attitude":
            dt_list.append([t+".x",t+".y",t+".z"])
        else:
            dt_list.append([t+".roll", t+".pitch", t+".yaw"])

    return dt_list


def create_time_series(dt_list, act_labels, trial_codes, mode="mag", labeled=True):
    """
    EEE4114F: This defines what data you would like to include for a given set.

    Args:
        dt_list: A list of columns that shows the type of data we want.
        act_labels: list of activites
        trial_codes: list of trials
        mode: It can be "raw" which means you want raw data
        for every dimension of each data type,
        [attitude(roll, pitch, yaw); gravity(x, y, z); rotationRate(x, y, z); userAcceleration(x,y,z)].
        or it can be "mag" which means you only want the magnitude for each data type: (x^2+y^2+z^2)^(1/2)
        labeled: True, if we want a labeled dataset. False, if we only want sensor values.

    Returns:
        It returns a time-series of sensor data.

    """
    num_data_cols = len(dt_list) if mode == "mag" else len(dt_list*3)

    if labeled:
        dataset = np.zeros((0,num_data_cols+1)) # "1" --> [act] we do not need the other labels
    else:
        dataset = np.zeros((0,num_data_cols))

    ds_list = get_ds_infos()

    print("[INFO] -- Creating Time-Series")
    for sub_id in ds_list["code"]:
        for act_id, act in enumerate(act_labels):
            for trial in trial_codes[act_id]:
                fname = 'drive/MyDrive/motion-sense-master/data/A_DeviceMotion_data/'+act+'_'+str(trial)+'/sub_'+str(int(sub_id))+'.csv'
                raw_data = pd.read_csv(fname)
                raw_data = raw_data.drop(['Unnamed: 0'], axis=1)
                vals = np.zeros((len(raw_data), num_data_cols))
                for x_id, axes in enumerate(dt_list):
                    if mode == "mag":
                        vals[:,x_id] = (raw_data[axes]**2).sum(axis=1)**0.5
                    else:
                        vals[:,x_id*3:(x_id+1)*3] = raw_data[axes].values
                    vals = vals[:,:num_data_cols]
                if labeled:
                    lbls = np.array([[act_id]]*len(raw_data))
                    vals = np.concatenate((vals, lbls), axis=1)
                dataset = np.append(dataset,vals, axis=0)
    cols = []
    for axes in dt_list:
        if mode == "raw":
            cols += axes
        else:
            cols += [str(axes[0][:-2])]

    if labeled:
        cols += ["act"]

    dataset = pd.DataFrame(data=dataset, columns=cols)
    return dataset

You would need to decide what to include in your dataset, for example if you want to try a reduce dataset with only a few sensors then you can alter the `sdt` list.

In [None]:
ACT_LABELS = ["dws","ups", "wlk", "jog", "std", "sit"]
TRIAL_CODES = {
    ACT_LABELS[0]:[1,2,11],
    ACT_LABELS[1]:[3,4,12],
    ACT_LABELS[2]:[7,8,15],
    ACT_LABELS[3]:[9,16],
    ACT_LABELS[4]:[6,14],
    ACT_LABELS[5]:[5,13]
}

## Here we set parameter to build labeled time-series from dataset of "(A)DeviceMotion_data"
## attitude(roll, pitch, yaw); gravity(x, y, z); rotationRate(x, y, z); userAcceleration(x,y,z)
sdt = ["attitude", "userAcceleration"]
print("[INFO] -- Selected sensor data types: "+str(sdt))
act_labels = ACT_LABELS [0:6]
print("[INFO] -- Selected activites: "+str(act_labels))
trial_codes = [TRIAL_CODES[act] for act in act_labels]
dt_list = set_data_types(sdt)
dataset = create_time_series(dt_list, act_labels, trial_codes, mode="raw", labeled=True)
print("[INFO] -- Shape of time-Series dataset:"+str(dataset.shape))
dataset.head()

[INFO] -- Selected sensor data types: ['attitude', 'userAcceleration']
[INFO] -- Selected activites: ['dws', 'ups', 'wlk', 'jog', 'std', 'sit']
[INFO] -- Data subjects' information is imported.
[INFO] -- Creating Time-Series
[INFO] -- Shape of time-Series dataset:(1412865, 7)


Unnamed: 0,attitude.roll,attitude.pitch,attitude.yaw,userAcceleration.x,userAcceleration.y,userAcceleration.z,act
0,1.528132,-0.733896,0.696372,0.294894,-0.184493,0.377542,0.0
1,1.527992,-0.716987,0.677762,0.219405,0.035846,0.114866,0.0
2,1.527765,-0.706999,0.670951,0.010714,0.134701,-0.167808,0.0
3,1.516768,-0.704678,0.675735,-0.008389,0.136788,0.094958,0.0
4,1.493941,-0.703918,0.672994,0.199441,0.353996,-0.044299,0.0


# Data Pre-processing
There are numerous approaches to solving this task and your choice of algorithm will determine what form your input data should take. For example, a feed-forward neural network could be used, but what type of representation would make sense? Without being able to account for the temporal structure of the data, feed-forward networks might fail. One way around this is to potentially use summarized statistics about the data to simplify the features in a way that is not reliant on temporal information. E.g. instead of the raw signal for the acceleration, what if you worked with the mean? This might not be the best approach, compared to other algorithms, but it could be useful for a feed-forward network.

what about convolutional neural networks? Well, you could use 1D convolutions directly on time-series data which would then result in fewer model parameters compared to a fully-connected network, with the added benefit of retaining temporal information. Alternatively you could look to transform the time-series data into 2D data using something such as an FFT to produce a spectogram. Or for something simpler you can create fixed windows to divide your dataset into chunks of 2D data e.g. for 6 sensors and a window of 400 samples you would generate a 6 x 400 input.

In [None]:
def windows(dataset, window_size=400, stride=200):
    """
    Create fixed-length windows of the input data.

    Args:
        dataset: DataFrame of the time-series data
        size: (int) the size of the windows in number of samples.
        stride: (int) the stride length of the windows.

    Returns:
        windows_df: DataFrame of the windowed time-series data. Each window should have the shape (n_sensors, window_size).
            The number of examples will change depending on the stride.

    """

    windows_df = pd.DataFrame()

    return windows_df

# Pytorch Dataset
If using Pytorch it is useful to create datasets using their dataset structures. This may require modification if using additional transformations, or formats

In [None]:
class MotionSense(Dataset):
    def __init__(self, X, Y, transform=None):
        self.X = torch.tensor(X.values)
        self.Y = torch.tensor(Y.values)
        self.transform = transform

    def __getitem__(self, index):
        if torch.is_tensor(index):
            index = int(index.item())

        x = self.X[index]
        y = self.Y[index]

        return x, y

    def __len__(self):
        return len(self.X)

# Resampling
You can split the dataset using sklearn, but you need to take care with your specific data. For example, randomly splitting raw sensor data will result in jumbled time-series signals.

In [None]:
X = dataset[dt_list[0]] # You could collect the input and target data as separate dataframes
Y = dataset.act

In [None]:
# You may be tempted to use train_test_split directly. But if using raw sensor data this would randomize the signals and not retain the order.
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.1)

In [None]:
X_train.head()

Unnamed: 0,attitude.roll,attitude.pitch,attitude.yaw
469480,0.531458,-0.550554,0.118097
966526,-0.183789,-1.257339,1.791805
648119,-2.918224,-0.133874,-1.27948
972448,-1.952808,-1.253952,-1.748129
672867,-2.803317,-1.429967,2.86234


Notice the indeces are all jumbled? If using windowed data the usual splitting methods work, as each window retains the order of sensor values, as long as you split the data along the correct axis.

If using summarized data e.g. taking the mean, max, min etc. as input features rather than the raw data then you should be fine using train_test_split.

If working with raw sensor data there is an sklearn fuction for splitting time series data to make splitting raw signals easier if you want to work with that instead.

In [None]:
# Here is an example of the TimeSeriesSplit. It does create cross-validation data by default.
ts_cv = TimeSeriesSplit(
    n_splits=5,
    gap=20,
    max_train_size=10000,
    test_size=1000
)

all_splits = list(ts_cv.split(X ,Y)) # These are indices, not the data itself.

Once you have the indices you can sample the data

In [None]:
X.iloc[all_splits[0][0]].head() # This is the training input data from the first split

Unnamed: 0,attitude.roll,attitude.pitch,attitude.yaw
1397845,-0.958415,-1.2734,2.505994
1397846,-0.960113,-1.27276,2.506021
1397847,-0.961836,-1.272516,2.506426
1397848,-0.961993,-1.272238,2.508011
1397849,-0.959931,-1.272258,2.51163


In [None]:
Y.iloc[all_splits[0][0]].head() # training target data from the first split. You can see the indices still match, and are ordered correctly.

Unnamed: 0,act
1397845,4.0
1397846,4.0
1397847,4.0
1397848,4.0
1397849,4.0


Once you have your splits, you can convert them to Pytorch datasets for easier loading. Technically you could offload the sampling methods to Pytorch samplers, but this can be trickier to implement.

In [None]:
train_1 = MotionSense(X.iloc[all_splits[0][0]].reset_index(drop=True),
                      Y.iloc[all_splits[0][0]].reset_index(drop=True))
train_1_loader = DataLoader(train_1, batch_size=5)

# Models
As mentioned previously there are a few options for the types of models you end up using.

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class CNN1D(nn.Module):
    def __init__(self, in_channels=18, num_classes=6):
        super(CNN1D, self).__init__()
        #The first layers of the CNN consist of convolutional layer with kernels mapping the inputs to outputs,
        #followed by a pooling layer which takes the output of the first layer to 1D. This is then fed
        # into another convolutional layer as shown.
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=8, stride=2)
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1)

        #Following the convolutional layers, 2 linear layers are used to reduce the
        #number of channels down to the final number kf classified activities.
        self.fc1_input_features = 64 * 4
        self.fc1 = nn.Linear(self.fc1_input_features, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = x.unsqueeze(1) # adds a channel dimension which assists in the forward pass

        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))

        x = x.view(-1, self.fc1_input_features)

        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
def windows(df, window_size=400, stride=200):
    sensor_data = df.iloc[:, :-1].values.T
    labels = df["act"].values

    X_windows = []
    Y_windows = []

    start = 0
    while start + window_size <= sensor_data.shape[1]:
        window = sensor_data[:, start:start+window_size]
        label = np.bincount(labels[start:start+window_size].astype(int)).argmax()
        X_windows.append(window)
        Y_windows.append(label)
        start += stride

    return np.array(X_windows), np.array(Y_windows)

def summarize_window(window):
    stats = []
    stats.append(np.mean(window, axis=1))
    stats.append(np.std(window, axis=1))
    stats.append(np.min(window, axis=1))
    return np.concatenate(stats)


In [None]:
X_raw, Y_raw = windows(dataset, window_size=400, stride=200)
X_summary = np.array([summarize_window(w) for w in X_raw])
Y_summary = Y_raw

# Split into train, val, test
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset

X_train, X_temp, Y_train, Y_temp = train_test_split(X_summary, Y_summary, test_size=0.3, random_state=42)
X_val, X_test, Y_val, Y_test = train_test_split(X_temp, Y_temp, test_size=0.5, random_state=42)

train_ds = TensorDataset(torch.tensor(X_train, dtype=torch.float32),
                         torch.tensor(Y_train, dtype=torch.long))
val_ds = TensorDataset(torch.tensor(X_val, dtype=torch.float32),
                       torch.tensor(Y_val, dtype=torch.long))
test_ds = TensorDataset(torch.tensor(X_test, dtype=torch.float32),
                        torch.tensor(Y_test, dtype=torch.long))

train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=32)
test_loader = DataLoader(test_ds, batch_size=32)


# Training
With your model defined you will need to decide on an optimizer used to perform parameter updates. The choice is yours, but for the most part default optimizers like Adam should work for CNNs, although SGD can be more stable but take longer to train.

You can then load in data, for example if using the loader described before the loop shown below will iterate through batches of data.

In [None]:
def train(model, loader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for X, Y in loader:
        X, Y = X.to(device), Y.to(device)

        optimizer.zero_grad()
        outputs = model(X)
        loss = criterion(outputs, Y)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * X.size(0)
        _, predicted = torch.max(outputs, 1)
        total += Y.size(0)
        correct += (predicted == Y).sum().item()

    epoch_loss = running_loss / total
    accuracy = correct / total
    return epoch_loss, accuracy


def evaluate(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for X, Y in loader:
            X, Y = X.to(device), Y.to(device)
            outputs = model(X)
            loss = criterion(outputs, Y)

            running_loss += loss.item() * X.size(0)
            _, predicted = torch.max(outputs, 1)
            total += Y.size(0)
            correct += (predicted == Y).sum().item()

    epoch_loss = running_loss / total
    accuracy = correct / total
    return epoch_loss, accuracy

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = CNN1D(in_channels=1, num_classes=6).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)

num_epochs = 20

train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

for epoch in range(num_epochs):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion, device)
    val_loss, val_acc = evaluate(model, val_loader, criterion, device)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)

    print(f"Epoch {epoch+1:02d} | "
          f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
          f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

RuntimeError: max_pool1d() Invalid computed output size: 0

In [None]:
# Loss
plt.figure()
plt.plot(train_losses, label="Train Loss")
plt.plot(val_losses, label="Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training vs. Validation Loss")
plt.legend()
plt.grid(True)
plt.show()

# Accuracy
plt.figure()
plt.plot(train_accuracies, label="Train Accuracy")
plt.plot(val_accuracies, label="Validation Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Training vs. Validation Accuracy")
plt.legend()
plt.grid(True)
plt.show()

# Evaluation
Make sure to correctly evaluate your models. Justify your methods given your particular model and data used e.g. if you use random holdout, is it justified? What do your performance metrics tell you e.g. was the model overfitting, underfitting, are their early stages in the training process where the model performs well compared to the end of training, which model do you choose to save, are the model outputs "reasonable" i.e. are there errors that might make sense given the data?

In [None]:

def evaluate(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for X, Y in loader:
            X, Y = X.to(device), Y.to(device)
            outputs = model(X)
            loss = criterion(outputs, Y)
            running_loss += loss.item() * X.size(0)
            _, predicted = torch.max(outputs, 1)
            total += Y.size(0)
            correct += (predicted == Y).sum().item()

    epoch_loss = running_loss / total
    accuracy = correct / total
    return epoch_loss, accuracy


In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

def run_experiment(train_loader, val_loader, test_loader, num_epochs=20, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model = CNN1D(in_channels=1, num_classes=6).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(num_epochs):
        train_loss, train_acc = train(model, train_loader, optimizer, criterion, device)
        val_loss, val_acc = evaluate(model, val_loader, criterion, device)

    test_loss, test_acc = evaluate(model, test_loader, criterion, device)
    print(f"Finished one run. Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

    return test_acc

n_runs = 20

test_accuracies_across_runs = []

print(f"Starting {n_runs} training runs...")
for i in range(n_runs):
    print(f"\n--- Running experiment {i+1}/{n_runs} ---")
    acc = run_experiment(train_loader, val_loader, test_loader, num_epochs=20, lr=0.001)
    test_accuracies_across_runs.append(acc)

print("\nAll runs finished.")
print("Test accuracies:", test_accuracies_across_runs)


plt.figure(figsize=(8, 6))
plt.boxplot(test_accuracies_across_runs, labels=['FCNet (Summary Features)'])
plt.ylabel("Test Accuracy")
plt.title(f"Distribution of Test Accuracy Across {n_runs} Runs")
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()

In [None]:
test_loss, test_acc = evaluate(model, test_loader, criterion, device)
print(f"\nFinal Test Loss: {test_loss:.4f}, Final Test Accuracy: {test_acc:.4f}")

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import numpy as np
import torch

model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

all_predictions = []
all_labels = []

with torch.no_grad():
    for X, Y in test_loader:
        X, Y = X.to(device), Y.to(device)
        outputs = model(X)
        _, predicted = torch.max(outputs, 1)
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(Y.cpu().numpy())

cm = confusion_matrix(all_labels, all_predictions)

ACT_LABELS = ["dws","ups", "wlk", "jog", "std", "sit"]
used_act_labels = ACT_LABELS[0:6]

unique_test_labels = sorted(list(np.unique(all_labels)))

used_act_labels_for_display = [ACT_LABELS[label_idx] for label_idx in unique_test_labels]

fig, ax = plt.subplots(figsize=(8, 8))
cmd = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=used_act_labels_for_display)
cmd.plot(ax=ax, cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.show()

In [None]:

channels = ["attitude.roll", "attitude.pitch", "attitude.yaw",
            "userAcceleration.x", "userAcceleration.y", "userAcceleration.z"]

stats = ["mean", "std", "min"]

feature_names = [f"{stat}_{ch}" for stat in stats for ch in channels]

print("Features used by FCNet (18 total):")
for i, name in enumerate(feature_names, 1):
    print(f"{i:2d}. {name}")