## **BCI Competition III Dataset IV**

#### **1. Load training and test dataset**

In [70]:
import os
import pandas as pd
import numpy as np

data_path = os.path.join(os.getcwd(), "..", "data", "bci_competition")
data_psd_path = os.path.join(data_path, "data_psd")

# Channel indices for left and right brain
left_channels = [0, 36, 60]  # C3, CP1, P3
right_channels = [24, 48, 84]  # C4, CP2, P4


def load_psd_file(file_path, include_labels=True):
    """
    Load a PSD file and extract features and labels
    """
    data = pd.read_csv(file_path, delim_whitespace=True, header=None)
    features = data.iloc[:, :96]
    labels = data.iloc[:, 96] if include_labels else None
    return features, labels

#### **2. Extract left and right regions (electrodes)**

The dataset has 96 PSD features (12 frequency bands for 8 channels). We need to extract the left and right brain regions.

Left channels: **[C3, CP1, CP5, P3]** → 4 channels → 4×12=48 features.
Right channels: **[C4, CP2, CP6, P4]** → 4 channels → also 48 features.

In [71]:
def extract_left_right_features(features):
    """
    Extract left and right brain features from the PSD data.
    """
    left_features = pd.concat(
        [features.iloc[:, i : i + 12] for i in left_channels], axis=1
    )  # skip each 12 columns
    right_features = pd.concat(
        [features.iloc[:, i : i + 12] for i in right_channels], axis=1
    )
    return left_features, right_features

In [72]:
train_files = [f for f in os.listdir(data_psd_path) if f.startswith("train")]
train_data = {}
for file in train_files:
    file_path = os.path.join(data_psd_path, file)
    features, labels = load_psd_file(file_path)
    left_features, right_features = extract_left_right_features(features)
    train_data[file] = (left_features, right_features, labels)

train_left = pd.concat([data[0] for data in train_data.values()])
train_right = pd.concat([data[1] for data in train_data.values()])
train_labels = pd.concat([data[2] for data in train_data.values()])

  data = pd.read_csv(file_path, delim_whitespace=True, header=None)
  data = pd.read_csv(file_path, delim_whitespace=True, header=None)
  data = pd.read_csv(file_path, delim_whitespace=True, header=None)
  data = pd.read_csv(file_path, delim_whitespace=True, header=None)
  data = pd.read_csv(file_path, delim_whitespace=True, header=None)
  data = pd.read_csv(file_path, delim_whitespace=True, header=None)
  data = pd.read_csv(file_path, delim_whitespace=True, header=None)
  data = pd.read_csv(file_path, delim_whitespace=True, header=None)
  data = pd.read_csv(file_path, delim_whitespace=True, header=None)


#### **3. Resample to get predictions every 0.5 seconds**

As mentioned in the competition, we need to average every 8 consecutive samples to predict the task every 0.5 seconds (since the sampling rate is 16 Hz, each 8 samples correspond to 0.5 seconds).

In [73]:
def resample_features(features, step=8):
    """
    Resample features by averaging over intervals of 0.5s.
    """
    num_samples = len(features)
    num_intervals = num_samples // step
    resampled = np.mean(
        features[: num_intervals * step].values.reshape(-1, step, features.shape[1]),
        axis=1,
    )
    return pd.DataFrame(resampled)


train_left_resampled = resample_features(train_left)
train_right_resampled = resample_features(train_right)

xt = train_left_resampled  # Left electrode features
yt = train_right_resampled  # Right electrode features

#### **4. Combine and prepare for the Adaptive CCA Alg.**

**x_t**: (36-dimensional feature vector).
**y_t**: (class labels) for each time interval.

In [76]:
print("xt shape:", xt.shape)
print("yt shape:", yt.shape)
np.save("xt.npy", xt)
np.save("yt.npy", yt)

xt shape: (3902, 36)
yt shape: (3902, 36)


#### **6. Adaptive CCA Implemented**

##### **A. Preprocessing**

