# Produce npy dataset required for fine-tuning

### Load pkl

In [1]:
import pickle
import numpy as np
import pandas as pd
from pathlib import Path

In [2]:
# Load the pkl file
pkl_file = '../../data/pt_decoding_data_S62.pkl'

with open(pkl_file, 'rb') as f:
    data = pickle.load(f)

print(f"Loaded: {pkl_file}")
print(f"Type: {type(data)}")

  data = pickle.load(f)


Loaded: ../../data/pt_decoding_data_S62.pkl
Type: <class 'dict'>


In [5]:
data.keys()

dict_keys(['S14', 'S26', 'S23', 'S33', 'S22', 'S39', 'S58', 'S62'])

In [6]:
data['S14'].keys()

dict_keys(['ID', 'X1', 'X1_map', 'y1', 'X2', 'X2_map', 'y2', 'X3', 'X3_map', 'y3', 'y_full_phon', 'X_collapsed', 'y_phon_collapsed', 'y_artic_collapsed', 'pre_pts'])

In [7]:
for patient_name in data.keys():
    print(data[patient_name]['X1_map'].shape)

(144, 8, 16, 200)
(148, 8, 16, 200)
(151, 8, 16, 200)
(46, 12, 24, 200)
(151, 8, 16, 200)
(137, 24, 12, 200)
(141, 24, 12, 200)
(178, 24, 12, 200)


In [8]:
y_coords, x_coords = np.meshgrid(
    np.linspace(0, 1, 8),
    np.linspace(0, 1, 16),
    indexing="ij"
)

In [9]:
actual_positions = np.column_stack([x_coords.ravel(), y_coords.ravel()])


### X_train.npy, y_train.npy, X_test.npy, y_test.npy [test = one patient, train = the rest]

#### first unify the channels across patients

In [10]:
import numpy as np
from scipy.spatial.distance import cdist

def spatial_resample(data, tgt_H, tgt_W):
    """
    Resample 4D neural data (B, H, W, T) to a target spatial grid (tgt_H, tgt_W).

    Parameters
    ----------
    data : np.ndarray
        Input array of shape (B, H, W, T)
    tgt_H, tgt_W : int
        Target grid height and width

    Returns
    -------
    np.ndarray
        Resampled data of shape (B, tgt_H * tgt_W, T)
    """

    B, H, W, T = data.shape

    # 1Ô∏è‚É£ Flatten the spatial grid -> (B, H*W, T)
    x = data.reshape(B, H * W, T)

    # 2Ô∏è‚É£ Build coordinate grids for the original layout (normalized 0‚Äì1)
    y_coords, x_coords = np.meshgrid(
        np.linspace(0, 1, H),
        np.linspace(0, 1, W),
        indexing="ij"
    )
    actual_positions = np.column_stack([x_coords.ravel(), y_coords.ravel()])

    # 3Ô∏è‚É£ Build coordinate grids for the target layout
    y_t, x_t = np.meshgrid(
        np.linspace(0, 1, tgt_H),
        np.linspace(0, 1, tgt_W),
        indexing="ij"
    )
    target_positions = np.column_stack([x_t.ravel(), y_t.ravel()])

    # 4Ô∏è‚É£ Compute inverse-distance interpolation weights
    dist = cdist(target_positions, actual_positions)
    weights = 1 / (dist + 1e-6)
    weights /= weights.sum(axis=1, keepdims=True)  # normalize each row

    # 5Ô∏è‚É£ Map each trial into the target grid
    x_mapped = np.zeros((B, tgt_H * tgt_W, T))
    for b in range(B):
        x_mapped[b] = weights @ x[b]

    return x_mapped.astype(np.float32)

In [11]:
from scipy.linalg import fractional_matrix_power

def EA(x):
    """
    Parameters
    ----------
    x : numpy array
        data of shape (num_samples, num_channels, num_time_samples)

    Returns
    ----------
    XEA : numpy array
        data of shape (num_samples, num_channels, num_time_samples)
    """
    cov = np.zeros((x.shape[0], x.shape[1], x.shape[1]))
    for i in range(x.shape[0]):
        cov[i] = np.cov(x[i])
    refEA = np.mean(cov, 0)
    sqrtRefEA = fractional_matrix_power(refEA, -0.5) 
    XEA = np.zeros(x.shape)
    for i in range(x.shape[0]):
        XEA[i] = np.dot(sqrtRefEA, x[i])
    return XEA

