In [None]:
import numpy as np
import pandas as pd
import os
from scipy.signal import butter, filtfilt, iirnotch, welch, detrend
from sklearn.decomposition import FastICA
from scipy.stats import skew, kurtosis
import warnings
warnings.filterwarnings("ignore")

# ---------------------------------------
# CONFIGURATION
# ---------------------------------------
channels = ["Fp1", "Fp2", "F7", "F3", "FZ", "F4", "F8", "C2"]
sampling_rate = 250
datasets = ["Arithmetic_Data", "Stroop_Data"]
tasks = ["natural", "lowlevel", "midlevel", "highlevel"]
num_subjects = 15

window_size = 4              
window_samples = window_size * sampling_rate
overlap = 0.5
step_size = int(window_samples * (1 - overlap))
z_threshold = 3


# ---------------------------------------
# ADVANCED PREPROCESSING
# ---------------------------------------
def bandpass_filter(signal, low=1, high=45, fs=250, order=4):
    nyq = 0.5 * fs
    b, a = butter(order, [low/nyq, high/nyq], btype="band")
    return filtfilt(b, a, signal)

def notch_filter_50hz(signal, fs=250, Q=30):
    nyq = 0.5 * fs
    w0 = 50 / nyq
    b, a = iirnotch(w0, Q)
    return filtfilt(b, a, signal)

def remove_outliers(signal, z_thresh=3):
    z = (signal - np.mean(signal)) / (np.std(signal) + 1e-12)
    signal[z > z_thresh] = np.nan
    signal[z < -z_thresh] = np.nan

    nans = np.isnan(signal)
    if np.any(nans):
        valid = np.flatnonzero(~nans)
        signal[nans] = np.interp(np.flatnonzero(nans), valid, signal[valid])
    return signal

def baseline_drift_removal(signal):
    return detrend(signal, type="linear")

def apply_ica_multi_channel(df):
    try:
        ica = FastICA(n_components=len(channels), random_state=42)
        cleaned = ica.fit_transform(df.values)
        return pd.DataFrame(cleaned, columns=channels)
    except:
        return df

def preprocess_signal(signal):
    signal = remove_outliers(signal)
    signal = baseline_drift_removal(signal)
    signal = bandpass_filter(signal, 1, 45, sampling_rate)
    signal = notch_filter_50hz(signal, sampling_rate)
    return signal


# ---------------------------------------
# FREQUENCY BANDS
# ---------------------------------------
bands = {
    "delta": (0.5, 4),
    "theta": (4, 8),
    "alpha": (8, 13),
    "beta": (13, 30),
    "gamma": (30, 40),
    "sigma": (12, 16)
}

# ---------------------------------------
# FEATURE EXTRACTION (UPDATED)
# ---------------------------------------
def extract_required_features(signal, fs=250):
    feats = {}

    # -------- BASIC TIME FEATURES --------
    feats["mean"] = np.mean(signal)
    feats["median"] = np.median(signal)
    feats["variance"] = np.var(signal)
    feats["std"] = np.std(signal)
    feats["skewness"] = skew(signal)
    feats["kurtosis"] = kurtosis(signal)

    # -------- POWER SPECTRAL FEATURES --------
    f, Pxx = welch(signal, fs=fs, nperseg=fs)
    band_powers = {}

    for band, (low, high) in bands.items():
        mask = (f >= low) & (f < high)
        power = np.sum(Pxx[mask])
        feats[f"{band}_power"] = power
        band_powers[band] = power

    # avoid zero division
    d = band_powers["delta"] + 1e-8
    t = band_powers["theta"] + 1e-8
    a = band_powers["alpha"] + 1e-8
    b = band_powers["beta"] + 1e-8
    g = band_powers["gamma"] + 1e-8
    s = band_powers["sigma"] + 1e-8

    # -------- REQUIRED RATIOS FROM BOTH IMAGES --------
    # (Group 1)
    feats["theta_alpha_ratio"] = t / a
    feats["beta_alpha_ratio"] = b / a
    feats["theta_plus_alpha_over_beta"] = (t + a) / b

    # (Group 2 - More ratios)
    feats["theta_beta_ratio"] = t / b
    feats["theta_plus_alpha_over_alpha_plus_beta"] = (t + a) / (a + b)
    feats["gamma_delta_ratio"] = g / d
    feats["gamma_plus_beta_over_delta_plus_alpha"] = (g + b) / (d + a)

    return feats


