# Searchlight Analysis
Searchlight Analysis is used to sequentially analyze small groups of voxels in the brain in order to identify regions of interest. The technique is designed in the following steps:

1. Define a sphere of voxels around a seed voxel
2. Extract the time series from each voxel in the sphere
3. Concatenate the time series into a feature vector
4. Train a classifier on the feature vector and the labels
5. Use the classifier to predict the labels of the seed voxel
6. Repeat steps 1-5 for all seed voxels
7. Aggregate the results across all seed voxels to form a statistical map


In [1]:
# Load dependencies
from sklearn.svm import LinearSVC
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
from sklearn.pipeline import Pipeline
import numpy as np
import nibabel as nib
from nilearn import image as nli

### Load and initialize data

In [2]:
def prep_data_training(clean_data_audio, clean_data_visual):
    """
    Load and resample the audio and visual data
    """

    # Resample visual data to match audio data
    visual_resampled = nli.resample_img(
        clean_data_visual,
        target_affine = clean_data_audio.affine,
        target_shape = clean_data_audio.shape[:3],
        interpolation = 'linear' 
    )
    print("visual sample data before truncation: ", visual_resampled.shape)
    print("audio sample data before truncation: ", clean_data_audio.shape)

    # Convert 4D fMRI to a 2D array (samples per timepoint x features)
    min_timepoints = min(clean_data_audio.shape[-1], visual_resampled.shape[-1]) # get min z axis
    print("min timepoints: ", min_timepoints)

    audio_data = clean_data_audio.get_fdata()[...,:min_timepoints] # set length to min z axis
    visual_data = visual_resampled.get_fdata()[...,:min_timepoints]

    print("audio sample data after truncation: ", audio_data.shape)
    print("visual sample data after truncation: ", visual_data.shape)

    return audio_data, visual_data


In [3]:
# Reshape data: (x, y, z, time) -> (time, x*y*z)
def reshape_data(audio_data, visual_data):
    """
    Reshape the dataset to the feature matrix and labels to prep for training
    """
    X_audio = audio_data.reshape(audio_data.shape[-1], -1)
    X_visual = visual_data.reshape(visual_data.shape[-1], -1)

    # Create labels (0 for audio, 1 for visual)
    y_audio = np.zeros(X_audio.shape[0])
    y_visual = np.ones(X_visual.shape[0])

    # combine datasets to create a single feature matrix and labels
    X = np.vstack((X_audio, X_visual))
    y = np.concatenate((y_audio, y_visual))

    # test print
    print("Feature matrix shape: ", X.shape)
    print("Labels shape: ", y.shape)

    return X, y


### Set up Linear SVM classifier and train the new feature matrix
This will classify the data into audio and visual categories

In [4]:
# Set up model pipeline
def train_model(X, y):
    """
    X represents a feature matrix and y represents labels
    """
    pipeline = Pipeline(steps = [
        ('standardscaler', StandardScaler()),
        ('linearsvc', LinearSVC(random_state=0, tol=1e-05))])

    # Train model with feature matrix and labels
    pipeline.fit(X, y)

    # Test print the model score
    print("Model score: ", pipeline.score(X, y))

    return pipeline

def evaluate_model(pipeline, X, y):
    """
    Data represents a feature matrix and labels of the test set
    """

    print(pipeline.score(X, y))
    if pipeline.score(X, y) > 0.5:
        print("Model is performing better than chance")
    else:
        print("Model is performing worse than chance")

    return pipeline


### Set up Searchlight Analysis

In [5]:
# Mask voxels
def create_brain_mask(clean_data_audio, clean_data_visual):
    """
    create a binary mask of brain activity
    """
    audio_data = clean_data_audio.get_fdata()
    visual_data = clean_data_visual.get_fdata()

    audio_brain_mask = np.mean(audio_data, axis = 3) > 0
    visual_brain_mask = np.mean(visual_data, axis = 3) > 0

    # convert to int
    audio_brain_mask = audio_brain_mask.astype(int)
    visual_brain_mask = visual_brain_mask.astype(int)

    print("Audio brain mask shape: ", audio_brain_mask.shape)
    print("Visual brain mask shape: ", visual_brain_mask.shape)

    # create a small mask for testing
    small_mask_audio = np.zeros(audio_brain_mask.shape)
    small_mask_visual = np.zeros(visual_brain_mask.shape)

    # set a single voxel to 1 for testing
    small_mask_audio[30, 31, 13] = 1
    small_mask_visual[30, 31, 13] = 1

    return audio_brain_mask, visual_brain_mask, small_mask_audio, small_mask_visual



In [6]:
import time
from sklearn.model_selection import cross_val_score

def create_sphere_mask(x, y, z, radius):
    """
    Create a sphere mask for the current voxel
    """
    return np.sqrt((x - radius)**2 + (y - radius)**2 + (z - radius)**2) <= radius