In [77]:
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score

xt = np.load("xt.npy")  # left hemisphere features (n_samples, 36)
yt = np.load("yt.npy")  # right hemisphere features (n_samples, 36)

# Preprocess (normalize)
xt = (xt - np.mean(xt, axis=0)) / np.std(xt, axis=0)
yt = (yt - np.mean(yt, axis=0)) / np.std(yt, axis=0)

n_samples, nx = xt.shape
_, ny = yt.shape
p = 4  # Rank of decomposition (number of principal directions)
beta = 0.98  # Forgetting factor, test with 0.99, 0.98, 0.95

In [78]:
Cx = np.eye(nx)
Cy = np.eye(ny)
Cxy = np.zeros((nx, ny))

# Initialize subspaces
Ux = np.random.randn(nx, p)  # Random init
Vy = np.random.randn(ny, p)
Ux, _ = np.linalg.qr(Ux)  # Orthonormalize Ux
Vy, _ = np.linalg.qr(Vy)  # Orthonormalize Vy

# Storage for residuals and reconstruction error
reconstruction_errors = []
detected_changes = []

##### **B. Computing the CCA model**

Computation of the algorithm based on suggested steps.

In [None]:
def adap_cca(xt, yt, beta=0.98, p=4):
    for t in range(n_samples):
        # Get the current sample
        x_t = xt[t].reshape(-1, 1)
        y_t = yt[t].reshape(-1, 1)

        # Update covariance matrices
        Cx = beta * Cx + (1 - beta) * x_t @ x_t.T
        Cy = beta * Cy + (1 - beta) * y_t @ y_t.T
        Cxy = beta * Cxy + (1 - beta) * x_t @ y_t.T

        # Compute generalized eigenvalue problem
        Mx = np.linalg.inv(Cx) @ Cxy @ np.linalg.inv(Cy) @ Cxy.T
        My = np.linalg.inv(Cy) @ Cxy.T @ np.linalg.inv(Cx) @ Cxy
        eig_vals_x, eig_vecs_x = np.linalg.eigh(Mx)
        eig_vals_y, eig_vecs_y = np.linalg.eigh(My)

        # Take top p components
        Ux = eig_vecs_x[:, -p:]
        Vy = eig_vecs_y[:, -p:]

        # Compute residuals
        rx_t = np.linalg.inv(Cx) @ (np.eye(nx) - Ux @ Ux.T) @ x_t
        ry_t = np.linalg.inv(Cy) @ (np.eye(ny) - Vy @ Vy.T) @ y_t

        # Compute reconstruction error
        c_t = 0.5 * (rx_t.T @ Cx @ rx_t / nx + ry_t.T @ Cy @ ry_t / ny)
        reconstruction_errors.append(c_t.item())

        # Detect changes (based on a threshold tau)
        if len(reconstruction_errors) > 5:
            if reconstruction_errors[-1] > np.percentile(reconstruction_errors[:t], 95):
                if len(detected_changes) == 0 or (t - detected_changes[-1] > 5):
                    detected_changes.append(t)

In [None]:
# Plot Results
plt.figure(figsize=(12, 6))
plt.plot(reconstruction_errors, label="Reconstruction Error")
for change in detected_changes:
    plt.axvline(change, color="r", linestyle="--", label="Detected Change")
plt.xlabel("Time Step")
plt.ylabel("Reconstruction Error")
plt.legend()
plt.title("Reconstruction Error and Detected Changes")
plt.show()

# Evaluate Performance
# Assuming `true_changes` contains ground truth change points
true_changes = []  # Populate with known change points
y_true = np.zeros(n_samples)
y_pred = np.zeros(n_samples)

y_true[true_changes] = 1
y_pred[detected_changes] = 1

auc = roc_auc_score(y_true, reconstruction_errors) if true_changes else None
if auc:
    print(f"AUC of Change Detection: {auc:.4f}")
else:
    print("No ground truth change points provided.")