# Purpose of this file is to create a Training, Testing, and validation DataFrame dataset out of 17 REALDISP participants' separate data files

In [None]:
# Import libraries
from google.colab import drive
import pandas as pd
import numpy as np

drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Concateate participants data for train, test, valid sets

In [None]:
# -----------------------------
# Purpose:
# Reads and splits data from REALDISP subject log files into train, validation, and test sets
# based on subject ID. Lines with a label of "0" are skipped (considered irrelevant or background).
# -----------------------------

train_data = []
valid_data = []
test_data = []

# Iterate over 17 subject files
for i in range(1, 18):
    file_path = f"/content/drive/My Drive/PROJECT/REALDISP/subject{i}_ideal.log"

    with open(file_path, "r") as file:  # Automatically closes file after reading
        for line in file:
            split_line = line.split()
            if split_line[-1] != "0":  # Exclude lines labeled as "0" (non-activity)
                if i < 12:
                    train_data.append(split_line)      # Subjects 1–11 → Training set
                elif i < 15:
                    valid_data.append(split_line)      # Subjects 12–14 → Validation set
                else:
                    test_data.append(split_line)       # Subjects 15–17 → Test set


In [None]:
# Reformat them in the DataFrames

df_train = pd.DataFrame(train_data)
df_train = df_train.astype(float)
print(df_train.shape)

df_valid = pd.DataFrame(valid_data)
df_valid = df_valid.astype(float)
print(df_valid.shape)

df_test = pd.DataFrame(test_data)
df_test = df_test.astype(float)
print(df_test.shape)

(435064, 120)
(153507, 120)
(123251, 120)


#Preprocess Features

In [None]:
# -----------------------------
# Purpose:
# This script preprocesses the REALDISP dataset by:
# 1. Removing the timestamp and label columns.
# 2. Dropping quaternion-related columns from the sensor data,
#    which are not used in the HAR model.
# -----------------------------

# Step 1: Remove timestamp and label columns
train_features = df_train.iloc[:, 2:-1]
train_labels = df_train.iloc[:, -1]

valid_features = df_valid.iloc[:, 2:-1]
valid_labels = df_valid.iloc[:, -1]

test_features = df_test.iloc[:, 2:-1]
test_labels = df_test.iloc[:, -1]

# -----------------------------
# Step 2: Remove quaternion data
# Quaternions consist of 4 columns per sensor and appear every 13 columns
# Starting from column index 9 (1-based), remove 4 consecutive columns every 13

columns_to_remove = []
start = 9        # First quaternion column (0-based index)
step = 13        # Distance between quaternion blocks
remove_count = 4 # Number of columns per quaternion block

# Dynamically build list of columns to remove
while start <= train_features.shape[1]:
    columns_to_remove.extend(range(start, start + remove_count))
    start += step

# Ensure all indices are within bounds
columns_to_remove = [col for col in columns_to_remove if col < train_features.shape[1]]
print("Columns to remove (quaternions):", columns_to_remove)

# Drop quaternion columns from each dataset
train_features = train_features.drop(columns=train_features.columns[columns_to_remove])
valid_features = valid_features.drop(columns=valid_features.columns[columns_to_remove])
test_features = test_features.drop(columns=test_features.columns[columns_to_remove])

# Print final shapes for verification
print("Train features shape:", train_features.shape)
print("Validation features shape:", valid_features.shape)
print("Test features shape:", test_features.shape)

(435064, 81)
(153507, 81)
(123251, 81)


# Segment and shuffle data

In [None]:
import numpy as np

