# EEG Data Preprocessing

## Converting Files to CSVs

In [1]:
# Notebook: EEG File Converter to CSV with Integrated Annotations
import os
import mne
import pandas as pd

# Suppress MNE warnings
mne.set_log_level('ERROR')

# Define paths
root_path = './data/selfmade_dataset/'       # adjust if necessary
output_path = './csv_output/'

# Ensure output directory exists
os.makedirs(output_path, exist_ok=True)

# Channels to mark as misc
misc_chs = ['Aux1', 'Aux2', 'x_dir', 'y_dir', 'z_dir']

# Iterate over Person and Recording folders
for person_dir in sorted(os.listdir(root_path)):
    person_path = os.path.join(root_path, person_dir)
    if not os.path.isdir(person_path):
        continue

    for recording_dir in sorted(os.listdir(person_path)):
        recording_path = os.path.join(person_path, recording_dir)
        if not os.path.isdir(recording_path):
            continue

        # Process .vhdr files
        for file in sorted(os.listdir(recording_path)):
            if not file.endswith('.vhdr'):
                continue

            vhdr_path = os.path.join(recording_path, file)
            basename = file[:-5]  # strip .vhdr

            # Fix .vhdr references
            with open(vhdr_path, 'r', encoding='utf-8') as f:
                lines = f.readlines()
            with open(vhdr_path, 'w', encoding='utf-8') as f:
                for line in lines:
                    if line.startswith('DataFile='):
                        f.write(f'DataFile={basename}.eeg\n')
                    elif line.startswith('MarkerFile='):
                        f.write(f'MarkerFile={basename}.vmrk\n')
                    else:
                        f.write(line)

            # Load raw data, specifying eog as empty list and misc channels
            raw = mne.io.read_raw_brainvision(
                vhdr_path,
                preload=True,
                eog=[],
                misc=misc_chs
            )

            # Convert to DataFrame
            df = raw.to_data_frame()

            # Add annotation column, default ''
            df['annotation'] = ''
            if raw.annotations and len(raw.annotations) > 0:
                for onset, duration, desc in zip(raw.annotations.onset,
                                                 raw.annotations.duration,
                                                 raw.annotations.description):
                    mask = (df['time'] >= onset) & (df['time'] < onset + duration)
                    df.loc[mask, 'annotation'] = desc

            # Save combined CSV
            out_csv = os.path.join(output_path, f'{basename}.csv')
            df.to_csv(out_csv, index=False)
            print(f'Saved combined CSV: {out_csv}')


Saved combined CSV: ./csv_output/Person1Recording1.csv
Saved combined CSV: ./csv_output/Person1Recording2.csv
Saved combined CSV: ./csv_output/Person1Recording3.csv
Saved combined CSV: ./csv_output/Person1Recording3.csv
Saved combined CSV: ./csv_output/Person2Recording1.csv
Saved combined CSV: ./csv_output/Person2Recording2.csv
Saved combined CSV: ./csv_output/Person2Recording3.csv
Saved combined CSV: ./csv_output/Person3Recording1.csv
Saved combined CSV: ./csv_output/Person3Recording2.csv
Saved combined CSV: ./csv_output/Person3Recording3.csv
Saved combined CSV: ./csv_output/Person4Recording1.csv
Saved combined CSV: ./csv_output/Person4Recording2.csv
Saved combined CSV: ./csv_output/Person5Recording4.csv
Saved combined CSV: ./csv_output/Person6Recording4.csv
Saved combined CSV: ./csv_output/Person6Recording5.csv
Saved combined CSV: ./csv_output/Person6Recording6.csv
Saved combined CSV: ./csv_output/Person7Recording7.csv
Saved combined CSV: ./csv_output/Person8Recording8.csv


## Extend annotations for all rows and clean annotations 

In [2]:
import os, pandas as pd, numpy as np

for file in sorted(os.listdir(output_path)):
    if not file.endswith('.csv'):
        continue
    csv_file = os.path.join(output_path, file)
    df = pd.read_csv(csv_file)

    # Replace empty strings with NaN for ffill
    df['annotation'] = df['annotation'].replace('', np.nan)
    # Remove 'Stimulus/' prefix
    df['annotation'] = df['annotation'].str.replace(r'^Stimulus/', '', regex=True)
    # Forward-fill
    df['annotation'] = df['annotation'].ffill()
    df['annotation'] = df['annotation'].fillna('')

    df.to_csv(csv_file, index=False)
    print(f'Updated annotations in: {csv_file}')


