In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [2]:
import os
import numpy as np
from scipy.signal import butter, filtfilt, welch
from scipy.stats import skew, kurtosis
from scipy.spatial.distance import cdist
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from joblib import Parallel, delayed
import math
import warnings
warnings.filterwarnings("ignore")

In [3]:
dataset_path = '/content/drive/MyDrive/EEG_Project/datasets/FNOSZ'
label_map = {'F': 0, 'N': 1, 'O': 2, 'S': 3, 'Z': 4}

In [4]:
#filtering and windowing
def bandpass_filter(sig, low=0.5, high=50, fs=173.61, order=4):
    nyq = 0.5 * fs
    low_cut = low / nyq
    high_cut = high / nyq
    b, a = butter(order, [low_cut, high_cut], btype='band')
    return filtfilt(b, a, sig)

def sliding_windows(sig, window_s=2.0, overlap=0.5, fs=173.61):
    win_samples = int(window_s * fs)
    step = int(win_samples * (1 - overlap))
    if step <= 0:
        step = win_samples
    windows = []
    for start in range(0, len(sig) - win_samples + 1, step):
        windows.append(sig[start:start + win_samples])
    return windows


In [5]:
#pfen
def ordinal_patterns_vectorized(seq, m=3, tau=1):
    # returns integer rank patterns shape (N, m)
    N = len(seq) - (m - 1) * tau
    if N <= 0:
        return np.empty((0, m), dtype=int)
    patterns = np.empty((N, m), dtype=int)
    for i in range(N):
        vec = seq[i:i + m * tau:tau]
        # ranks 0..m-1
        ranks = np.argsort(np.argsort(vec))
        patterns[i] = ranks
    return patterns

def phi_from_patterns(patterns, r, n):
    # if patterns <2, return 0
    N = patterns.shape[0]
    if N <= 1:
        return 0.0
    # Chebyshev (max absolute) distance across pattern components
    D = cdist(patterns, patterns, metric='chebyshev')  # NxN
    # take upper triangle (i<j)
    iu = np.triu_indices(N, k=1)
    dvals = D[iu]
    # fuzzy membership
    mu = np.exp(- (dvals ** n) / r)
    return np.mean(mu) if mu.size > 0 else 0.0

def permutation_fuzzy_entropy(window, m=3, tau=1, n=2, r_factor=0.2):
    # window: 1D numpy array
    std = np.std(window)
    if std == 0:
        return 0.0
    r = max(1e-12, r_factor * std)
    Pm = ordinal_patterns_vectorized(window, m=m, tau=tau)
    Pm1 = ordinal_patterns_vectorized(window, m=m+1, tau=tau)
    phi_m = phi_from_patterns(Pm, r, n)
    phi_m1 = phi_from_patterns(Pm1, r, n)
    if phi_m <= 0 or phi_m1 <= 0:
        return 0.0
    val = -np.log(phi_m1 / phi_m)
    # guard numeric issues
    if np.isfinite(val):
        return float(val)
    else:
        return 0.0

# 5) Additional features per window: bandpower via Welch
def bandpower_welch(window, fs=173.61, band=(1,4)):
    f, Pxx = welch(window, fs=fs, nperseg=min(len(window), 256))
    idx = np.logical_and(f >= band[0], f <= band[1])
    return np.trapz(Pxx[idx], f[idx]) if np.any(idx) else 0.0

In [6]:
#loading files, filtering and windowing
signals = []
labels = []
for folder, lbl in label_map.items():
    folder_path = os.path.join(dataset_path, folder)
    if not os.path.exists(folder_path):
        print("Warning: folder not found:", folder_path)
        continue
    files = [f for f in os.listdir(folder_path) if f.lower().endswith('.txt')]
    files.sort()
    for fname in files:
        path = os.path.join(folder_path, fname)
        try:
            sig = np.loadtxt(path)
            if sig.size == 0:
                continue
            # bandpass filter
            sig_f = bandpass_filter(sig, low=0.5, high=50, fs=173.61, order=4)
            # store filtered signal and label
            signals.append(sig_f)
            labels.append(lbl)
        except Exception as e:
            print("Error reading", path, ":", e)

signals = np.array(signals, dtype=object)
labels = np.array(labels)
print("Loaded signals:", len(signals))

Loaded signals: 500


In [7]:
#windows for all signals
window_params = dict(window_s=2.0, overlap=0.6, fs=173.61)  # overlap 60%
all_windows_per_signal = [sliding_windows(sig, **window_params) for sig in signals]
n_total_windows = sum(len(ws) for ws in all_windows_per_signal)
print("Total windows across dataset:", n_total_windows)


Total windows across dataset: 14000


In [9]:
#computing window level features
fs = window_params['fs']