def create_variable_size_windows(features, labels, min_size=96, max_size=128):
    """
    Creates windows of variable lengths (randomly chosen between min_size and max_size),
    shuffles them, and reshapes them back to the original format.

    Args:
        features (pd.DataFrame or np.ndarray): Feature matrix (n_samples, n_features).
        labels (pd.DataFrame or np.ndarray): Label array (n_samples,).
        min_size (int): Minimum window size (default=64).
        max_size (int): Maximum window size (default=128).

    Returns:
        shuffled_features (np.ndarray): Features reshaped back to (n_samples, n_features).
        shuffled_labels (np.ndarray): Labels reshaped back to (n_samples,).
    """
    # Convert DataFrames to NumPy arrays if necessary
    if not isinstance(features, np.ndarray):
        features = features.to_numpy()
    if not isinstance(labels, np.ndarray):
        labels = labels.to_numpy()

    n_samples, n_features = features.shape
    windows = []
    label_windows = []

    # Step 1: Create variable-size windows
    i = 0
    while i < n_samples - min_size:
        # Randomly choose a window size between min_size and max_size
        window_size = np.random.randint(min_size, max_size + 1)
        #window_sizes.append(window_size)

        # Ensure we do not exceed dataset length
        if i + window_size > n_samples:
            break

        # Extract window and corresponding labels
        windows.append(features[i : i + window_size, :])  # Now works correctly
        label_windows.append(labels[i : i + window_size])

        # Move to the next window
        i += window_size

    # Convert lists to numpy arrays
    windows = np.array(windows, dtype=object)  # Use dtype=object for variable-length windows
    label_windows = np.array(label_windows, dtype=object)

    # Step 2: Shuffle the windows
    shuffled_indices = np.random.permutation(len(windows))
    windows = windows[shuffled_indices]
    label_windows = label_windows[shuffled_indices]

    # Step 3: Flatten the shuffled windows back into original shape
    shuffled_features = np.vstack(windows)  # Stack into a 2D array
    shuffled_labels = np.concatenate(label_windows)  # Flatten into 1D array

    return shuffled_features, shuffled_labels

# Example Usage:
# Assuming train_features, train_labels, valid_features, valid_labels, test_features, test_labels exist
shuffled_train_features, shuffled_train_labels = create_variable_size_windows(train_features, train_labels, min_size=128, max_size=128)
shuffled_valid_features, shuffled_valid_labels = create_variable_size_windows(valid_features, valid_labels, min_size=128, max_size=128)
shuffled_test_features, shuffled_test_labels = create_variable_size_windows(test_features, test_labels, min_size=128, max_size=128)

# Print the shapes to verify
print("Train shape:", shuffled_train_features.shape, shuffled_train_labels.shape)
print("Valid shape:", shuffled_valid_features.shape, shuffled_valid_labels.shape)
print("Test shape:", shuffled_test_features.shape, shuffled_test_labels.shape)


Train shape: (434944, 81) (434944,)
Valid shape: (153472, 81) (153472,)
Test shape: (123136, 81) (123136,)


In [None]:
X_train_normal = shuffled_train_features
y_train_normal = shuffled_train_labels
X_valid = shuffled_valid_features
y_valid = shuffled_valid_labels
X_test = shuffled_test_features
y_test = shuffled_test_labels

print(X_train_normal.shape)
print(y_train_normal.shape)
print(X_valid.shape)
print(y_valid.shape)
print(X_test.shape)
print(y_test.shape)

(434944, 81)
(434944,)
(153472, 81)
(153472,)
(123136, 81)
(123136,)


# Augmenting training data

In [None]:
num_sensors = 9
sensor_length = 9  # Each sensor has 9 features

sensor_set = [X_train_normal[:, i * sensor_length:(i + 1) * sensor_length] for i in range(num_sensors)]

sensor_index_for_light_up = [3, 9]  # Right Calf, Right Upper Arm
sensor_index_for_light_down = [7, 8]  # Left Upper Arm, Right Lower Arm
sensor_index_for_heavy_up = [7, 8, 2]  # Left Upper Arm, Right Lower Arm, Left Thigh
sensor_index_for_heavy_down = [3, 9, 4]  # Right Calf, Right Upper Arm, Right Thigh

In [None]:
from scipy.signal import resample

