# EEG Signal Processing Pipeline

This notebook implements advanced signal processing for EEG data analysis, including:
- Data loading and preprocessing
- Signal filtering (bandpass and notch)
- Spectral analysis using Welch's method
- Band power calculations
- Advanced visualization

In [None]:
# Install required packages
!pip install numpy pandas scipy matplotlib plotly mne scikit-learn

# Import required libraries
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import signal
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import mne
from sklearn.preprocessing import StandardScaler
from matplotlib.backends.backend_pdf import PdfPages

# Configure plotting
plt.style.use('seaborn')
%matplotlib inline

In [None]:
# --- CONFIGURATION ---
data_folder = 'data/raw'  # Local data folder path
output_folder = 'output'  # Local output folder path
sampling_rate = 250  # Hz
plot_duration_seconds = 210  # 3.5 minutes
max_samples = plot_duration_seconds * sampling_rate

# EEG frequency bands with more detailed beta bands
FREQ_BANDS = {
    'delta': (0.5, 4),
    'theta': (4, 8),
    'alpha': (8, 13),
    'beta': (13, 30),
    'gamma': (30, 45)
}

# Channel labels
channel_labels = [
    'Fz (ch1)', 'C3 (ch2)', 'Cz (ch3)', 'C4 (ch4)',
    'Pz (ch5)', 'PO7 (ch6)', 'Oz (ch7)', 'PO8 (ch8)',
    'Acc1 (ch9)', 'Acc2 (ch10)', 'Acc3 (ch11)',
    'Gyr1 (ch12)', 'Gyr2 (ch13)', 'Gyr3 (ch14)',
    'Counter (ch15)', 'Valid (ch16)', 'DeltaTime (ch17)', 'Trigger (ch18)'
]

# Make sure output folder exists
os.makedirs(output_folder, exist_ok=True)

In [None]:
# --- Signal Processing Functions ---
def bandpass_filter(data, lowcut, highcut, fs, order=4):
    """Apply a bandpass filter to the signal.
    
    Args:
        data (array): Input signal
        lowcut (float): Lower cutoff frequency
        highcut (float): Upper cutoff frequency
        fs (float): Sampling frequency
        order (int): Filter order
        
    Returns:
        array: Filtered signal
    """
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = signal.butter(order, [low, high], btype='band')
    return signal.filtfilt(b, a, data)

def notch_filter(data, freq, fs, q=30):
    """Apply a notch filter to remove power line noise.
    
    Args:
        data (array): Input signal
        freq (float): Notch frequency (e.g., 50Hz or 60Hz)
        fs (float): Sampling frequency
        q (float): Quality factor
        
    Returns:
        array: Filtered signal
    """
    w0 = freq / (fs/2)
    b, a = signal.iirnotch(w0, q)
    return signal.filtfilt(b, a, data)

def compute_psd(data, fs, nperseg=None):
    """Compute power spectral density using Welch's method.
    
    Args:
        data (array): Input signal
        fs (float): Sampling frequency
        nperseg (int): Length of each segment
        
    Returns:
        tuple: Frequencies and PSD values
    """
    if nperseg is None:
        nperseg = min(256, len(data))
    freqs, psd = signal.welch(data, fs, nperseg=nperseg)
    return freqs, psd

def calculate_band_powers(psd, freqs, bands=FREQ_BANDS):
    """Calculate power in specific frequency bands.
    
    Args:
        psd (array): Power spectral density values
        freqs (array): Frequency points
        bands (dict): Dictionary of frequency bands
        
    Returns:
        dict: Power values for each band
    """
    powers = {}
    for band_name, (low, high) in bands.items():
        mask = (freqs >= low) & (freqs <= high)
        powers[band_name] = np.trapz(psd[mask], freqs[mask])
    return powers

