In [None]:
import numpy as np
import pandas as pd
from nilearn import image, input_data, plotting
from nilearn.glm.first_level import FirstLevelModel
from nilearn.plotting import plot_stat_map
import matplotlib.pyplot as plt
from scipy import stats
import os
import pickle
from pathlib import Path
from nilearn.datasets import fetch_atlas_harvard_oxford


In [None]:
# ## Setup Project Directory Structure


In [None]:
# Create directory structure
project_dir = Path('fmri_project')
checkpoints_dir = project_dir / 'checkpoints'
results_dir = project_dir / 'results'

# Create directories if they don't exist
for dir_path in [checkpoints_dir, results_dir]:
    dir_path.mkdir(parents=True, exist_ok=True)


In [None]:
# ## Create Events DataFrame
# Create events DataFrame from the timing information


In [None]:
def save_checkpoint(data, filename):
    """Save data to checkpoint file"""
    filepath = checkpoints_dir / filename
    with open(filepath, 'wb') as f:
        pickle.dump(data, f)
    print(f"Checkpoint saved: {filepath}")

def load_checkpoint(filename):
    """Load data from checkpoint file"""
    filepath = checkpoints_dir / filename
    if filepath.exists():
        with open(filepath, 'rb') as f:
            return pickle.load(f)
    return None


In [None]:
# Check if events DataFrame already exists
events = load_checkpoint('events.pkl')

if events is None:
    # Create events DataFrame with timing information
    events = pd.DataFrame({
        'onset': [0, 60, 120, 180, 240, 300, 360, 420, 480, 540],
        'duration': [30] * 10,
        'emotion': ['calm', 'afraid', 'delighted', 'depressed',
                    'excited', 'delighted', 'depressed', 'calm',
                    'excited', 'afraid']
    })

    # Map emotions to valence categories
    valence_mapping = {
        'calm': 'neutral',
        'afraid': 'negative',
        'delighted': 'positive',
        'depressed': 'negative',
        'excited': 'positive'
    }
    events['trial_type'] = events['emotion'].map(valence_mapping)

    # Save events DataFrame
    save_checkpoint(events, 'events.pkl')
else:
    print("Loaded existing events DataFrame")


In [None]:
# ## Create ROI Masks
# Create masks for regions of interest in emotional processing


In [None]:
# Check if ROI masks already exist
roi_masks = load_checkpoint('roi_masks.pkl')

if roi_masks is None:
    # Fetch Harvard-Oxford atlases
    cortical = fetch_atlas_harvard_oxford('cort-maxprob-thr25-2mm')
    subcortical = fetch_atlas_harvard_oxford('sub-maxprob-thr25-2mm')

    # Create ROI masks
    roi_masks = {
        # Subcortical regions
        'amygdala_left': image.math_img('img == 10', img=subcortical.maps),
        'amygdala_right': image.math_img('img == 20', img=subcortical.maps),
        'hippocampus_left': image.math_img('img == 9', img=subcortical.maps),
        'hippocampus_right': image.math_img('img == 19', img=subcortical.maps),

        # Cortical regions
        'acc': image.math_img('img == 29', img=cortical.maps),
        'sup_frontal_gyrus': image.math_img('img == 3', img=cortical.maps),
        'mid_frontal_gyrus': image.math_img('img == 4', img=cortical.maps),
    }

    # Create bilateral masks
    bilateral_pairs = [
        ('amygdala', 'amygdala_left', 'amygdala_right'),
        ('hippocampus', 'hippocampus_left', 'hippocampus_right'),
    ]

    # Combine bilateral pairs
    for name, left, right in bilateral_pairs:
        roi_masks[f'{name}_bilateral'] = image.math_img(
            "img1 + img2",
            img1=roi_masks[left],
            img2=roi_masks[right]
        )

    # Save ROI masks
    save_checkpoint(roi_masks, 'roi_masks.pkl')
else:
    print("Loaded existing ROI masks")


In [None]:
# ## Process Individual Subjects
# Loop through subjects and perform first-level analysis