Updated annotations in: ./csv_output/Person1Recording1.csv
Updated annotations in: ./csv_output/Person1Recording2.csv
Updated annotations in: ./csv_output/Person1Recording3.csv
Updated annotations in: ./csv_output/Person2Recording1.csv
Updated annotations in: ./csv_output/Person2Recording2.csv
Updated annotations in: ./csv_output/Person2Recording3.csv
Updated annotations in: ./csv_output/Person3Recording1.csv
Updated annotations in: ./csv_output/Person3Recording2.csv
Updated annotations in: ./csv_output/Person3Recording3.csv


  df = pd.read_csv(csv_file)


Updated annotations in: ./csv_output/Person4Recording1.csv


  df = pd.read_csv(csv_file)


Updated annotations in: ./csv_output/Person4Recording2.csv


  df = pd.read_csv(csv_file)


Updated annotations in: ./csv_output/Person5Recording4.csv


  df = pd.read_csv(csv_file)


Updated annotations in: ./csv_output/Person6Recording4.csv
Updated annotations in: ./csv_output/Person6Recording5.csv


  df = pd.read_csv(csv_file)


Updated annotations in: ./csv_output/Person6Recording6.csv
Updated annotations in: ./csv_output/Person7Recording7.csv
Updated annotations in: ./csv_output/Person8Recording8.csv


## Add columns for Person and Recording

In [3]:
import os, re, pandas as pd

for file in sorted(os.listdir(output_path)):
    if not file.endswith('.csv'):
        continue
    csv_file = os.path.join(output_path, file)
    m = re.match(r'Person(\d+)Recording(\d+)\.csv$', file)
    if not m:
        print(f"Filename does not match pattern: {file}")
        continue
    person_num, recording_num = map(int, m.groups())

    df = pd.read_csv(csv_file)
    df.insert(0, 'Recording', recording_num)
    df.insert(0, 'Person',    person_num)
    df.to_csv(csv_file, index=False)
    print(f'Added Person and Recording columns in: {csv_file}')


Added Person and Recording columns in: ./csv_output/Person1Recording1.csv
Added Person and Recording columns in: ./csv_output/Person1Recording2.csv
Added Person and Recording columns in: ./csv_output/Person1Recording3.csv
Added Person and Recording columns in: ./csv_output/Person2Recording1.csv
Added Person and Recording columns in: ./csv_output/Person2Recording2.csv
Added Person and Recording columns in: ./csv_output/Person2Recording3.csv
Added Person and Recording columns in: ./csv_output/Person3Recording1.csv
Added Person and Recording columns in: ./csv_output/Person3Recording2.csv
Added Person and Recording columns in: ./csv_output/Person3Recording3.csv
Added Person and Recording columns in: ./csv_output/Person4Recording1.csv
Added Person and Recording columns in: ./csv_output/Person4Recording2.csv
Added Person and Recording columns in: ./csv_output/Person5Recording4.csv
Added Person and Recording columns in: ./csv_output/Person6Recording4.csv
Added Person and Recording columns in:

## Remove New Segment, START, END annotations

In [4]:
import os, pandas as pd

unwanted = ['New Segment/', 'START', 'END']
for file in sorted(os.listdir(output_path)):
    if not file.endswith('.csv'):
        continue
    csv_file = os.path.join(output_path, file)
    df = pd.read_csv(csv_file)

    df = df[~df['annotation'].isin(unwanted)]
    df.to_csv(csv_file, index=False)
    print(f'Removed unwanted annotations in: {csv_file}')


Removed unwanted annotations in: ./csv_output/Person1Recording1.csv
Removed unwanted annotations in: ./csv_output/Person1Recording2.csv
Removed unwanted annotations in: ./csv_output/Person1Recording3.csv
Removed unwanted annotations in: ./csv_output/Person2Recording1.csv
Removed unwanted annotations in: ./csv_output/Person2Recording2.csv
Removed unwanted annotations in: ./csv_output/Person2Recording3.csv
Removed unwanted annotations in: ./csv_output/Person3Recording1.csv
Removed unwanted annotations in: ./csv_output/Person3Recording2.csv
Removed unwanted annotations in: ./csv_output/Person3Recording3.csv
Removed unwanted annotations in: ./csv_output/Person4Recording1.csv
Removed unwanted annotations in: ./csv_output/Person4Recording2.csv
Removed unwanted annotations in: ./csv_output/Person5Recording4.csv
Removed unwanted annotations in: ./csv_output/Person6Recording4.csv
Removed unwanted annotations in: ./csv_output/Person6Recording5.csv
Removed unwanted annotations in: ./csv_output/Pe

## Replace lost-sample marker with REST

In [5]:
import os, pandas as pd

