In [None]:
import numpy as np
import scipy.io
from scipy import signal
from scipy.stats import pearsonr
from sklearn.model_selection import GridSearchCV, KFold
from sklearn.preprocessing import StandardScaler
from scipy.interpolate import CubicSpline
from scipy.ndimage import gaussian_filter1d
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.linear_model import Ridge
from sklearn.metrics import make_scorer

# ========== Parameters ==========
fs = 1000
window_len = 100
overlap = 50
step = window_len - overlap
N_wind = 3
n_splits = 5

# ========== Gaussian Smoothing (All points) ==========
def gaussian_smoothing(pred, sigma=2.25):
    pred_smooth = pred.copy()
    for i in range(pred.shape[1]):
        pred_smooth[:, i] = gaussian_filter1d(pred[:, i], sigma=sigma)
    return pred_smooth

# ========== Outlier Suppression (Set to 0) ==========
def suppress_low_outliers(pred, threshold_multiplier=2):
    pred_cleaned = pred.copy()
    for i in range(pred.shape[1]):
        col = pred[:, i]
        mean = np.mean(col)
        std = np.std(col)
        threshold = mean - threshold_multiplier * std
        pred_cleaned[:, i] = np.where(col < threshold, 0, col)
    return pred_cleaned

# ========== Bandpass Filter ==========
def bandpass_filter(data, fs, lowcut, highcut, order=4):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = signal.butter(order, [low, high], btype='band')
    return signal.filtfilt(b, a, data, axis=0)

# ========== Feature Extraction ==========
def get_features(window, fs=1000):
    freq_bands = [(5, 15), (20, 25), (75, 115), (125, 160), (160, 175)]
    n_channels = window.shape[1]
    features = np.zeros((n_channels, 6))
    features[:, 0] = np.mean(window, axis=0)
    for i, (low, high) in enumerate(freq_bands):
        band_filtered = bandpass_filter(window, fs, low, high)
        features[:, i + 1] = np.mean(np.abs(band_filtered), axis=0)
    return features

# ========== Sliding Window ==========
def get_windowed_feats(ecog, fs, win_len, overlap):
    step = win_len - overlap
    num_windows = (ecog.shape[0] - overlap) // step
    feats = []
    for i in tqdm(range(num_windows), desc="Extracting features"):
        start = i * step
        end = start + win_len
        if end > ecog.shape[0]:
            break
        window = ecog[start:end, :]
        feats.append(get_features(window, fs).flatten())
    return np.array(feats)

# ========== Create R Matrix ==========
def create_R_matrix(features, N_wind):
    num_windows, num_feats = features.shape
    pad = np.tile(features[0], (N_wind - 1, 1))
    padded = np.vstack([pad, features])
    R = np.zeros((num_windows, 1 + N_wind * num_feats))
    for i in range(num_windows):
        context = padded[i:i + N_wind].flatten()
        R[i] = np.concatenate(([1], context))
    return R

# ========== make scorer to maximise pearsonr ==========
def mean_pearsonr(y_true, y_pred):
    rs = [pearsonr(y_true[:,i], y_pred[:,i])[0]
          for i in range(y_true.shape[1])]
    return np.mean(rs)

pearson_scorer = make_scorer(mean_pearsonr, greater_is_better=True)


# ========== Cross-validation with Ridge ==========
train_data   = scipy.io.loadmat('raw_training_data.mat')
train_ecogs  = train_data['train_ecog']
train_gloves = train_data['train_dg']



for subj_idx in range(3):
    print(f"\n=== Subject {subj_idx+1} – Ridge CV ===")
    ecog  = train_ecogs[subj_idx].item()
    glove = train_gloves[subj_idx].item()

    feats      = get_windowed_feats(ecog, fs, window_len, overlap)
    R          = create_R_matrix(feats, N_wind)
    glove_down = signal.decimate(glove,
                                 glove.shape[0] // R.shape[0],
                                 axis=0, zero_phase=True)[:R.shape[0]]
    # smooth labels
    for i in range(glove_down.shape[1]):
        glove_down[:,i] = gaussian_filter1d(glove_down[:,i], sigma=2.25)

    kf    = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    r_vals = []

    for fold_idx, (train_idx, test_idx) in enumerate(kf.split(R)):
        # split
        R_train, R_test = R[train_idx], R[test_idx]
        y_train, y_test = glove_down[train_idx], glove_down[test_idx]

        # scale features (skip intercept column at 0)
        scaler = StandardScaler()
        R_train[:,1:] = scaler.fit_transform(R_train[:,1:])
        R_test [:,1:] = scaler.transform(   R_test[:,1:])

        # ——— GridSearchCV to pick α that maximizes mean Pearson-r ———
        alphas = np.logspace(2, 5, 30)
        param_grid = {'alpha': alphas}
        inner_cv = KFold(n_splits=3, shuffle=True, random_state=42)

        grid = GridSearchCV(
            Ridge(),
            param_grid,
            scoring=pearson_scorer,
            cv=inner_cv,
            n_jobs=-1
        )
        grid.fit(R_train[:,1:], y_train)
        best_alpha = grid.best_params_['alpha']
        best_model = grid.best_estimator_

        print(f"    → fold {fold_idx+1}: best α = {best_alpha:.3e}")

        y_pred = best_model.predict(R_test[:,1:])

        # post-process
        y_pred = suppress_low_outliers(y_pred, threshold_multiplier=2)
        y_pred = gaussian_smoothing(y_pred, sigma=2.25)

        # evaluate
        r = [pearsonr(y_test[:,i], y_pred[:,i])[0] for i in range(y_test.shape[1])]
        r_vals.append(np.mean(r))

        '''
        # optional: plot for forst fold
        if fold_idx == 0:
            max_tp = min(int(60*fs/step), len(y_test))
            time   = np.arange(max_tp)*step/fs
            fig, axs = plt.subplots(5,1, figsize=(12,8), sharex=True)
            fig.suptitle(f"Subject {subj_idx+1} Fold 1", fontsize=14)
            for ch in range(5):
                axs[ch].plot(time, y_test[:max_tp,ch], label='True')
                axs[ch].plot(time, y_pred[:max_tp,ch], '--', label='Pred')
                axs[ch].set_ylabel(f"Finger {ch+1}")
                axs[ch].legend(loc='upper right')
            axs[-1].set_xlabel("Time (s)")
            plt.tight_layout(rect=[0,0,1,0.95])
            plt.show()
        '''

    print(f" → Avg Pearson r (Ridge): {np.mean(r_vals):.3f}")


=== Subject 1 – Ridge CV ===


Extracting features: 100%|██████████| 5999/5999 [00:26<00:00, 223.53it/s]


    → fold 1: best α = 5.736e+03
    → fold 2: best α = 5.736e+03
    → fold 3: best α = 5.736e+03
    → fold 4: best α = 5.736e+03
    → fold 5: best α = 5.736e+03
 → Avg Pearson r (Ridge): 0.607

