In [1]:
import h5py
import numpy as np
import mne
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import matplotlib
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
matplotlib.use("Qt5Agg")
import os

Channels marked as bad:
none


In [2]:
def read_and_concatenate_fif(directory_path, preload=True, 
                              use_common_channels=True, verbose=False):
    """
    Read all FIF files from a directory and concatenate them.
    
    Parameters
    ----------
    directory_path : str
        Path to directory containing FIF files
    preload : bool
        Whether to load data into memory (default: True)
    use_common_channels : bool
        If True, only keep channels common to all files (default: True)
    verbose : bool
        Whether to print verbose output (default: False)
    
    Returns
    -------
    raw_concat : mne.io.Raw
        Concatenated Raw object
    """
    raw_list = []
    filenames = []
    
    # First pass: load all files
    for filename in sorted(os.listdir(directory_path)):
        if filename.endswith('.fif'):
            full_path = os.path.join(directory_path, filename)
            raw = mne.io.read_raw_fif(full_path, preload=False, verbose=verbose)
            raw_list.append(raw)
            filenames.append(filename)
    
    if len(raw_list) == 0:
        raise ValueError(f"No FIF files found in {directory_path}")
    
    print(f"Found {len(raw_list)} FIF files")
    
    # Find common channels if requested
    if use_common_channels and len(raw_list) > 1:
        common_channels = set(raw_list[0].ch_names)
        for raw in raw_list[1:]:
            common_channels &= set(raw.ch_names)
        
        common_channels = sorted(list(common_channels))
        print(f"Using {len(common_channels)} common channels")
        
        # Pick common channels from all files
        for i, raw in enumerate(raw_list):
            raw_list[i] = raw.copy().pick_channels(common_channels, ordered=True)
    
    # Concatenate
    print("Concatenating files...")
    raw_concat = mne.concatenate_raws(raw_list, preload=preload, verbose=verbose)
    
    print(f"Concatenation complete: {raw_concat.times[-1]:.2f} seconds total")
    
    return raw_concat

def read_and_concatenate_fif_all_channels(directory_path, 
                                          preload=True, 
                                          fill_value=0.0,
                                          verbose=False):
    """
    Read all FIF files from a directory and concatenate them using ALL channels.
    Missing channels in individual files are filled with specified value.
    
    Parameters
    ----------
    directory_path : str
        Path to directory containing FIF files
    preload : bool
        Whether to load data into memory (default: True)
    fill_value : float
        Value to use for missing channels (default: 0.0)
        Common options: 0.0, np.nan, or None (will use 0.0)
    verbose : bool
        Whether to print verbose output (default: False)
    
    Returns
    -------
    raw_concat : mne.io.Raw
        Concatenated Raw object with all channels
    channel_info : dict
        Information about which channels were present in which files
    """
    
    if fill_value is None:
        fill_value = 0.0
    
    print(f"Loading FIF files from: {directory_path}")
    print(f"Missing channels will be filled with: {fill_value}")
    print("=" * 80)
    
    # Step 1: Load all files and collect channel information
    raw_list = []
    filenames = []
    all_channels_sets = []
    
    for filename in sorted(os.listdir(directory_path)):
        if filename.endswith('.fif'):
            full_path = os.path.join(directory_path, filename)
            raw = mne.io.read_raw_fif(full_path, preload=False, verbose=verbose)
            raw_list.append(raw)
            filenames.append(filename)
            all_channels_sets.append(set(raw.ch_names))
            
            print(f"Loaded: {filename}")
            print(f"  Channels: {len(raw.ch_names)}")
    
    if len(raw_list) == 0:
        raise ValueError(f"No FIF files found in {directory_path}")
    
    print(f"\nTotal files loaded: {len(raw_list)}")
    
    # Step 2: Find union of all channels
    all_channels_union = set()
    for ch_set in all_channels_sets:
        all_channels_union |= ch_set
    
    all_channels_sorted = sorted(list(all_channels_union))
    
    print(f"\n" + "=" * 80)
    print(f"Total unique channels across all files: {len(all_channels_sorted)}")
    
    # Step 3: Analyze channel presence across files
    channel_presence = {ch: [] for ch in all_channels_sorted}
    for i, ch_set in enumerate(all_channels_sets):
        for ch in all_channels_sorted:
            channel_presence[ch].append(ch in ch_set)
    
    # Report channels that are missing from some files
    missing_in_some = {ch: sum([not present for present in presences]) 
                       for ch, presences in channel_presence.items()}
    channels_missing_somewhere = {ch: count for ch, count in missing_in_some.items() if count > 0}
    
    if channels_missing_somewhere:
        print(f"\nChannels missing from some files:")
        for ch, count in sorted(channels_missing_somewhere.items(), key=lambda x: x[1], reverse=True):
            print(f"  {ch}: missing from {count}/{len(raw_list)} files")
    else:
        print(f"\nAll channels present in all files!")
    
    # Step 4: Process each file to add missing channels
    print(f"\n" + "=" * 80)
    print("Processing files and adding missing channels...")
    print("=" * 80)
    
    processed_raws = []
    
    for i, (raw, filename) in enumerate(zip(raw_list, filenames)):
        print(f"\nProcessing: {filename}")
        
        # Find missing channels for this file
        missing_channels = [ch for ch in all_channels_sorted if ch not in raw.ch_names]
        
        if missing_channels:
            print(f"  Adding {len(missing_channels)} missing channels filled with {fill_value}")
            
            # Load data if not already loaded
            if not raw.preload:
                raw.load_data()
            
            # Get channel info from an existing channel to use as template
            existing_ch_idx = 0
            template_ch_name = raw.ch_names[existing_ch_idx]
            
            # Create info for the missing channels
            info_missing = mne.create_info(
                ch_names=missing_channels,
                sfreq=raw.info['sfreq'],
                ch_types='eeg'  # Default to EEG, will try to match types below
            )
            
            # Try to infer channel type from name patterns
            for ch_name in missing_channels:
                ch_idx = info_missing.ch_names.index(ch_name)
                
                # Try to match channel type based on naming conventions
                if 'STI' in ch_name or 'TRIG' in ch_name or ch_name.startswith('STI'):
                    info_missing['chs'][ch_idx]['kind'] = mne.io.constants.FIFF.FIFFV_STIM_CH
                elif 'EOG' in ch_name:
                    info_missing['chs'][ch_idx]['kind'] = mne.io.constants.FIFF.FIFFV_EOG_CH
                elif 'ECG' in ch_name:
                    info_missing['chs'][ch_idx]['kind'] = mne.io.constants.FIFF.FIFFV_ECG_CH
                # Otherwise keep as EEG (default)
            
            # Create data array filled with the specified value
            n_samples = raw.n_times
            missing_data = np.full((len(missing_channels), n_samples), fill_value)
            
            # Create Raw object for missing channels
            raw_missing = mne.io.RawArray(missing_data, info_missing)
            
            # Add the missing channels to the original raw
            raw = raw.add_channels([raw_missing], force_update_info=True)
            
            print(f"  New channel count: {len(raw.ch_names)}")
        else:
            print(f"  No missing channels")
            if not raw.preload:
                raw.load_data()
        
        # Ensure channels are in the same order
        raw.reorder_channels(all_channels_sorted)
        
        processed_raws.append(raw)
    
    # Step 5: Concatenate all processed files
    print(f"\n" + "=" * 80)
    print("Concatenating all files...")
    print("=" * 80)
    
    raw_concat = mne.concatenate_raws(processed_raws, preload=preload, verbose=verbose)
    
    print(f"\nConcatenation complete!")
    print(f"  Total channels: {len(raw_concat.ch_names)}")
    print(f"  Total duration: {raw_concat.times[-1]:.2f} seconds")
    print(f"  Sampling frequency: {raw_concat.info['sfreq']} Hz")
    
    # Prepare summary info
    channel_info = {
        'all_channels': all_channels_sorted,
        'n_channels': len(all_channels_sorted),
        'channels_missing_somewhere': channels_missing_somewhere,
        'files_processed': filenames,
        'fill_value_used': fill_value
    }
    
    return raw_concat, channel_info


