In [None]:
import os
import numpy as np
import pandas as pd
import scipy.signal as signal
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from scipy.signal import butter, filtfilt, iirnotch

# Load & Preprocessing

In [4]:
# Paths
DATA_DIR = "data"
OUTPUT_DIR = "processed_spectrograms"

# Ensure output directory exists
os.makedirs(OUTPUT_DIR, exist_ok=True)

# EEG Channels to use from Muse 2
EEG_CHANNELS = ["TP9", "Fp1", "Fp2", "TP10", "Fpz"]

In [5]:
# Band-pass filter (3-30Hz)
def bandpass_filter(data, lowcut=3, highcut=30, fs=256, order=4):
    nyquist = 0.5 * fs
    low, high = lowcut / nyquist, highcut / nyquist
    b, a = butter(order, [low, high], btype="band")
    return filtfilt(b, a, data, axis=0)

# Notch filter (50Hz)
def notch_filter(data, notch_freq=50, fs=256, quality=30):
    b, a = iirnotch(notch_freq / (fs / 2), quality)
    return filtfilt(b, a, data, axis=0)

# Compute STFT Spectrogram
def compute_stft(data, fs=256, nperseg=256, noverlap=128):
    spectrograms = []
    for channel in range(data.shape[1]):
        f, t, Sxx = signal.spectrogram(data[:, channel], fs=fs, nperseg=nperseg, noverlap=noverlap)
        spectrograms.append(Sxx)
    return np.array(spectrograms)

# Normalize using Z-score
def normalize_data(data):
    scaler = StandardScaler()
    return scaler.fit_transform(data)

# Save Spectrogram as Image
def save_spectrograms(spectrograms, filename, label):
    output_folder = os.path.join(OUTPUT_DIR, label)
    os.makedirs(output_folder, exist_ok=True)

    for i, spec in enumerate(spectrograms):
        plt.figure(figsize=(6, 4))
        plt.imshow(10 * np.log10(spec + 1e-10), aspect="auto", cmap="jet")
        plt.colorbar()
        plt.title(f"Channel {EEG_CHANNELS[i]} Spectrogram")
        plt.xlabel("Time")
        plt.ylabel("Frequency")
        plt.savefig(os.path.join(output_folder, f"{filename}_channel_{EEG_CHANNELS[i]}.png"))
        plt.close()

In [None]:
# Process a single EEG file
def process_file(csv_path, label):
    filename = os.path.basename(csv_path).split(".")[0]
    
    # Load EEG Data
    df = pd.read_csv(csv_path)
    if not all(channel in df.columns for channel in EEG_CHANNELS):
        print(f"Skipping {csv_path}, missing EEG channels!")
        return
    
    eeg_data = df[EEG_CHANNELS].values

    # Apply filtering and preprocessing
    eeg_data = bandpass_filter(eeg_data)
    eeg_data = notch_filter(eeg_data)
    eeg_data = normalize_data(eeg_data)
    spectrograms = compute_stft(eeg_data)

    # Save spectrogram images
    save_spectrograms(spectrograms, filename, label)

# Process all files in "true" and "false" folders
def process_all():
    for label in ["true", "false"]:
        folder_path = os.path.join(DATA_DIR, label)
        if not os.path.exists(folder_path):
            print(f"Skipping missing folder: {folder_path}")
            continue

        for file in os.listdir(folder_path):
            if file.endswith(".csv"):
                file_path = os.path.join(folder_path, file)
                process_file(file_path, label)

In [11]:
# Run the pipeline
process_all()

print("Processing complete! Check the 'processed_spectrograms' folder for results.")

Skipping missing folder: data\true
Skipping missing folder: data\false
Processing complete! Check the 'processed_spectrograms' folder for results.
