In [None]:
import os
import os.path as op
import nibabel as nib
import numpy as np
import pandas as pd
from PIL import Image, ImageDraw, ImageFont, ImageEnhance
import torch
from torchvision import transforms
from tqdm import tqdm
import h5py
import pickle
os.chdir('../')

## Data preparation code, normalizes and formats into an easily loadable format

In [None]:
def zscore(x, mean=None, stddev=None, return_stats=False):
    if mean is not None:
        m = mean
    else:
        m = torch.mean(x, axis=0, keepdims=True)
    if stddev is not None:
        s = stddev
    else:
        s = torch.std(x, axis=0, keepdims=True)
    if return_stats:
        return (x - m)/(s+1e-6), m, s
    else:
        return (x - m)/(s+1e-6)

def create_whole_region_imagery_unnormalized(subject = 1):

    nsd_general = nib.load(f"data/nsddata/ppdata/subj0{subject}/func1pt8mm/roi/nsdgeneral.nii.gz").get_fdata()
    nsd_general = np.nan_to_num(nsd_general)
    nsd_general = np.where(nsd_general==1.0, True, False)

    layer_size = np.sum(nsd_general == True)
    os.makedirs("data/preprocessed_data/subject{}/".format(subject), exist_ok=True)

    whole_region = np.zeros((720, layer_size))

    nsd_general_mask = np.nan_to_num(nsd_general)
    nsd_mask = np.array(nsd_general_mask.flatten(), dtype=bool)
    beta_file = f"data/nsddata_betas/ppdata/subj0{subject}/func1pt8mm/nsdimagerybetas_fithrf_GLMdenoise_RR/betas_nsdimagery.nii.gz"

    imagery_betas = nib.load(beta_file).get_fdata()
    imagery_betas = imagery_betas.transpose((3,0,1,2))
    whole_region = torch.from_numpy(imagery_betas.reshape((len(imagery_betas), -1))[:,nsd_general.flatten()].astype(np.float32))

    torch.save(whole_region, "data/preprocessed_data/subject{}/nsd_imagery_unnormalized.pt".format(subject))
    return whole_region

def create_whole_region_imagery_normalized(subject = 1):
    img_stim_file = "data/nsddata_stimuli/stimuli/nsdimagery_stimuli.pkl3"
    ex_file = open(img_stim_file, 'rb')
    imagery_dict = pickle.load(ex_file)
    ex_file.close()
    exps = imagery_dict['exps']
    cues = imagery_dict['cues']
    meta_cond_idx = {
        'visA': np.arange(len(exps))[exps=='visA'],
        'visB': np.arange(len(exps))[exps=='visB'],
        'visC': np.arange(len(exps))[exps=='visC'],
        'imgA_1': np.arange(len(exps))[exps=='imgA_1'],
        'imgA_2': np.arange(len(exps))[exps=='imgA_2'],
        'imgB_1': np.arange(len(exps))[exps=='imgB_1'],
        'imgB_2': np.arange(len(exps))[exps=='imgB_2'],
        'imgC_1': np.arange(len(exps))[exps=='imgC_1'],
        'imgC_2': np.arange(len(exps))[exps=='imgC_2']
    }
    unnormalized_file = f"data/preprocessed_data/subject{subject}/nsd_imagery_unnormalized"
    whole_region = torch.load(unnormalized_file + ".pt")
    whole_region = whole_region / 300.
    whole_region_norm = torch.zeros_like(whole_region)

    # Normalize the data using Z scoring method for each voxel
    for c,idx in meta_cond_idx.items():
        whole_region_norm[idx] = zscore(whole_region[idx])

    # Save the tensor of normalized data
    torch.save(whole_region_norm, f"data/preprocessed_data/subject{subject}/nsd_imagery.pt")

## Data loading code, prepares and averages/nests the data for a specific subject and stimulus type

In [None]:
def condition_average(x, y, cond, nest=False):
    idx, idx_count = np.unique(cond, return_counts=True)
    idx_list = [np.array(cond)==i for i in np.sort(idx)]
    if nest:
        avg_x = torch.zeros((len(idx), idx_count.max(), x.shape[1]), dtype=torch.float32)
    else:
        avg_x = torch.zeros((len(idx), 1, x.shape[1]), dtype=torch.float32)
    for i, m in enumerate(idx_list):
        if nest:
            avg_x[i] = x[m]
        else:
            avg_x[i] = torch.mean(x[m], axis=0)

    return avg_x, y, len(idx_count)

