### Settings

In [1]:
import os
import pathlib
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import torch
import math
import time

### Data Preparation

In [2]:
# ---------------------------------
# Load Brainwave files
# ---------------------------------
directory_path = "/Users/projects/Avatar/Data/EEG_Signals/brainwave_readings/"
core_dir = pathlib.Path(directory_path)
skip_dirs = ["Group1-8channels"] 

dfs = []

for item in core_dir.rglob('*.txt'):
    try:
        if set(item.parts).isdisjoint(skip_dirs):
            df = pd.read_csv(item, sep=',', header=4, on_bad_lines='skip')
            df["src_filename"] = str(item)
            dfs.append(df)
    except:
        pass # Future consideration, add a step to track files that are skipped and revisit them


# Filter out empty DataFrames
dfs_nonempty = [df for df in dfs if not df.empty]

# Concatenate only non-empty DataFrames
eeg_data = pd.concat(dfs_nonempty, ignore_index=True)

# -----------------------------
# Set Labels
# -----------------------------
eeg_data_fnl = eeg_data.copy()
eeg_data_fnl["label_txt"] = ""
label_names = ["backward","forward","landing","left","right","takeoff","fowward"]

for label in label_names:
    eeg_data_fnl["label_txt"] = np.where(
        (eeg_data_fnl["label_txt"] == "") & eeg_data_fnl["src_filename"].str.lower().str.contains(label),
        label,
        eeg_data_fnl["label_txt"]
    )

# Correct typo
eeg_data_fnl["label_txt"] = np.where(
    eeg_data_fnl["label_txt"] == "fowward", "forward",
    eeg_data_fnl["label_txt"]
)

# Encode labels
le = LabelEncoder()
eeg_data_fnl["label"] = le.fit_transform(eeg_data_fnl["label_txt"])

# -----------------------------
# Features / Labels
# -----------------------------
X_df = eeg_data_fnl.iloc[:, 1:17]
X_array = X_df.to_numpy()
y_array = eeg_data_fnl["label"].to_numpy()

X = torch.tensor(X_array, dtype=torch.float32)
y = torch.tensor(y_array, dtype=torch.long)

X_column_names = X_df.columns.tolist()

# Determine which features are categorical
is_categorical = [int(len(np.unique(X_df[col])) < 9) for col in X_column_names]
is_categorical = torch.tensor(is_categorical)

# -----------------------------
# Train/Test Split
# -----------------------------
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)


### Create Model

