# Imports

In [2]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Dense, Dropout, BatchNormalization, GlobalAveragePooling1D
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from collections import deque
from scipy.signal import butter, lfilter

# Parameters

In [4]:
sampling_rate = 20
window_size = 3 * sampling_rate  # 3s
wand_classes = ["Wave", "Circle", "Square", "Triangle", "Infinity", "Zigzag", "None"]
num_channels_online = 45
num_channels_wand = 6

# Processing Functions

In [6]:
def butter_lowpass(cutoff, fs, order=4):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return b, a

def lowpass_filter(data, cutoff=5, fs=50, order=4):
    b, a = butter_lowpass(cutoff, fs, order=order)
    return lfilter(b, a, data)

def normalize_window(window):
    return (window - np.mean(window, axis=0)) / (np.std(window, axis=0) + 1e-8)

def pad_or_crop(sample, window_size):
    n_rows, n_cols = sample.shape
    if n_rows > window_size:
        return sample[:window_size, :]
    elif n_rows < window_size:
        pad_width = ((0, window_size - n_rows), (0, 0))
        return np.pad(sample, pad_width, mode='constant')
    else:
        return sample

# Data cleaning

In [8]:
def clean_data(df):
    """Remove NaNs and reset index."""
    df = df.dropna().reset_index(drop=True)
    return df
    