# ---------------------------------------
# SEGMENTATION
# ---------------------------------------
def segment_signal(signal):
    return [
        signal[i:i + window_samples]
        for i in range(0, len(signal) - window_samples + 1, step_size)
    ]


# ---------------------------------------
# MAIN EXTRACTION LOOP
# ---------------------------------------
def build_feature_dataset(base_path):
    all_rows = []

    for dataset in datasets:
        print("\nProcessing dataset:", dataset)
        folder = os.path.join(base_path, dataset)

        for subject in range(1, num_subjects + 1):

            subject_data = {}
            lengths = {}

            for task in tasks:
                file_path = os.path.join(folder, f"{task}-{subject}.txt")
                if not os.path.exists(file_path):
                    continue

                df = pd.read_csv(file_path, header=None).iloc[:, 1:1+len(channels)]
                df.columns = channels

                for ch in channels:
                    df[ch] = df[ch].astype(str).str.rstrip(',').astype(float)

                subject_data[task] = df
                lengths[task] = len(df)

            if len(subject_data) < len(tasks):
                continue

            min_len = min(lengths.values())
            trim_start = 50
            trim_end = min_len - 50

            print(f"  Subject {subject}: trimmed length = {trim_end - trim_start}")

            for task in tasks:

                df = subject_data[task].iloc[trim_start:trim_end].reset_index(drop=True)

                df = apply_ica_multi_channel(df)

                for ch in channels:
                    df[ch] = preprocess_signal(df[ch].values)

                for start in range(0, len(df) - window_samples, step_size):
                    row = {}

                    for ch in channels:
                        seg = df[ch].values[start:start + window_samples]
                        feats = extract_required_features(seg)
                        for k, v in feats.items():
                            row[f"{ch}_{k}"] = v

                    row["subject"] = subject
                    row["task"] = task
                    row["dataset"] = dataset
                    row["window_id"] = start

                    all_rows.append(row)

    return pd.DataFrame(all_rows)


# ---------------------------------------
# RUN PIPELINE
# ---------------------------------------
base_path = "/kaggle/input/tempora/Cognitive Load Assessment Through EEG A Dataset from Arithmetic and Stroop Tasks"
features_df = build_feature_dataset(base_path)

features_df.to_csv("frequency_features_extended.csv", index=False)

print("\nâœ… Saved: frequency_features_extended.csv (ALL ratios included!)")


In [None]:
# ===========================
# 1. IMPORTS
# ===========================
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score

# Models
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier

import warnings
warnings.filterwarnings("ignore")


# ===========================
# 2. LOAD DATA
# ===========================
df = pd.read_csv("frequency_features_extended.csv")


# ===========================
# 3. CLEAN
# ===========================
df["dataset"] = df["dataset"].astype(str).str.strip().str.lower()
df["task"] = df["task"].astype(str).str.strip().str.lower()


# ===========================
# 4. FILTER DESIRED DATA
# Keep all 4 levels: natural, lowlevel, midlevel, highlevel
# ===========================
df = df[
    (df["dataset"] == "stroop_data") &
    (df["task"].isin(["natural", "lowlevel", "midlevel", "highlevel"]))
].reset_index(drop=True)

# FOUR-CLASS label mapping
label_map = {
    "natural": 0,
    "lowlevel": 1,
    "midlevel": 2,
    "highlevel": 3
}

df["task_label"] = df["task"].map(label_map)

# Drop unused columns
df = df.drop(columns=["dataset", "task", "window_id"], errors="ignore")


# ===========================
# 5. SPLIT FEATURES / TARGET
# ===========================
X = df.drop(columns=["task_label"])
y = df["task_label"]

# Scale inputs
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# split
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y, test_size=0.2, random_state=42, stratify=y
)


# ===========================
# 6. DEFINE MULTI-CLASS MODELS (4 classes)
# ===========================
models = {
    "XGBoost": XGBClassifier(
        objective="multi:softprob",
        num_class=4,
        eval_metric="mlogloss",
        n_estimators=400,
        learning_rate=0.05,
        max_depth=8,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=42,
    ),

    "Random Forest": RandomForestClassifier(
        n_estimators=400, random_state=42
    ),

    "Logistic Regression": LogisticRegression(
        max_iter=3000, solver="lbfgs", multi_class="multinomial"
    ),

    "SVM (RBF)": SVC(
        kernel="rbf", probability=True, C=3, gamma="scale"
    ),

    "KNN": KNeighborsClassifier(
        n_neighbors=7
    ),

    "Gradient Boosting": GradientBoostingClassifier(
        n_estimators=300, learning_rate=0.05, random_state=42
    ),
}