=== Subject 2 – Ridge CV ===


Extracting features: 100%|██████████| 5999/5999 [00:23<00:00, 257.06it/s]


    → fold 1: best α = 4.520e+03
    → fold 2: best α = 4.520e+03
    → fold 3: best α = 4.520e+03
    → fold 4: best α = 4.520e+03
    → fold 5: best α = 4.520e+03
 → Avg Pearson r (Ridge): 0.497

=== Subject 3 – Ridge CV ===


Extracting features: 100%|██████████| 5999/5999 [00:26<00:00, 225.96it/s]


    → fold 1: best α = 3.562e+03
    → fold 2: best α = 3.562e+03
    → fold 3: best α = 3.562e+03
    → fold 4: best α = 3.562e+03
    → fold 5: best α = 3.562e+03
 → Avg Pearson r (Ridge): 0.703


Fine tune alphas

In [None]:
import numpy as np
import scipy.io
from scipy import signal
from scipy.stats import pearsonr
from sklearn.model_selection import GridSearchCV, KFold
from sklearn.preprocessing import StandardScaler
from scipy.interpolate import CubicSpline
from scipy.ndimage import gaussian_filter1d
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.linear_model import Ridge
from sklearn.metrics import make_scorer

# ========== Parameters ==========
fs = 1000
window_len = 100
overlap = 50
step = window_len - overlap
N_wind = 3
n_splits = 5
coarse_alphas = [5.736e3, 4.520e3, 3.562e3]

# ========== Gaussian Smoothing (All points) ==========
def gaussian_smoothing(pred, sigma=2.25):
    pred_smooth = pred.copy()
    for i in range(pred.shape[1]):
        pred_smooth[:, i] = gaussian_filter1d(pred[:, i], sigma=sigma)
    return pred_smooth

# ========== Outlier Suppression (Set to 0) ==========
def suppress_low_outliers(pred, threshold_multiplier=2):
    pred_cleaned = pred.copy()
    for i in range(pred.shape[1]):
        col = pred[:, i]
        mean = np.mean(col)
        std = np.std(col)
        threshold = mean - threshold_multiplier * std
        pred_cleaned[:, i] = np.where(col < threshold, 0, col)
    return pred_cleaned

# ========== Bandpass Filter ==========
def bandpass_filter(data, fs, lowcut, highcut, order=4):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = signal.butter(order, [low, high], btype='band')
    return signal.filtfilt(b, a, data, axis=0)

# ========== Feature Extraction ==========
def get_features(window, fs=1000):
    freq_bands = [(5, 15), (20, 25), (75, 115), (125, 160), (160, 175)]
    n_channels = window.shape[1]
    features = np.zeros((n_channels, 6))
    features[:, 0] = np.mean(window, axis=0)
    for i, (low, high) in enumerate(freq_bands):
        band_filtered = bandpass_filter(window, fs, low, high)
        features[:, i + 1] = np.mean(np.abs(band_filtered), axis=0)
    return features

# ========== Sliding Window ==========
def get_windowed_feats(ecog, fs, win_len, overlap):
    step = win_len - overlap
    num_windows = (ecog.shape[0] - overlap) // step
    feats = []
    for i in tqdm(range(num_windows), desc="Extracting features"):
        start = i * step
        end = start + win_len
        if end > ecog.shape[0]:
            break
        window = ecog[start:end, :]
        feats.append(get_features(window, fs).flatten())
    return np.array(feats)

# ========== Create R Matrix ==========
def create_R_matrix(features, N_wind):
    num_windows, num_feats = features.shape
    pad = np.tile(features[0], (N_wind - 1, 1))
    padded = np.vstack([pad, features])
    R = np.zeros((num_windows, 1 + N_wind * num_feats))
    for i in range(num_windows):
        context = padded[i:i + N_wind].flatten()
        R[i] = np.concatenate(([1], context))
    return R

# ========== make scorer to maximise pearsonr ==========
def mean_pearsonr(y_true, y_pred):
    rs = [pearsonr(y_true[:,i], y_pred[:,i])[0]
          for i in range(y_true.shape[1])]
    return np.mean(rs)

pearson_scorer = make_scorer(mean_pearsonr, greater_is_better=True)


# ========== Cross-validation with Ridge ==========
train_data   = scipy.io.loadmat('raw_training_data.mat')
train_ecogs  = train_data['train_ecog']
train_gloves = train_data['train_dg']



for subj_idx in range(3):
    print(f"\n=== Subject {subj_idx+1} – Ridge CV ===")
    ecog  = train_ecogs[subj_idx].item()
    glove = train_gloves[subj_idx].item()

    feats      = get_windowed_feats(ecog, fs, window_len, overlap)
    R          = create_R_matrix(feats, N_wind)
    glove_down = signal.decimate(glove,
                                 glove.shape[0] // R.shape[0],
                                 axis=0, zero_phase=True)[:R.shape[0]]
    # smooth labels
    for i in range(glove_down.shape[1]):
        glove_down[:,i] = gaussian_filter1d(glove_down[:,i], sigma=2.25)

    kf    = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    r_vals = []

    for fold_idx, (train_idx, test_idx) in enumerate(kf.split(R)):
        # split
        R_train, R_test = R[train_idx], R[test_idx]
        y_train, y_test = glove_down[train_idx], glove_down[test_idx]

        # scale features (skip intercept column at 0)
        scaler = StandardScaler()
        R_train[:,1:] = scaler.fit_transform(R_train[:,1:])
        R_test [:,1:] = scaler.transform(   R_test[:,1:])

        # ——— GridSearchCV to pick α that maximizes mean Pearson-r ———
        # pick the coarse α for this subject
        coarse_alpha = coarse_alphas[subj_idx]

        # build a tight, high-res grid around it (±×3, 50 points)
        lowalpha  = coarse_alpha / 3
        highalpha = coarse_alpha * 3
        alphas_refined = np.logspace(np.log10(lowalpha), np.log10(highalpha), 50)

        param_grid = {'alpha': alphas_refined}
        inner_cv = KFold(n_splits=3, shuffle=True, random_state=42)

        grid = GridSearchCV(
            Ridge(),
            param_grid,
            scoring=pearson_scorer,
            cv=inner_cv,
            n_jobs=-1
        )
        grid.fit(R_train[:,1:], y_train)
        best_alpha = grid.best_params_['alpha']
        best_model = grid.best_estimator_

        print(f"    → fold {fold_idx+1}: best α = {best_alpha:.3e}")

        y_pred = best_model.predict(R_test[:,1:])

        # post-process
        y_pred = suppress_low_outliers(y_pred, threshold_multiplier=2)
        y_pred = gaussian_smoothing(y_pred, sigma=2.25)

        # evaluate
        r = [pearsonr(y_test[:,i], y_pred[:,i])[0] for i in range(y_test.shape[1])]
        r_vals.append(np.mean(r))

        '''
        # optional: plot for forst fold
        if fold_idx == 0:
            max_tp = min(int(60*fs/step), len(y_test))
            time   = np.arange(max_tp)*step/fs
            fig, axs = plt.subplots(5,1, figsize=(12,8), sharex=True)
            fig.suptitle(f"Subject {subj_idx+1} Fold 1", fontsize=14)
            for ch in range(5):
                axs[ch].plot(time, y_test[:max_tp,ch], label='True')
                axs[ch].plot(time, y_pred[:max_tp,ch], '--', label='Pred')
                axs[ch].set_ylabel(f"Finger {ch+1}")
                axs[ch].legend(loc='upper right')
            axs[-1].set_xlabel("Time (s)")
            plt.tight_layout(rect=[0,0,1,0.95])
            plt.show()
        '''

    print(f" → Avg Pearson r (Ridge): {np.mean(r_vals):.3f}")


