In [None]:
import numpy as np 
import scipy as sp
import pandas as pd
from datetime import datetime
from utils import *
from cpm import *

import sys
import os
import json

import matplotlib.pyplot as plt
from matplotlib import cm
from pylab import *
import glob
from mpl_toolkits.axes_grid1 import make_axes_locatable
from scipy import stats
from matplotlib.patches import Patch
from matplotlib.lines import Line2D
rc('axes',linewidth=2)
alpha=0.7

In [None]:
read_path = '/path/to/CPM_outputs/output_folder'
json_path = '/path/to/CPM_json/settings.json'

In [None]:
with open(json_path) as json_data:
    data = json.load(json_data)

In [None]:
t = data['t']
k = data['k']
p_thresh = data['p_thresh']
repeat = data['repeat']
num_iter = data['num_iter']
mat_path = data['mat_path']
mat_name = data['mat_name']  # should contains behav_name, num_rois, and num_contrasts
zscore = data['zscore']
mode = data['mode']
y_norm = data['y_norm']

In [None]:
data

In [None]:
"""import pickle
filename = '{}/ymodel_{}.pkl'.format(mat_path, mat_name[:-4])
transformer = pickle.load(open(filename, 'rb'))
"""
transformer = FunctionTransformer()  # identity function
i=1
df_tmp = pd.read_csv('{}/y_prediction_iter{}.csv'.format(read_path,i))
y_actual = df_tmp['y_actual'].to_numpy()
transformer.fit(y_actual)
print(y_actual.shape)

y_actual = transformer.inverse_transform(y_actual.reshape(-1,1)).flatten()
y_actual

In [None]:
transformer

### Calculate r

In [None]:
lst_of_i = list(range(1, repeat+num_iter+1))

In [None]:
r_pos = []
r_neg = []
r_both = []
for i in lst_of_i:
    df_tmp = pd.read_csv('{}/y_prediction_iter{}.csv'.format(read_path,i))
    y_actual = df_tmp['y_actual'].to_numpy()
    y_actual = transformer.inverse_transform(y_actual.reshape(-1,1)).flatten()
    y_pos = df_tmp['y_pred_pos'].to_numpy()
    y_pos = transformer.inverse_transform(y_pos.reshape(-1,1)).flatten()
    y_neg = df_tmp['y_pred_neg'].to_numpy()
    y_neg = transformer.inverse_transform(y_neg.reshape(-1,1)).flatten()
    y_both = df_tmp['y_pred_both'].to_numpy()
    y_both = transformer.inverse_transform(y_both.reshape(-1,1)).flatten()
    
    r_pos.append(stats.spearmanr(y_pos, y_actual, nan_policy='propagate')[0])
    r_neg.append(stats.spearmanr(y_neg, y_actual, nan_policy='propagate')[0])
    r_both.append(stats.spearmanr(y_both, y_actual, nan_policy='propagate')[0])

In [None]:
r_pos_null = r_pos[repeat:]
r_pos_true = r_pos[:repeat]
r_neg_null = r_neg[repeat:]
r_neg_true = r_neg[:repeat]
r_both_null = r_both[repeat:]
r_both_true = r_both[:repeat]

In [None]:
# save all rs
with open('{}/r_pos.txt'.format(read_path),'w') as f:
    for r in r_pos:
        f.write('{}\n'.format(r))

with open('{}/r_neg.txt'.format(read_path),'w') as f:
    for r in r_neg:
        f.write('{}\n'.format(r))

with open('{}/r_both.txt'.format(read_path),'w') as f:
    for r in r_both:
        f.write('{}\n'.format(r))

In [None]:
# save null model rs
with open('{}/r_pos_null.txt'.format(read_path),'w') as f:
    for r in r_pos_null:
        f.write('{}\n'.format(r))

with open('{}/r_neg_null.txt'.format(read_path),'w') as f:
    for r in r_neg_null:
        f.write('{}\n'.format(r))

with open('{}/r_both_null.txt'.format(read_path),'w') as f:
    for r in r_both_null:
        f.write('{}\n'.format(r))

