In [None]:
from hyppo.ksample import KSample
from hyppo.independence import Dcorr
from combat import combat
import pandas as pd
import glob
import os
import graspy as gp
import numpy as np
from dask.distributed import Client, progress
import dask.dataframe as ddf
from scipy.stats import zscore, rankdata, mannwhitneyu
import copy
import math
import networkx as nx
from graspy.models import SIEMEstimator as siem

In [84]:
def get_sub(fname):
    stext = os.path.basename(fname).split('_')
    return('{}_{}_{}'.format(stext[0], stext[1], stext[3]))

def get_sub_pheno_dat(subid, scan, pheno_dat):
    matches = pheno_dat.index[pheno_dat["SUBID"] == int(subid)].tolist()
    match = np.min(matches)
    return(int(pheno_dat.iloc[match]["SEX"]))

def get_age_pheno_dat(subid, scan, pheno_dat):
    matches = pheno_dat.index[pheno_dat["SUBID"] == int(subid)].tolist()
    match = np.min(matches)
    return(int(pheno_dat.iloc[match]["AGE_AT_SCAN_1"]))

def apply_along_dataset(scs, dsets, fn):
    scs_xfmd = np.zeros(scs.shape)
    for dset in np.unique(dsets):
        scs_xfmd[dsets == dset,:] = np.apply_along_axis(fn, 0, scs[dsets == dset,:])
    return(scs_xfmd)

def apply_along_individual(scs, fn):
    scs_xfmd = np.zeros(scs.shape)

def ptr(x):
    x_ch = copy.deepcopy(x)
    nz = x[x != 0]
    x_rank = rankdata(nz)*2/(len(nz) + 1)
    x_ch[x_ch != 0] = x_rank
    if (np.min(x_ch) != np.max(x_ch)):
        x_ch = (x_ch - np.min(x_ch))/(np.max(x_ch) - np.min(x_ch))
    return(x_ch)

In [None]:
basepath = '/data/corr/'
pheno_basepath = '/data/corr/phenotypic/'
datasets = os.listdir(basepath)
datasets.remove("phenotypic")
print(datasets)

In [4]:
fmri_dict = {}
pheno_dat = {}

for i, dataset in enumerate(datasets):
    try:
        pheno_dat[dataset] = pd.read_csv('{}{}_phenotypic_data.csv'.format(pheno_basepath, dataset))
        scan_dict = {}
        sex_dict = []
        age_dict = []
        dset_dir = os.path.join('{}{}/graphs/FSL_nff_nsc_gsr_des'.format(basepath, dataset), '*.ssv')
        for f in glob.glob(dset_dir):
            gr_dat = gp.utils.import_edgelist(f)
            sub = get_sub(f)
            scan_dict[sub] = gr_dat.flatten()
            scansub = sub.split('_')
            sex_dict.append(get_sub_pheno_dat(scansub[1], scansub[2], pheno_dat[dataset]))
            age_dict.append(get_age_pheno_dat(scansub[1], scansub[2], pheno_dat[dataset]))
        fmri_dict[dataset] = {}
        fmri_dict[dataset]["scans"] = np.vstack(list(scan_dict.values()))
        fmri_dict[dataset]["subs"] = list(scan_dict.keys())
        fmri_dict[dataset]["sex"] = sex_dict
        fmri_dict[dataset]["age"] = age_dict
        fmri_dict[dataset]["dataset"] = [i + 1 for j in range(0, fmri_dict[dataset]["scans"].shape[0])]
    except Exception as e:
        print("Error in {} Dataset.".format(dataset))
        print(e)

In [5]:
def run_experiment(row):
    try:
        ds1 = row[0]; ds2 = row[1]; sxfm=row[2]; dxfm = row[3]
        scans = np.vstack((fmri_dict[ds1]["scans"], fmri_dict[ds2]["scans"]))
        scans = scans[:,~np.all(scans == 0, axis=0)]
        sex = np.array(fmri_dict[ds1]["sex"] + fmri_dict[ds2]["sex"])
        age = np.array(fmri_dict[ds1]["age"] + fmri_dict[ds2]["age"])
        datasets = np.array([1 for i in range(0, fmri_dict[ds1]["scans"].shape[0])] + [2 for i in range(0, fmri_dict[ds2]["scans"].shape[0])])
        if sxfm == "ptr":
            scans = np.apply_along_axis(ptr, 1, scans)
        if dxfm == "raw":
            scans = scans
        elif dxfm == "zscore":
            scans = apply_along_dataset(scans, datasets, zscore)
        elif dxfm == "ptr":
            scans = apply_along_dataset(scans, datasets, ptr)
        elif dxfm == "combat":
            scans = np.array(combat(pd.DataFrame(scans.T), datasets, model=None, numerical_covariates=None)).T
        try:
            eff_batch = KSample("DCorr").test(scans[datasets == 1,:], scans[datasets == 2,:])
        except:
            eff_batch = (None, None)
        try:
            eff_sex = KSample("DCorr").test(scans[sex == 1,:], scans[sex == 2,:])
        except:
            eff_sex = (None, None)
        try:
            eff_age = DCorr().test(scans, age)
        except:
            eff_age = (None, None)
    except:
        eff_batch = (None, None)
        eff_sex = (None, None)
        eff_age = (None, None)
    return (row[0], row[1], row[2], eff_batch[0], eff_batch[1], eff_sex[0], eff_sex[1])