=== Subject 1 – Ridge CV ===


Extracting features: 100%|██████████| 5999/5999 [00:37<00:00, 161.67it/s]


    → fold 1: best α = 5.363e+03
    → fold 2: best α = 5.609e+03
    → fold 3: best α = 5.866e+03
    → fold 4: best α = 5.609e+03
    → fold 5: best α = 5.363e+03
 → Avg Pearson r (Ridge): 0.607

=== Subject 2 – Ridge CV ===


Extracting features: 100%|██████████| 5999/5999 [00:28<00:00, 212.56it/s]


    → fold 1: best α = 5.056e+03
    → fold 2: best α = 4.622e+03
    → fold 3: best α = 4.622e+03
    → fold 4: best α = 4.834e+03
    → fold 5: best α = 5.056e+03
 → Avg Pearson r (Ridge): 0.497

=== Subject 3 – Ridge CV ===


Extracting features: 100%|██████████| 5999/5999 [00:27<00:00, 215.53it/s]


    → fold 1: best α = 3.985e+03
    → fold 2: best α = 3.810e+03
    → fold 3: best α = 3.985e+03
    → fold 4: best α = 3.810e+03
    → fold 5: best α = 3.810e+03
 → Avg Pearson r (Ridge): 0.702


In [None]:
tuned_alphas = [5.609e+03, 4.834e+03, 3.810e+03]

In [None]:
import numpy as np
import scipy.io
from scipy import signal
from scipy.stats import pearsonr
from sklearn.model_selection import GridSearchCV, KFold
from sklearn.preprocessing import StandardScaler
from scipy.interpolate import CubicSpline
from scipy.ndimage import gaussian_filter1d
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.metrics import make_scorer
from sklearn.linear_model import Ridge, ElasticNet
from sklearn.cross_decomposition import PLSRegression


# ========== Parameters ==========
fs = 1000
window_len = 100
overlap = 50
step = window_len - overlap
N_wind = 3
n_splits = 5
final_alphas = [5.609e3, 4.834e3, 3.810e3]

# ========== Gaussian Smoothing (All points) ==========
def gaussian_smoothing(pred, sigma=2.25):
    pred_smooth = pred.copy()
    for i in range(pred.shape[1]):
        pred_smooth[:, i] = gaussian_filter1d(pred[:, i], sigma=sigma)
    return pred_smooth

# ========== Outlier Suppression (Set to 0) ==========
def suppress_low_outliers(pred, threshold_multiplier=2):
    pred_cleaned = pred.copy()
    for i in range(pred.shape[1]):
        col = pred[:, i]
        mean = np.mean(col)
        std = np.std(col)
        threshold = mean - threshold_multiplier * std
        pred_cleaned[:, i] = np.where(col < threshold, 0, col)
    return pred_cleaned

# ========== Bandpass Filter ==========
def bandpass_filter(data, fs, lowcut, highcut, order=4):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = signal.butter(order, [low, high], btype='band')
    return signal.filtfilt(b, a, data, axis=0)

# ========== Feature Extraction ==========
def get_features(window, fs=1000):
    freq_bands = [(5, 15), (20, 25), (75, 115), (125, 160), (160, 175)]
    n_channels = window.shape[1]
    features = np.zeros((n_channels, 6))
    features[:, 0] = np.mean(window, axis=0)
    for i, (low, high) in enumerate(freq_bands):
        band_filtered = bandpass_filter(window, fs, low, high)
        features[:, i + 1] = np.mean(np.abs(band_filtered), axis=0)
    return features

# ========== Sliding Window ==========
def get_windowed_feats(ecog, fs, win_len, overlap):
    step = win_len - overlap
    num_windows = (ecog.shape[0] - overlap) // step
    feats = []
    for i in tqdm(range(num_windows), desc="Extracting features"):
        start = i * step
        end = start + win_len
        if end > ecog.shape[0]:
            break
        window = ecog[start:end, :]
        feats.append(get_features(window, fs).flatten())
    return np.array(feats)

# ========== Create R Matrix ==========
def create_R_matrix(features, N_wind):
    num_windows, num_feats = features.shape
    pad = np.tile(features[0], (N_wind - 1, 1))
    padded = np.vstack([pad, features])
    R = np.zeros((num_windows, 1 + N_wind * num_feats))
    for i in range(num_windows):
        context = padded[i:i + N_wind].flatten()
        R[i] = np.concatenate(([1], context))
    return R

# ========== make scorer to maximise pearsonr ==========
def mean_pearsonr(y_true, y_pred):
    rs = [pearsonr(y_true[:,i], y_pred[:,i])[0]
          for i in range(y_true.shape[1])]
    return np.mean(rs)

pearson_scorer = make_scorer(mean_pearsonr, greater_is_better=True)


# ========== Cross-validation with Ridge ==========
train_data   = scipy.io.loadmat('raw_training_data.mat')
train_ecogs  = train_data['train_ecog']
train_gloves = train_data['train_dg']