In [None]:
def process_eeg_data(filepath, channels=None):
    """Process EEG data from a file.
    
    Args:
        filepath (str): Path to the data file
        channels (list): List of channel indices to process
        
    Returns:
        tuple: Processed data and time points
    """
    # Read data
    df = pd.read_csv(filepath, header=None)
    df = df.iloc[:max_samples]
    
    if channels is None:
        channels = list(range(8))  # Process first 8 EEG channels by default
    
    time = np.arange(len(df)) / sampling_rate
    processed_data = {}
    
    for channel_index in channels:
        try:
            # Convert to numeric and handle missing values
            data = pd.to_numeric(df[channel_index], errors='coerce').dropna()
            time_cut = time[:len(data)]
            
            # Apply filters
            filtered = bandpass_filter(data, 0.5, 45, sampling_rate)  # Broadband filter
            filtered = notch_filter(filtered, 50, sampling_rate)  # Remove power line noise
            
            # Calculate PSD
            freqs, psd = compute_psd(filtered, sampling_rate)
            
            # Calculate band powers
            powers = calculate_band_powers(psd, freqs)
            
            processed_data[channel_index] = {
                'raw': data,
                'filtered': filtered,
                'time': time_cut,
                'freqs': freqs,
                'psd': psd,
                'powers': powers
            }
            
        except Exception as e:
            print(f"Error processing channel {channel_index}: {e}")
    
    return processed_data

In [None]:
def plot_processed_data(processed_data, filename, output_pdf=None):
    """Create comprehensive visualizations of processed EEG data.
    
    Args:
        processed_data (dict): Dictionary containing processed data
        filename (str): Name of the data file
        output_pdf (PdfPages): PDF pages object for saving
    """
    n_channels = len(processed_data)
    fig = plt.figure(figsize=(15, 25))
    gs = plt.GridSpec(4, 1, height_ratios=[2, 2, 2, 3])
    
    # Plot 1: Time domain (Raw vs Filtered)
    ax1 = fig.add_subplot(gs[0])
    for ch_idx, ch_data in processed_data.items():
        ax1.plot(ch_data['time'], ch_data['raw'], '--', alpha=0.3,
                label=f'{channel_labels[ch_idx]} (Raw)')
        ax1.plot(ch_data['time'], ch_data['filtered'],
                label=f'{channel_labels[ch_idx]} (Filtered)')
    ax1.set_title(f'{filename} - Time Domain', fontsize=12)
    ax1.set_xlabel('Time (s)')
    ax1.set_ylabel('Amplitude')
    ax1.grid(True)
    ax1.legend(fontsize=8, bbox_to_anchor=(1.05, 1), loc='upper left')
    
    # Plot 2: Frequency domain (PSD)
    ax2 = fig.add_subplot(gs[1])
    for ch_idx, ch_data in processed_data.items():
        ax2.semilogy(ch_data['freqs'], ch_data['psd'],
                    label=channel_labels[ch_idx])
    ax2.set_title('Power Spectral Density', fontsize=12)
    ax2.set_xlabel('Frequency (Hz)')
    ax2.set_ylabel('Power/Frequency (dB/Hz)')
    ax2.grid(True)
    ax2.legend(fontsize=8, bbox_to_anchor=(1.05, 1), loc='upper left')
    
    # Plot 3: Band powers per channel
    ax3 = fig.add_subplot(gs[2:])  # Take up remaining space
    
    channels = list(processed_data.keys())
    bands = list(FREQ_BANDS.keys())
    x = np.arange(len(channels))
    width = 0.15  # Width of bars
    
    # Plot bars for each frequency band
    for i, band in enumerate(bands):
        powers = [processed_data[ch]['powers'][band] for ch in channels]
        offset = width * (i - len(bands)/2)
        ax3.bar(x + offset, powers, width, label=band)
    
    ax3.set_title('Frequency Band Powers by Channel', fontsize=12)
    ax3.set_xlabel('Channels')
    ax3.set_ylabel('Power')
    ax3.set_xticks(x)
    ax3.set_xticklabels([channel_labels[ch] for ch in channels], rotation=45)
    ax3.grid(True)
    ax3.legend(title='Frequency Bands', bbox_to_anchor=(1.05, 1), loc='upper left')
    
    plt.tight_layout()
    
    if output_pdf is not None:
        output_pdf.savefig(fig, bbox_inches='tight')
    plt.close()