def classify_sphere(clean_data_audio, clean_data_visual, x, y, z, radius, sphere_mask):
    """
    Classify the accuracy of the data in the sphere
    """
    # extract data from the sphere for both conditions
    sphere_data_audio = clean_data_audio.get_fdata()[sphere_mask]
    sphere_data_visual = clean_data_visual.get_fdata()[sphere_mask]

    # retrieve time series for voxels in the sphere
    # reshape to the data (timepoints x voxels_in_sphere)
    X_audio = sphere_data_audio[
        x-radius:x+radius+1,
        y-radius:y+radius+1,
        z-radius:z+radius+1
    ][sphere_mask].T
    
    X_visual = sphere_data_visual[
        x-radius:x+radius+1,
        y-radius:y+radius+1,
        z-radius:z+radius+1
    ][sphere_mask].T
    
    # Create feature matrix and labels
    X = np.vstack((X_audio, X_visual))
    y = np.concatenate([
        np.zeros(X_audio.shape[0]),  # Audio labels
        np.ones(X_visual.shape[0])   # Visual labels
    ])
    
    # Train classifier on this sphere
    clf = Pipeline([
        ('scaler', StandardScaler()),
        ('svm', LinearSVC(random_state=42))
    ])
    
    # Use cross-validation to get accuracy
    scores = cross_val_score(clf, X, y, cv=5)
    
    return np.mean(scores)


def apply_searchlight(clean_data_audio, clean_data_visual, radius = 3):
    """
    Apply searchlight analysis to the data
    """
    # start timer for runtime
    begin_time = time.time()

    # initialize accuracy map
    accuracy_map_audio = np.zeros(clean_data_audio.shape[:3])
    accuracy_map_visual = np.zeros(clean_data_visual.shape[:3])

    # iterate through each voxel in the brain mask
    for x in range(radius, clean_data_audio.shape[0] - radius): # iterate through x from the radius to the end of the x axis
        for y in range(radius, clean_data_audio.shape[1] - radius):
            for z in range(radius, clean_data_audio.shape[2] - radius):
                # extract sphere data for the current voxel
                sphere_mask = create_sphere_mask(x, y, z, radius)

                # classify the sphere data
                sphere_accuracy = classify_sphere(
                    clean_data_audio,
                    clean_data_visual,
                    x,
                    y,
                    z,
                    radius,
                    sphere_mask
                )

                # update the accuracy map
                accuracy_map_audio[x, y, z] = sphere_accuracy


### Run the Program


In [7]:

def classification_pipeline():
    """ 
    Run the classification pipeline
    """
    # Set up data
    clean_data_audio = nib.load('../results/audio/cleaned_data_audio.nii.gz')
    clean_data_visual = nib.load('../results/visual/cleaned_data_visual.nii.gz')

    # Preprocess data
    audio_data, visual_data = prep_data_training(clean_data_audio, clean_data_visual)
    X, y = reshape_data(audio_data, visual_data)

    # Train model
    pipeline = train_model(X, y)
    evaluate_model(pipeline, X, y)

    return pipeline

pipeline = classification_pipeline()
print(pipeline)

visual sample data before truncation:  (120, 120, 28, 156)
audio sample data before truncation:  (120, 120, 28, 200)
min timepoints:  156
audio sample data after truncation:  (120, 120, 28, 156)
visual sample data after truncation:  (120, 120, 28, 156)
Feature matrix shape:  (312, 403200)
Labels shape:  (312,)
Model score:  1.0
1.0
Model is performing better than chance
Pipeline(steps=[('standardscaler', StandardScaler()),
                ('linearsvc', LinearSVC(random_state=0, tol=1e-05))])


In [8]:
def searchlight_pipeline():
    """
    Run the searchlight pipeline
    """
    # Load data
    clean_data_audio = nib.load('../results/audio/cleaned_data_audio.nii.gz')
    clean_data_visual = nib.load('../results/visual/cleaned_data_visual.nii.gz')

    # Create brain masks
    audio_brain_mask, visual_brain_mask, small_mask_audio, small_mask_visual = create_brain_mask(clean_data_audio, clean_data_visual)

    # Apply searchlight
    accuracy_map_audio = apply_searchlight(clean_data_audio, clean_data_visual, radius = 3)
    accuracy_map_visual = apply_searchlight(clean_data_visual, clean_data_audio, radius = 3)

    return accuracy_map_audio, accuracy_map_visual

accuracy_map_audio, accuracy_map_visual = searchlight_pipeline()
print(accuracy_map_audio)

Audio brain mask shape:  (120, 120, 28)
Visual brain mask shape:  (64, 64, 32)


ValueError: all the input array dimensions except for the concatenation axis must match exactly, but along dimension 1, the array at index 0 has size 28 and the array at index 1 has size 32