def print_channel_summary(channel_info):
    """
    Print a summary of channel information from concatenation.
    
    Parameters
    ----------
    channel_info : dict
        Dictionary returned by read_and_concatenate_fif_all_channels
    """
    print("\n" + "=" * 80)
    print("CHANNEL SUMMARY")
    print("=" * 80)
    
    print(f"\nTotal unique channels: {channel_info['n_channels']}")
    print(f"Files processed: {len(channel_info['files_processed'])}")
    print(f"Fill value used: {channel_info['fill_value_used']}")
    
    if channel_info['channels_missing_somewhere']:
        print(f"\nChannels with missing data (filled with {channel_info['fill_value_used']}):")
        for ch, count in sorted(channel_info['channels_missing_somewhere'].items(), 
                               key=lambda x: x[1], reverse=True):
            print(f"  {ch}: missing from {count} file(s)")
    else:
        print("\nAll channels present in all files - no filling needed!")


In [3]:
#loading data from subject 6
subject_6, s6_channel_info = read_and_concatenate_fif_all_channels("Data_converted/Subject_06")
# Then load the data
events = mne.find_events(subject_6, 
                         stim_channel='STI', 
                         consecutive=True,
                         shortest_event=0,   # FIX: Allow 1-sample events
                         min_duration=0,      # FIX: Include all durations
                         initial_event=True)  # BONUS: Detects initial value
event_id = {
    'fixation': 1,
    'encoding': 2,
    'maintenance': 3,
    'retrieval': 4,
    'response': 5
}
#loading data from subject 2
subject_8, s8_channel_info = read_and_concatenate_fif_all_channels("Data_converted/Subject_08")
# Then load the data
events_8 = mne.find_events(subject_8, 
                         stim_channel='STI', 
                         consecutive=True,
                         shortest_event=0,   # FIX: Allow 1-sample events
                         min_duration=0,      # FIX: Include all durations
                         initial_event=True)  # BONUS: Detects initial value
event_id = {
    'fixation': 1,
    'encoding': 2,
    'maintenance': 3,
    'retrieval': 4,
    'response': 5
}
#Plotting the raw file
subject_8.plot()

Loading FIF files from: Data_converted/Subject_06
Missing channels will be filled with: 0.0
Loaded: Data_Subject_06_Session_01.h5_seeg_raw.fif
  Channels: 11
Loaded: Data_Subject_06_Session_02.h5_seeg_raw.fif
  Channels: 11
Loaded: Data_Subject_06_Session_03.h5_seeg_raw.fif
  Channels: 11
Loaded: Data_Subject_06_Session_04.h5_seeg_raw.fif
  Channels: 11
Loaded: Data_Subject_06_Session_05.h5_seeg_raw.fif
  Channels: 11
Loaded: Data_Subject_06_Session_06.h5_seeg_raw.fif
  Channels: 11
Loaded: Data_Subject_06_Session_07.h5_seeg_raw.fif
  Channels: 11

Total files loaded: 7

Total unique channels across all files: 11

All channels present in all files!

Processing files and adding missing channels...

Processing: Data_Subject_06_Session_01.h5_seeg_raw.fif
  No missing channels
Reading 0 ... 79999  =      0.000 ...   399.995 secs...

Processing: Data_Subject_06_Session_02.h5_seeg_raw.fif
  No missing channels