In [None]:
# Check for NaN
r_neg = np.array(r_neg)
r_pos = np.array(r_pos)
r_both = np.array(r_both)
r_neg_true = np.array(r_neg_true)
r_pos_true = np.array(r_pos_true)
r_both_true = np.array(r_both_true)
r_neg_null = np.array(r_neg_null)
r_pos_null = np.array(r_pos_null)
r_both_null = np.array(r_both_null)

if np.isnan(r_neg).any():
    print('r_neg contains NaNs.')
    if np.isnan(r_neg_true).any():
        print('WARNING: NaNs detected in true behavioral data iterations (neg).')

if np.isnan(r_pos).any():
    print('r_pos contains NaNs.')
    if np.isnan(r_pos_true).any():
        print('WARNING: NaNs detected in true behavioral data iterations (pos).')

if np.isnan(r_both).any():
    print('r_both contains NaNs.')
    if np.isnan(r_both_true).any():
        print('WARNING: NaNs detected in true behavioral data iterations (both).')

print("Mean true correlation: pos {}, neg {}, both {}".format(np.nanmean(r_pos_true),np.nanmean(r_neg_true),np.nanmean(r_both_true)))

In [None]:
r_neg_null = r_neg_null[~np.isnan(r_neg_null)]
r_pos_null = r_pos_null[~np.isnan(r_pos_null)]
r_both_null = r_both_null[~np.isnan(r_both_null)]

print("Number of entries left in null model: pos {}, neg {}, both {}".format(len(r_pos_null),len(r_neg_null),len(r_both_null)))

In [None]:
# One-tailed test
if np.nanmean(r_pos_true) >= 0:
    p_pos_onetail = np.sum(r_pos_null >= np.nanmean(r_pos_true))/len(r_pos_null)
else:
    print("WARNING: pos - correlation is not positive (this should not happen often)?")
    p_pos_onetail = np.sum(r_pos_null <= np.nanmean(r_pos_true))/len(r_pos_null)

if np.nanmean(r_neg_true) >= 0:
    p_neg_onetail = np.sum(r_neg_null >= np.nanmean(r_neg_true))/len(r_neg_null)
else:
    print("WARNING: neg - correlation is not positive (this should not happen often)?")
    p_neg_onetail = np.sum(r_neg_null <= np.nanmean(r_neg_true))/len(r_neg_null)
#p_neg_onetail = np.nan

if np.nanmean(r_both_true) >= 0:
    p_both_onetail = np.sum(r_both_null >= np.nanmean(r_both_true))/len(r_both_null)
else:
    print("WARNING: both - correlation is not positive (this should not happen often)?")
    p_both_onetail = np.sum(r_both_null <= np.nanmean(r_both_true))/len(r_both_null)

In [None]:
# Two-tailed test
r_pos_null_abs = np.abs(r_pos_null)
p_pos_twotail = np.sum(r_pos_null_abs >= np.abs(np.nanmean(r_pos_true)))/len(r_pos_null_abs)

r_neg_null_abs = np.abs(r_neg_null)
p_neg_twotail = np.sum(r_neg_null_abs >= np.abs(np.nanmean(r_neg_true)))/len(r_neg_null_abs)
#p_neg_twotail = np.nan

r_both_null_abs = np.abs(r_both_null)
p_both_twotail = np.sum(r_both_null_abs >= np.abs(np.nanmean(r_both_true)))/len(r_both_null_abs)

In [None]:
print(p_pos_onetail, p_neg_onetail, p_both_onetail)
print(p_pos_twotail, p_neg_twotail, p_both_twotail)

In [None]:
df_p = pd.DataFrame(columns=['#iter available in null', 'True Spearman r (mean)','True Spearman r (std)', 'one-tail p','two-tail p'],index=['pos','neg','both'])
df_p['#iter available in null'] = [len(r_pos_null),len(r_neg_null),len(r_both_null)]
df_p['True Spearman r (mean)'] = [np.nanmean(r_pos_true),np.nanmean(r_neg_true),np.nanmean(r_both_true)]
df_p['True Spearman r (std)'] = [np.nanstd(r_pos_true),np.nanstd(r_neg_true),np.nanstd(r_both_true)]
df_p['one-tail p'] = [p_pos_onetail, p_neg_onetail, p_both_onetail]
df_p['two-tail p'] = [p_pos_twotail, p_neg_twotail, p_both_twotail]