In [None]:
# Check if subject results already exist
all_subject_contrasts = load_checkpoint('subject_contrasts.pkl')
all_subject_roi_signals = load_checkpoint('subject_roi_signals.pkl')

if all_subject_contrasts is None or all_subject_roi_signals is None:
    # Initialize lists to store results
    n_subjects = 40
    all_subject_contrasts = []
    all_subject_roi_signals = []

    # Process each subject
    for subject_id in range(1, n_subjects + 1):
        subject_id_str = f'{subject_id:02d}'
        print(f'Processing subject {subject_id_str}...')

        # Check if individual subject results exist
        subject_results = load_checkpoint(f'subject_{subject_id_str}_results.pkl')

        if subject_results is None:
            # Load subject data
            func_file = f'ds005700/sub-{subject_id_str}/func/sub-{subject_id_str}_task-fe_bold.nii.gz'
            anat_file = f'ds005700/sub-{subject_id_str}/anat/sub-{subject_id_str}_T1w.nii.gz'

            func_img = image.load_img(func_file)
            anat_img = image.load_img(anat_file)

            # Create and fit first-level model
            model = FirstLevelModel(
                t_r=2.02697,  # Time repetition from .json file
                noise_model='ar1',
                standardize=True,
                hrf_model='spm',
                drift_model='cosine'
            )

            model.fit(func_img, events)

            # Define and compute contrasts
            contrasts = {
                'positive_vs_neutral': 'positive - neutral',
                'negative_vs_neutral': 'negative - neutral',
                'positive_vs_negative': 'positive - negative'
            }

            contrast_maps = {}
            for contrast_id, contrast_def in contrasts.items():
                contrast_maps[contrast_id] = model.compute_contrast(contrast_def)

            # Extract ROI signals
            roi_signals = {}
            for roi_name, roi_mask in roi_masks.items():
                masker = input_data.NiftiMasker(mask_img=roi_mask)
                roi_signals[roi_name] = masker.fit_transform(func_img)

            # Save individual subject results
            subject_results = {'contrasts': contrast_maps, 'roi_signals': roi_signals}
            save_checkpoint(subject_results, f'subject_{subject_id_str}_results.pkl')
        else:
            print(f"Loaded existing results for subject {subject_id_str}")
            contrast_maps = subject_results['contrasts']
            roi_signals = subject_results['roi_signals']

        all_subject_contrasts.append(contrast_maps)
        all_subject_roi_signals.append(roi_signals)

    # Save all subject results
    save_checkpoint(all_subject_contrasts, 'subject_contrasts.pkl')
    save_checkpoint(all_subject_roi_signals, 'subject_roi_signals.pkl')
else:
    print("Loaded existing subject results")


In [None]:
# ## Group Analysis
# Perform second-level analysis across all subjects


In [None]:
# Check if group results already exist
group_results = load_checkpoint('group_results.pkl')

if group_results is None:
    from nilearn.glm import second_level

    # Create and fit second-level model
    group_model = second_level.SecondLevelModel()

    # Compute group results for each contrast
    group_results = {}
    for contrast_id in all_subject_contrasts[0].keys():
        contrast_maps = [subj_contrasts[contrast_id]
                         for subj_contrasts in all_subject_contrasts]
        group_results[contrast_id] = group_model.fit(contrast_maps).compute_contrast()

    # Save group results
    save_checkpoint(group_results, 'group_results.pkl')
else:
    print("Loaded existing group results")


In [None]:
# ## Plot Results
# Generate and save visualization of group-level results


In [None]:
# Plot and save results for each contrast
for contrast_id, stat_map in group_results.items():
    output_file = results_dir / f'{contrast_id}_group_map.png'

    # Only generate plot if it doesn't exist
    if not output_file.exists():
        display = plotting.plot_stat_map(
            stat_map,
            threshold=3.0,
            title=contrast_id
        )
        plt.savefig(output_file)
        plt.close()
        print(f"Generated plot: {output_file}")
    else:
        print(f"Plot already exists: {output_file}")