# Univariate VLSM

This script can be used to perform univariate voxel-based lesion-symptom mapping (VLSM) or be used as feature-selection for multi-variate LSM (e.g. with support-vector regression (SVR)).

There are two assumptions on your data:
1. You have binary lesions maps (0=background, 1=lesion) in MNI space
2. The output is a continuous variable (e.g. Z-scores)

The script performs the following steps:
1. A lesion prevalence analysis, showing how many subjects have a lesion in a specific voxel. This is used to only assess voxels where a sufficient number of subjects have a lesion (e.g. >= 5 subjects)
2. A voxel-wise t-test
3. Computing a power-map per voxel
  1. Calculating the effect-size per voxel and taking the 99th percentile as the overall (fixed) effect size
  2. Computing the power-map per voxel, using the fixed effect size from (a)
4. Performing multiple testing correction
  1. FDR correction (Benjamini/Hochberg)
  2. Optionally: a permutation minT/maxP correction
5. Saving everything as nifti images in MNI space

In [None]:
import glob
import json
import os

from joblib import Parallel, delayed
import numpy as np
import pandas
from scipy.stats import ttest_ind
import SimpleITK as sitk
from statsmodels.stats.multitest import multipletests
from statsmodels.stats.power import tt_ind_solve_power
from tqdm.notebook import trange, tqdm
from tqdm.contrib import tenumerate

## Parameters

All relevant parameters for this script can be set in the box below, related to either the input, settings, or output.

### Input 

- **dir_lesion:** the directory that contains the binary lesion maps per subject
- **design_document:** spreadsheet containing two or more columns: *first column:* name of the lesion map for every subject (should exist in dir_lesion or a subfolder (see below)); *second column:* continuous output variable, e.g. a Z-score on a specific domain; *optionally* more columns with additional output variables
- **data_in_subfolders:** specifies whether the data is directly in *dir_lesion* (False) or whether it is in subfolders (True). In the latter case, the filename in the first column of the *design_document* should be: COHORT_SUBJECT.nii.gz and this file is located in *dir_lesion*/COHORT/COHORT_SUBJECT.nii.gz  
- **domain:** which column in the design_document to use as output (note: 0-based index). By default: 1.

### Settings

- **subject_threshold:** minimum number of subjects with a lesion in a voxel
- **alpha:** alpha value to use in t-test and multiple testing correction
- **perform_multiple_testing_permutation:** whether to perform multiple testing correction using permutations (minT/maxP ; True) or not (False). Note: this is extremely slow and can easily take more than 24 hours.
- **num_permutations:** number of permutations for the multiple testing correction
- **n_jobs:** number of parallel computation jobs (set at 1 when unsure)

### Output
- **output_base_path:** output directory into which all results will be written
- **output_name_\*:** output filenames for the various maps that are computed

In [None]:
# Input data
dir_lesion = r"/home/jovyan/work/WMH_memory_clinic/Lesion_maps"
design_document = r"/home/jovyan/work/WMH_memory_clinic/Analyses/WMH_MemoryClinic_MainProject/Domain_VerbalMemory.xlsx"
data_in_subfolders = True
domain = 1

# Settings
subject_threshold = 5
alpha = 0.05
perform_multiple_testing_permutation = False  # Note: super slow !
num_permutations = 1000
n_jobs = 35

# Output
output_base_path = r"/home/jovyan/work/WMH_memory_clinic/Analyses/WMH_MemoryClinic_MainProject/Domain_VerbalMemory"

# Output for lesion prevalence map
output_name_lesion_prevalence = output_base_path + "_lesion_prevalence.nii"

# Output for t-test and multiple testing correction
output_name_tdata = output_base_path + "_tmap.nii"
output_name_pdata = output_base_path + "_1-p_not_corrected.nii"
output_name_pcorrdata = output_base_path + "_1-p_fdr_corrected.nii"