Reading 0 ... 79999  =      0.000 ...   399.995 secs...

Processing: Data_Subject_

<mne_qt_browser._pg_figure.MNEQtBrowser at 0x1cc318545f0>

In [4]:
#Filtering th data
seeg_raw_filtered_8 = subject_8.copy().filter(l_freq=0.1, h_freq=40)
seeg_raw_filtered_ica_8 = subject_8.copy().filter(l_freq=1,h_freq=40)

#Plotting the filtered data
seeg_raw_filtered_ica_8.plot_sensors(kind = "3d")

Filtering raw data in 5 contiguous segments
Setting up band-pass filter from 0.1 - 40 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.10
- Lower transition bandwidth: 0.10 Hz (-6 dB cutoff frequency: 0.05 Hz)
- Upper passband edge: 40.00 Hz
- Upper transition bandwidth: 10.00 Hz (-6 dB cutoff frequency: 45.00 Hz)
- Filter length: 6601 samples (33.005 s)

Filtering raw data in 5 contiguous segments
Setting up band-pass filter from 1 - 40 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 1.00
- Lower transition bandwidth: 1.00 Hz (-6 dB cutoff frequency: 0.50 Hz)
- Upper passband e

<Figure size 640x640 with 1 Axes>

In [5]:
# Load the data
print("\nLoading data into memory...")
all_loaded.load_data()

# Extract events
print("Extracting events...")
events = mne.find_events(all_loaded, stim_channel='STI', consecutive=True)

event_id = {
    'fixation': 1,
    'encoding': 2,
    'maintenance': 3,
    'retrieval': 4,
    'response': 5
}

print(f"\nFinal concatenated data:")
print(f"  Channels: {len(all_loaded.ch_names)}")
print(f"  Duration: {all_loaded.times[-1]:.2f} seconds")
print(f"  Events found: {len(events)}")

# Show event distribution
unique_events, counts = np.unique(events[:, 2], return_counts=True)
print(f"\nEvent distribution:")
for evt, count in zip(unique_events, counts):
    event_name = [k for k, v in event_id.items() if v == evt]
    name = event_name[0] if event_name else 'unknown'
    print(f"  {name} ({evt}): {count} events")



Loading data into memory...


NameError: name 'all_loaded' is not defined

In [4]:
"""
Reaction Time Analysis for FIF Format Files (MNE-Python)
Extracts, analyzes, and visualizes reaction times from probe onset to response
"""

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import mne
from scipy import stats as scipy_stats
import pandas as pd

# Set style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 8)


def extract_reaction_times_from_fif(fif_file_path, probe_event_id=None, response_event_id=None):
    """
    Extract reaction times from MNE FIF format file.
    RT = Response time - Probe onset time
    
    Parameters:
    -----------
    fif_file_path : str or Path
        Path to the FIF format file
    probe_event_id : int or str or list, optional
        Event ID(s) for probe/retrieval onset
        If None, will attempt to detect automatically
    response_event_id : int or str or list, optional
        Event ID(s) for response/button press
        If None, will attempt to detect automatically
        
    Returns:
    --------
    dict : Dictionary containing RT data and metadata
    """
    data = {
        'reaction_times': [],
        'trial_numbers': [],
        'correct_trials': [],
        'probe_times': [],
        'response_times': [],
        'probe_event_ids': [],
        'response_event_ids': []
    }
    
    print(f"Loading FIF file: {fif_file_path}")
    
    # Load the raw or epochs file
    file_path = str(fif_file_path)
    
    try:
        # Try loading as Raw first
        if 'epo' in file_path or 'epochs' in file_path.lower():
            print("Loading as Epochs file...")
            epochs = mne.read_epochs(file_path, preload=False, verbose=False)
            events = epochs.events
            event_id = epochs.event_id
            sfreq = epochs.info['sfreq']
        else:
            print("Loading as Raw file...")
            raw = mne.io.read_raw_fif(file_path, preload=False, verbose=False)
            events = mne.find_events(raw, verbose=False)
            sfreq = raw.info['sfreq']
            
            # Try to get event_id from annotations or create generic mapping
            if raw.annotations:
                event_id = {}
                for desc in set(raw.annotations.description):
                    if desc.isdigit():
                        event_id[desc] = int(desc)
                    else:
                        # Try to find corresponding event code
                        unique_events = np.unique(events[:, 2])
                        if len(event_id) < len(unique_events):
                            event_id[desc] = unique_events[len(event_id)]
            else:
                # Create generic event_id mapping
                unique_event_codes = np.unique(events[:, 2])
                event_id = {f"Event_{code}": code for code in unique_event_codes}
        
        print(f"Found {len(events)} events")
        print(f"Sampling frequency: {sfreq} Hz")
        print(f"Event IDs: {event_id}")
        
        # Determine probe and response events
        if probe_event_id is None:
            # Auto-detect: look for keywords
            probe_keys = [k for k in event_id.keys() if any(
                word in str(k).lower() for word in ['probe', 'retrieval', 'test', 'cue']
            )]
            if probe_keys:
                probe_event_id = [event_id[k] for k in probe_keys]
                print(f"Auto-detected probe events: {probe_keys} -> {probe_event_id}")
            else:
                print("WARNING: Could not auto-detect probe events. Please specify probe_event_id.")
                return data
        elif isinstance(probe_event_id, str):
            probe_event_id = [event_id[probe_event_id]]
        elif isinstance(probe_event_id, int):
            probe_event_id = [probe_event_id]
        
        if response_event_id is None:
            # Auto-detect: look for keywords
            response_keys = [k for k in event_id.keys() if any(
                word in str(k).lower() for word in ['response', 'button', 'answer', 'resp']
            )]
            if response_keys:
                response_event_id = [event_id[k] for k in response_keys]
                print(f"Auto-detected response events: {response_keys} -> {response_event_id}")
            else:
                print("WARNING: Could not auto-detect response events. Please specify response_event_id.")
                return data
        elif isinstance(response_event_id, str):
            response_event_id = [event_id[response_event_id]]
        elif isinstance(response_event_id, int):
            response_event_id = [response_event_id]
        
        # Extract probe and response times
        probe_mask = np.isin(events[:, 2], probe_event_id)
        response_mask = np.isin(events[:, 2], response_event_id)
        
        probe_events = events[probe_mask]
        response_events = events[response_mask]
        
        print(f"Found {len(probe_events)} probe events")
        print(f"Found {len(response_events)} response events")
        
        # Convert sample indices to time (in seconds)
        probe_times = probe_events[:, 0] / sfreq
        response_times = response_events[:, 0] / sfreq
        
        # Match each probe to the next response
        for i, (probe_sample, probe_time, probe_code) in enumerate(zip(
            probe_events[:, 0], probe_times, probe_events[:, 2]
        )):
            # Find the next response after this probe
            future_response_mask = response_events[:, 0] > probe_sample
            
            if np.any(future_response_mask):
                next_response_idx = np.where(future_response_mask)[0][0]
                response_sample = response_events[next_response_idx, 0]
                response_time = response_times[next_response_idx]
                response_code = response_events[next_response_idx, 2]
                
                # Calculate RT in milliseconds
                rt = (response_time - probe_time) * 1000
                
                # Only include reasonable RTs (100ms - 3000ms)
                if 100 <= rt <= 3000:
                    data['reaction_times'].append(rt)
                    data['trial_numbers'].append(i + 1)
                    data['probe_times'].append(probe_time)
                    data['response_times'].append(response_time)
                    data['probe_event_ids'].append(probe_code)
                    data['response_event_ids'].append(response_code)
                    
                    # Placeholder for correctness (would need task info)
                    data['correct_trials'].append(True)
        
        print(f"\nExtracted {len(data['reaction_times'])} valid trials (100-3000ms)")
        
    except Exception as e:
        print(f"Error extracting reaction times: {e}")
        import traceback
        traceback.print_exc()
    
    return data


