Import Necessary Libraries

In [1]:
import matplotlib.pyplot as plt
import matplotlib
import numpy as np
import mne
import os
import argparse
import pandas as pd
from typing import Dict
from loguru import logger
# from DataCollection.actions import Action
from scipy import stats
from scipy import signal
from scipy import integrate
from sklearn.decomposition import PCA
from dataclasses import dataclass

In [2]:
matplotlib.use("TkAgg")  # TkAgg is a backend that uses the Tkinter GUI toolkit for rendering interactive plots in a separate interactive window, where you can zoom, pan, or save the plot.
EPSILON = 1e-8  # constant EPSILON is a very small positive number, to avoid Division by Zero, and prevent Logarithmic Errors

Define the `Action` data class, which encapsulates information about user actions in the EEG dataset.

In [3]:
@dataclass
class Action:
    action_value: int
    text: str
    audio: str
    image: str

Visualization utility functions

In [4]:
# FFT Plotting
def plot_fft(data, sampling_frequency, fft_window_size, percent_overlap_between_windows):
    pxx, freqs, bins, im = plt.specgram(
        data,
        Fs=sampling_frequency,
        NFFT=fft_window_size,
        noverlap=int(percent_overlap_between_windows * fft_window_size),
    )
    plt.colorbar(label="Power/Frequency (dB/Hz)")
    plt.title("FFT Spectrogram")
    plt.xlabel("Time (s)")
    plt.ylabel("Frequency (Hz)")
    plt.show()
    print(f"Spectrogram shape: {pxx.shape}, Frequency bins: {freqs.shape}, Time bins: {bins.shape}")
    return pxx, freqs, bins

In [5]:
# Entropy Calculation
def plot_entropy_of_data_time_and_frequency_dimensions(pxx, freqs, times):
    frame_entropies = np.apply_along_axis(lambda x: stats.entropy(x + EPSILON), 0, pxx)
    freq_entropies = np.apply_along_axis(lambda x: stats.entropy(x + EPSILON), 1, pxx)

    plt.figure()
    plt.plot(freqs, freq_entropies, label="Frequency Band Entropy")
    plt.xlabel("Frequency (Hz)")
    plt.ylabel("Entropy")
    plt.title("Entropy Across Frequencies")
    plt.legend()
    plt.show()

    plt.figure()
    plt.plot(times, frame_entropies, label="Time Frame Entropy", color="orange")
    plt.xlabel("Time (s)")
    plt.ylabel("Entropy")
    plt.title("Entropy Across Time Frames")
    plt.legend()
    plt.show()

Load Helper Functions

In [6]:
# SNR Calculation
def snr(signal):
    signal_power = np.mean(signal**2)
    noise = signal - np.mean(signal)
    noise_power = np.mean(noise**2)
    return 10 * np.log10(signal_power / noise_power)

In [7]:
def convert_timestamp_to_time_since_last_epoch(df):
    """Converts the timestamp to time since the last epoch."""
    df["timestamp"] = pd.to_datetime(df["timestamp"]).astype(int) / 10**9
    #Converts the timestamp column to pandas datetime format
    #a column of datetime64[ns] objects as the number of nanoseconds since the Unix epoch (January 1, 1970, 00:00:00 UTC).
    #Then convert the datetime64[ns] type to int, it returns the raw nanoseconds since the epoch.
    #Dividing by 10**9 (1 billion) converts the nanosecond-based timestamp into seconds.
    return df

def align_data_to_experiment_start_and_end_time(df, start_time: float, end_time: float):
    """Aligns the data to the experiment start and end time."""
    #Ensures that the end_time is greater than the start_time before proceeding.
    #If this condition is not true, the program will raise an AssertionError and stop execution.
    assert end_time > start_time
    #Selects rows where the timestamp value is greater than or equal to start_time
    #Selects rows where the timestamp value is less than or equal to end_time
    #The & operator performs an element-wise logical "AND". 
    #Only rows that satisfy both conditions will be included.
    #Returns a new DataFrame with only the rows that satisfy the time range condition
    return df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)]

def time_align_accel_data_by_linearly_interpolating(accel_data, eeg_data):
    """Aligns accelerometer data with EEG data using linear interpolation."""
    #align accel_data to match the timestamps in eeg_data by interpolating the accelerometer data values.
    #Stores the column names of accel_data as accel_data_columns for later use in the final DataFrame
    #to ensure the returned DataFrame has the same structure as the original
    accel_data_columns = accel_data.columns
    #Converts accel_data to a 2D Numpy Array
    #where rows correspond to timestamps and columns correspond to accelerometer measurements.
    accel_data_np = accel_data.to_numpy()
    #Creates an empty NumPy array to store the interpolated accelerometer data.
    #The number of rows matches the number of rows in eeg_data (the number of EEG timestamps).
    #The number of columns matches the number of columns in accel_data_np.
    new_accel_data = np.zeros((len(eeg_data), accel_data_np.shape[1]))
    for i in range(accel_data_np.shape[1]):
        new_accel_data[:, i] = np.interp(
            eeg_data["timestamp"],
            accel_data["timestamp"],
            accel_data_np[:, i],
        )
    #new_accel_data is transformed back into a DataFrame using the original column names
    return pd.DataFrame(new_accel_data, columns=accel_data_columns)