In [None]:
df_p

In [None]:
df_p.to_excel('{}/evaluation_with_perm.xlsx'.format(read_path))

In [None]:
np.nanmean(r_pos_true)

In [None]:
len(r_pos_null)

### Plot boxplot

In [None]:
df = pd.read_excel('{}/evaluation_with_perm.xlsx'.format(read_path),index_col=0)
df

In [None]:
with open('{}/r_pos.txt'.format(read_path),'r') as f:
    r_pos = f.read().splitlines()

with open('{}/r_neg.txt'.format(read_path),'r') as f:
    r_neg = f.read().splitlines()

with open('{}/r_both.txt'.format(read_path),'r') as f:
    r_both = f.read().splitlines()

r_pos = np.array(r_pos)
r_neg = np.array(r_neg)
r_both = np.array(r_both)
r_pos = r_pos.astype(np.float)
r_neg = r_neg.astype(np.float)
r_both = r_both.astype(np.float)

In [None]:
df_r = pd.DataFrame(columns=['r_pos','r_neg','r_both'],index=range(1,repeat+num_iter+1))
df_r['r_pos']=r_pos
df_r['r_neg']=r_neg
df_r['r_both']=r_both

In [None]:
df_r.to_excel('{}/df_r.xlsx'.format(read_path))

In [None]:
r_pos_null = r_pos[repeat:]
r_pos_true = r_pos[:repeat]
r_neg_null = r_neg[repeat:]
r_neg_true = r_neg[:repeat]
r_both_null = r_both[repeat:]
r_both_true = r_both[:repeat]

In [None]:
len(r_pos_true)

In [None]:
if np.isnan(r_neg).any():
    print('r_neg contains NaNs.')
    if np.isnan(r_neg_true).any():
        print('WARNING: NaNs detected in true behavioral data iterations (neg).')

if np.isnan(r_pos).any():
    print('r_pos contains NaNs.')
    if np.isnan(r_pos_true).any():
        print('WARNING: NaNs detected in true behavioral data iterations (pos).')

if np.isnan(r_both).any():
    print('r_both contains NaNs.')
    if np.isnan(r_both_true).any():
        print('WARNING: NaNs detected in true behavioral data iterations (both).')

In [None]:
r_neg_null = r_neg_null[~np.isnan(r_neg_null)]
r_pos_null = r_pos_null[~np.isnan(r_pos_null)]
r_both_null = r_both_null[~np.isnan(r_both_null)]

r_neg_true = r_neg_true[~np.isnan(r_neg_true)]
r_pos_true = r_pos_true[~np.isnan(r_pos_true)]
r_both_true = r_both_true[~np.isnan(r_both_true)]

print("Number of entries left in null model: pos {}, neg {}, both {}".format(len(r_pos_null),len(r_neg_null),len(r_both_null)))
print("Number of entries left in true model: pos {}, neg {}, both {}".format(len(r_pos_true),len(r_neg_true),len(r_both_true)))

In [None]:
df

In [None]:
df_boxplot = pd.DataFrame(index=['pos','neg','both'],columns=['null model size','true model size','r mean','one-tail p mean','r median','one-tail p median'])
df_boxplot['null model size'] = [len(r_pos_null),len(r_neg_null),len(r_both_null)]
df_boxplot['true model size'] = [len(r_pos_true),len(r_neg_true),len(r_both_true)]
df_boxplot['r mean'] = [np.mean(r_pos_true),np.mean(r_neg_true),np.mean(r_both_true)]
df_boxplot['r median'] = [np.median(r_pos_true),np.median(r_neg_true),np.median(r_both_true)]
df_boxplot

In [None]:
# One-tailed test
if np.mean(r_pos_true) >= 0:
    p_pos_onetail = np.sum(r_pos_null >= np.mean(r_pos_true))/len(r_pos_null)
else:
    print("WARNING: pos - correlation is not positive (this should not happen often)?")
    p_pos_onetail = np.sum(r_pos_null <= np.mean(r_pos_true))/len(r_pos_null)

if np.mean(r_neg_true) >= 0:
    p_neg_onetail = np.sum(r_neg_null >= np.mean(r_neg_true))/len(r_neg_null)