def compute_rt_statistics(reaction_times):
    """
    Compute comprehensive statistics on reaction times.
    
    Parameters:
    -----------
    reaction_times : array-like
        Reaction times in milliseconds
        
    Returns:
    --------
    dict : Dictionary of statistics
    """
    rts = np.array(reaction_times)
    
    if len(rts) == 0:
        print("No reaction times to analyze!")
        return None
    
    stats_dict = {
        'mean': np.mean(rts),
        'median': np.median(rts),
        'std': np.std(rts),
        'sem': scipy_stats.sem(rts),
        'min': np.min(rts),
        'max': np.max(rts),
        'q25': np.percentile(rts, 25),
        'q75': np.percentile(rts, 75),
        'iqr': np.percentile(rts, 75) - np.percentile(rts, 25),
        'n_trials': len(rts),
        'skewness': scipy_stats.skew(rts),
        'kurtosis': scipy_stats.kurtosis(rts),
        'cv': (np.std(rts) / np.mean(rts)) * 100  # Coefficient of variation
    }
    
    return stats_dict


def detect_outliers(reaction_times, method='iqr', threshold=3):
    """
    Detect outlier trials based on reaction time.
    
    Parameters:
    -----------
    reaction_times : array-like
        Reaction times in milliseconds
    method : str
        'iqr' for interquartile range or 'zscore' for z-score method
    threshold : float
        For 'iqr': multiplier for IQR (default 3)
        For 'zscore': z-score threshold (default 3)
        
    Returns:
    --------
    tuple : (outlier_mask, outlier_indices)
    """
    rts = np.array(reaction_times)
    
    if method == 'iqr':
        q25 = np.percentile(rts, 25)
        q75 = np.percentile(rts, 75)
        iqr = q75 - q25
        
        lower_bound = q25 - threshold * iqr
        upper_bound = q75 + threshold * iqr
        
        outlier_mask = (rts < lower_bound) | (rts > upper_bound)
        
    elif method == 'zscore':
        z_scores = np.abs(scipy_stats.zscore(rts))
        outlier_mask = z_scores > threshold
    
    else:
        raise ValueError("method must be 'iqr' or 'zscore'")
    
    outlier_indices = np.where(outlier_mask)[0]
    
    return outlier_mask, outlier_indices