In [8]:
# Wiener Filter
def wiener_filter(x, filter_type, mysize=None, noise=None):
    num_epochs, num_channels, num_samples = x.shape
    if filter_type == 1:
        combined = x.reshape(-1)
        filtered = signal.wiener(combined, mysize=mysize, noise=noise)
        return filtered.reshape(x.shape)
    elif filter_type == 2:
        filtered = np.zeros_like(x)
        for channel in range(num_channels):
            combined = x[:, channel, :].reshape(-1)
            filtered[:, channel, :] = signal.wiener(combined, mysize=mysize, noise=noise).reshape(
                num_epochs, num_samples
            )
        return filtered
    elif filter_type == 3:
        filtered = np.zeros_like(x)
        for epoch in range(num_epochs):
            combined = x[epoch, :, :].reshape(-1)
            filtered[epoch, :, :] = signal.wiener(combined, mysize=mysize, noise=noise).reshape(
                num_channels, num_samples
            )
        return filtered
    else:
        filtered = np.copy(x)
        for epoch in range(num_epochs):
            for channel in range(num_channels):
                filtered[epoch, channel] = signal.wiener(
                    x[epoch, channel], mysize=mysize, noise=noise
                )
        return filtered

Load Data from Directory

In [9]:
def get_data_from_directory(data_directory_path: str):
    #Joins the directory path with the file names to create the full paths to the files.
    eeg_file = os.path.join(data_directory_path, "eeg_data_raw.csv")
    accel_file = os.path.join(data_directory_path, "accelerometer_data.csv")
    action_file = os.path.join(data_directory_path, "action_data.csv")

    if not all(map(os.path.exists, [eeg_file, accel_file, action_file])):
        logger.warning(f"Missing files in {data_directory_path}")
        return None, None, None

    eeg_data = pd.read_csv(eeg_file)
    accel_data = pd.read_csv(accel_file)
    action_data = pd.read_csv(action_file)

    print(f"EEG Data Shape: {eeg_data.shape}\n{eeg_data.head()}")
    print(f"Accelerometer Data Shape: {accel_data.shape}\n{accel_data.head()}")
    print(f"Action Data Shape: {action_data.shape}\n{action_data.head()}")
    
    #return DataFrames
    return eeg_data, accel_data, action_data


Preprocessing Steps

In [10]:
def preprocess_data(eeg_data, accel_data, action_data):
    # Convert timestamps
    eeg_data = convert_timestamp_to_time_since_last_epoch(eeg_data)
    accel_data = convert_timestamp_to_time_since_last_epoch(accel_data)
    action_data = convert_timestamp_to_time_since_last_epoch(action_data)

    # Align timestamps
    start_time = max(eeg_data["timestamp"].min(), accel_data["timestamp"].min())
    end_time = min(eeg_data["timestamp"].max(), accel_data["timestamp"].max())
    eeg_data = align_data_to_experiment_start_and_end_time(eeg_data, start_time, end_time)
    accel_data = align_data_to_experiment_start_and_end_time(accel_data, start_time, end_time)

    print(f"Aligned EEG Data Shape: {eeg_data.shape}")
    print(f"Aligned Accelerometer Data Shape: {accel_data.shape}")
    return eeg_data, accel_data, action_data


Feature Extraction

In [11]:
def feature_extract(x):
    print(f"Original Shape: {x.shape}")
    num_epochs, num_channels, num_samples = x.shape

    # Perform PCA
    x_reshaped = x.reshape(num_epochs * num_channels, -1)
    pca = PCA(n_components=10)
    features = pca.fit_transform(x_reshaped)
    print(f"PCA Features Shape: {features.shape}")

    return features

Main Execution

In [14]:
if __name__ == "__main__":
    actions = {
        "end_collection": Action(action_value=5, text="End of Collection", audio=None, image=None),
    }

    subject_id = 105
    visit_number = 1
    data_directory = os.path.abspath(f"../DataCollection/data/{subject_id}/{visit_number}/")
    print(f"Resolved Data Directory: {data_directory}")
    
    eeg_data, accel_data, action_data = get_data_from_directory(data_directory)
    if eeg_data is not None:
        eeg_data, accel_data, action_data = preprocess_data(eeg_data, accel_data, action_data)

        # Apply preprocessing
        eeg_data_array = eeg_data.drop(columns=["timestamp"]).to_numpy()
        filtered_data = wiener_filter(eeg_data_array, filter_type=1)
        print(f"Filtered Data Shape: {filtered_data.shape}")

        # Feature extraction
        features = feature_extract(filtered_data)



Resolved Data Directory: /Users/yuelei/Desktop/Universum-clean/DataCollection/data/105/1