# ===========================
# 7. TRAIN + EVALUATE MODELS
# ===========================
results = []

for name, model in models.items():
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)

    acc = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average="weighted")   # weighted for multi-class

    results.append((name, acc, f1))

    print("\n===========================")
    print(f"ðŸ“Œ MODEL: {name}")
    print("===========================")
    print(f"Accuracy = {acc:.4f}")
    print(f"F1 Score = {f1:.4f}")


# ===========================
# 8. MODEL COMPARISON SUMMARY
# ===========================
print("\n===========================")
print("ðŸ“Š MODEL COMPARISON SUMMARY (4-CLASS)")
print("===========================")

df_results = pd.DataFrame(results, columns=["Model", "Accuracy", "F1 Score"])
print(df_results.sort_values("Accuracy", ascending=False))


In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score

from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier

import warnings
warnings.filterwarnings("ignore")

# ===========================
# 1. LOAD DATA
# ===========================
df = pd.read_csv("frequency_features_extended.csv")

# ===========================
# 2. CLEAN
# ===========================
df["dataset"] = df["dataset"].astype(str).str.lower().str.strip()
df["task"] = df["task"].astype(str).str.lower().str.strip()

# ===========================
# 3. TASK LABEL: Arithmetic vs Stroop
# ===========================
df["task_type"] = df["dataset"].map(lambda x: 0 if x == "arithmetic_data" else 1)

# Remove columns not needed
df_task = df.drop(columns=["task", "window_id"], errors="ignore")

# ===========================
# 4. SPLIT
# ===========================
X_task = df_task.drop(columns=["dataset", "task_type"])
y_task = df_task["task_type"]

scaler_task = StandardScaler()
X_task_scaled = scaler_task.fit_transform(X_task)

X_train_t, X_test_t, y_train_t, y_test_t = train_test_split(
    X_task_scaled, y_task, test_size=0.2, random_state=42, stratify=y_task
)

# ===========================
# 5. MODELS
# ===========================
models_task = {
    "XGBoost": XGBClassifier(
        objective="binary:logistic",
        eval_metric="logloss",
        n_estimators=300,
        learning_rate=0.05,
        max_depth=6,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=42,
    ),
    "Random Forest": RandomForestClassifier(n_estimators=300, random_state=42),
    "Logistic Regression": LogisticRegression(max_iter=2000),
    "SVM (RBF)": SVC(kernel="rbf", probability=True),
    "KNN": KNeighborsClassifier(n_neighbors=7),
    "Gradient Boosting": GradientBoostingClassifier(n_estimators=300),
}

# ===========================
# 6. TRAIN + EVALUATE
# ===========================
print("\n==============================")
print("ðŸ”µ TASK CLASSIFICATION RESULTS")
print("==============================")

task_results = []

for name, model in models_task.items():

    model.fit(X_train_t, y_train_t)
    y_pred_t = model.predict(X_test_t)

    acc = accuracy_score(y_test_t, y_pred_t)
    f1 = f1_score(y_test_t, y_pred_t)

    task_results.append((name, acc, f1))

    print(f"\nðŸ“Œ Model: {name}")
    print(f"Accuracy = {acc:.4f}")
    print(f"F1 Score = {f1:.4f}")

df_task_results = pd.DataFrame(task_results, columns=["Model", "Accuracy", "F1 Score"])
print("\nTask Classification Summary:")
print(df_task_results.sort_values("Accuracy", ascending=False))


In [None]:
import numpy as np
import pandas as pd
import os
from scipy.signal import butter, filtfilt, iirnotch, welch, detrend, hilbert
from sklearn.decomposition import FastICA
from scipy.stats import skew, kurtosis
import warnings
warnings.filterwarnings("ignore")

# ---------------------------------------
# CONFIGURATION
# ---------------------------------------
channels = ["Fp1", "Fp2", "F7", "F3", "FZ", "F4", "F8", "C2"]
sampling_rate = 250
datasets = ["Arithmetic_Data", "Stroop_Data"]
tasks = ["natural", "lowlevel", "midlevel", "highlevel"]
num_subjects = 15