In [12]:
processed_data = {k: [] for k in data.keys()}

for patient_name in data.keys():
    original_patient_data = data[patient_name]['X1_map']
    processed_patient_data = spatial_resample(original_patient_data, 16, 8)
    processed_data[patient_name] = processed_patient_data

#### create test set & train set

In [26]:
test_patient = 'S14'

X_test = processed_data[test_patient]
y_test = data[test_patient]['y1']

In [25]:
X = []
y = []

X_train = []
y_train = []

for patient_name in data.keys():

    y_train.append(data[patient_name]['y1'])
    X_train.append(processed_data[patient_name])

# Concatenate all patients along the trial axis
X_train = np.concatenate(X_train, axis=0)  # (total_patients, trials, chan, time) -> (total_trials, chan, time)
y_train = np.concatenate(y_train, axis=0)  # (total_patients, trials) -> (total_trials,)

# Convert labels from 1‚Äì9 ‚Üí 0‚Äì8
y_train = y_train - 1

print(X_test.shape)
print(y_test.shape)
print(X_train.shape)
print(y_train.shape)
print(f"Label range: {y_train.min()} to {y_train.max()}")

(44, 128, 200)
(144,)
(1096, 128, 200)
(1096,)
Label range: 0 to 8


In [15]:
# ==== CONFIG ====
DATASET_NAME = "basic"
WEIGHT_PATH = "./weight/MIRepNet.pth"   # pretrained weights (4-class)
BATCH_SIZE = 32

# ==== LOAD DATA ====
X = np.load(f'../../data/{DATASET_NAME}/X.npy')   # (N, 111, 200)
y = np.load(f'../../data/{DATASET_NAME}/labels.npy')  # (N,)
print("Loaded data:", X.shape, y.shape)

Loaded data: (1096, 128, 200) (1096,)


In [16]:
np.save('../../data/X.npy', X_train)
np.save('../../data/labels.npy', y_train)
np.save('../../data/X_test.npy', X_test)
np.save('../../data/labels_test.npy', y_test)

### Create per-patient datasets (phoneme 1 only)

In [34]:
import numpy as np
from sklearn.model_selection import train_test_split
import os

# ==== CONFIG ====
current_patient = 'S14'
test_proportion = 0.3
val_proportion = 0.2       # 20% of train data goes to validation
augmentation_factor = 10   # how many times to augment the training set
augmentation_strength = 0.5  # 0 = identical copies, 1 = strong augmentation

# ==== LOAD ORIGINAL EEG DATA ====
all_trials = data[current_patient]['X1_map']
all_trials = all_trials.reshape(
    all_trials.shape[0],
    all_trials.shape[1] * all_trials.shape[2],
    all_trials.shape[3]
)
all_labels = data[current_patient]['y1'] - 1  # shift labels to start at 0

# ==== TRAIN/TEST SPLIT ====
train_trials, test_trials, train_labels, test_labels = train_test_split(
    all_trials, all_labels,
    test_size=test_proportion,
    random_state=42,
    shuffle=True
)

# ==== TRAIN/VAL SPLIT (from train only) ====
train_trials, val_trials, train_labels, val_labels = train_test_split(
    train_trials, train_labels,
    test_size=val_proportion,
    random_state=42,
    shuffle=True
)

# ==== SAVE ORIGINAL (UN-AUGMENTED) TRAINING COPY ====
save_dir = f'../../data/{current_patient}'
os.makedirs(save_dir, exist_ok=True)

pre_aug_train_X_path = f"{save_dir}/X_same_training_data_before_aug.npy"
pre_aug_train_y_path = f"{save_dir}/labels_same_training_data_before_aug.npy"
np.save(pre_aug_train_X_path, train_trials)
np.save(pre_aug_train_y_path, train_labels)