In [None]:
def create_interactive_visualization(processed_data, filename):
    """Create an interactive visualization using Plotly.
    
    Args:
        processed_data (dict): Dictionary containing processed data
        filename (str): Name of the data file
    """
    fig = make_subplots(
        rows=4, cols=1,
        subplot_titles=[f'{filename} - Time Domain',
                       'Power Spectral Density',
                       'Band Powers by Channel',
                       'Band Power Distribution']
    )
    
    # Plot 1: Time domain
    for ch_idx, ch_data in processed_data.items():
        fig.add_trace(
            go.Scatter(x=ch_data['time'], y=ch_data['raw'],
                      name=f'{channel_labels[ch_idx]} (Raw)',
                      line=dict(dash='dash'), opacity=0.3),
            row=1, col=1
        )
        fig.add_trace(
            go.Scatter(x=ch_data['time'], y=ch_data['filtered'],
                      name=f'{channel_labels[ch_idx]} (Filtered)'),
            row=1, col=1
        )
    
    # Plot 2: PSD
    for ch_idx, ch_data in processed_data.items():
        fig.add_trace(
            go.Scatter(x=ch_data['freqs'], y=ch_data['psd'],
                      name=channel_labels[ch_idx]),
            row=2, col=1
        )
    
    # Plot 3: Band powers by channel
    channels = list(processed_data.keys())
    bands = list(FREQ_BANDS.keys())
    
    for band in bands:
        powers = [processed_data[ch]['powers'][band] for ch in channels]
        fig.add_trace(
            go.Bar(name=band,
                   x=[channel_labels[ch] for ch in channels],
                   y=powers),
            row=3, col=1
        )
    
    # Plot 4: Band power distribution across all channels
    for ch_idx, ch_data in processed_data.items():
        fig.add_trace(
            go.Scatter(x=list(ch_data['powers'].keys()),
                      y=list(ch_data['powers'].values()),
                      name=channel_labels[ch_idx],
                      mode='lines+markers'),
            row=4, col=1
        )
    
    # Update layout
    fig.update_layout(
        height=1200,
        showlegend=True,
        legend=dict(x=1.05, y=1)
    )
    
    # Update axes labels
    fig.update_xaxes(title_text='Time (s)', row=1, col=1)
    fig.update_yaxes(title_text='Amplitude', row=1, col=1)
    fig.update_xaxes(title_text='Frequency (Hz)', row=2, col=1)
    fig.update_yaxes(title_text='Power/Frequency', type='log', row=2, col=1)
    fig.update_xaxes(title_text='Channel', row=3, col=1)
    fig.update_yaxes(title_text='Power', row=3, col=1)
    fig.update_xaxes(title_text='Frequency Band', row=4, col=1)
    fig.update_yaxes(title_text='Power', row=4, col=1)
    
    fig.show()

In [None]:
# Process files and generate visualizations
print('Starting EEG signal processing...')

# Get list of CSV files
csv_files = sorted([f for f in os.listdir(data_folder) if f.endswith('.csv')])

# Create PDF report
output_pdf_path = os.path.join(output_folder, 'eeg_analysis_report.pdf')
with PdfPages(output_pdf_path) as pdf:
    for filename in csv_files:
        print(f'\nProcessing {filename}...')
        filepath = os.path.join(data_folder, filename)
        
        # Process data
        processed_data = process_eeg_data(filepath, channels=list(range(8)))
        
        # Create and save visualizations
        plot_processed_data(processed_data, filename, pdf)
        create_interactive_visualization(processed_data, filename)
        
print(f'\nPDF report saved to: {output_pdf_path}')