# Experiments

## Effects

In [6]:
ncores = 6
client = Client(threads_per_worker=1, n_workers=ncores)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 40809 instead
  http_address["port"], self.http_server.port


0,1
Client  Scheduler: tcp://127.0.0.1:44643  Dashboard: http://127.0.0.1:40809/status,Cluster  Workers: 6  Cores: 6  Memory: 33.51 GB


In [None]:
exps = []
datasets = list(fmri_dict.keys())
for sxfm in ["raw", "ptr"]:
    for i, ds1 in enumerate(datasets):
        for j in range(i+1, len(datasets)):
            for dxfm in ["raw", "ptr", "zscore", "combat"]:
                exps.append([ds1, datasets[j], sxfm, dxfm])
sim_exps = pd.DataFrame(exps, columns=["Dataset1", "Dataset2", "Sxfm", "Dxfm"])
print(sim_exps.head())

In [None]:
sim_exps = ddf.from_pandas(sim_exps, npartitions=ncores)
sim_results = sim_exps.apply(lambda x: run_experiment(x), axis=1, result_type='expand',
                             meta={0: str, 1: str, 2: str, 3: float, 4: float, 5: float, 6: float})
sim_results

In [None]:
sim_results = sim_results.compute(scheduler="multiprocessing")
sim_results = sim_results.rename(columns={0: "Dataset1", 1: "Dataset2", 2: "Transform", 3: "Effect.Batch",
                                          4: "pvalue.Batch", 5: "Effect.Sex", 6: "pvalue.Sex"})
sim_results.to_csv('./data/batch_results.csv')

## Preservation of Network Statistics

In [7]:
def diag_edges(n):
    """
    A function for generating diagonal SIEM edge communities.
    """
    m = int(n/2)
    edge_comm = np.zeros((n,n))
    for i in range(n):
        for j in range(n):
            if (i == j + m) or (j == i + m):
                edge_comm[i,j] = 1
            else:
                edge_comm[i,j] = 2
    return edge_comm

des_diag = diag_edges(70)

def modular_edges(n):
    """
    A function for generating modular sbm edge communities.
    """
    m = int(n/2)
    edge_comm = np.zeros((n,n))
    for i in range(n):
        for j in range(n):
            if ( (i<m) & (j<m)) or ( (i>=m ) & (j>=m) ):
                edge_comm[i,j] = 1
            else:
                edge_comm[i,j] = 2   
    return edge_comm

des_modular = modular_edges(70)

def mww(G, C):
    A = G[C == 1]
    B = G[C == 2]
    return(mannwhitneyu(A, B, alternative='greater')[1])

In [85]:
dset_ls = [fmri_dict[ds]["scans"] for ds in fmri_dict.keys()]
raw_dat = np.vstack(dset_ls)
datasets = np.array([j for ds in fmri_dict.keys() for j in fmri_dict[ds]["dataset"]])
# get the subject ids and dataset ids as a big list
subjects = np.array([j for ds in fmri_dict.keys() for j in fmri_dict[ds]["subs"]])

def prepare_aggregate_data(scans, datasets):
    newdat = {}
    newdat["raw"] = copy.deepcopy(scans)
    # copy the raw data over
    newdat["zscore"] = copy.deepcopy(scans)
    newdat["ptr"] = copy.deepcopy(scans)
    newdat["combat"] = copy.deepcopy(scans)

    # remove stationary edges for combat
    combat_rem_edges = ~np.all(newdat["combat"] == 0, axis=0)

    # apply relevant transforms en-masse
    newdat["zscore"] = apply_along_dataset(newdat["zscore"], datasets, zscore)
    # replace nans with zeros
    newdat["zscore"][np.isnan(newdat["zscore"])] = 0
    newdat["ptr"] = apply_along_dataset(newdat["ptr"], datasets, ptr)
    newdat["combat"][:,combat_rem_edges] = np.array(combat(pd.DataFrame(newdat["combat"][:,combat_rem_edges].T), datasets, model=None, numerical_covariates=None)).T
    return(newdat)

data_preproc = {}
data_preproc["raw"] = prepare_aggregate_data(raw_dat, datasets)
data_preproc["ptr"] = prepare_aggregate_data(np.apply_along_axis(ptr, 1, raw_dat), datasets)