for fname in sorted(os.listdir(output_path)):
    if not fname.endswith('.csv'):
        continue
    fullpath = os.path.join(output_path, fname)
    df = pd.read_csv(fullpath)

    df['annotation'] = df['annotation'].replace(
        'New Segment/LostSamples: 1', 'REST'
    )
    df.to_csv(fullpath, index=False)
    print(f"Oppdatert annotering i {fname}")


Oppdatert annotering i Person1Recording1.csv
Oppdatert annotering i Person1Recording2.csv
Oppdatert annotering i Person1Recording3.csv
Oppdatert annotering i Person2Recording1.csv
Oppdatert annotering i Person2Recording2.csv
Oppdatert annotering i Person2Recording3.csv
Oppdatert annotering i Person3Recording1.csv
Oppdatert annotering i Person3Recording2.csv
Oppdatert annotering i Person3Recording3.csv
Oppdatert annotering i Person4Recording1.csv
Oppdatert annotering i Person4Recording2.csv
Oppdatert annotering i Person5Recording4.csv
Oppdatert annotering i Person6Recording4.csv
Oppdatert annotering i Person6Recording5.csv
Oppdatert annotering i Person6Recording6.csv
Oppdatert annotering i Person7Recording7.csv
Oppdatert annotering i Person8Recording8.csv


# Model

In [None]:
# -----------------------------  Block 1  -----------------------------
# Load all CSVs into a single DataFrame
import os, pandas as pd, numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedShuffleSplit
from collections import Counter

csv_dir   = './csv_output'
csv_files = [os.path.join(csv_dir, f) for f in os.listdir(csv_dir)
             if f.endswith('.csv')]
df        = pd.concat([pd.read_csv(f) for f in csv_files], ignore_index=True)

print(f"Loaded {len(df)} rows from {len(csv_files)} files.")
print(df.columns)


In [None]:
# -----------------------------  Block 2  -----------------------------
# Filter annotations, drop Aux, keep only relevant EEG channels
imagery = True         # False ➜ drop Imagery trials
use_aux = False        # True  ➜ keep Aux1/Aux2

if not imagery:
    df = df[~df['annotation'].str.contains('Imagery', case=False, na=False)]

aux_cols = ['Aux1', 'Aux2']
if not use_aux:
    df = df.drop(columns=[c for c in aux_cols if c in df.columns])

eeg_features = (['Person','Recording','time'] +
    ['Fp1','Fz','F3','F7','FT9','FC5','FC1','C3','T7','TP9','CP5','CP1',
     'Pz','P3','P7','O1','Oz','O2','P4','P8','TP10','CP6','CP2','Cz','C4',
     'T8','FT10','FC6','FC2','F4','F8','Fp2'])
if use_aux:
    eeg_features += aux_cols

cols_to_keep = [c for c in eeg_features + ['annotation'] if c in df.columns]
df = df[cols_to_keep]

print("Shape after filtering:", df.shape)


In [None]:
# -----------------------------  Block 3  -----------------------------
# Build feature matrix X, encoded y, and group labels
feature_cols = [c for c in df.columns
                if c not in ('Person','Recording','time','annotation')]
X = df[feature_cols].values.astype(np.float32)

le    = LabelEncoder()
y_enc = le.fit_transform(df['annotation'].values)
groups = (df['Person'].astype(str) + '_' +
          df['Recording'].astype(str)).values

print("Label mapping:", dict(zip(le.classes_, le.transform(le.classes_))))


In [None]:
# -----------------------------  Block 4  -----------------------------
# Stratified Group Shuffle → train / val / test indices
from sklearn.model_selection import StratifiedShuffleSplit
from collections import Counter
import numpy as np

def strat_group_split(y, groups, test_size=0.2, val_size=0.2, seed=42):
    """
    Returnerer rad-indekser for train / val / test
    med både stratifisering og gruppesammenheng.
    """
    uniq_g, inv = np.unique(groups, return_inverse=True)
    # Majoritetsklasse per gruppe (proxy-label til stratifisering)
    maj = np.array([np.bincount(y[inv == i]).argmax() for i in range(len(uniq_g))])

    # ---- trekk test-grupper -------------------------------------
    sss = StratifiedShuffleSplit(n_splits=1,
                                 test_size=test_size,
                                 random_state=seed)
    g_trainval, g_test = next(sss.split(uniq_g, maj))

    # ---- trekk val-grupper fra resten ---------------------------
    sss2 = StratifiedShuffleSplit(n_splits=1,
                                  test_size=val_size,
                                  random_state=seed)
    g_train, g_val = next(sss2.split(uniq_g[g_trainval], maj[g_trainval]))

    g_train = uniq_g[g_trainval][g_train]
    g_val   = uniq_g[g_trainval][g_val]
    g_test  = uniq_g[g_test]

    idx_tr = np.where(np.isin(groups, g_train))[0]
    idx_va = np.where(np.isin(groups, g_val))[0]
    idx_te = np.where(np.isin(groups, g_test))[0]
    return idx_tr, idx_va, idx_te