def plot_rt_analysis(data, stats_dict, output_path=None):
    """
    Create comprehensive visualization of reaction time data.
    
    Parameters:
    -----------
    data : dict
        Dictionary containing RT data from extract_reaction_times_from_fif
    stats_dict : dict
        Dictionary of statistics from compute_rt_statistics
    output_path : str, optional
        Path to save the figure. If None, saves to current directory.
    """
    # Set default output path based on platform
    if output_path is None:
        output_dir = Path('/mnt/user-data/outputs')
        if not output_dir.exists():
            output_dir = Path('.')  # Use current directory
        output_path = output_dir / 'rt_analysis_fif.png'
    
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    
    rts = np.array(data['reaction_times'])
    
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    fig.suptitle('Reaction Time Analysis (FIF Format)', fontsize=16, fontweight='bold')
    
    # 1. Histogram with KDE
    ax = axes[0, 0]
    ax.hist(rts, bins=30, alpha=0.7, color='skyblue', edgecolor='black', density=True)
    
    # Add KDE
    from scipy.stats import gaussian_kde
    kde = gaussian_kde(rts)
    x_range = np.linspace(rts.min(), rts.max(), 200)
    ax.plot(x_range, kde(x_range), 'r-', linewidth=2, label='KDE')
    
    # Add mean and median lines
    ax.axvline(stats_dict['mean'], color='green', linestyle='--', 
               linewidth=2, label=f"Mean: {stats_dict['mean']:.1f} ms")
    ax.axvline(stats_dict['median'], color='orange', linestyle='--', 
               linewidth=2, label=f"Median: {stats_dict['median']:.1f} ms")
    
    ax.set_xlabel('Reaction Time (ms)', fontsize=12)
    ax.set_ylabel('Density', fontsize=12)
    ax.set_title('Distribution of Reaction Times', fontsize=13, fontweight='bold')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    # 2. Box plot with outliers
    ax = axes[0, 1]
    bp = ax.boxplot(rts, vert=True, patch_artist=True, 
                     showmeans=True, meanline=True)
    bp['boxes'][0].set_facecolor('lightblue')
    bp['medians'][0].set_color('red')
    bp['medians'][0].set_linewidth(2)
    bp['means'][0].set_color('green')
    bp['means'][0].set_linewidth(2)
    
    ax.set_ylabel('Reaction Time (ms)', fontsize=12)
    ax.set_title('Box Plot with Outliers', fontsize=13, fontweight='bold')
    ax.grid(True, alpha=0.3, axis='y')
    
    # Add text with quartiles
    ax.text(1.35, stats_dict['q75'], f"Q3: {stats_dict['q75']:.1f}", 
            fontsize=10, va='center')
    ax.text(1.35, stats_dict['median'], f"Median: {stats_dict['median']:.1f}", 
            fontsize=10, va='center')
    ax.text(1.35, stats_dict['q25'], f"Q1: {stats_dict['q25']:.1f}", 
            fontsize=10, va='center')
    
    # 3. Reaction time across trials
    ax = axes[0, 2]
    trial_nums = data['trial_numbers']
    ax.plot(trial_nums, rts, 'o-', alpha=0.6, markersize=4, linewidth=1)
    ax.axhline(stats_dict['mean'], color='red', linestyle='--', 
               linewidth=2, label='Mean', alpha=0.7)
    
    # Add trend line
    z = np.polyfit(trial_nums, rts, 1)
    p = np.poly1d(z)
    ax.plot(trial_nums, p(trial_nums), "r--", alpha=0.5, linewidth=2, 
            label=f'Trend (slope: {z[0]:.2f})')
    
    ax.set_xlabel('Trial Number', fontsize=12)
    ax.set_ylabel('Reaction Time (ms)', fontsize=12)
    ax.set_title('RT Across Trials', fontsize=13, fontweight='bold')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    # 4. Q-Q plot for normality
    ax = axes[1, 0]
    scipy_stats.probplot(rts, dist="norm", plot=ax)
    ax.set_title('Q-Q Plot (Normality Check)', fontsize=13, fontweight='bold')
    ax.grid(True, alpha=0.3)
    
    # 5. Cumulative distribution
    ax = axes[1, 1]
    sorted_rts = np.sort(rts)
    cumulative = np.arange(1, len(sorted_rts) + 1) / len(sorted_rts)
    ax.plot(sorted_rts, cumulative, linewidth=2)
    ax.axhline(0.5, color='red', linestyle='--', alpha=0.5, label='50th percentile')
    ax.axvline(stats_dict['median'], color='red', linestyle='--', alpha=0.5)
    
    # Add percentile markers
    percentiles = [10, 25, 75, 90]
    for p in percentiles:
        val = np.percentile(rts, p)
        ax.axvline(val, color='gray', linestyle=':', alpha=0.3)
        ax.text(val, 0.02, f'{p}th', fontsize=8, rotation=90)
    
    ax.set_xlabel('Reaction Time (ms)', fontsize=12)
    ax.set_ylabel('Cumulative Probability', fontsize=12)
    ax.set_title('Cumulative Distribution Function', fontsize=13, fontweight='bold')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    # 6. Statistics table
    ax = axes[1, 2]
    ax.axis('off')
    
    stats_text = f"""
    REACTION TIME STATISTICS
    ═══════════════════════════
    
    Central Tendency:
      Mean:         {stats_dict['mean']:.2f} ms
      Median:       {stats_dict['median']:.2f} ms
      
    Variability:
      SD:           {stats_dict['std']:.2f} ms
      SEM:          {stats_dict['sem']:.2f} ms
      CV:           {stats_dict['cv']:.2f}%
      IQR:          {stats_dict['iqr']:.2f} ms
      
    Range:
      Min:          {stats_dict['min']:.2f} ms
      Max:          {stats_dict['max']:.2f} ms
      Q25:          {stats_dict['q25']:.2f} ms
      Q75:          {stats_dict['q75']:.2f} ms
      
    Distribution:
      Skewness:     {stats_dict['skewness']:.3f}
      Kurtosis:     {stats_dict['kurtosis']:.3f}
      
    Sample:
      N trials:     {stats_dict['n_trials']}
    """
    
    ax.text(0.1, 0.95, stats_text, transform=ax.transAxes,
            fontsize=11, verticalalignment='top', fontfamily='monospace',
            bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
    
    plt.tight_layout()
    plt.savefig(output_path, dpi=300, bbox_inches='tight')
    print(f"\nPlot saved to: {output_path}")
    
    return fig


def generate_report(data, stats_dict, outlier_info=None):
    """
    Generate a text report of the analysis.
    
    Parameters:
    -----------
    data : dict
        Dictionary containing RT data
    stats_dict : dict
        Dictionary of statistics
    outlier_info : tuple, optional
        (outlier_mask, outlier_indices) from detect_outliers
    """
    print("\n" + "="*70)
    print("REACTION TIME ANALYSIS REPORT (FIF FORMAT)")
    print("="*70)
    
    print(f"\nSample Size: {stats_dict['n_trials']} trials")
    
    print("\n--- Central Tendency ---")
    print(f"Mean RT:              {stats_dict['mean']:.2f} ms")
    print(f"Median RT:            {stats_dict['median']:.2f} ms")
    print(f"Difference:           {abs(stats_dict['mean'] - stats_dict['median']):.2f} ms")
    
    print("\n--- Variability ---")
    print(f"Standard Deviation:   {stats_dict['std']:.2f} ms")
    print(f"Standard Error:       {stats_dict['sem']:.2f} ms")
    print(f"Coefficient of Var:   {stats_dict['cv']:.2f}%")
    print(f"Interquartile Range:  {stats_dict['iqr']:.2f} ms")
    
    print("\n--- Range ---")
    print(f"Minimum RT:           {stats_dict['min']:.2f} ms")
    print(f"25th Percentile:      {stats_dict['q25']:.2f} ms")
    print(f"75th Percentile:      {stats_dict['q75']:.2f} ms")
    print(f"Maximum RT:           {stats_dict['max']:.2f} ms")
    print(f"Total Range:          {stats_dict['max'] - stats_dict['min']:.2f} ms")
    
    print("\n--- Distribution Shape ---")
    print(f"Skewness:             {stats_dict['skewness']:.3f}", end="")
    if stats_dict['skewness'] > 0.5:
        print(" (right-skewed)")
    elif stats_dict['skewness'] < -0.5:
        print(" (left-skewed)")
    else:
        print(" (approximately symmetric)")
    
    print(f"Kurtosis:             {stats_dict['kurtosis']:.3f}", end="")
    if stats_dict['kurtosis'] > 1:
        print(" (heavy-tailed)")
    elif stats_dict['kurtosis'] < -1:
        print(" (light-tailed)")
    else:
        print(" (normal-like tails)")
    
    # Normality test
    _, p_value = scipy_stats.shapiro(data['reaction_times'])
    print(f"\nShapiro-Wilk Test:    p = {p_value:.4f}", end="")
    if p_value < 0.05:
        print(" (significantly non-normal)")
    else:
        print(" (not significantly different from normal)")
    
    # Outlier information
    if outlier_info is not None:
        outlier_mask, outlier_indices = outlier_info
        n_outliers = np.sum(outlier_mask)
        percent_outliers = (n_outliers / len(data['reaction_times'])) * 100
        
        print("\n--- Outlier Detection (IQR method, threshold=3) ---")
        print(f"Number of outliers:   {n_outliers} ({percent_outliers:.1f}%)")
        if n_outliers > 0:
            outlier_rts = np.array(data['reaction_times'])[outlier_mask]
            print(f"Outlier RT range:     {outlier_rts.min():.2f} - {outlier_rts.max():.2f} ms")
            print(f"Outlier trial IDs:    {np.array(data['trial_numbers'])[outlier_mask].tolist()}")
    
    # Event ID information
    if len(data['probe_event_ids']) > 0:
        unique_probe_ids = np.unique(data['probe_event_ids'])
        unique_response_ids = np.unique(data['response_event_ids'])
        print("\n--- Event Information ---")
        print(f"Probe event IDs:      {unique_probe_ids.tolist()}")
        print(f"Response event IDs:   {unique_response_ids.tolist()}")
    
    print("\n" + "="*70)


def main():
    """Main analysis pipeline for FIF files."""
    
    # Example file path - UPDATE THIS to your actual file
    fif_file = "/mnt/user-data/uploads/your_file_raw.fif"
    
    # Optional: Specify event IDs if auto-detection fails
    # probe_event_id = 3  # or [3, 4] for multiple
    # response_event_id = 5  # or [5, 6] for multiple
    probe_event_id = None  # Auto-detect
    response_event_id = None  # Auto-detect
    
    print("Starting Reaction Time Analysis for FIF File...")
    print(f"Processing file: {fif_file}")
    
    # Extract data
    data = extract_reaction_times_from_fif(fif_file, 
                                          probe_event_id=probe_event_id,
                                          response_event_id=response_event_id)
    
    if len(data['reaction_times']) == 0:
        print("\nNo reaction time data found!")
        print("Tips:")
        print("- Check that event IDs are correct")
        print("- Verify that probe and response events exist in the file")
        print("- Try specifying probe_event_id and response_event_id manually")
        return
    
    # Compute statistics
    stats_dict = compute_rt_statistics(data['reaction_times'])
    
    if stats_dict is None:
        return
    
    # Detect outliers
    outlier_mask, outlier_indices = detect_outliers(data['reaction_times'], 
                                                     method='iqr', threshold=3)
    
    # Generate report
    generate_report(data, stats_dict, (outlier_mask, outlier_indices))
    
    # Create visualizations
    plot_rt_analysis(data, stats_dict)
    
    # Save data to CSV
    output_dir = Path('/mnt/user-data/outputs')
    if not output_dir.exists():
        output_dir = Path('.')  # Use current directory
    output_dir.mkdir(parents=True, exist_ok=True)
    
    df = pd.DataFrame({
        'trial': data['trial_numbers'],
        'reaction_time_ms': data['reaction_times'],
        'probe_time_sec': data['probe_times'],
        'response_time_sec': data['response_times'],
        'probe_event_id': data['probe_event_ids'],
        'response_event_id': data['response_event_ids'],
        'is_outlier': outlier_mask
    })
    
    csv_path = output_dir / 'reaction_times_fif.csv'
    df.to_csv(csv_path, index=False)
    print(f"\nData saved to: {csv_path}")
    
    # Optional: Save cleaned data (without outliers)
    df_clean = df[~df['is_outlier']]
    csv_clean_path = output_dir / 'reaction_times_fif_clean.csv'
    df_clean.to_csv(csv_clean_path, index=False)
    print(f"Clean data (outliers removed) saved to: {csv_clean_path}")
    
    print("\nAnalysis complete!")


if __name__ == "__main__":
    main()

Starting Reaction Time Analysis for FIF File...
Processing file: /mnt/user-data/uploads/your_file_raw.fif
Loading FIF file: /mnt/user-data/uploads/your_file_raw.fif
Loading as Raw file...
Error extracting reaction times: fname does not exist: "d:\mnt\user-data\uploads\your_file_raw.fif"

No reaction time data found!
Tips:
- Check that event IDs are correct
- Verify that probe and response events exist in the file
- Try specifying probe_event_id and response_event_id manually


Traceback (most recent call last):
  File "C:\Users\RSKALA\AppData\Local\Temp\ipykernel_20756\754550572.py", line 64, in extract_reaction_times_from_fif
    raw = mne.io.read_raw_fif(file_path, preload=False, verbose=False)
  File "c:\ProgramData\anaconda3\envs\mne\Lib\site-packages\mne\io\fiff\raw.py", line 543, in read_raw_fif
    return Raw(
        fname=fname,
    ...<3 lines>...
        on_split_missing=on_split_missing,
    )
  File "<decorator-gen-206>", line 10, in __init__
  File "c:\ProgramData\anaconda3\envs\mne\Lib\site-packages\mne\io\fiff\raw.py", line 103, in __init__
    raw, next_fname, buffer_size_sec = self._read_raw_file(
                                       ~~~~~~~~~~~~~~~~~~~^
        next_fname, allow_maxshield, preload, do_check_ext
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "<decorator-gen-207>", line 12, in _read_raw_file
  File "c:\ProgramData\anaconda3\envs\mne\Lib\site-packages\mne\io\fiff\raw.py", line 198, in _read_ra

In [5]:
data = extract_reaction_times_from_fif(
    'subject01_session01_seeg_raw.fif',
    probe_event_id=4,      # or [3, 4] for multiple
    response_event_id=5    # or [5, 6] for multiple
)

stats = compute_rt_statistics(data['reaction_times'])
plot_rt_analysis(data, stats)

Loading FIF file: subject01_session01_seeg_raw.fif
Loading as Raw file...
Found 249 events
Sampling frequency: 200.0 Hz
Event IDs: {np.str_('Response'): np.int64(1), np.str_('Probe'): np.int64(2), np.str_('Maintenance'): np.int64(3), np.str_('Stimulus'): np.int64(4), np.str_('Fixation'): np.int64(5)}
Found 50 probe events
Found 50 response events

Extracted 47 valid trials (100-3000ms)

Plot saved to: rt_analysis_fif.png


<Figure size 1800x1200 with 6 Axes>

In [8]:
events = mne.find_events(all_loaded, stim_channel='STI', consecutive=True)

event_id = {
    'fixation': 1,
    'encoding': 2,
    'maintenance': 3,
    'retrieval': 4,
    'response': 5
}

print(f"\nFinal concatenated data:")
print(f"  Channels: {len(all_loaded.ch_names)}")
print(f"  Duration: {all_loaded.times[-1]:.2f} seconds")
print(f"  Events found: {len(events)}")

# Show event distribution
unique_events, counts = np.unique(events[:, 2], return_counts=True)
print(f"\nEvent distribution:")
for evt, count in zip(unique_events, counts):
    event_name = [k for k, v in event_id.items() if v == evt]
    name = event_name[0] if event_name else 'unknown'
    print(f"  {name} ({evt}): {count} events")


Finding events on: STI
Trigger channel STI has a non-zero initial value of 1 (consider using initial_event=True to detect this event)
Removing orphaned offset at the beginning of the file.
8888 events found on stim channel STI
Event IDs: [1 2 3 4 5]


ValueError: You have 6 events shorter than the shortest_event. These are very unusual and you may want to set min_duration to a larger value e.g. x / raw.info['sfreq']. Where x = 1 sample shorter than the shortest event length.

In [38]:

baseline = (0, 0)


# Now create epochs with the valid event_id
fixation_epoch = mne.Epochs(seeg_raw_filtered,
                    events=events,
                    event_id={'fixation': 1},  # This is now populated!
                    tmin=0,
                    tmax=1,
                    baseline=baseline,
                    preload=True)

encoding_epoch = mne.Epochs(seeg_raw_filtered,
                    events=events,
                    event_id={'encoding': 2},  # This is now populated!
                    tmin=0,
                    tmax=1,
                    baseline=baseline,
                    preload=True)

maintenance_epoch = mne.Epochs(seeg_raw_filtered,
                    events=events,
                    event_id={'maintenance': 3},  # This is now populated!
                    tmin=0,
                    tmax=1,
                    baseline=baseline,
                    preload=True)

probe_epoch = mne.Epochs(seeg_raw_filtered,
                    events=events,
                    event_id={'retreival': 4},  # This is now populated!
                    tmin=0,
                    tmax=1,
                    baseline=baseline,
                    preload=True)
###############################33333
fixation_epoch_ica = mne.Epochs(seeg_raw_filtered_ica,
                    events=events,
                    event_id={'fixation': 1},  # This is now populated!
                    tmin=0,
                    tmax=1,
                    baseline=None,
                    preload=True)

encoding_epoch_ica = mne.Epochs(seeg_raw_filtered_ica,
                    events=events,
                    event_id={'encoding': 2},  # This is now populated!
                    tmin=0,
                    tmax=1,
                    baseline=None,
                    preload=True)

maintenance_epoch_ica = mne.Epochs(seeg_raw_filtered_ica,
                    events=events,
                    event_id={'maintenance': 3},  # This is now populated!
                    tmin=0,
                    tmax=1,
                    baseline=None,
                    preload=True)

probe_epoch_ica = mne.Epochs(seeg_raw_filtered_ica,
                    events=events,
                    event_id={'retreival': 4},  # This is now populated!
                    tmin=0,
                    tmax=1,
                    baseline=None,
                    preload=True)


response_epoch_ica = mne.Epochs(seeg_raw_filtered_ica_8,
                    events=events,
                    event_id={'response': 5},  # This is now populated!
                    tmin=-1.160,
                    tmax=2,
                    baseline=None,
                    preload=True)

response_epoch = mne.Epochs(seeg_raw_filtered_8,
                    events=events,
                    event_id={'response': 5},  # This is now populated!
                    tmin=-1.160,
                    tmax=2,
                    baseline=(None,0),
                    preload=True)


Not setting metadata
349 matching events found
Applying baseline correction (mode: mean)
0 projection items activated
Using data from preloaded Raw for 349 events and 201 original time points ...
0 bad epochs dropped
Not setting metadata
349 matching events found
Applying baseline correction (mode: mean)
0 projection items activated
Using data from preloaded Raw for 349 events and 201 original time points ...
0 bad epochs dropped
Not setting metadata
349 matching events found
Applying baseline correction (mode: mean)
0 projection items activated
Using data from preloaded Raw for 349 events and 201 original time points ...
0 bad epochs dropped
Not setting metadata
349 matching events found
Applying baseline correction (mode: mean)
0 projection items activated
Using data from preloaded Raw for 349 events and 201 original time points ...
0 bad epochs dropped
Not setting metadata
349 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded

In [7]:
#epochs.plot()
evoked_fixation = fixation_epoch.average()
evoked_encoding = encoding_epoch.average()
evoked_maintenance = maintenance_epoch.average()
evoked_response = response_epoch.average()
#evoked_encoding = encoding_epoch.average()


In [39]:
response_epoch.plot()

<mne_qt_browser._pg_figure.MNEQtBrowser at 0x1e2562595b0>

In [None]:
mne.viz.plot_compare_evokeds([evoked_fixation, evoked_encoding, evoked_maintenance,probe_evoked])

In [40]:
n_components = 0.999  # Should normally be higher, like 0.999!!
method = 'picard'
max_iter = 500  # Should normally be higher, like 500 or even 1000!!

random_state = 42

ica = mne.preprocessing.ICA(n_components=n_components,
                            method=method,
                            max_iter=max_iter,
                            random_state=random_state)
ica.fit(response_epoch_ica )

Fitting ICA to data using 19 channels (please be patient, this may take a while)
Selecting by explained variance: 16 components
Fitting ICA took 7.3s.


0,1
Method,picard
Fit parameters,max_iter=500
Fit,93 iterations on epochs (154452 samples)
ICA components,16
Available PCA components,19
Channel types,eeg
ICA components marked for exclusion,—


In [42]:
ica.plot_components(inst=response_epoch_ica)


    Using multitaper spectrum estimation with 7 DPSS windows
Not setting metadata
244 matching events found
No baseline correction applied
0 projection items activated
    Using multitaper spectrum estimation with 7 DPSS windows
Not setting metadata
244 matching events found
No baseline correction applied
0 projection items activated
    Using multitaper spectrum estimation with 7 DPSS windows
Not setting metadata
244 matching events found
No baseline correction applied
0 projection items activated
    Using multitaper spectrum estimation with 7 DPSS windows
Not setting metadata
244 matching events found
No baseline correction applied
0 projection items activated
    Using multitaper spectrum estimation with 7 DPSS windows
Not setting metadata
244 matching events found
No baseline correction applied
0 projection items activated
    Using multitaper spectrum estimation with 7 DPSS windows
Not setting metadata
244 matching events found
No baseline correction applied
0 projection items ac

<MNEFigure size 975x967 with 16 Axes>

In [18]:
evoked_response = response_epoch.average()

In [19]:
evoked_response.plot()


<Figure size 2880x1708 with 2 Axes>

In [13]:
evoked_response.plot_topomap()

NameError: name 'evoked_response' is not defined

In [20]:
evoked_response.plot_joint()

No projector specified for this dataset. Please consider the method self.add_proj.


<Figure size 2880x1708 with 6 Axes>

In [59]:
ica.plot_sources(seeg_raw_filtered_ica, show_scrollbars=True)

Creating RawArray with float64 data, n_channels=18, n_times=160000
    Range : 0 ... 159999 =      0.000 ...   799.995 secs
Ready.


<mne_qt_browser._pg_figure.MNEQtBrowser at 0x1134e0e0ef0>

In [61]:
# This shows: topography, time series, PSD, and variance over epochs
ica.plot_properties(seeg_raw_filtered_ica, picks=range(ica.n_components_))

    Using multitaper spectrum estimation with 7 DPSS windows
Not setting metadata
200 matching events found
No baseline correction applied
0 projection items activated
Not setting metadata
200 matching events found
No baseline correction applied
0 projection items activated
Not setting metadata
200 matching events found
No baseline correction applied
0 projection items activated
Not setting metadata
200 matching events found
No baseline correction applied
0 projection items activated
Not setting metadata
200 matching events found
No baseline correction applied
0 projection items activated
Not setting metadata
200 matching events found
No baseline correction applied
0 projection items activated
Not setting metadata
200 matching events found
No baseline correction applied
0 projection items activated
Not setting metadata
200 matching events found
No baseline correction applied
0 projection items activated
Not setting metadata
200 matching events found
No baseline correction applied
0 pro

[<Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>,
 <Figure size 700x600 with 6 Axes>]