for subj_idx in range(3):
    print(f"\n=== Subject {subj_idx+1} – Ensemble CV ===")
    ecog  = train_ecogs[subj_idx].item()
    glove = train_gloves[subj_idx].item()

    feats      = get_windowed_feats(ecog, fs, window_len, overlap)
    R          = create_R_matrix(feats, N_wind)
    glove_down = signal.decimate(glove,
                                 glove.shape[0] // R.shape[0],
                                 axis=0, zero_phase=True)[:R.shape[0]]
    # smooth labels
    for i in range(glove_down.shape[1]):
        glove_down[:,i] = gaussian_filter1d(glove_down[:,i], sigma=2.25)

    kf    = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    fold_rs = []
    inner_cv = KFold(n_splits=3, shuffle=True, random_state=42)

    fold_rs  = []
    pls_bests = []

    for fold_idx, (train_idx, test_idx) in enumerate(kf.split(R)):
        # 1) Split & scale
        R_tr, R_te = R[train_idx], R[test_idx]
        y_tr, y_te = glove_down[train_idx], glove_down[test_idx]

        scaler = StandardScaler()
        R_tr[:,1:] = scaler.fit_transform(R_tr[:,1:])
        R_te[:,1:] = scaler.transform(R_te[:,1:])

        # 2) Train each model on the train split
        # 2a) Ridge
        ridge = Ridge(alpha=final_alphas[subj_idx])
        ridge.fit(R_tr[:,1:], y_tr)
        pred_ridge = ridge.predict(R_te[:,1:])

        # 2b) PLSRegression (tuned by inner CV)
        pls_cv = GridSearchCV(
            PLSRegression(),
            {'n_components': list(range(2, 20, 2))},
            scoring=pearson_scorer,
            cv=inner_cv,
            n_jobs=-1
        )
        pls_cv.fit(R_tr[:,1:], y_tr)

        best_n = pls_cv.best_params_['n_components']
        pls_bests.append(best_n)
        pred_pls = pls_cv.predict(R_te[:,1:])

        '''
        # 2c) ElasticNet (tuned by inner CV)
        enet_cv = GridSearchCV(
            ElasticNet(warm_start=True, max_iter=2000, tol=1e-3),
            {'alpha': np.logspace(-3,0,7),
            'l1_ratio': [0.1, 0.9]},
            scoring=pearson_scorer,
            cv=inner_cv,
            n_jobs=-1
        )
        enet_cv.fit(R_tr[:,1:], y_tr)
        pred_enet = enet_cv.predict(R_te[:,1:])
        '''

        # 3) Average predictions
        y_pred = 0.5 * (pred_ridge + pred_pls)


        # 4) Post-process & evaluate
        y_pred = suppress_low_outliers(y_pred, threshold_multiplier=2)
        y_pred = gaussian_smoothing(y_pred, sigma=2.25)
        r = [pearsonr(y_te[:,i], y_pred[:,i])[0] for i in range(y_te.shape[1])]
        fold_rs.append(np.mean(r))

        print(f"    → Fold {fold_idx+1} r = {fold_rs[-1]:.3f}")

    print(f"  Fold-wise best PLS components for subject {subj_idx+1}: {pls_bests}")
    print(f"  Median PLS n_components = {int(np.median(pls_bests))}")
    print(f" → Avg ensemble r = {np.mean(fold_rs):.3f}")


=== Subject 1 – Ensemble CV ===


Extracting features: 100%|██████████| 5999/5999 [00:26<00:00, 230.63it/s]


    → Fold 1 r = 0.621
    → Fold 2 r = 0.627
    → Fold 3 r = 0.609
    → Fold 4 r = 0.610
    → Fold 5 r = 0.601
  Fold-wise best PLS components for subject 1: [14, 14, 18, 14, 14]
  Median PLS n_components = 14
 → Avg ensemble r = 0.614

=== Subject 2 – Ensemble CV ===


Extracting features: 100%|██████████| 5999/5999 [00:23<00:00, 256.84it/s]


    → Fold 1 r = 0.516
    → Fold 2 r = 0.504
    → Fold 3 r = 0.487
    → Fold 4 r = 0.504
    → Fold 5 r = 0.510
  Fold-wise best PLS components for subject 2: [14, 14, 14, 14, 14]
  Median PLS n_components = 14
 → Avg ensemble r = 0.504

=== Subject 3 – Ensemble CV ===


Extracting features: 100%|██████████| 5999/5999 [00:26<00:00, 229.01it/s]


    → Fold 1 r = 0.700
    → Fold 2 r = 0.704
    → Fold 3 r = 0.704
    → Fold 4 r = 0.707
    → Fold 5 r = 0.699
  Fold-wise best PLS components for subject 3: [14, 12, 14, 12, 12]
  Median PLS n_components = 12
 → Avg ensemble r = 0.703


In [None]:
final_alphas = [5.609e3, 4.834e3, 3.810e3]
final_pls = [14, 14, 12]

Quick refine PLS

In [None]:
import numpy as np
import scipy.io
from scipy import signal
from scipy.stats import pearsonr
from sklearn.model_selection import GridSearchCV, KFold
from sklearn.preprocessing import StandardScaler
from scipy.interpolate import CubicSpline
from scipy.ndimage import gaussian_filter1d
from tqdm import tqdm
from sklearn.metrics import make_scorer
from sklearn.linear_model import Ridge
from sklearn.cross_decomposition import PLSRegression

# ========== Parameters ==========
fs = 1000
window_len = 100
overlap = 50
step = window_len - overlap
N_wind = 3
n_splits = 5
final_alphas = [5.609e3, 4.834e3, 3.810e3]
median_pls   = [14, 14, 12]  # pre-computed medians per subject

# ========== Helper Functions ==========
def gaussian_smoothing(pred, sigma=2.25):
    pred_smooth = pred.copy()
    for i in range(pred.shape[1]):
        pred_smooth[:, i] = gaussian_filter1d(pred[:, i], sigma=sigma)
    return pred_smooth

def suppress_low_outliers(pred, threshold_multiplier=2):
    pred_cleaned = pred.copy()
    for i in range(pred.shape[1]):
        col = pred[:, i]
        thresh = np.mean(col) - threshold_multiplier * np.std(col)
        pred_cleaned[:, i] = np.where(col < thresh, 0, col)
    return pred_cleaned

def get_features(window, fs=1000):
    freq_bands = [(5,15),(20,25),(75,115),(125,160),(160,175)]
    n_ch = window.shape[1]
    feats = np.zeros((n_ch, 6))
    feats[:,0] = np.mean(window, axis=0)
    for idx, (low, high) in enumerate(freq_bands):
        bf = bandpass_filter(window, fs, low, high)
        feats[:,idx+1] = np.mean(np.abs(bf), axis=0)
    return feats

def bandpass_filter(data, fs, lowcut, highcut, order=4):
    nyq = 0.5 * fs
    low, high = lowcut/nyq, highcut/nyq
    b, a = signal.butter(order, [low, high], btype='band')
    return signal.filtfilt(b, a, data, axis=0)

def get_windowed_feats(ecog, fs, win_len, overlap):
    step = win_len - overlap
    n_win = (ecog.shape[0] - overlap) // step
    feats = []
    for i in range(n_win):
        s, e = i*step, i*step + win_len
        if e > ecog.shape[0]: break
        feats.append(get_features(ecog[s:e, :], fs).flatten())
    return np.array(feats)

def create_R_matrix(features, N_wind):
    num_windows, num_feats = features.shape
    pad = np.tile(features[0], (N_wind-1, 1))
    padded = np.vstack([pad, features])
    R = np.zeros((num_windows, 1 + N_wind * num_feats))
    for i in range(num_windows):
        ctx = padded[i:i+N_wind].flatten()
        R[i] = np.concatenate(([1], ctx))
    return R