def resample_signal(data, labels, strategy="low", factor=1.0):
    """
    Resamples a 2D sensor data array [frames, axes] by a given factor
    and applies the same transformation to the labels.

    Args:
        data (numpy.ndarray): Input sensor data of shape [frames, 9].
        labels (numpy.ndarray): Corresponding labels of shape [frames].
        factor (float): Resampling factor (>1.0 for upsampling, <1.0 for downsampling).

    Returns:
        tuple: (Resampled sensor data, Resampled labels)
    """

    new_length = int(data.shape[0] * factor)  # Compute new frame count
    resampled_data = np.zeros((new_length, data.shape[1]))  # Allocate output array

    # Resample each sensor axis independently
    for i in range(data.shape[1]):
        resampled_data[:, i] = resample(data[:, i], new_length)

    labels = np.array(labels)

    original_indices = np.linspace(0, data.shape[0] - 1, new_length)
    indices_affected = np.round(original_indices).astype(int)

    if factor < 1:  # Downsampling: Remove affected labels
        labels = np.delete(labels, indices_affected)  # Remove indices
    elif factor > 1:  # Upsampling: Copy previous label at affected indices
        for idx in indices_affected:
            if idx > 0:  # Ensure there's a previous label to copy
                labels = np.insert(labels, idx, labels[idx - 1])  # Insert previous label

    return resampled_data, labels


def frequency_warp(data, warp_factor_range=(0.9, 1.1)):
    """
    Applies frequency warping to a 2D sensor data array [frames, axes].

    Args:
        data (numpy.ndarray): Input sensor data of shape [frames, 9].
        warp_factor_range (tuple): Range from which a random warp factor is selected.

    Returns:
        numpy.ndarray: Frequency-warped sensor data.
    """
    warped_data = np.zeros_like(data)  # Allocate output array

    for i in range(data.shape[1]):  # Process each sensor axis independently
        warp_factor = np.random.uniform(*warp_factor_range)  # Randomize warp factor per axis

        freq_domain = np.fft.fft(data[:, i])  # Compute FFT
        freq_domain = np.fft.fftshift(freq_domain) * warp_factor  # Apply warping
        warped_data[:, i] = np.real(np.fft.ifft(np.fft.ifftshift(freq_domain)))  # Inverse FFT

    return warped_data


def random_dropout(data, labels, dropout_rate=0.05):
    """
    Applies random dropout to a 2D sensor data array [frames, axes] and ensures labels match.

    Args:
        data (numpy.ndarray): Input sensor data of shape [frames, 9].
        labels (numpy.ndarray): Corresponding labels of shape [frames].
        dropout_rate (float): Percentage of frames to randomly drop.

    Returns:
        tuple: (Dropout-affected sensor data, Matching labels)
    """
    mask = np.random.rand(data.shape[0]) > dropout_rate  # Generate mask for keeping frames
    return data[mask], labels[mask]  # Apply mask to both data and labels

Apply Augmentation

In [None]:
import numpy as np

def augment(sensor_set, labels, idx_up, idx_down, strategy="low"):
    """
    Apply augmentation separately to each sensor while keeping alignment.

    Args:
        sensor_set (list of np.ndarray): List of sensors, each of shape [frames, axes].
        labels (np.ndarray): Corresponding labels of shape [frames].
        idx_up (list): Indexes of sensors to be upsampled.
        idx_down (list): Indexes of sensors to be downsampled.
        strategy (str): "low" or "high" for light or heavy augmentation.

    Returns:
        tuple: (Augmented sensor data [sensors, min_frames, axes], Voted labels [min_frames])
    """
    augmented_sensors = []
    augmented_labels = []

    # Define augmentation parameters based on strategy
    if strategy == "low":
        warp_factor = (0.95, 1.05)
        dropout = 0.05
    else:
        warp_factor = (0.9, 1.1)
        dropout = 0.1

    # Perform augmentation separately for each sensor
    for i, sensor in enumerate(sensor_set):
        X_train_aug, y_train_aug = sensor, labels.copy()  # Copy labels to prevent modification

        # Apply resampling based on sensor index
        if i+1 in idx_up:
            X_train_aug, y_train_aug = resample_signal(sensor, labels, strategy, 1.000072 if strategy == "low" else 1.000142)
        elif i+1 in idx_down:
            X_train_aug, y_train_aug = resample_signal(sensor, labels, strategy, 0.999931 if strategy == "low" else 0.999862)

        # Perform frequency warping and dropout
        X_train_aug = frequency_warp(X_train_aug, warp_factor_range=warp_factor)
        X_train_aug, y_train_aug = random_dropout(X_train_aug, y_train_aug, dropout_rate=dropout)

        augmented_sensors.append(X_train_aug)
        augmented_labels.append(y_train_aug)

    # Find the minimum number of frames across all augmented sensors
    min_length = min(sensor.shape[0] for sensor in augmented_sensors)

    # Trim all sensor arrays and labels to the minimum length
    trimmed_sensors = [sensor[:min_length] for sensor in augmented_sensors]
    trimmed_labels = [labels[:min_length] for labels in augmented_labels]

    # Stack sensors while maintaining separate sensor rows
    stacked_sensors = np.stack(trimmed_sensors, axis=0)  # Shape: [sensors, min_frames, axes]
    stacked_labels = np.stack(trimmed_labels, axis=0)  # Shape: [sensors, min_frames]

    # Perform label voting (majority label per frame across sensors)
    voted_labels = np.apply_along_axis(lambda x: np.bincount(x).argmax(), axis=0, arr=stacked_labels.astype(int))

    return stacked_sensors, voted_labels