#subject: nsd subject index between 1-8
#mode: vision, imagery
#stimtype: all, simple, complex, concepts
#average: whether to average across trials, will produce x that is (stimuli, 1, voxels)
#nest: whether to nest the data according to stimuli, will produce x that is (stimuli, trials, voxels)
def load_nsd_mental_imagery(subject, mode, stimtype="all", average=False, nest=False):
    # This file has a bunch of information about the stimuli and cue associations that will make loading it easier
    img_stim_file = "data/nsddata_stimuli/stimuli/nsdimagery_stimuli.pkl3"
    ex_file = open(img_stim_file, 'rb')
    imagery_dict = pickle.load(ex_file)
    ex_file.close()
    # Indicates what experiments trials belong to
    exps = imagery_dict['exps']
    # Indicates the cues for different stimuli
    cues = imagery_dict['cues']
    # Maps the cues to the stimulus image information
    image_map  = imagery_dict['image_map']
    # Organize the indices of the trials according to the modality and the type of stimuli
    cond_idx = {
    'visionsimple': np.arange(len(exps))[exps=='visA'],
    'visioncomplex': np.arange(len(exps))[exps=='visB'],
    'visionconcepts': np.arange(len(exps))[exps=='visC'],
    'visionall': np.arange(len(exps))[np.logical_or(np.logical_or(exps=='visA', exps=='visB'), exps=='visC')],
    'imagerysimple': np.arange(len(exps))[np.logical_or(exps=='imgA_1', exps=='imgA_2')],
    'imagerycomplex': np.arange(len(exps))[np.logical_or(exps=='imgB_1', exps=='imgB_2')],
    'imageryconcepts': np.arange(len(exps))[np.logical_or(exps=='imgC_1', exps=='imgC_2')],
    'imageryall': np.arange(len(exps))[np.logical_or(
                                        np.logical_or(
                                            np.logical_or(exps=='imgA_1', exps=='imgA_2'),
                                            np.logical_or(exps=='imgB_1', exps=='imgB_2')),
                                        np.logical_or(exps=='imgC_1', exps=='imgC_2'))]}
    # Load normalized betas
    x = torch.load("data/preprocessed_data/subject{}/nsd_imagery.pt".format(subject)).requires_grad_(False).to("cpu")
    # Find the trial indices conditioned on the type of trials we want to load
    cond_im_idx = {n: [image_map[c] for c in cues[idx]] for n,idx in cond_idx.items()}
    conditionals = cond_im_idx[mode+stimtype]
    # Stimuli file is of shape (18,3,425,425), these can be converted back into PIL images using transforms.ToPILImage()
    y = torch.load("data/nsddata_stimuli/stimuli/imagery_stimuli_18.pt").requires_grad_(False).to("cpu")
    # Prune the beta file down to specific experimental mode/stimuli type
    x = x[cond_idx[mode+stimtype]]
    # If stimtype is not all, then prune the image data down to the specific stimuli type
    if stimtype == "simple":
        y = y[:6]
    elif stimtype == "complex":
        y = y[6:12]
    elif stimtype == "concepts":
        y = y[12:]

    # Average or nest the betas across trials
    if average or nest:
        x, y, sample_count = condition_average(x, y, conditionals, nest=nest)
    else:
        x = x.reshape((x.shape[0], 1, x.shape[1]))

    print(x.shape)
    return x, y

In [None]:
for subject in tqdm(range(1,9)):
    create_whole_region_imagery_unnormalized(subject)
    create_whole_region_imagery_normalized(subject)
    x, y = load_nsd_mental_imagery(subject=subject, mode="imagery", stimtype="all", average=True, nest=False)

 12%|█▎        | 1/8 [00:09<01:05,  9.42s/it]

torch.Size([18, 1, 15724])


 25%|██▌       | 2/8 [00:18<00:55,  9.24s/it]

torch.Size([18, 1, 14278])


 38%|███▊      | 3/8 [00:27<00:46,  9.23s/it]

torch.Size([18, 1, 15226])


 50%|█████     | 4/8 [00:36<00:36,  9.09s/it]

torch.Size([18, 1, 13153])


 62%|██████▎   | 5/8 [00:44<00:26,  8.68s/it]

torch.Size([18, 1, 13039])


 75%|███████▌  | 6/8 [00:55<00:18,  9.48s/it]

torch.Size([18, 1, 17907])


 88%|████████▊ | 7/8 [01:03<00:08,  8.99s/it]

torch.Size([18, 1, 12682])


100%|██████████| 8/8 [01:11<00:00,  8.99s/it]

torch.Size([18, 1, 14386])