# kall funksjonen
train_idx, val_idx, test_idx = strat_group_split(y_enc, groups)

print("Train:", Counter(y_enc[train_idx]))
print("Val  :", Counter(y_enc[val_idx]))
print("Test :", Counter(y_enc[test_idx]))


In [None]:
# -----------------------------  Block 5  -----------------------------
# MNE filtering  →  1-s z-scored windows (50 % overlap)
import mne
from mne.filter import filter_data, notch_filter

sfreq, win_s = 256, 1.0
win, step    = int(win_s*sfreq), int(win_s*sfreq)//2

def preprocess(rec_df):
    dat = rec_df[feature_cols].values.T
    dat = notch_filter(dat, sfreq, freqs=[50], verbose=False)
    dat = filter_data(dat, sfreq, 8, 30, verbose=False)
    return dat - dat.mean(0, keepdims=True)  # CAR

def make_windows(idx_subset):
    Xw, yw = [], []
    for g in np.unique(groups[idx_subset]):
        df_rec = df[groups==g].sort_values('time')
        if len(df_rec) < sfreq*3: continue
        arr, labels = preprocess(df_rec), df_rec['annotation'].values
        for s in range(0, arr.shape[1]-win+1, step):
            seg_lab = labels[s:s+win]
            if (seg_lab==seg_lab[0]).mean() >= .8:
                seg = arr[:, s:s+win]
                seg = (seg-seg.mean(1,keepdims=True))/ (seg.std(1,keepdims=True)+1e-6)
                Xw.append(seg); yw.append(seg_lab[0])
    return np.stack(Xw), np.array(yw)

X_tr, y_tr = make_windows(train_idx)
X_va, y_va = make_windows(val_idx)
X_te, y_te = make_windows(test_idx)
print("Windows:", X_tr.shape, X_va.shape, X_te.shape)


In [None]:
# -----------------------------  Block 6  -----------------------------
# Feature transformer: Covariances→TangentSpace + Band-power
from pyriemann.estimation import Covariances
from pyriemann.tangentspace import TangentSpace
from scipy.signal import welch
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