window_size = 4              
window_samples = window_size * sampling_rate
overlap = 0.5
step_size = int(window_samples * (1 - overlap))
z_threshold = 3


# ---------------------------------------
# ADVANCED PREPROCESSING
# ---------------------------------------
def bandpass_filter(signal, low=1, high=45, fs=250, order=4):
    nyq = 0.5 * fs
    b, a = butter(order, [low/nyq, high/nyq], btype="band")
    return filtfilt(b, a, signal)

def notch_filter_50hz(signal, fs=250, Q=30):
    nyq = 0.5 * fs
    w0 = 50 / nyq
    b, a = iirnotch(w0, Q)
    return filtfilt(b, a, signal)

def remove_outliers(signal, z_thresh=3):
    z = (signal - np.mean(signal)) / (np.std(signal) + 1e-12)
    signal[z > z_thresh] = np.nan
    signal[z < -z_thresh] = np.nan

    # interpolate nans
    nans = np.isnan(signal)
    if np.any(nans):
        valid = np.flatnonzero(~nans)
        signal[nans] = np.interp(np.flatnonzero(nans), valid, signal[valid])
    return signal

def baseline_drift_removal(signal):
    return detrend(signal, type="linear")

def apply_ica_multi_channel(df):
    try:
        ica = FastICA(n_components=len(channels), random_state=42)
        cleaned = ica.fit_transform(df.values)
        return pd.DataFrame(cleaned, columns=channels)
    except:
        return df


def preprocess_signal(signal):
    signal = remove_outliers(signal)
    signal = baseline_drift_removal(signal)
    signal = bandpass_filter(signal, 1, 45, sampling_rate)
    signal = notch_filter_50hz(signal, sampling_rate)
    return signal


# ---------------------------------------
# TIME DOMAIN FEATURE EXTRACTION
# ---------------------------------------

# Hjorth parameters
def hjorth_params(x):
    dx = np.diff(x)
    ddx = np.diff(dx)

    var_x = np.var(x)
    var_dx = np.var(dx)
    var_ddx = np.var(ddx)

    activity = var_x
    mobility = np.sqrt(var_dx / var_x) if var_x != 0 else 0
    complexity = np.sqrt(var_ddx / var_dx) / mobility if var_dx != 0 else 0

    return activity, mobility, complexity

# Instantaneous frequency
def instantaneous_frequency(signal, fs):
    analytic = hilbert(signal)
    phase = np.unwrap(np.angle(analytic))
    inst_freq = np.diff(phase) * (fs / (2 * np.pi))
    return np.mean(inst_freq) if len(inst_freq) > 1 else 0


def extract_time_features(signal, fs=250):

    activity, mobility, complexity = hjorth_params(signal)
    inst_freq = instantaneous_frequency(signal, fs)

    feats = {
        "mean": np.mean(signal),
        "median": np.median(signal),
        "variance": np.var(signal),
        "std": np.std(signal),
        "skewness": skew(signal),
        "kurtosis": kurtosis(signal),
        "hjorth_activity": activity,
        "hjorth_mobility": mobility,
        "hjorth_complexity": complexity,
        "instantaneous_frequency": inst_freq,
    }

    return feats


# ---------------------------------------
# SEGMENTATION
# ---------------------------------------
def segment_signal(signal):
    return [
        signal[i:i + window_samples]
        for i in range(0, len(signal) - window_samples + 1, step_size)
    ]


# ---------------------------------------
# MAIN EXTRACTION LOOP
# ---------------------------------------
def build_feature_dataset(base_path):
    all_rows = []

    for dataset in datasets:
        print("\nProcessing dataset:", dataset)
        folder = os.path.join(base_path, dataset)

        for subject in range(1, num_subjects + 1):

            subject_data = {}
            lengths = {}

            for task in tasks:
                file_path = os.path.join(folder, f"{task}-{subject}.txt")
                if not os.path.exists(file_path):
                    continue

                df = pd.read_csv(file_path, header=None).iloc[:, 1:1+len(channels)]
                df.columns = channels

                for ch in channels:
                    df[ch] = df[ch].astype(str).str.rstrip(',').astype(float)

                subject_data[task] = df
                lengths[task] = len(df)

            if len(subject_data) < len(tasks):
                continue

            min_len = min(lengths.values())
            trim_start = 50
            trim_end = min_len - 50

            print(f"  Subject {subject}: trimmed length = {trim_end - trim_start}")

            for task in tasks:

                df = subject_data[task].iloc[trim_start:trim_end].reset_index(drop=True)

                df = apply_ica_multi_channel(df)

                for ch in channels:
                    df[ch] = preprocess_signal(df[ch].values)

                for start in range(0, len(df) - window_samples, step_size):
                    row = {}

                    for ch in channels:
                        seg = df[ch].values[start:start + window_samples]
                        feats = extract_time_features(seg)

                        for k, v in feats.items():
                            row[f"{ch}_{k}"] = v

                    row["subject"] = subject
                    row["task"] = task
                    row["dataset"] = dataset
                    row["window_id"] = start

                    all_rows.append(row)

    return pd.DataFrame(all_rows)