def mean_pearsonr(y_true, y_pred):
    rs = []
    for i in range(y_true.shape[1]):
        r, _ = pearsonr(y_true[:,i], y_pred[:,i])
        rs.append(0 if np.isnan(r) else r)
    return np.mean(rs)

pearson_scorer = make_scorer(mean_pearsonr, greater_is_better=True)

# ========== Load Data ==========
train_data   = scipy.io.loadmat('raw_training_data.mat')
train_ecogs  = train_data['train_ecog']
train_gloves = train_data['train_dg']

# ========== Cross-Validation with PLS Refinement ==========
outer_cv = KFold(n_splits=n_splits, shuffle=True, random_state=42)
inner_cv = KFold(n_splits=3, shuffle=True, random_state=42)

for subj_idx in range(3):
    print(f"\n=== Subject {subj_idx+1} – CV with ±2 PLS refine ===")
    # build full data for this subject
    ecog = train_ecogs[subj_idx].item()
    glove = train_gloves[subj_idx].item()

    feats = get_windowed_feats(ecog, fs, window_len, overlap)
    R = create_R_matrix(feats, N_wind)
    y_raw = signal.decimate(glove, glove.shape[0]//R.shape[0], axis=0, zero_phase=True)[:R.shape[0]]
    for ch in range(y_raw.shape[1]):
        y_raw[:,ch] = gaussian_filter1d(y_raw[:,ch], sigma=2.25)

    fold_rs = []
    fold_pls_best = []
    grid_range = list(range(max(2, median_pls[subj_idx]-2), median_pls[subj_idx]+3))
    print(f"→ refining n_components in {grid_range}")

    for fold, (tr_i, te_i) in enumerate(outer_cv.split(R), 1):
        # split and scale
        R_tr, R_te = R[tr_i], R[te_i]
        y_tr, y_te = y_raw[tr_i], y_raw[te_i]
        scaler = StandardScaler().fit(R_tr[:,1:])
        X_tr = scaler.transform(R_tr[:,1:])
        X_te = scaler.transform(R_te[:,1:])

        # Ridge prediction
        mr = Ridge(alpha=final_alphas[subj_idx]).fit(X_tr, y_tr)
        pr = mr.predict(X_te)

        # PLS refinement on this fold
        pls_cv = GridSearchCV(
            PLSRegression(),
            {'n_components': grid_range},
            scoring=pearson_scorer,
            cv=inner_cv,
            n_jobs=-1
        )
        pls_cv.fit(X_tr, y_tr)
        best_nc = pls_cv.best_params_['n_components']
        fold_pls_best.append(best_nc)
        pp = pls_cv.predict(X_te)

        # ensemble & post-process
        y_pred = 0.5 * (pr + pp)
        y_pred = suppress_low_outliers(y_pred, threshold_multiplier=2)
        y_pred = gaussian_smoothing(y_pred, sigma=2.25)

        # score
        r_vals = [pearsonr(y_te[:,i], y_pred[:,i])[0] for i in range(y_te.shape[1])]
        fold_r = np.mean(r_vals)
        fold_rs.append(fold_r)
        print(f"  Fold {fold} | best PLS={best_nc} | r={fold_r:.3f}")

    print(f"→ Fold-wise PLS components: {fold_pls_best}")
    print(f"→ Median refined PLS = {int(np.median(fold_pls_best))}")
    print(f"→ Avg ensemble r = {np.mean(fold_rs):.3f} ± {np.std(fold_rs):.3f}")


=== Subject 1 – CV with ±2 PLS refine ===
→ refining n_components in [12, 13, 14, 15, 16]
  Fold 1 | best PLS=15 | r=0.620
  Fold 2 | best PLS=14 | r=0.627
  Fold 3 | best PLS=16 | r=0.609
  Fold 4 | best PLS=15 | r=0.614
  Fold 5 | best PLS=15 | r=0.604
→ Fold-wise PLS components: [15, 14, 16, 15, 15]
→ Median refined PLS = 15
→ Avg ensemble r = 0.615 ± 0.008

=== Subject 2 – CV with ±2 PLS refine ===
→ refining n_components in [12, 13, 14, 15, 16]
  Fold 1 | best PLS=15 | r=0.518
  Fold 2 | best PLS=13 | r=0.501
  Fold 3 | best PLS=14 | r=0.487
  Fold 4 | best PLS=15 | r=0.506
  Fold 5 | best PLS=14 | r=0.510
→ Fold-wise PLS components: [15, 13, 14, 15, 14]
→ Median refined PLS = 14
→ Avg ensemble r = 0.504 ± 0.010

=== Subject 3 – CV with ±2 PLS refine ===
→ refining n_components in [10, 11, 12, 13, 14]
  Fold 1 | best PLS=13 | r=0.699
  Fold 2 | best PLS=13 | r=0.706
  Fold 3 | best PLS=13 | r=0.704
  Fold 4 | best PLS=13 | r=0.712
  Fold 5 | best PLS=13 | r=0.701
→ Fold-wise PLS 

In [None]:
refined_pls = [15, 14, 13]

Tune weight of each model

In [None]:
import numpy as np
import scipy.io
from scipy import signal
from scipy.stats import pearsonr
from sklearn.model_selection import GridSearchCV, KFold
from sklearn.preprocessing import StandardScaler
from scipy.interpolate import CubicSpline
from scipy.ndimage import gaussian_filter1d
from tqdm import tqdm
from sklearn.metrics import make_scorer
from sklearn.linear_model import Ridge
from sklearn.cross_decomposition import PLSRegression

# ========== Parameters ==========
fs = 1000
window_len = 100
overlap = 50
step = window_len - overlap
N_wind = 3
n_splits = 5
# Tuned hyperparameters per subject
final_alphas = [5.609e3, 4.834e3, 3.810e3]
refined_pls   = [15, 14, 13]

# ========== Helper Functions ==========
def gaussian_smoothing(pred, sigma=2.25):
    out = pred.copy()
    for i in range(out.shape[1]):
        out[:, i] = gaussian_filter1d(out[:, i], sigma)
    return out

def suppress_low_outliers(pred, threshold_multiplier=2):
    out = pred.copy()
    for i in range(out.shape[1]):
        col = out[:, i]
        thr = col.mean() - threshold_multiplier * col.std()
        out[:, i] = np.where(col < thr, 0, col)
    return out

def bandpass_filter(data, fs, lowcut, highcut, order=4):
    nyq = 0.5 * fs
    low, high = lowcut/nyq, highcut/nyq
    b, a = signal.butter(order, [low, high], btype='band')
    return signal.filtfilt(b, a, data, axis=0)

def get_features(window, fs=1000):
    bands = [(5,15),(20,25),(75,115),(125,160),(160,175)]
    n_ch  = window.shape[1]
    feats = np.zeros((n_ch, 6))
    feats[:,0] = window.mean(axis=0)
    for idx, (low, high) in enumerate(bands):
        bf = bandpass_filter(window, fs, low, high)
        feats[:,idx+1] = np.abs(bf).mean(axis=0)
    return feats

def get_windowed_feats(ecog, fs, win_len, overlap):
    step = win_len - overlap
    n_win = (ecog.shape[0] - overlap)//step
    feats = []
    for i in tqdm(range(n_win), desc="Extracting features"):
        s, e = i*step, i*step + win_len
        if e > ecog.shape[0]: break
        feats.append(get_features(ecog[s:e,:], fs).flatten())
    return np.array(feats)

def create_R_matrix(features, N_wind):
    n_w, n_f = features.shape
    pad   = np.tile(features[0], (N_wind-1,1))
    concat= np.vstack([pad, features])
    R     = np.zeros((n_w, 1 + N_wind*n_f))
    for i in range(n_w):
        ctx = concat[i:i+N_wind].ravel()
        R[i] = np.concatenate(([1], ctx))
    return R

def mean_pearsonr(y_true, y_pred):
    rs = []
    for i in range(y_true.shape[1]):
        r, _ = pearsonr(y_true[:,i], y_pred[:,i])
        rs.append(0 if np.isnan(r) else r)
    return np.mean(rs)

pearson_scorer = make_scorer(mean_pearsonr, greater_is_better=True)

def tune_weight(X, y, alpha, n_comp, n_splits=5):
    """
    Returns (best_w, best_r) by mixing post-processed Ridge and PLS on OOF preds.
    """
    oof_pr = np.zeros_like(y)
    oof_pp = np.zeros_like(y)
    oof_y  = np.zeros_like(y)
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=0)
    for tr_i, te_i in kf.split(X):
        # train models
        mr = Ridge(alpha=alpha).fit(X[tr_i], y[tr_i])
        pl = PLSRegression(n_components=n_comp).fit(X[tr_i], y[tr_i])
        # raw preds
        pr = mr.predict(X[te_i])
        pp = pl.predict(X[te_i])
        # post-process
        pr = suppress_low_outliers(pr, threshold_multiplier=2)
        pr = gaussian_smoothing(pr, sigma=2.25)
        pp = suppress_low_outliers(pp, threshold_multiplier=2)
        pp = gaussian_smoothing(pp, sigma=2.25)
        # store OOF
        oof_pr[te_i] = pr
        oof_pp[te_i] = pp
        oof_y [te_i] = y[te_i]
    best_w, best_r = 0.0, -np.inf
    for w in np.linspace(0,1,21):
        pred = w*oof_pr + (1-w)*oof_pp
        rs   = [pearsonr(oof_y[:,i], pred[:,i])[0] for i in range(y.shape[1])]
        r    = np.mean(rs)
        if r > best_r:
            best_r, best_w = r, w
    return best_w, best_r