# Output for power calculation
output_name_powermap = output_base_path + "_power_map.nii"
output_name_effectmap = output_base_path + "_effect_map.nii"

# Output for permutation tests (optional, depending on perform_multiple_testing_permutation==True)
output_name_ppermcorrdata = output_base_path + "_1-p_permutation_corrected.nii"
output_name_tstatistics = output_base_path + "_tstatistics.dat"

### Do not make changes below here

In [None]:
# Load the design document and load the Z-scores
df = pandas.read_excel(design_document, header=None)
z_score = df.iloc[:, domain].to_numpy()

In [None]:
# Initialize lesion matrix and load all lesion data
lesion_filename = df[0][0]
if data_in_subfolders:
    subfolder = lesion_filename.split("_")[0]
    lesion_filename = os.path.join(subfolder, lesion_filename)
nii = sitk.ReadImage(os.path.join(dir_lesion, lesion_filename))

# The raw_lesion_matrix has the shape: number of subjects, number of voxels
raw_lesion_matrix = np.zeros((len(df.index), sitk.GetArrayViewFromImage(nii).size), np.int8)
for i, lesion_filename in tenumerate(df[0]):
    if data_in_subfolders:
        subfolder = lesion_filename.split("_")[0]
        lesion_filename = os.path.join(subfolder, lesion_filename)

    nii = sitk.ReadImage(os.path.join(dir_lesion, lesion_filename))
    raw_lesion_matrix[i] = sitk.GetArrayViewFromImage(nii).ravel()

In [None]:
# Compute the lesion prevalence map
lesion_prevalence = np.sum(raw_lesion_matrix, axis=0)

In [None]:
# Determine which voxels to test (sufficient number of subjects per voxel)
index_test_mask = lesion_prevalence >= subject_threshold
index_test_voxels = np.argwhere(index_test_mask)
n_test_voxels = len(index_test_voxels)

In [None]:
# Perform t-test
t_data_v = np.zeros(n_test_voxels)
pvalue_v = np.zeros(n_test_voxels)


def do_ttest(i):
    group_nc = raw_lesion_matrix[:, index_test_voxels[i][0]] == 0
    group_damage = raw_lesion_matrix[:, index_test_voxels[i][0]] > 0
    
    return ttest_ind(z_score[group_nc], z_score[group_damage], equal_var=True)


result = Parallel(n_jobs=n_jobs)(delayed(do_ttest)(i) for i in trange(n_test_voxels))
t_data_v, pvalue_v = zip(*result)

In [None]:
# Perform multiple testing correction using permutations. Results are stored (per permutation) in
# a text file. This way, you can resume and append the results.
if perform_multiple_testing_permutation:
    rng = np.random.default_rng()

    for _ in trange(num_permutations):
        permuted_z_score = rng.permutation(z_score)
        
        
        def do_permuted_ttest(i):
            group_nc = raw_lesion_matrix[:, index_test_voxels[i][0]] == 0
            group_damage = raw_lesion_matrix[:, index_test_voxels[i][0]] > 0

            return ttest_ind(permuted_z_score[group_nc], permuted_z_score[group_damage], equal_var=True)
        
        
        permuted_result = Parallel(n_jobs=n_jobs)(delayed(do_permuted_ttest)(i) for i in trange(n_test_voxels, leave=False))
        permuted_t_data_v, _ = zip(*permuted_result)

        with open(output_name_tstatistics, 'ab') as tstatistics_file:
            np.savetxt(tstatistics_file, np.array([max(permuted_t_data_v)]))

In [None]:
# Load in the results from the previous cell and compute the multiple testing correction
if perform_multiple_testing_permutation:
    with open(output_name_tstatistics, 'rb') as tstatistics_file:
        max_t_distribution = np.loadtxt(tstatistics_file)

    pvalue_permutation_corrected = np.mean(max_t_distribution[:, np.newaxis] > t_data_v, axis=0)