# ---------------------------------------
# RUN PIPELINE
# ---------------------------------------
base_path = "/kaggle/input/tempora/Cognitive Load Assessment Through EEG A Dataset from Arithmetic and Stroop Tasks"
features_df = build_feature_dataset(base_path)

features_df.to_csv("time_domain_features_updated.csv", index=False)

print("\nâœ… Saved: time_domain_features_updated.csv")


In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score

from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier

import warnings
warnings.filterwarnings("ignore")

# ===========================
# 1. LOAD TIME-DOMAIN DATA
# ===========================
df = pd.read_csv("time_domain_features_updated.csv")   # UPDATED

# ===========================
# 2. CLEAN
# ===========================
df["dataset"] = df["dataset"].astype(str).str.lower().str.strip()
df["task"] = df["task"].astype(str).str.lower().str.strip()

# ===========================
# 3. TASK LABEL: Arithmetic vs Stroop
# ===========================
df["task_type"] = df["dataset"].map(lambda x: 0 if x == "arithmetic_data" else 1)

# ===========================
# 4. REMOVE NON-FEATURE COLUMNS
# ===========================
df_task = df.drop(columns=["task", "window_id"], errors="ignore")

# features = all columns except dataset + task_type
X_task = df_task.drop(columns=["dataset", "task_type"], errors="ignore")
y_task = df_task["task_type"]

# ===========================
# 5. SCALE FEATURES
# ===========================
scaler_task = StandardScaler()
X_task_scaled = scaler_task.fit_transform(X_task)

# ===========================
# 6. TRAIN/TEST SPLIT
# ===========================
X_train_t, X_test_t, y_train_t, y_test_t = train_test_split(
    X_task_scaled, y_task, test_size=0.2, random_state=42, stratify=y_task
)

# ===========================
# 7. MODELS
# ===========================
models_task = {
    "XGBoost": XGBClassifier(
        objective="binary:logistic",
        eval_metric="logloss",
        n_estimators=300,
        learning_rate=0.05,
        max_depth=6,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=42,
    ),

    "Random Forest": RandomForestClassifier(
        n_estimators=300, random_state=42
    ),

    "Logistic Regression": LogisticRegression(
        max_iter=3000
    ),

    "SVM (RBF)": SVC(
        kernel="rbf", probability=True, C=2
    ),

    "KNN": KNeighborsClassifier(
        n_neighbors=7
    ),

    "Gradient Boosting": GradientBoostingClassifier(
        n_estimators=300, learning_rate=0.05, random_state=42
    ),
}

# ===========================
# 8. TRAIN + EVALUATE MODELS
# ===========================
print("\n==============================")
print("ðŸ”µ TASK CLASSIFICATION RESULTS (Time-Domain Features)")
print("==============================")

task_results = []

for name, model in models_task.items():

    model.fit(X_train_t, y_train_t)
    y_pred_t = model.predict(X_test_t)

    acc = accuracy_score(y_test_t, y_pred_t)
    f1 = f1_score(y_test_t, y_pred_t)

    task_results.append((name, acc, f1))

    print(f"\nðŸ“Œ Model: {name}")
    print(f"Accuracy = {acc:.4f}")
    print(f"F1 Score = {f1:.4f}")

df_task_results = pd.DataFrame(task_results, columns=["Model", "Accuracy", "F1 Score"])

print("\nTask Classification Summary:")
print(df_task_results.sort_values("Accuracy", ascending=False))

In [None]:
# ===========================
# 1. IMPORTS
# ===========================
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score

# Models
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier

import warnings
warnings.filterwarnings("ignore")


# ===========================
# 2. LOAD TIME-DOMAIN DATA
# ===========================
df = pd.read_csv("time_domain_features_updated.csv")   # UPDATED FILE


# ===========================
# 3. CLEAN STRINGS
# ===========================
df["dataset"] = df["dataset"].astype(str).str.strip().str.lower()
df["task"] = df["task"].astype(str).str.strip().str.lower()


# ===========================
# 4. FILTER FOR STROOP 4-LEVEL TASKS
# ===========================
df = df[
    (df["dataset"] == "stroop_data") &
    (df["task"].isin(["natural", "lowlevel", "midlevel", "highlevel"]))
].reset_index(drop=True)

# 4-class mapping
label_map = {
    "natural": 0,
    "lowlevel": 1,
    "midlevel": 2,
    "highlevel": 3
}

df["task_label"] = df["task"].map(label_map)


# ===========================
# 5. DROP UNUSED COLUMNS
# ===========================
df = df.drop(columns=["dataset", "task", "window_id"], errors="ignore")


# ===========================
# 6. SPLIT FEATURES / TARGET
# ===========================
X = df.drop(columns=["task_label"])   # all time-domain feature columns
y = df["task_label"]

# Standard scaling (required for SVM, KNN, Logistic Regression)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y, test_size=0.2, random_state=42, stratify=y
)


# ===========================
# 7. DEFINE MULTI-CLASS MODELS
# ===========================
models = {
    "XGBoost": XGBClassifier(
        objective="multi:softprob",
        num_class=4,
        eval_metric="mlogloss",
        n_estimators=400,
        learning_rate=0.05,
        max_depth=8,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=42,
    ),

    "Random Forest": RandomForestClassifier(
        n_estimators=400, random_state=42
    ),

    "Logistic Regression": LogisticRegression(
        max_iter=3000, solver="lbfgs", multi_class="multinomial"
    ),

    "SVM (RBF)": SVC(
        kernel="rbf", probability=True, C=3
    ),

    "KNN": KNeighborsClassifier(
        n_neighbors=7
    ),

    "Gradient Boosting": GradientBoostingClassifier(
        n_estimators=300, learning_rate=0.05, random_state=42
    ),
}


# ===========================
# 8. TRAIN + EVALUATE MODELS
# ===========================
results = []

for name, model in models.items():

    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)

    acc = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average="weighted")

    results.append((name, acc, f1))

    print("\n===========================")
    print(f"ðŸ“Œ MODEL: {name}")
    print("===========================")
    print(f"Accuracy = {acc:.4f}")
    print(f"F1 Score = {f1:.4f}")


# ===========================
# 9. COMPARISON SUMMARY
# ===========================
print("\n===========================")
print("ðŸ“Š MODEL COMPARISON SUMMARY (4-CLASS, TIME DOMAIN)")
print("===========================")

df_results = pd.DataFrame(results, columns=["Model", "Accuracy", "F1 Score"])
print(df_results.sort_values("Accuracy", ascending=False))


In [None]:
import numpy as np
import pandas as pd
import os
import warnings
warnings.filterwarnings("ignore")

from scipy.signal import butter, filtfilt, iirnotch, detrend, welch, hilbert
from sklearn.decomposition import FastICA
from scipy.stats import skew, kurtosis
import matplotlib.pyplot as plt
plt.style.use("seaborn")

# ====================================================================
# CONFIGURATION
# ====================================================================
channels = ["Fp1", "Fp2", "F7", "F3", "FZ", "F4", "F8", "C2"]
sampling_rate = 250
datasets = ["Arithmetic_Data", "Stroop_Data"]
tasks = ["natural", "lowlevel", "midlevel", "highlevel"]
num_subjects = 15
window_size = 4
window_samples = sampling_rate * window_size
step_size = window_samples // 2

# ====================================================================
# PLOTTING (ONLY FIRST CHANNEL OF FIRST SUBJECT+TASK)
# ====================================================================
PLOT_ENABLED = True
PLOT_DONE = False

def safe_plot(func, *args, **kwargs):
    global PLOT_DONE
    if PLOT_ENABLED and not PLOT_DONE:
        func(*args, **kwargs)