# ==== DATA AUGMENTATION FUNCTION ====
def augment_eeg_data(X, strength=0.5):
    """Random temporal shift, amplitude scaling, and Gaussian noise.
       strength ‚àà [0, 1] controls how drastic the augmentations are."""
    X_aug = np.copy(X)
    max_shift = int(10 * strength)
    shift_vals = np.random.randint(-max_shift, max_shift + 1, size=X.shape[0])
    for i, s in enumerate(shift_vals):
        X_aug[i] = np.roll(X_aug[i], s, axis=-1)
    
    scale_low, scale_high = 1 - 0.1 * strength, 1 + 0.1 * strength
    scale_factors = np.random.uniform(scale_low, scale_high, size=X.shape[0])
    X_aug *= scale_factors[:, np.newaxis, np.newaxis]
    
    noise_std = 0.01 * strength
    noise = np.random.normal(0, noise_std, size=X.shape)
    X_aug += noise.astype(np.float32)
    
    return X_aug


# ==== APPLY AUGMENTATION ONLY TO TRAINING SET ====
augmented_trials, augmented_labels = [], []
for _ in range(augmentation_factor):
    X_aug = augment_eeg_data(train_trials, strength=augmentation_strength)
    augmented_trials.append(X_aug)
    augmented_labels.append(train_labels)

train_trials = np.concatenate(augmented_trials, axis=0)
train_labels = np.concatenate(augmented_labels, axis=0)


# ==== SAVE FINAL FILES ====
train_X_path = f'{save_dir}/X_train.npy'
train_y_path = f'{save_dir}/labels_train.npy'
val_X_path   = f'{save_dir}/X_val.npy'
val_y_path   = f'{save_dir}/labels_val.npy'
test_X_path  = f'{save_dir}/X_test.npy'
test_y_path  = f'{save_dir}/labels_test.npy'

np.save(train_X_path, train_trials)
np.save(train_y_path, train_labels)
np.save(val_X_path, val_trials)
np.save(val_y_path, val_labels)
np.save(test_X_path, test_trials)
np.save(test_y_path, test_labels)


# ==== PRINT SUMMARY ====
abs_dir = os.path.abspath(save_dir)
print("===============================================")
print("üìä Original data size (before augmentation):")
print(f"  train: {np.load(pre_aug_train_X_path).shape}")
print(f"  val:   {val_trials.shape}")
print(f"  test:  {test_trials.shape}")

print("\nüìà After augmentation:")
print(f"  train: {train_trials.shape}")
print(f"  val:   {val_trials.shape}")
print(f"  test:  {test_trials.shape}")
print("===============================================")

print("\nüìÅ Files saved to:")
print(f"  training data (post aug): {os.path.abspath(train_X_path)}")
print(f"  training labels (post aug): {os.path.abspath(train_y_path)}")
print(f"  validation data: {os.path.abspath(val_X_path)}")
print(f"  validation labels: {os.path.abspath(val_y_path)}")
print(f"  testing data: {os.path.abspath(test_X_path)}")
print(f"  testing labels: {os.path.abspath(test_y_path)}")
print(f"  training data (before aug): {os.path.abspath(pre_aug_train_X_path)}")
print(f"  training labels (before aug): {os.path.abspath(pre_aug_train_y_path)}")

print(f"\n‚ú® Data augmented to {train_trials.shape[0]} training trials (x{augmentation_factor} increase)")
print("===============================================")


üìä Original data size (before augmentation):
  train: (80, 128, 200)
  val:   (20, 128, 200)
  test:  (44, 128, 200)

üìà After augmentation:
  train: (800, 128, 200)
  val:   (20, 128, 200)
  test:  (44, 128, 200)

üìÅ Files saved to:
  training data (post aug): /Users/wangmaidou/Documents/EEG-Model-Fine-tune/data/S14/X_train.npy
  training labels (post aug): /Users/wangmaidou/Documents/EEG-Model-Fine-tune/data/S14/labels_train.npy
  validation data: /Users/wangmaidou/Documents/EEG-Model-Fine-tune/data/S14/X_val.npy
  validation labels: /Users/wangmaidou/Documents/EEG-Model-Fine-tune/data/S14/labels_val.npy
  testing data: /Users/wangmaidou/Documents/EEG-Model-Fine-tune/data/S14/X_test.npy
  testing labels: /Users/wangmaidou/Documents/EEG-Model-Fine-tune/data/S14/labels_test.npy
  training data (before aug): /Users/wangmaidou/Documents/EEG-Model-Fine-tune/data/S14/X_same_training_data_before_aug.npy
  training labels (before aug): /Users/wangmaidou/Documents/EEG-Model-Fine-tune/da