else:
    print("WARNING: neg - correlation is not positive (this should not happen often)?")
    p_neg_onetail = np.sum(r_neg_null <= np.mean(r_neg_true))/len(r_neg_null)
#p_neg_onetail = np.nan

if np.mean(r_both_true) >= 0:
    p_both_onetail = np.sum(r_both_null >= np.mean(r_both_true))/len(r_both_null)
else:
    print("WARNING: both - correlation is not positive (this should not happen often)?")
    p_both_onetail = np.sum(r_both_null <= np.mean(r_both_true))/len(r_both_null)

In [None]:
df_boxplot['one-tail p mean'] = [p_pos_onetail, p_neg_onetail, p_both_onetail]
df_boxplot

In [None]:
# One-tailed test
if np.median(r_pos_true) >= 0:
    p_pos_onetail = np.sum(r_pos_null >= np.median(r_pos_true))/len(r_pos_null)
else:
    print("WARNING: pos - correlation is not positive (this should not happen often)?")
    p_pos_onetail = np.sum(r_pos_null <= np.median(r_pos_true))/len(r_pos_null)

if np.median(r_neg_true) >= 0:
    p_neg_onetail = np.sum(r_neg_null >= np.median(r_neg_true))/len(r_neg_null)
else:
    print("WARNING: neg - correlation is not positive (this should not happen often)?")
    p_neg_onetail = np.sum(r_neg_null <= np.median(r_neg_true))/len(r_neg_null)
#p_neg_onetail = np.nan

if np.median(r_both_true) >= 0:
    p_both_onetail = np.sum(r_both_null >= np.median(r_both_true))/len(r_both_null)
else:
    print("WARNING: both - correlation is not positive (this should not happen often)?")
    p_both_onetail = np.sum(r_both_null <= np.median(r_both_true))/len(r_both_null)

In [None]:
df_boxplot['one-tail p median'] = [p_pos_onetail, p_neg_onetail, p_both_onetail]
df_boxplot

In [None]:
df_boxplot.to_excel('{}/df_boxplot.xlsx'.format(read_path))

In [None]:
all_data = [r_pos_true, r_neg_true, r_both_true]
labels = ['pos', 'neg', 'both']
lw=1.2
boxprops = dict(linestyle='-', linewidth=lw)
capprops = dict(linestyle='-', linewidth=lw)
whiskerprops = dict(linestyle='-', linewidth=lw)
medianprops = dict(linestyle='-', linewidth=lw, color='darkorange')
meanprops = dict(marker='o',markerfacecolor='lightgrey', markersize=5,linestyle='-',markeredgecolor='black',lw=lw)

fig, ax = plt.subplots(figsize=(8,4))
bplot = ax.boxplot(all_data,vert=True,whis=(0,100),labels=labels, patch_artist=True, showmeans=True,
                   boxprops=boxprops,capprops=capprops,whiskerprops=whiskerprops,medianprops=medianprops,meanprops=meanprops) 
# whiskers covers the whole range of the data
# box extends from the lower to upper quartile values of the data, with a line at the median
ax.set_title('Box plot of r_true',fontsize=16)
plt.xticks(fontsize=16)
plt.yticks(fontsize=14)
# fill with colors
colors = ['#91C4F2', '#8CA0D7', '#9D79BC', '']
for patch, color in zip(bplot['boxes'], colors):
    patch.set_facecolor(color)

# adding horizontal grid lines
ax.yaxis.grid(True)
ax.set_ylabel('Spearman $\it{rho}$',fontsize=16)
    

legend_elements = [Line2D([0], [0], color='darkorange', lw=lw, label='Median'),
                   Line2D([0], [0], marker='o', color='white', markerfacecolor='lightgrey',label='Mean',markersize=5,linestyle='-',markeredgecolor='black',lw=lw),
                   Patch(facecolor='lightgrey', edgecolor='black', lw=lw,
                         label='lower to upper quartile'),
                  Line2D([0], [0], color='black', lw=lw, label='Range',solid_capstyle='butt')]

ax.legend(handles=legend_elements, loc='lower left')


#plt.show()
plt.savefig('{}/boxplot.png'.format(read_path),dpi=300,bbox_inches="tight")

In [None]:
max(r_pos_true)