def clean_dataset(input_folder, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    for fname in os.listdir(input_folder):
        if fname.endswith('.csv'):
            df = pd.read_csv(os.path.join(input_folder, fname))
            df_clean = clean_data(df)
            df_clean.to_csv(os.path.join(output_folder, fname), index=False)
    print(f"Cleaned dataset saved to {output_folder}")

# Dataset loader

In [10]:
def load_preprocess_dataset(dataset_path, window_size=60, use_clean=True,
                            selected_channels=None, class_list=None, use_labels_folder=True):
  
    data_folder = os.path.join(dataset_path, "data_clean" if use_clean else "data")
    X_list, y_list = [], []

    data_files = sorted(os.listdir(data_folder))

    if use_labels_folder:
        label_folder = os.path.join(dataset_path, "label")
        label_files = sorted(os.listdir(label_folder))
        for data_file, label_file in zip(data_files, label_files):
            df = pd.read_csv(os.path.join(data_folder, data_file)).values
            if selected_channels:
                df = df[:, selected_channels]
            df_fixed = pad_or_crop(df, window_size)
            X_list.append(df_fixed)

            with open(os.path.join(label_folder, label_file), 'r') as f:
                raw_label = f.read().strip()
            y_list.append(raw_label)
    else:
        for data_file in data_files:
            df = pd.read_csv(os.path.join(data_folder, data_file)).values
            if selected_channels:
                df = df[:, selected_channels]
            df_fixed = pad_or_crop(df, window_size)
            X_list.append(df_fixed)

            # Infer label from filename (e.g. circle_01.csv → "Circle")
            raw_label = os.path.splitext(data_file)[0].split("_")[0].capitalize()
            y_list.append(raw_label)

    X = np.array([StandardScaler().fit_transform(sample) for sample in X_list])
    y = np.array(y_list)

    encoder = LabelEncoder()
    if class_list is None:
        encoder.fit(y)
    else:
        encoder.fit(class_list)
    y_encoded = encoder.transform(y)
    y_onehot = np.eye(len(encoder.classes_))[y_encoded]

    print(f"Loaded {len(X)} samples, {X.shape[1]} timesteps, {X.shape[2]} channels")
    print(f"Classes: {list(encoder.classes_)}")
    return X, y_onehot, encoder

# Train base model on online dataset

In [12]:
dataset_online_path = r"C:\Users\CK Cheong\Desktop\rosbag\data"
X_online, y_online, encoder_online = load_preprocess_dataset(
    dataset_online_path,
    window_size=window_size,
    use_clean=True,
    selected_channels=None,
    class_list=None,
    use_labels_folder=True
)

# Split for training/validation
X_train_online, X_val_online, y_train_online, y_val_online = train_test_split(
    X_online, y_online, test_size=0.2, stratify=y_online, random_state=42
)

Loaded 810 samples, 60 timesteps, 45 channels
Classes: ['label\n0', 'label\n1', 'label\n10', 'label\n2', 'label\n3', 'label\n4', 'label\n5', 'label\n6']


# Base CNN Model

In [14]:
base_model = Sequential([
    tf.keras.Input(shape=(window_size, num_channels_online)),
    Conv1D(64, 5, activation='relu'),
    BatchNormalization(),
    MaxPooling1D(2),

    Conv1D(128, 5, activation='relu'),
    BatchNormalization(),
    MaxPooling1D(2),

    Conv1D(256, 3, activation='relu'),
    BatchNormalization(),
    MaxPooling1D(2),

    Dropout(0.4),
    GlobalAveragePooling1D(),   
    Dense(128, activation='relu'),
    Dropout(0.3),
    Dense(len(encoder_online.classes_), activation='softmax')
])

base_model.compile(optimizer=Adam(0.001), loss='categorical_crossentropy', metrics=['accuracy'])
base_model.summary()

history_online = base_model.fit(
    X_train_online, y_train_online,
    validation_data=(X_val_online, y_val_online),
    epochs=30,
    batch_size=32
)

# Save base model
base_model.save("base_model_online.keras")

Epoch 1/30
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 81ms/step - accuracy: 0.2654 - loss: 1.9718 - val_accuracy: 0.2593 - val_loss: 1.9901
Epoch 2/30
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 31ms/step - accuracy: 0.6019 - loss: 1.1761 - val_accuracy: 0.3889 - val_loss: 1.8550
Epoch 3/30
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 31ms/step - accuracy: 0.7546 - loss: 0.8185 - val_accuracy: 0.4568 - val_loss: 1.6530
Epoch 4/30
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 34ms/step - accuracy: 0.8287 - loss: 0.5815 - val_accuracy: 0.5617 - val_loss: 1.4388
Epoch 5/30
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 33ms/step - accuracy: 0.8781 - loss: 0.4155 - val_accuracy: 0.6111 - val_loss: 1.2797
Epoch 6/30
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 33ms/step - accuracy: 0.9383 - loss: 0.2700 - val_accuracy: 0.6111 - val_loss: 1.1616
Epoch 7/30
[1m21/21[0m [32m━━━━

# Clean wand dataset and load

In [16]:
wand_dataset = r"C:\Users\CK Cheong\Documents\GitHub\CG4002-Wizard-Game-Project\AI\wand_dataset"
clean_dataset(os.path.join(wand_dataset, "data"),
              os.path.join(wand_dataset, "data_clean"))

X_wand, y_wand, encoder_wand = load_preprocess_dataset(
    wand_dataset,
    window_size=window_size,
    use_clean=True,
    selected_channels=[0,1,2,3,4,5],
    class_list=wand_classes,
    use_labels_folder=False
)

Cleaned dataset saved to C:\Users\CK Cheong\Documents\GitHub\CG4002-Wizard-Game-Project\AI\wand_dataset\data_clean
Loaded 798 samples, 60 timesteps, 6 channels
Classes: ['Circle', 'Infinity', 'None', 'Square', 'Triangle', 'Wave', 'Zigzag']


# Fine-tune model for wand dataset

In [18]:
# Resize model input if necessary for wand dataset
# Create a new model with 6 channels input but reuse weights
finetune_model = Sequential([
    tf.keras.Input(shape=(window_size, num_channels_wand)),
    Conv1D(64, 5, activation='relu'),
    BatchNormalization(),
    MaxPooling1D(2),

    Conv1D(128, 5, activation='relu'),
    BatchNormalization(),
    MaxPooling1D(2),

    Conv1D(256, 3, activation='relu'),
    BatchNormalization(),
    MaxPooling1D(2),

    Dropout(0.4),
    GlobalAveragePooling1D(),   
    Dense(128, activation='relu'),
    Dropout(0.3),
    Dense(len(encoder_wand.classes_), activation='softmax')
])

finetune_model.compile(optimizer=Adam(0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
finetune_model.summary()

history_finetune = finetune_model.fit(
    X_wand, y_wand,
    validation_split=0.2,
    epochs=20,
    batch_size=16
)

# Save fine-tuned model
finetune_model.save("cnn_finetuned_wand.keras")

Epoch 1/20
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 44ms/step - accuracy: 0.2727 - loss: 2.0057 - val_accuracy: 0.4688 - val_loss: 1.8514
Epoch 2/20
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 22ms/step - accuracy: 0.5470 - loss: 1.2565 - val_accuracy: 0.6250 - val_loss: 1.6463
Epoch 3/20
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 25ms/step - accuracy: 0.7241 - loss: 0.8975 - val_accuracy: 0.6562 - val_loss: 1.5221
Epoch 4/20
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 25ms/step - accuracy: 0.8150 - loss: 0.6576 - val_accuracy: 0.6875 - val_loss: 1.4726
Epoch 5/20
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 24ms/step - accuracy: 0.8448 - loss: 0.5348 - val_accuracy: 0.6687 - val_loss: 1.4909
Epoch 6/20
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 25ms/step - accuracy: 0.9044 - loss: 0.4156 - val_accuracy: 0.3375 - val_loss: 1.5719
Epoch 7/20
[1m40/40[0m [32m━━━━

# Evaluate fine-tuned model

In [20]:
y_pred = finetune_model.predict(X_wand)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(y_wand, axis=1)

print(classification_report(y_true_classes, y_pred_classes,
                            target_names=wand_classes,
                            zero_division=0))  # sets undefined metrics to 0

cm = confusion_matrix(y_true_classes, y_pred_classes)
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt='d', xticklabels=wand_classes, yticklabels=wand_classes)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix (Fine-tuned Wand)')
plt.show()

[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 25ms/step


ValueError: Number of classes, 6, does not match size of target_names, 7. Try specifying the labels parameter