def features_from_window(window):
    # Basic stats
    m = float(np.mean(window))
    s = float(np.std(window))
    sk = float(skew(window))
    kt = float(kurtosis(window))
    # PFEn
    pfen_val = permutation_fuzzy_entropy(window, m=3, tau=1, n=2, r_factor=0.2)
    # bandpowers (common EEG bands)
    delta = bandpower_welch(window, fs=fs, band=(0.5,4))
    theta = bandpower_welch(window, fs=fs, band=(4,8))
    alpha = bandpower_welch(window, fs=fs, band=(8,13))
    beta  = bandpower_welch(window, fs=fs, band=(13,30))
    # normalized bandpowers relative to total power
    total = delta + theta + alpha + beta + 1e-12
    d_rel = delta/total; t_rel = theta/total; a_rel = alpha/total; b_rel = beta/total
    return [pfen_val, m, s, sk, kt, delta, theta, alpha, beta, d_rel, t_rel, a_rel, b_rel]

# flatten windows and keep indices mapping
window_list = []
window_label_list = []
signal_index_for_window = []  # which signal index this window came from
for sig_idx, windows in enumerate(all_windows_per_signal):
    for w in windows:
        window_list.append(w)
        window_label_list.append(labels[sig_idx])
        signal_index_for_window.append(sig_idx)

# Convert window_list to a numpy array with a specific dtype
window_list_np = np.array(window_list, dtype=np.float32)

print("Computing features for each window (parallel)...")
# parallel compute
n_jobs = -1  # use all cores
window_features = Parallel(n_jobs=n_jobs, prefer="threads")(
    delayed(features_from_window)(w) for w in window_list_np
)
window_features = np.array(window_features)  # shape (n_windows, n_features)
window_labels = np.array(window_label_list)
signal_index_for_window = np.array(signal_index_for_window)

print("Window features shape:", window_features.shape)

Computing features for each window (parallel)...
Window features shape: (14000, 13)


In [10]:
#aggregate window level features
n_win_features = window_features.shape[1]
n_signals = len(signals)
agg_features = []

for sig_idx in range(n_signals):
    idxs = np.where(signal_index_for_window == sig_idx)[0]
    feats = window_features[idxs]  # shape (#windows_for_signal, n_win_features)
    if feats.shape[0] == 0:
        # fallback zeros
        agg = np.zeros(n_win_features * 5)
    else:
        # for each feature compute: mean, std, 25th, 50th(median), 75th
        stats = []
        for j in range(n_win_features):
            col = feats[:, j]
            stats.extend([np.nanmean(col), np.nanstd(col), np.nanpercentile(col,25),
                          np.nanpercentile(col,50), np.nanpercentile(col,75)])
        agg = np.array(stats)
    agg_features.append(agg)

X = np.vstack(agg_features)  # shape (n_signals, n_win_features*5)
y = labels.copy()
print("Aggregated feature matrix shape:", X.shape)

Aggregated feature matrix shape: (500, 65)


In [11]:
#train test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)
print("Train shape:", X_train.shape, "Test shape:", X_test.shape)

Train shape: (400, 65) Test shape: (100, 65)


In [12]:
#scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled  = scaler.transform(X_test)


In [13]:
#random forest classifier
clf = RandomForestClassifier(n_estimators=300, random_state=42, class_weight="balanced", n_jobs=-1)
clf.fit(X_train_scaled, y_train)


In [14]:
#evaluation
y_pred = clf.predict(X_test_scaled)
print("Accuracy:", accuracy_score(y_test, y_pred))
print("\nClassification Report:\n", classification_report(y_test, y_pred))
print("\nConfusion Matrix:\n", confusion_matrix(y_test, y_pred))


Accuracy: 0.86

Classification Report:
               precision    recall  f1-score   support

           0       0.83      0.75      0.79        20
           1       0.80      0.80      0.80        20
           2       0.86      0.95      0.90        20
           3       0.91      1.00      0.95        20
           4       0.89      0.80      0.84        20

    accuracy                           0.86       100
   macro avg       0.86      0.86      0.86       100
weighted avg       0.86      0.86      0.86       100


Confusion Matrix:
 [[15  4  0  1  0]
 [ 2 16  0  1  1]
 [ 0  0 19  0  1]
 [ 0  0  0 20  0]
 [ 1  0  3  0 16]]


In [15]:
# feature importances and save model
import joblib
model_path = '/content/drive/MyDrive/EEG_Project/models/pfen_rf_model.pkl'
scaler_path = '/content/drive/MyDrive/EEG_Project/models/scaler.pkl'
os.makedirs(os.path.dirname(model_path), exist_ok=True)
joblib.dump(clf, model_path)
joblib.dump(scaler, scaler_path)
print("Saved model to:", model_path)

Saved model to: /content/drive/MyDrive/EEG_Project/models/pfen_rf_model.pkl