In [3]:
# ----------------------------
# Decision Tree Class
# ----------------------------
class DecisionTree:
    def __init__(self, max_depth=12, nb_features=None, is_categorical=None, n_bins=32, min_samples_split=50):
        self.max_depth = max_depth
        self.nb_features = nb_features
        self.is_categorical = is_categorical
        self.n_bins = n_bins
        self.min_samples_split = min_samples_split
        self.tree = None  # will store nodes

    # ----------------------------
    # Feature binning
    # ----------------------------
    def _bin_features(self, X):
        X_binned = X.clone()
        numeric_cols = [i for i in range(X.shape[1]) if not self.is_categorical[i]]
        if not numeric_cols:
            return X_binned.long()

        for i in numeric_cols:
            col = X[:, i]
            edges = torch.linspace(col.min(), col.max(), self.n_bins, device=X.device)
            X_binned[:, i] = torch.searchsorted(edges, col.contiguous(), right=False)

        return X_binned.long()

    # ----------------------------
    # Fit tree recursively
    # ----------------------------
    def fit(self, X, y):
        X_binned = self._bin_features(X)
        self.n_classes = int(y.max().item() + 1)
        self.nb_features = self.nb_features or int(math.sqrt(X.shape[1]))
        self.tree = self._build_tree(X_binned, y, depth=0)

    # ----------------------------
    # Node building
    # ----------------------------
    def _build_tree(self, X, y, depth):
        node = {}
        counts = torch.bincount(y, minlength=self.n_classes)
        pred_class = torch.argmax(counts).item()
        node['prediction'] = pred_class

        # Stopping criteria
        if depth >= self.max_depth or len(torch.unique(y)) == 1 or X.shape[0] < self.min_samples_split:
            node['is_leaf'] = True
            return node

        feature_idx = torch.randperm(X.shape[1])[:self.nb_features]
        best_gini = float('inf')
        best_feature = None
        best_split = None

        for f in feature_idx:
            vals = X[:, f]
            unique_vals = torch.unique(vals)
            for val in unique_vals[:-1]:
                left_mask = vals <= val
                right_mask = vals > val
                if left_mask.sum() == 0 or right_mask.sum() == 0:
                    continue
                y_left, y_right = y[left_mask], y[right_mask]
                gini = self._gini_impurity(y_left, y_right)
                if gini < best_gini:
                    best_gini = gini
                    best_feature = f
                    best_split = val

        if best_feature is None:
            node['is_leaf'] = True
            return node

        node['is_leaf'] = False
        node['feature'] = best_feature
        node['split'] = best_split

        left_mask = X[:, best_feature] <= best_split
        right_mask = X[:, best_feature] > best_split

        node['left'] = self._build_tree(X[left_mask], y[left_mask], depth + 1)
        node['right'] = self._build_tree(X[right_mask], y[right_mask], depth + 1)

        return node

    # ----------------------------
    # Gini impurity
    # ----------------------------
    def _gini_impurity(self, y_left, y_right):
        n_left = len(y_left)
        n_right = len(y_right)
        n_total = n_left + n_right
        def gini(y):
            counts = torch.bincount(y, minlength=self.n_classes).float()
            probs = counts / counts.sum()
            return 1.0 - (probs ** 2).sum()
        return (n_left / n_total) * gini(y_left) + (n_right / n_total) * gini(y_right)

    # ----------------------------
    # prediction
    # ----------------------------
    def predict(self, X):
        X_binned = self._bin_features(X)
        n_samples = X.shape[0]
        preds = torch.full((n_samples,), -1, dtype=torch.long, device=X.device)

        queue = [(self.tree, torch.ones(n_samples, dtype=torch.bool, device=X.device))]

        while queue:
            node, mask = queue.pop()
            if mask.sum() == 0:
                continue
            if node['is_leaf']:
                preds[mask] = node['prediction']
                continue
            f, s = node['feature'], node['split']
            left_mask = mask & (X_binned[:, f] <= s)
            right_mask = mask & (X_binned[:, f] > s)
            queue.append((node['left'], left_mask))
            queue.append((node['right'], right_mask))

        return preds


# ----------------------------
# Random Forest
# ----------------------------
class RandomForest:
    def __init__(self, 
                 n_estimators=100,
                 max_depth=12,
                 nb_features=None,
                 is_categorical=None,
                 columns=None,
                 bootstrap=True,
                 device='cpu',
                 sample_frac=0.1,
                 n_bins=32,
                 min_samples_split=50):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.nb_features = nb_features
        self.is_categorical = is_categorical
        self.bootstrap = bootstrap
        self.device = device
        self.sample_frac = sample_frac
        self.n_bins = n_bins
        self.min_samples_split = min_samples_split
        self.trees = []

    def _train_single_tree(self, X, y):
        n_samples = max(1, int(X.shape[0] * self.sample_frac))
        idx = torch.randint(0, X.shape[0], (n_samples,), device=X.device)
        X_sub, y_sub = X[idx], y[idx]
        tree = DecisionTree(
            max_depth=self.max_depth,
            nb_features=self.nb_features,
            is_categorical=self.is_categorical,
            n_bins=self.n_bins,
            min_samples_split=self.min_samples_split
        )
        tree.fit(X_sub, y_sub)
        return tree

    def fit(self, X, y):
        X, y = X.to(self.device), y.to(self.device)
        self.trees = []
        for i in range(self.n_estimators):
            self.trees.append(self._train_single_tree(X, y))
        return self

    def predict(self, X):
        X = X.to(self.device)
        preds_list = [tree.predict(X) for tree in self.trees]
        preds_tensor = torch.stack(preds_list, dim=0)
        return torch.mode(preds_tensor, dim=0).values

    def save(self, path):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        torch.save(self, path)
        print(f"âœ… Model saved to {path}")

    @staticmethod
    def load(path):
        model = torch.load(path, map_location='cpu')
        print(f"âœ… Model loaded from {path}")
        return model