# ====================================================================
# PREPROCESSING FUNCTIONS
# ====================================================================
def bandpass_filter(sig, low=1, high=45, fs=250, order=4):
    nyq = 0.5 * fs
    b, a = butter(order, [low / nyq, high / nyq], btype="band")
    return filtfilt(b, a, sig)

def notch_filter(sig, fs=250, Q=30):
    nyq = 0.5 * fs
    b, a = iirnotch(50 / nyq, Q)
    return filtfilt(b, a, sig)

def remove_outliers(sig):
    z = (sig - np.mean(sig)) / (np.std(sig) + 1e-6)
    sig[z > 3] = np.nan
    sig[z < -3] = np.nan
    nans = np.isnan(sig)
    if np.any(nans):
        valid = np.flatnonzero(~nans)
        sig[nans] = np.interp(np.flatnonzero(nans), valid, sig[valid])
    return sig

# ====================================================================
# FEATURE EXTRACTION
# ====================================================================
def hjorth_params(x):
    dx = np.diff(x)
    ddx = np.diff(dx)
    var0 = np.var(x)
    var1 = np.var(dx)
    var2 = np.var(ddx)
    activity = var0
    mobility = np.sqrt(var1 / var0) if var0 else 0
    complexity = np.sqrt(var2 / var1) / mobility if var1 else 0
    return activity, mobility, complexity

def inst_freq(sig, fs):
    analytic = hilbert(sig)
    phase = np.unwrap(np.angle(analytic))
    inst = np.diff(phase) * fs / (2*np.pi)
    return np.mean(inst) if len(inst) else 0

def extract_features(seg):
    A, M, C = hjorth_params(seg)
    IF = inst_freq(seg, sampling_rate)
    return {
        "mean": np.mean(seg),
        "median": np.median(seg),
        "variance": np.var(seg),
        "std": np.std(seg),
        "skewness": skew(seg),
        "kurtosis": kurtosis(seg),
        "hjorth_activity": A,
        "hjorth_mobility": M,
        "hjorth_complexity": C,
        "instantaneous_frequency": IF
    }

# ====================================================================
# SEGMENTATION
# ====================================================================
def segment(sig):
    return [
        sig[i:i + window_samples]
        for i in range(0, len(sig) - window_samples, step_size)
    ]

# ====================================================================
# MAIN PIPELINE
# ====================================================================
def build_features(base_path):
    global PLOT_DONE
    all_features = []

    for dataset in datasets:
        for subject in range(1, num_subjects + 1):

            for task in tasks:
                fpath = os.path.join(base_path, dataset, f"{task}-{subject}.txt")
                if not os.path.exists(fpath):
                    continue

                # Load EEG
                df = pd.read_csv(fpath, header=None).iloc[:, 1:9]
                df.columns = channels

                # Clean format
                for ch in channels:
                    df[ch] = df[ch].astype(str).str.rstrip(",").astype(float)

                # ICA once per task (fast)
                try:
                    ica = FastICA(n_components=len(channels), random_state=42)
                    df = pd.DataFrame(ica.fit_transform(df), columns=channels)
                except:
                    pass

                for ch in channels:
                    sig = df[ch].values.copy()

                    # Only plot ONCE
                    if not PLOT_DONE:
                        t = np.arange(len(sig)) / sampling_rate
                        plt.plot(t, sig); plt.title(f"Raw Signal: {ch}"); plt.show()

                    sig1 = remove_outliers(sig)
                    safe_plot(lambda b,a: plt.plot(b); plt.plot(a); plt.title("Outlier removal"), sig, sig1)

                    sig2 = detrend(sig1)
                    sig3 = bandpass_filter(sig2)
                    sig4 = notch_filter(sig3)

                    df[ch] = sig4

                PLOT_DONE = True  # Stop plotting for rest of pipeline

                # Segmentation + Feature extraction
                for ch in channels:
                    segments = segment(df[ch].values)
                    for seg in segments:
                        feats = extract_features(seg)
                        feats["dataset"] = dataset
                        feats["task"] = task
                        feats["subject"] = subject
                        feats["channel"] = ch
                        all_features.append(feats)

    return pd.DataFrame(all_features)


# ====================================================================
# RUN PIPELINE FAST
# ====================================================================
base = "/kaggle/input/tempora/Cognitive Load Assessment Through EEG A Dataset from Arithmetic and Stroop Tasks"

features = build_features(base)
features.to_csv("time_features_final.csv", index=False)
print("âœ” FAST PIPELINE COMPLETE")