In [None]:
light_augmented_sensors, light_augmented_labels = augment(sensor_set, y_train_normal, sensor_index_for_light_up, sensor_index_for_light_down, strategy = "low")

In [None]:
heqvy_augmented_sensors, heqvy_augmented_labels = augment(sensor_set, y_train_normal, sensor_index_for_heavy_up, sensor_index_for_heavy_down, strategy = "high")

In [None]:
print(light_augmented_sensors.shape)
print(light_augmented_labels.shape)
print(heqvy_augmented_sensors.shape)
print(heqvy_augmented_labels.shape)

(9, 391152, 9)
(391152,)


Bring data to desired shape (num_of_frames, num_of_sensors * num_of_axis), and concatenate all training datasets into 1
(e.g. [normal data - lightly augmented data - heavy augmented data])

In [None]:
# Step 1: Swap axes to bring frames to the first dimension
light_augmented_sensors = np.transpose(light_augmented_sensors, (1, 0, 2))  # Shape: (num_of_frames, num_of_sensors, num_of_axis)
heqvy_augmented_sensors = np.transpose(heqvy_augmented_sensors, (1, 0, 2))

# Step 2: Reshape to flatten sensor axes into a single row
light_augmented_sensors = light_augmented_sensors.reshape(light_augmented_sensors.shape[0], -1)  # Shape: (num_of_frames, num_of_sensors * num_of_axis)
heqvy_augmented_sensors = heqvy_augmented_sensors.reshape(heqvy_augmented_sensors.shape[0], -1)

print(light_augmented_sensors.shape)
print(heqvy_augmented_sensors.shape)


(391152, 81)


In [None]:
X_train = np.concatenate((X_train_normal, light_augmented_sensors, heqvy_augmented_sensors), axis=0)
y_train = np.concatenate((y_train_normal, light_augmented_labels, heqvy_augmented_labels), axis=0)


Chain 3 copies of normal train data for the experiment comparing non-augmented data and augmented data.

In [None]:
#X_train = np.tile(X_train_normal, (3, 1))
#y_train = np.tile(y_train_normal, (3,))

Save as .mat file

In [None]:
X_train = heqvy_augmented_sensors
y_train = heqvy_augmented_labels
print(len(X_train))
print(len(y_train))
print(X_valid.shape)
print(y_valid.shape)
print(X_test.shape)
print(y_test.shape)

391152
391152
(153472, 81)
(153472,)
(123136, 81)
(123136,)


In [None]:
from scipy.io import savemat

data_dict = {
    "trainData": X_train.tolist(),
    "valData": X_valid.tolist(),
    "testData": X_test.tolist(),
    "trainLabels": y_train.tolist(),
    "valLabels": y_valid.tolist(),
    "testLabels": y_test.tolist(),
}

savemat("REALDISP_AUG_NEW_SPLIT.mat", data_dict)