# ========== Load Data ==========
train = scipy.io.loadmat('raw_training_data.mat')
train_ecogs  = train['train_ecog']
train_gloves = train['train_dg']

# ========== CV with PLS Refinement & Weight Tuning ==========
outer_cv = KFold(n_splits=n_splits, shuffle=True, random_state=42)
inner_cv = KFold(n_splits=3, shuffle=True, random_state=42)

for subj_idx in range(3):
    print(f"\n=== Subject {subj_idx+1} – CV w/ PLS refine & w-tuning ===")
    # full-train features & labels
    E = train_ecogs[subj_idx].item()
    G = train_gloves[subj_idx].item()
    feats = get_windowed_feats(E, fs, window_len, overlap)
    R     = create_R_matrix(feats, N_wind)
    y_raw = signal.decimate(G, G.shape[0]//R.shape[0], axis=0, zero_phase=True)[:R.shape[0]]
    for ch in range(y_raw.shape[1]):
        y_raw[:,ch] = gaussian_filter1d(y_raw[:,ch], sigma=2.25)
    # scale full data
    scaler = StandardScaler().fit(R[:,1:])
    X_full = scaler.transform(R[:,1:])
    y_full = y_raw
    # tune ensemble weight
    w_opt, r_opt = tune_weight(
        X_full, y_full,
        alpha = final_alphas[subj_idx],
        n_comp= refined_pls[subj_idx],
        n_splits=5
    )
    print(f"→ Optimal w = {w_opt:.2f}, global OOF r = {r_opt:.3f}")
    # outer CV folds
    fold_rs, fold_pls_best = [], []
    grid_range = list(range(max(2, refined_pls[subj_idx]-2),
                             refined_pls[subj_idx]+3))
    print(f"→ refining PLS comps in {grid_range}")
    for fold, (tr_i, te_i) in enumerate(outer_cv.split(R), 1):
        R_tr, R_te = R[tr_i], R[te_i]
        y_tr, y_te = y_full[tr_i], y_full[te_i]
        X_tr = scaler.transform(R_tr[:,1:])
        X_te = scaler.transform(R_te[:,1:])
        pr = Ridge(alpha=final_alphas[subj_idx]).fit(X_tr, y_tr).predict(X_te)
        pls_cv = GridSearchCV(
            PLSRegression(), {'n_components': grid_range},
            scoring=pearson_scorer, cv=inner_cv, n_jobs=-1
        )
        pls_cv.fit(X_tr, y_tr)
        best_nc = pls_cv.best_params_['n_components']
        fold_pls_best.append(best_nc)
        pp = pls_cv.predict(X_te)
        y_pred = w_opt*pr + (1-w_opt)*pp
        y_pred = suppress_low_outliers(y_pred, 2)
        y_pred = gaussian_smoothing(y_pred, sigma=2.25)
        rs = [pearsonr(y_te[:,i], y_pred[:,i])[0] for i in range(y_te.shape[1])]
        fold_r = np.mean(rs)
        fold_rs.append(fold_r)
        print(f"  Fold {fold} | best PLS={best_nc} | r={fold_r:.3f}")
    print(f"→ Fold-wise PLS picks: {fold_pls_best}")
    print(f"→ Median PLS = {int(np.median(fold_pls_best))}")
    print(f"→ Avg  r = {np.mean(fold_rs):.3f} ± {np.std(fold_rs):.3f}")


=== Subject 1 – CV w/ PLS refine & w-tuning ===


Extracting features: 100%|██████████| 5999/5999 [00:22<00:00, 262.06it/s]


→ Optimal w = 0.50, global OOF r = 0.608
→ refining PLS comps in [13, 14, 15, 16, 17]
  Fold 1 | best PLS=15 | r=0.620
  Fold 2 | best PLS=14 | r=0.627
  Fold 3 | best PLS=17 | r=0.610
  Fold 4 | best PLS=17 | r=0.610
  Fold 5 | best PLS=15 | r=0.604
→ Fold-wise PLS picks: [15, 14, 17, 17, 15]
→ Median PLS = 15
→ Avg  r = 0.614 ± 0.008

=== Subject 2 – CV w/ PLS refine & w-tuning ===


Extracting features: 100%|██████████| 5999/5999 [00:21<00:00, 277.88it/s]


→ Optimal w = 0.35, global OOF r = 0.502
→ refining PLS comps in [12, 13, 14, 15, 16]
  Fold 1 | best PLS=15 | r=0.517
  Fold 2 | best PLS=13 | r=0.498
  Fold 3 | best PLS=14 | r=0.487
  Fold 4 | best PLS=15 | r=0.507
  Fold 5 | best PLS=14 | r=0.510
→ Fold-wise PLS picks: [15, 13, 14, 15, 14]
→ Median PLS = 14
→ Avg  r = 0.504 ± 0.010

=== Subject 3 – CV w/ PLS refine & w-tuning ===


Extracting features: 100%|██████████| 5999/5999 [00:23<00:00, 259.60it/s]


→ Optimal w = 0.65, global OOF r = 0.701
→ refining PLS comps in [11, 12, 13, 14, 15]
  Fold 1 | best PLS=13 | r=0.699
  Fold 2 | best PLS=13 | r=0.707
  Fold 3 | best PLS=13 | r=0.704
  Fold 4 | best PLS=13 | r=0.712
  Fold 5 | best PLS=13 | r=0.701
→ Fold-wise PLS picks: [13, 13, 13, 13, 13]
→ Median PLS = 13
→ Avg  r = 0.705 ± 0.005


In [None]:
import numpy as np
import scipy.io
from scipy import signal
from scipy.stats import pearsonr
from sklearn.model_selection import KFold, GridSearchCV
from sklearn.preprocessing import StandardScaler
from scipy.interpolate import CubicSpline
from scipy.ndimage import gaussian_filter1d
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.linear_model import Ridge
from sklearn.cross_decomposition import PLSRegression
from sklearn.metrics import make_scorer

# ========== Parameters ==========
fs = 1000
window_len = 100
overlap = 50
step = window_len - overlap
N_wind = 3

# ========== Gaussian Smoothing ==========
def gaussian_smoothing(pred, sigma=2.25):
    pred_smooth = pred.copy()
    for i in range(pred.shape[1]):
        pred_smooth[:, i] = gaussian_filter1d(pred[:, i], sigma=sigma)
    return pred_smooth

# ========== Outlier Suppression ==========
def suppress_low_outliers(pred, threshold_multiplier=2):
    pred_cleaned = pred.copy()
    for i in range(pred.shape[1]):
        col = pred[:, i]
        mean = np.mean(col)
        std = np.std(col)
        thresh = mean - threshold_multiplier * std
        pred_cleaned[:, i] = np.where(col < thresh, 0, col)
    return pred_cleaned

# ========== Bandpass Filter ==========
def bandpass_filter(data, fs, lowcut, highcut, order=4):
    nyq = 0.5 * fs
    low, high = lowcut/nyq, highcut/nyq
    b, a = signal.butter(order, [low, high], btype='band')
    return signal.filtfilt(b, a, data, axis=0)

# ========== Feature Extraction ==========
def get_features(window, fs=1000):
    freq_bands = [(5,15),(20,25),(75,115),(125,160),(160,175)]
    n_ch = window.shape[1]
    feats = np.zeros((n_ch, 6))
    feats[:,0] = np.mean(window,axis=0)
    for i,(low,high) in enumerate(freq_bands):
        bf = bandpass_filter(window, fs, low, high)
        feats[:,i+1] = np.mean(np.abs(bf),axis=0)
    return feats

# ========== Sliding Window ==========
def get_windowed_feats(ecog, fs, win_len, overlap):
    step = win_len - overlap
    n_win = (ecog.shape[0] - overlap)//step
    feats = []
    for i in tqdm(range(n_win), desc="Extracting features"):
        s, e = i*step, i*step+win_len
        if e > ecog.shape[0]: break
        feats.append(get_features(ecog[s:e,:], fs).flatten())
    return np.array(feats)

# ========== Create R Matrix ==========
def create_R_matrix(features, N_wind):
    n_w, n_f = features.shape
    pad = np.tile(features[0], (N_wind-1,1))
    concat = np.vstack([pad, features])
    R = np.zeros((n_w, 1 + N_wind*n_f))
    for i in range(n_w):
        ctx = concat[i:i+N_wind].flatten()
        R[i] = np.concatenate(([1], ctx))
    return R

# ========== Scorer for Pearson-r ==========
def mean_pearsonr(y_true, y_pred):
    rs = [pearsonr(y_true[:,i], y_pred[:,i])[0]
          for i in range(y_true.shape[1])]
    return np.mean(rs)

pearson_scorer = make_scorer(mean_pearsonr, greater_is_better=True)

# ==========Tuned parameters ==========
final_alphas = [5609, 4834, 3810]
refined_pls  = [15  ,   14,   13]
weights      = [0.50, 0.35, 0.65]

# ========== Predict on Leaderboard ==========
train_data        = scipy.io.loadmat('raw_training_data.mat')
test_data         = scipy.io.loadmat('leaderboard_data.mat')
train_ecogs       = train_data['train_ecog']
train_gloves      = train_data['train_dg']
leaderboard_ecogs = test_data['leaderboard_ecog']
predicted_dg      = np.empty((3,1),dtype=object)

for subj_idx in range(3):
    print(f"\n=== Subject {subj_idx+1} ===")
    # load
    ecog_tr = train_ecogs[subj_idx].item()
    glove_tr= train_gloves[subj_idx].item()
    ecog_te = leaderboard_ecogs[subj_idx].item()

    # train features & labels
    feats_tr  = get_windowed_feats(ecog_tr, fs, window_len, overlap)
    R_tr      = create_R_matrix(feats_tr, N_wind)
    glove_ds  = signal.decimate(glove_tr,
                   glove_tr.shape[0]//R_tr.shape[0], axis=0,
                   zero_phase=True)[:R_tr.shape[0]]
    # smooth labels
    for i in range(glove_ds.shape[1]):
        glove_ds[:,i] = gaussian_filter1d(glove_ds[:,i], sigma=2.25)

    # scale
    scaler = StandardScaler()
    R_tr[:,1:] = scaler.fit_transform(R_tr[:,1:])

    #Fit
    #Ridge
    modridge = Ridge(alpha=final_alphas[subj_idx]).fit(R_tr[:,1:], glove_ds)

    #PLS
    pls = PLSRegression(n_components=refined_pls[subj_idx]).fit(R_tr[:,1:], glove_ds)

    # test features & predict
    feats_te   = get_windowed_feats(ecog_te, fs, window_len, overlap)
    R_te       = create_R_matrix(feats_te, N_wind)
    R_te[:,1:] = scaler.transform(R_te[:,1:])
    pr = modridge.predict(R_te[:,1:])
    pp = pls.predict(R_te[:,1:])
    w  = weights[subj_idx]
    pred = w*pr + (1-w)*pp

    # post-process
    pred = suppress_low_outliers(pred, threshold_multiplier=2)
    pred = gaussian_smoothing(pred, sigma=2.25)

    # upsample back to 1kHz
    x_old = np.arange(pred.shape[0]) * step
    x_new = np.arange(ecog_te.shape[0])
    interp = np.zeros((len(x_new), pred.shape[1]))
    for i in range(pred.shape[1]):
        cs = CubicSpline(x_old, pred[:,i], bc_type='natural')
        interp[:,i] = cs(x_new)

    predicted_dg[subj_idx,0] = interp

# save
scipy.io.savemat('predicted_submission.mat',
                 {'predicted_dg': predicted_dg})
print("\n✅ Predictions saved to 'predicted_submission.mat'")


=== Subject 1 ===


Extracting features: 100%|██████████| 5999/5999 [00:25<00:00, 232.08it/s]
Extracting features: 100%|██████████| 2949/2949 [00:12<00:00, 236.11it/s]



=== Subject 2 ===


Extracting features: 100%|██████████| 5999/5999 [00:23<00:00, 256.30it/s]
Extracting features: 100%|██████████| 2949/2949 [00:11<00:00, 255.10it/s]



=== Subject 3 ===


Extracting features: 100%|██████████| 5999/5999 [00:26<00:00, 227.87it/s]
Extracting features: 100%|██████████| 2949/2949 [00:12<00:00, 234.36it/s]



✅ Predictions saved to 'predicted_submission.mat'


Final submission (tuned Ridge)
Train and save model to use on final dataset

In [3]:
import numpy as np
import scipy.io
from scipy import signal
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import Ridge
from scipy.ndimage import gaussian_filter1d
from tqdm import tqdm
import joblib

# ========== Parameters ==========
fs = 1000
window_len = 100
overlap = 50
step = window_len - overlap
N_wind = 3

# ========== Gaussian Smoothing ==========
def gaussian_smoothing(pred, sigma=2.25):
    pred_smooth = pred.copy()
    for i in range(pred.shape[1]):
        pred_smooth[:, i] = gaussian_filter1d(pred[:, i], sigma=sigma)
    return pred_smooth

# ========== Outlier Suppression ==========
def suppress_low_outliers(pred, threshold_multiplier=2):
    pred_cleaned = pred.copy()
    for i in range(pred.shape[1]):
        col = pred[:, i]
        mean = np.mean(col)
        std = np.std(col)
        thresh = mean - threshold_multiplier * std
        pred_cleaned[:, i] = np.where(col < thresh, 0, col)
    return pred_cleaned

# ========== Bandpass Filter ==========
def bandpass_filter(data, fs, lowcut, highcut, order=4):
    nyq = 0.5 * fs
    low, high = lowcut/nyq, highcut/nyq
    b, a = signal.butter(order, [low, high], btype='band')
    return signal.filtfilt(b, a, data, axis=0)

# ========== Feature Extraction ==========
def get_features(window, fs=1000):
    freq_bands = [(5,15),(20,25),(75,115),(125,160),(160,175)]
    n_ch = window.shape[1]
    feats = np.zeros((n_ch, 6))
    feats[:,0] = np.mean(window,axis=0)
    for i,(low,high) in enumerate(freq_bands):
        bf = bandpass_filter(window, fs, low, high)
        feats[:,i+1] = np.mean(np.abs(bf),axis=0)
    return feats

# ========== Sliding Window ==========
def get_windowed_feats(ecog, fs, win_len, overlap):
    step = win_len - overlap
    n_win = (ecog.shape[0] - overlap)//step
    feats = []
    for i in tqdm(range(n_win), desc="Extracting features"):
        s, e = i*step, i*step+win_len
        if e > ecog.shape[0]: break
        feats.append(get_features(ecog[s:e,:], fs).flatten())
    return np.array(feats)

# ========== Create R Matrix ==========
def create_R_matrix(features, N_wind):
    n_w, n_f = features.shape
    pad = np.tile(features[0], (N_wind-1,1))
    concat = np.vstack([pad, features])
    R = np.zeros((n_w, 1 + N_wind*n_f))
    for i in range(n_w):
        ctx = concat[i:i+N_wind].flatten()
        R[i] = np.concatenate(([1], ctx))
    return R


# ==========Tuned parameters ==========
final_alphas = [5609, 4834, 3810]

# ==========Tuned parameters ==========
savemodels = []


# ========== Predict on Leaderboard ==========
train_data        = scipy.io.loadmat('raw_training_data.mat')
train_ecogs       = train_data['train_ecog']
train_gloves      = train_data['train_dg']

for subj_idx in range(3):
    print(f"\n=== Subject {subj_idx+1} ===")
    # load
    ecog_tr = train_ecogs[subj_idx].item()
    glove_tr= train_gloves[subj_idx].item()

    # train features & labels
    feats_tr  = get_windowed_feats(ecog_tr, fs, window_len, overlap)
    R_tr      = create_R_matrix(feats_tr, N_wind)
    glove_ds  = signal.decimate(glove_tr,
                   glove_tr.shape[0]//R_tr.shape[0], axis=0,
                   zero_phase=True)[:R_tr.shape[0]]
    # smooth labels
    for i in range(glove_ds.shape[1]):
        glove_ds[:,i] = gaussian_filter1d(glove_ds[:,i], sigma=2.25)



    # Build and fit pipeline: scaler + Ridge
    model = make_pipeline(
        StandardScaler(),
        Ridge(alpha=final_alphas[subj_idx])
    )
    model.fit(R_tr[:,1:], glove_ds)
    savemodels.append(model)


#dump into joblib
joblib.dump(savemodels, 'ridge_models.joblib', compress=3)
print("Saved ridgemodels to 'ridge_models.joblib'")



=== Subject 1 ===


Extracting features: 100%|██████████| 5999/5999 [00:26<00:00, 227.59it/s]



=== Subject 2 ===


Extracting features: 100%|██████████| 5999/5999 [00:23<00:00, 252.79it/s]



=== Subject 3 ===


Extracting features: 100%|██████████| 5999/5999 [00:26<00:00, 226.77it/s]


Saved ridgemodels to 'ridge_models.joblib'


In [5]:
savemodels

[Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('ridge', Ridge(alpha=5609))]),
 Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('ridge', Ridge(alpha=4834))]),
 Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('ridge', Ridge(alpha=3810))])]

In [None]:
#create test dataset to debug code with
import scipy.io

# 1. Load the .mat
data = scipy.io.loadmat('leaderboard_data.mat')

print(data.keys())

# 2. Change keys to match document
data['truetest_data'] = data['leaderboard_ecog']
del data['leaderboard_ecog']

# 3. Save as test dataset
scipy.io.savemat('truetest_data.mat', data)