bands = [(0.5,4),(4,8),(8,12),(13,30)]
def bandpow(seg):
    f,p = welch(seg, sfreq, nperseg=sfreq//2)
    return np.concatenate([p[:,(f>=lo)&(f<=hi)].mean(1) for lo,hi in bands])

class BandPower(BaseEstimator, TransformerMixin):
    def fit(self,X,y=None): return self
    def transform(self,X):  return np.array([bandpow(x) for x in X])

cov_ts = Pipeline([('cov', Covariances('oas')),
                   ('ts',  TangentSpace(metric='riemann'))])

feat_pipe = Pipeline([
    ('features', ColumnTransformer(
        [('ts', cov_ts, slice(None)),
         ('bp', BandPower(), slice(None))])),
    ('sc', StandardScaler())
])


In [None]:
# -----------------------------  Block 7  -----------------------------
# Balanced batch generator + EEGNet training
import tensorflow as tf, math, random
from tensorflow.keras.utils import Sequence, to_categorical
from tensorflow.keras.callbacks import EarlyStopping

n_classes = len(le.classes_)
def EEGNet(nb_classes, Chans, Samples, dropout=0.4):
    from tensorflow.keras.layers import (Input, Conv2D, DepthwiseConv2D,
         SeparableConv2D, BatchNormalization, Activation, AveragePooling2D,
         SpatialDropout2D, Flatten, Dense)
    inp = Input((Chans,Samples,1))
    x = Conv2D(8,(1,64),padding='same',use_bias=False)(inp)
    x = BatchNormalization()(x)
    x = DepthwiseConv2D((Chans,1),depth_multiplier=2,use_bias=False)(x)
    x = BatchNormalization()(x); x=Activation('elu')(x)
    x = AveragePooling2D((1,4))(x); x=SpatialDropout2D(dropout)(x)
    x = SeparableConv2D(16,(1,16),padding='same',use_bias=False)(x)
    x = BatchNormalization()(x); x=Activation('elu')(x)
    x = AveragePooling2D((1,8))(x); x=SpatialDropout2D(dropout)(x)
    x = Flatten()(x)
    out = Dense(nb_classes, activation='softmax')(x)
    return tf.keras.Model(inp,out)

class BalBatch(Sequence):
    def __init__(self,X,y,batch=64):
        self.X, self.y = X, np.array(le.transform(y))
        self.b, self.cls = batch, np.unique(self.y)
        self.idxs = {c:np.where(self.y==c)[0] for c in self.cls}
    def __len__(self): return math.ceil(len(self.y)/self.b)
    def __getitem__(self,i):
        per = max(1,self.b//len(self.cls)); idx=[]
        for c in self.cls: idx+=random.sample(self.idxs[c].tolist(),per)
        random.shuffle(idx)
        return (self.X[idx,...,np.newaxis],
                to_categorical(self.y[idx], n_classes))

train_gen = BalBatch(X_tr, y_tr, 64)
val_gen   = BalBatch(X_va, y_va, 64)

model = EEGNet(n_classes, X_tr.shape[1], X_tr.shape[2])
model.compile('adam', 'categorical_crossentropy', metrics=['accuracy'])
es = EarlyStopping(monitor='val_accuracy', patience=8, restore_best_weights=True)

model.fit(train_gen, epochs=60, validation_data=val_gen,
          callbacks=[es], verbose=1)

X_te4D = X_te[..., np.newaxis]
print("Test accuracy:",
      model.evaluate(X_te4D,
      to_categorical(le.transform(y_te), n_classes), verbose=0)[1])


In [None]:
# -----------------------------  Block 8  -----------------------------
# Confusion matrix on test-set
from sklearn.metrics import ConfusionMatrixDisplay, classification_report
import matplotlib.pyplot as plt

y_pred = model.predict(X_te4D, verbose=0).argmax(1)
y_true = le.transform(y_te)

print("\nTest classification report:")
print(classification_report(y_true, y_pred, target_names=le.classes_))

fig, ax = plt.subplots(figsize=(7,7))
ConfusionMatrixDisplay.from_predictions(
    y_true, y_pred,
    display_labels=le.classes_, cmap='Blues',
    xticks_rotation=45, ax=ax, colorbar=False
)
ax.set_title("EEGNet – Confusion Matrix (Test)")
plt.tight_layout(); plt.show()


In [None]:
# -----------------------------  Block 9  -----------------------------
# Lagre alle artefakter for sanntids-inferens
# --------------------------------------------------------------------
import os, json, numpy as np, tensorflow as tf
save_dir = "./saved_artifacts"
os.makedirs(save_dir, exist_ok=True)

# 1) TF-modell
model.save(os.path.join(save_dir, "EEGNet_MI"))

# 2) Etiketter  →  json + npy
lbl_json = os.path.join(save_dir, "label_classes.json")
lbl_npy  = os.path.join(save_dir, "label_classes.npy")
with open(lbl_json, "w", encoding="utf-8") as f:
    json.dump(le.classes_.tolist(), f, indent=2)
np.save(lbl_npy, le.classes_)                 # back-compat
print("✅ etiketter →", lbl_json, "og", lbl_npy)

# 3) Kanal-rekkefølge
with open(os.path.join(save_dir, "eeg_channels.json"), "w") as f:
    json.dump(feature_cols, f, indent=2)

# 4) Pre-proc-meta
meta_info = {
    "sfreq": sfreq, "window_s": win_s, "window_len": win,
    "step_len": step, "overlap": 0.5,
    "zscore": True, "bandpass": [8, 30], "notch": 50
}
with open(os.path.join(save_dir, "preproc_meta.json"), "w") as f:
    json.dump(meta_info, f, indent=2)


# NY MODELL

In [None]:
# -----------------------------  Block 1 -----------------------------
import os, pandas as pd, numpy as np

csv_dir = './csv_output'
csv_files = [os.path.join(csv_dir, f) for f in os.listdir(csv_dir) if f.endswith('.csv')]
df = pd.concat([pd.read_csv(f) for f in csv_files], ignore_index=True)

# Behold kun REST og IMAGERY
df = df[df['annotation'].str.contains('REST|IMAGERY', case=False, na=False)]

# Lag binær annotasjon
df['label'] = np.where(df['annotation'].str.contains('IMAGERY'), 'IMAGERY', 'REST')
df.reset_index(drop=True, inplace=True)

print("Klassefordeling:", df['label'].value_counts())


In [None]:
# -----------------------------  Block 2 -----------------------------
epoch_duration = 4   # kortere epochs gir mer data
fs = 500             # samplingfrekvens

def create_epochs(df, epoch_duration, fs):
    epochs, labels = [], []
    n_samples_epoch = epoch_duration * fs
    for label in ['REST', 'IMAGERY']:
        df_label = df[df['label'] == label]
        n_epochs = len(df_label) // n_samples_epoch
        for i in range(n_epochs):
            epoch = df_label.iloc[i*n_samples_epoch:(i+1)*n_samples_epoch]
            epoch_data = epoch.drop(columns=['Person','Recording','time','annotation','label']).values.T
            epochs.append(epoch_data)
            labels.append(label)
    return np.array(epochs), np.array(labels)

epochs, labels = create_epochs(df, epoch_duration, fs)
print(f'Epochs shape: {epochs.shape}, labels shape: {labels.shape}')


In [None]:
# -----------------------------  Block 3 -----------------------------
cut_seconds = 1
cut_samples = cut_seconds * fs

epochs = epochs[:, :, cut_samples:]
print("Shape etter kutt:", epochs.shape)


In [None]:
# -----------------------------  Block 4 -----------------------------
# I stedet for 0.5–40 Hz, prøv f.eks. 8–30 Hz (alfa + beta-bånd)
epochs_filtered = mne.filter.filter_data(
    epochs, sfreq=fs, l_freq=8, h_freq=30, verbose=False
)



In [None]:
# -----------------------------  Block 5 -----------------------------
from sklearn.model_selection import train_test_split

X_trainval, X_test, y_trainval, y_test = train_test_split(
    epochs_filtered, labels, test_size=0.15, stratify=labels, random_state=42
)

X_train, X_val, y_train, y_val = train_test_split(
    X_trainval, y_trainval, test_size=0.15, stratify=y_trainval, random_state=42
)

print(f"Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")


In [None]:
# -----------------------------  Block 6 -----------------------------
from mne.decoding import CSP

csp = CSP(n_components=8, reg=None, log=True, norm_trace=False)

X_train_csp = csp.fit_transform(X_train, y_train)
X_val_csp = csp.transform(X_val)
X_test_csp = csp.transform(X_test)

print(f'CSP train: {X_train_csp.shape}')


In [None]:
# -----------------------------  Block 7 -----------------------------
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_csp)
X_val_scaled = scaler.transform(X_val_csp)
X_test_scaled = scaler.transform(X_test_csp)


In [None]:
# -----------------------------  Ny Block 7b -----------------------------
from imblearn.under_sampling import RandomUnderSampler
from collections import Counter

print("Original training label distribution:", Counter(y_train))

rus = RandomUnderSampler(random_state=42)
X_train_balanced, y_train_balanced = rus.fit_resample(X_train_scaled, y_train)

print("Balanced training label distribution:", Counter(y_train_balanced))


In [None]:
# -----------------------------  Block 8 -----------------------------
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV

svm = SVC(kernel='rbf', random_state=42)

param_grid = {
    'C': [0.1, 1, 10, 100],
    'gamma': ['scale', 0.01, 0.1, 1],
}

grid = GridSearchCV(svm, param_grid, cv=3, scoring='accuracy', verbose=1, n_jobs=-1)
grid.fit(X_train_balanced, y_train_balanced)

print("Beste parametre:", grid.best_params_)
print("Beste CV accuracy:", grid.best_score_)


In [None]:
# -----------------------------  Block 9 -----------------------------
from sklearn.metrics import classification_report, accuracy_score

model = grid.best_estimator_

# Valideringsresultater
y_val_pred = model.predict(X_val_scaled)
print("Valideringsresultater:")
print(classification_report(y_val, y_val_pred))
print("Valideringsnøyaktighet:", accuracy_score(y_val, y_val_pred))

# Testresultater
y_test_pred = model.predict(X_test_scaled)
print("Testresultater:")
print(classification_report(y_test, y_test_pred))
print("Testnøyaktighet:", accuracy_score(y_test, y_test_pred))


In [None]:
# -----------------------------  Block 10 (Confusion Matrix) -----------------------------
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

cm = confusion_matrix(y_test, y_test_pred, labels=['IMAGERY', 'REST'])

# Plot confusion matrix
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['IMAGERY', 'REST'], yticklabels=['IMAGERY', 'REST'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix (Test Set)')
plt.show()


In [None]:
# ----------------------------- Block 1: Last inn datasett, fjern unødvendige kanaler, binær-annotasjon -----------------------------
import os
import pandas as pd
import numpy as np

# For reproducerbarhet
np.random.seed(42)

# 1) Les inn alle CSV-filer
csv_dir = './csv_output'
csv_files = [os.path.join(csv_dir, f) for f in os.listdir(csv_dir) if f.endswith('.csv')]
df = pd.concat([pd.read_csv(f) for f in csv_files], ignore_index=True)

# 2) Standardiser annotation-tekst
df['annotation'] = df['annotation'].str.upper().str.strip()

# 3) Identifiser EEG-kanaler: ta alle kolonner unntatt metadata, fjern AUX og DIR
metadata_cols = ['Person', 'Recording', 'time', 'annotation']
all_channels  = [c for c in df.columns if c not in metadata_cols]
eeg_channels  = [c for c in all_channels if ('AUX' not in c.upper()) and ('DIR' not in c.upper())]

# 4) Filtrer bort uønskede kolonner, behold kun EEG-signaler + metadata
df = df[metadata_cols + eeg_channels].copy()

# 5) Binær-annotasjon: behold kun REST eller IMAGERY-varianter, lag label
df = df[df['annotation'].str.contains('REST|IMAGERY', na=False)].reset_index(drop=True)
df['label'] = np.where(df['annotation'].str.contains('IMAGERY'), 'IMAGERY', 'REST')

# 6) Sjekk datasett og klassedistribusjon
print("DataFrame shape:", df.shape)
print("Antall EEG-kanaler:", len(eeg_channels))
print("Klassefordeling:\n", df['label'].value_counts())


In [None]:
# ----------------------------- Block 2: Notch- og båndpass-filtrering -----------------------------
import mne
import pandas as pd

# 1) Opprett MNE RawArray fra DataFrame
sfreq = 500.0  # samplingfrekvens
info = mne.create_info(ch_names=eeg_channels, sfreq=sfreq, ch_types='eeg')
raw = mne.io.RawArray(df[eeg_channels].T.values, info)

# 2) Notch-filter ved 50 Hz for å fjerne strømnettstøy
#    - FIR-notching med smal båndbredde rundt 50 Hz
raw.notch_filter(freqs=50., fir_design='firwin', verbose=True)

# 3) Båndpass 1–40 Hz for å fange de viktigste EEG-båndene (drift ↔ gamma)
#    - 1 Hz high-pass for å fjerne DC-drift
#    - 40 Hz low-pass for å undertrykke høyfrekvent støy
raw.filter(l_freq=1.0, h_freq=40.0, fir_design='firwin', verbose=True)

# 4) Les det filtrerte signalet tilbake til en DataFrame
df_filtered = pd.DataFrame(raw.get_data().T, columns=eeg_channels)

# 5) Sjekk at filtreringen er brukt
print(raw.info)                 # skal vise 32 EEG-kanaler @ 500 Hz
print("First values (filtered):")
print(df_filtered.iloc[:5, :5])  # vis et lite utdrag av de filtrerte dataene


In [None]:
# ----------------------------- Block 3 -----------------------------



In [None]:
# ----------------------------- Block 4 (oppdatert): Epoching med gruppelabels -----------------------------
import numpy as np
import pandas as pd

data_clean = raw.get_data().T    
persons    = df['Person'].values       
recs       = df['Recording'].values    
labels_all = df['label'].values        

sfreq       = raw.info['sfreq']
epoch_samps = int(4 * sfreq)
step        = epoch_samps

# 3) Lag epoker, majoritets-label og gruppelabel per epoke
X_epochs, y_epochs, groups = [], [], []
for start in range(0, len(data_clean) - epoch_samps + 1, step):
    end = start + epoch_samps
    seg = data_clean[start:end]    
    baseline = seg[:int(0.5*sfreq), :].mean(axis=0, keepdims=True)
    seg_bc = seg - baseline
    X_epochs.append(seg_bc.T)
    lab = pd.Series(labels_all[start:end]).mode()[0]
    y_epochs.append(lab)
    groups.append(f"{persons[start]}__{recs[start]}")

X_epochs = np.array(X_epochs)    # (n_epochs, n_channels, n_times)
y_epochs = np.array(y_epochs)
groups    = np.array(groups)

# 4) Sjekk
import pandas as pd
print("Epochs shape:", X_epochs.shape)
print("Label-fordeling:\n", pd.Series(y_epochs).value_counts())
print("Antall unike grupper (Person__Recording):", len(np.unique(groups)))


In [None]:
# ----------------------------- Block 5 (oppdatert): Group-based split i train/val/test -----------------------------
from sklearn.model_selection import GroupShuffleSplit
import numpy as np
import pandas as pd

# 1) Først: train vs (val+test)
gss1 = GroupShuffleSplit(n_splits=1, test_size=0.30, random_state=42)
train_idx, vt_idx = next(gss1.split(X_epochs, y_epochs, groups))

# 2) Deretter: val vs test på det resterende
gss2 = GroupShuffleSplit(n_splits=1, test_size=0.50, random_state=42)
val_idx_rel, test_idx_rel = next(gss2.split(X_epochs[vt_idx], y_epochs[vt_idx], groups[vt_idx]))
val_idx  = vt_idx[val_idx_rel]
test_idx = vt_idx[test_idx_rel]

# 3) Del opp data
X_train, y_train = X_epochs[train_idx],    y_epochs[train_idx]
X_val,   y_val   = X_epochs[val_idx],      y_epochs[val_idx]
X_test,  y_test  = X_epochs[test_idx],     y_epochs[test_idx]

# 4) Verifiser at ingen grupper overlapper
train_groups = set(groups[train_idx])
val_groups   = set(groups[val_idx])
test_groups  = set(groups[test_idx])
assert train_groups.isdisjoint(val_groups)
assert train_groups.isdisjoint(test_groups)
assert val_groups.isdisjoint(test_groups)
print("Ingen gruppelekkasje mellom train/val/test ✔️")

# 5) Sjekk former og fordeling
print("Train shape:", X_train.shape, " Val shape:", X_val.shape, " Test shape:", X_test.shape)
print("Train distrib:\n", pd.Series(y_train).value_counts())
print("Val distrib:\n",   pd.Series(y_val).value_counts())
print("Test distrib:\n",  pd.Series(y_test).value_counts())


In [None]:
# ----------------------------- Block 6 (alternativ): Riemann Tangent Space-features -----------------------------
from pyriemann.estimation import Covariances
from pyriemann.tangentspace import TangentSpace

# 1) Beregn kovariansmatriser for hver epoke (n_epochs, n_channels, n_channels)
cov_est = Covariances(estimator='lwf')
X_train_cov = cov_est.fit_transform(X_train)    # bruker kun X_train
X_val_cov   = cov_est.transform(X_val)
X_test_cov  = cov_est.transform(X_test)

# 2) Mapper kovarianser til tangentrom
ts = TangentSpace()
X_train_csp = ts.fit_transform(X_train_cov, y_train)   # navner output X_train_csp
X_val_csp   = ts.transform(X_val_cov)                  # navner output X_val_csp
X_test_csp  = ts.transform(X_test_cov)                 # navner output X_test_csp

# 3) Sjekk at formater samsvarer med resten av pipelinen
print("Train Riemann shape:", X_train_csp.shape)
print("Val   Riemann shape:", X_val_csp.shape)
print("Test  Riemann shape:", X_test_csp.shape)


In [None]:
# ----------------------------- Block 7: Skalering + SVM med klassevekter -----------------------------
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import classification_report, accuracy_score
# Rett import:
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC

# Pakk Riemann‑transformerte features + SVM i pipeline
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('svc', SVC(kernel='rbf', class_weight='balanced', random_state=42))
])