### Model Training

In [5]:
# -----------------------------
# Model Directory
# -----------------------------
model_save_directory = '/Users/projects/Avatar/Models/'
os.makedirs(model_save_directory, exist_ok=True)

# -----------------------------
# Initialize Random Forest
# -----------------------------
rf_model = RandomForest(
    n_estimators=300,
    max_depth=20,
    nb_features=None,
    is_categorical=is_categorical,
    columns=X_column_names,
    bootstrap=True,
    device='cpu',          # switch to 'cuda' if GPU is available
    sample_frac=0.4,       # 40% of rows per tree
    min_samples_split=50,   # test/change as needed for speed and reduce overfitting 
    n_bins=64              # bin continuous features
)

# -----------------------------
# Train the Model
# -----------------------------
start_time = time.time()
rf_model.fit(X_train, y_train)
elapsed = time.time() - start_time
print(f"âœ… Training completed in {elapsed:.2f} seconds")

# -----------------------------
# Evaluate Training Accuracy
# -----------------------------
start_time = time.time()
with torch.no_grad():
    preds_train = rf_model.predict(X_train)
train_acc = (preds_train == y_train).float().mean().item()
elapsed_train_eval = time.time() - start_time
print(f"ðŸ“Š Training accuracy: {train_acc:.4f} (evaluated in {elapsed_train_eval:.2f} seconds)")

# -----------------------------
# Evaluate Test Accuracy
# -----------------------------
start_time = time.time()
with torch.no_grad():
    preds_test = rf_model.predict(X_test)
test_acc = (preds_test == y_test).float().mean().item()
elapsed_test_eval = time.time() - start_time
print(f"ðŸ“Š Test accuracy: {test_acc:.4f} (evaluated in {elapsed_test_eval:.2f} seconds)")

# -----------------------------
# Save the Model
# -----------------------------
save_path = os.path.join(model_save_directory, "custom_rf.pt") 
rf_model.save(save_path)
print("âœ… Model saved")

âœ… Training completed in 2860.19 seconds
ðŸ“Š Training accuracy: 0.8309 (evaluated in 269.62 seconds)
ðŸ“Š Test accuracy: 0.4939 (evaluated in 53.04 seconds)
âœ… Model saved to /Users/projects/Avatar/Models/custom_rf.pt
âœ… Model saved


### Load Model

In [8]:
# -------
# Load
# -------

model_load_directory = '/Users/projects/Avatar/Models/' # Change directory as needed
load_path = os.path.join(model_load_directory, "custom_rf.pt") # Change name as needed

rf_model = torch.load(load_path, weights_only=False)

print("âœ… Model loaded successfully")

âœ… Model loaded successfully


In [9]:
# -----------------------------
# Evaluate Test Accuracy
# Just to show the loaded model works
# -----------------------------
start_time = time.time()
with torch.no_grad():
    preds_test = rf_model.predict(X_test)
test_acc = (preds_test == y_test).float().mean().item()
elapsed_test_eval = time.time() - start_time
print(f"ðŸ“Š Test accuracy: {test_acc:.4f} (evaluated in {elapsed_test_eval:.2f} seconds)")

ðŸ“Š Test accuracy: 0.4939 (evaluated in 50.95 seconds)