found 2 batches
found 0 numerical covariates...
found 0 categorical variables:	
Standardizing Data across genes.
Fitting L/S model and finding priors
Finding parametric adjustments


Adjusting data


found 2 batches
found 0 numerical covariates...
found 0 categorical variables:	
Standardizing Data across genes.
Fitting L/S model and finding priors
Finding parametric adjustments


Adjusting data


In [87]:
exps = []

for i, sub in enumerate(subjects):
    sspl = sub.split('_')
    dset = sspl[0]
    for sxfm in ["raw", "ptr"]:
        for dxfm in ["raw", "zscore", "ptr", "combat"]:
            exps.append([dset, sspl[1], sspl[2], i, sub, sxfm, dxfm])
sim_exps = pd.DataFrame(exps, columns=["Dataset", "Subject", "Retest", "Ix", "Fullname", "Sxfm", "Dxfm"])
print(sim_exps.head())

  Dataset  Subject Retest  Ix        Fullname Sxfm    Dxfm
0    BNU1  0025880      2   0  BNU1_0025880_2  raw     raw
1    BNU1  0025880      2   0  BNU1_0025880_2  raw  zscore
2    BNU1  0025880      2   0  BNU1_0025880_2  raw     ptr
3    BNU1  0025880      2   0  BNU1_0025880_2  raw  combat
4    BNU1  0025880      2   0  BNU1_0025880_2  ptr     raw


In [88]:
def singlegraph_exp(row):
    # grab data, and reshape it to nv x nv matrix
    flat_gr = data_preproc[row[5]][row[6]][subjects == row[4]][0,:]
    nv = int(np.sqrt(np.max(flat_gr.shape)))
    exp_gr = flat_gr.reshape((nv, nv))
    G = nx.from_numpy_matrix(exp_gr)
    cc = nx.average_clustering(G, weight="weight")
    deg = np.array(list(dict(G.degree(weight="weight")).values())).mean()
    homophil_stat = mww(exp_gr, des_modular)
    homotop_stat = mww(exp_gr, des_diag)
    return(row[0], row[1], row[2], row[3], row[4], row[5], row[6], cc, deg, homophil_stat, homotop_stat)

In [89]:
sim_exps = ddf.from_pandas(sim_exps, npartitions=ncores)
sim_results = sim_exps.apply(lambda x: singlegraph_exp(x), axis=1, result_type='expand',
                             meta={0: str, 1: str, 2: str, 3:str, 4:str, 5:str, 6:str,
                                   7: float, 8: float, 9: float, 10: float})
sim_results

Unnamed: 0_level_0,0,1,2,3,4,5,6,7,8,9,10
npartitions=6,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
0,object,object,object,object,object,object,object,float64,float64,float64,float64
534,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...
2670,...,...,...,...,...,...,...,...,...,...,...
3199,...,...,...,...,...,...,...,...,...,...,...


In [90]:
sim_results = sim_results.compute(scheduler="multiprocessing")
sim_results = sim_results.rename(columns={0: "Dataset", 1: "Subject", 2: "Retest", 3: "Ix",
                                          4: "Fullname", 5: "Sxfm", 6: "Dxfm", 7: "Clustering",
                                          8: "Degree", 9: "Homophilic", 10: "Homotopic"})
sim_results.to_csv('./data/batch_statistics.csv')

In [91]:
sim_results

Unnamed: 0,Dataset,Subject,Retest,Ix,Fullname,Sxfm,Dxfm,Clustering,Degree,Homophilic,Homotopic
0,BNU1,0025880,2,0,BNU1_0025880_2,raw,raw,0.175662+0.000000j,12.861285,0.874698,0.926181
1,BNU1,0025880,2,0,BNU1_0025880_2,raw,zscore,0.127331+0.063867j,0.202158,0.388454,0.803617
2,BNU1,0025880,2,0,BNU1_0025880_2,raw,ptr,0.427748+0.000000j,34.424098,0.950291,0.645351
3,BNU1,0025880,2,0,BNU1_0025880_2,raw,combat,0.179922+0.001882j,13.343396,0.892313,0.943219
4,BNU1,0025880,2,0,BNU1_0025880_2,ptr,raw,0.426502+0.000000j,34.510715,0.874698,0.926181
...,...,...,...,...,...,...,...,...,...,...,...
3195,HNU1,0025435,10,399,HNU1_0025435_10,raw,combat,0.178570+0.000692j,13.078171,0.996401,0.002714
3196,HNU1,0025435,10,399,HNU1_0025435_10,ptr,raw,0.425659+0.000000j,34.510715,0.997091,0.003268
3197,HNU1,0025435,10,399,HNU1_0025435_10,ptr,zscore,0.214425+0.110408j,0.551420,0.956672,0.000062
3198,HNU1,0025435,10,399,HNU1_0025435_10,ptr,ptr,0.427754+0.000000j,34.566747,0.999482,0.000037