In [None]:
# Compute all effect sizes
z_score_std = np.std(z_score)
def do_effect_size(i):
    group_nc = raw_lesion_matrix[:, index_test_voxels[i][0]] == 0
    group_damage = raw_lesion_matrix[:, index_test_voxels[i][0]] > 0    
    
    z_score_nc = np.mean(z_score[group_nc])
    z_score_damage = np.mean(z_score[group_damage])
    
    return (z_score_nc - z_score_damage) / z_score_std

result_effect = Parallel(n_jobs=n_jobs)(delayed(do_effect_size)(i) for i in trange(n_test_voxels))

# We use the 99th percentile as a fixed effect size
fixed_effect_size = np.percentile(result_effect, 99)

In [None]:
# Compute power maps. There are two functions:
# do_tt_ind_solve_power: determine the effect size per voxel and compute the power
# do_tt_ind_solve_power_fixed_effect_size: use the fixed effect size (computed above) for all voxels

z_score_std = np.std(z_score)
def do_tt_ind_solve_power(i):
    group_nc = raw_lesion_matrix[:, index_test_voxels[i][0]] == 0
    group_damage = raw_lesion_matrix[:, index_test_voxels[i][0]] > 0    
    
    z_score_nc = np.mean(z_score[group_nc])
    z_score_damage = np.mean(z_score[group_damage])
    effect_size = (z_score_nc - z_score_damage) / z_score_std

    nobs1 = np.sum(group_nc)
    ratio = np.sum(group_damage) / nobs1
    
    return tt_ind_solve_power(effect_size=effect_size, nobs1=nobs1, alpha=alpha, power=None, ratio=ratio, alternative='two-sided')


def do_tt_ind_solve_power_fixed_effect_size(i):   
    group_nc = raw_lesion_matrix[:, index_test_voxels[i][0]] == 0
    group_damage = raw_lesion_matrix[:, index_test_voxels[i][0]] > 0    
     
    nobs1 = np.sum(group_nc)
    ratio = np.sum(group_damage) / nobs1

    return tt_ind_solve_power(effect_size=fixed_effect_size, nobs1=nobs1, alpha=alpha, power=None, ratio=ratio, alternative='two-sided')


# Change the function in delayed() to the desired way of computing the power.
result_power = Parallel(n_jobs=n_jobs)(delayed(do_tt_ind_solve_power_fixed_effect_size)(i) for i in trange(n_test_voxels))

In [None]:
# Correction for multiple testing
_, pvals_corrected, _, _ = multipletests(pvalue_v, alpha=alpha, method='fdr_bh')

In [None]:
# Load the MNI template, to use as a reference image for all output images
ref_nii = sitk.ReadImage(os.path.join(os.getcwd(), 'src', 'Atlas', 'LSM_reference_1mm_MNI152.nii'))
ref_nii_arr = sitk.GetArrayViewFromImage(ref_nii)

# Save all maps as nii
nii = sitk.GetImageFromArray(lesion_prevalence.reshape(ref_nii_arr.shape).astype(float))
nii.CopyInformation(ref_nii)
sitk.WriteImage(nii, output_name_lesion_prevalence)


def save_result_image(data, output_name):
    data_arr = np.zeros(ref_nii_arr.shape, np.float32)
    data_arr[index_test_mask.reshape(ref_nii_arr.shape)] = data
    data_img = sitk.GetImageFromArray(data_arr)
    data_img.CopyInformation(ref_nii)
    sitk.WriteImage(data_img, output_name)


save_result_image(np.array(t_data_v), output_name_tdata)
save_result_image(1-np.array(pvalue_v), output_name_pdata)
save_result_image(1-pvals_corrected, output_name_pcorrdata)
if perform_multiple_testing_permutation:
    save_result_image(1-pvalue_permutation_corrected, output_name_ppermcorrdata)
save_result_image(np.array(result_power), output_name_powermap)
save_result_image(np.array(result_effect), output_name_effectmap)

In [None]:
# TODO: save description of input, settings, outputs, date/time, etc