# Gjør 5‑fold stratified CV på alle Riemann‑features
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
X_all = np.vstack([X_train_csp, X_val_csp, X_test_csp])
y_all = np.hstack([y_train, y_val, y_test])
scores = cross_val_score(pipe_full, X_all, y_all,
                         cv=cv, scoring='f1_weighted', n_jobs=-1)
print(f"5‑fold mean={scores.mean():.3f}, std={scores.std():.3f}")


# 2) Tren på treningssettets CSP-features
pipe.fit(X_train_csp, y_train)

# 3) Evaluer på valideringssettet
y_val_pred = pipe.predict(X_val_csp)
print("Valideringsnøyaktighet:", accuracy_score(y_val, y_val_pred))
print("Valideringsrapport:")
print(classification_report(y_val, y_val_pred, target_names=['REST','IMAGERY']))

# 4) Evaluer på testsettet
y_test_pred = pipe.predict(X_test_csp)
print("Testnøyaktighet:", accuracy_score(y_test, y_test_pred))
print("Testrapport:")
print(classification_report(y_test, y_test_pred, target_names=['REST','IMAGERY']))


In [None]:
# ----------------------------- Block 8: Plot Confusion Matrix for Test-set -----------------------------
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report

# Definer klasserekkefølgen én gang
labels = ['REST', 'IMAGERY']

# 1) Skriv ut klassifikasjonsrapport med riktig etikett-rekkefølge
print("Test Classification Report:")
print(classification_report(
    y_test, y_test_pred,
    labels=labels,
    target_names=labels
))

# 2) Beregn og plott forvirringsmatrisen med samme etiketter
cm = confusion_matrix(y_test, y_test_pred, labels=labels)

fig, ax = plt.subplots(figsize=(5, 4))
im = ax.imshow(cm, interpolation='nearest', cmap='Blues')
ax.figure.colorbar(im, ax=ax)
ax.set_xticks([0, 1])
ax.set_yticks([0, 1])
ax.set_xticklabels(labels)
ax.set_yticklabels(labels)
ax.set_xlabel('Predicted label')
ax.set_ylabel('True label')
ax.set_title('Confusion Matrix (Test Set)')

# Annoter hver rute med tall
thresh = cm.max() / 2
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        ax.text(j, i, cm[i, j], ha='center', va='center',
                color='white' if cm[i, j] > thresh else 'black')

plt.tight_layout()
plt.show()
