In [None]:
import numpy as np
import pandas as pd
import pymc3 as pm

from matplotlib import pyplot as plt
#import time
#from tqdm.notebook import tqdm
from theano import tensor as tt
from IPython.core.pylabtools import figsize
import os
from datetime import datetime
from sklearn.preprocessing import StandardScaler

In [None]:
def save_fig(fig_id,IMAGES_PATH, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

class MMS():
    def __init__(self,LB,UB): 
        self.LB = LB
        self.UB = UB
    def fit(self,x): return (x-self.LB)/(self.UB-self.LB)

Outputpath = os.path.join("Results",datetime.strftime(datetime.now(),"%y%m%d"))
os.makedirs(Outputpath,exist_ok=True)

In [None]:
rawdata = pd.read_csv("RawData.csv")
data = rawdata.copy()

TestScoreMax = data["TestScore"].max()
TestScoreMin = data["TestScore"].min()
coutWord = 0
CogVarList = []
for n in list(data.columns):
    if "WordCat" in n: 
        coutWord += 1
        CogVarList.append(n)

Cogobdata = data[CogVarList].values.astype("int32")


data.describe()

In [None]:
from scipy.stats import pearsonr
print("Correlation: {:.2f}\
    , p-value: {:.4f}".format(pearsonr(\
        data["TestScore"]\
            ,data[CogVarList].sum(axis=1)/38\
                )[0]\
                    ,pearsonr(data["TestScore"]\
                        ,data[CogVarList].sum(axis=1)/38\
                            )[1]\
                                )\
                                    )
figsize(10,10)
plt.scatter(data["TestScore"],data[CogVarList].sum(axis=1)/38)

In [None]:
print("Median: {:.2f}, Mean: {:.2f}".format(np.median(data["TestScore"]),np.mean(data["TestScore"])))
figsize(10,10)
histdata = plt.hist(data["TestScore"],color="grey")
for v,k in zip(histdata[0],histdata[1]):
    plt.text(k+0.5,v,round(v),fontdict={"size":15})
plt.savefig(os.path.join(Outputpath,"KnDis.png"))

In [None]:
from collections import Counter
histdata = plt.hist((data[CogVarList].sum(axis=0)/200).values,color = "grey")


for v,k in zip(histdata[0],histdata[1]):
    plt.text(k+0.01,v,v,fontdict={"size":15})

plt.savefig(os.path.join(Outputpath,"ChoiceDistr.png"))
plt.show()
plt.close()

In [None]:
with pm.Model() as WordCatModel:
    TurnP = pm.DiscreteUniform("TurnP",lower=TestScoreMin,upper= TestScoreMax)
    Groupmu = pm.Normal("Groupmu",mu=0,sd=10,shape = 2)
    GroupFlag = (data["TestScore"].values >= TurnP)

    realmu = pm.math.switch(GroupFlag,Groupmu[0],Groupmu[1])
    realmu = tt.reshape(realmu,(data.shape[0],1))

    groupSigma = pm.HalfCauchy("groupSigma",beta = 10)
    CogP = pm.Normal("CogP",mu=realmu,sd=groupSigma,shape = (data.shape[0],1))

    
    sigmas = pm.HalfCauchy("sigmas",beta = 10,shape= (1,coutWord))
    WordCat = pm.Normal("WordCat",mu = CogP,sigma = sigmas,shape=data[CogVarList].shape)
    
    diff = pm.Deterministic("diff",Groupmu[0]-Groupmu[1])


    probs = pm.Deterministic("probs",pm.math.sigmoid(WordCat))
    res = pm.Bernoulli("res",p = probs,observed = Cogobdata)

In [None]:
InputPath = ""
InferFlag = True # False#
if InferFlag:
    with WordCatModel:
        WordCattrace = pm.sample(nuts={'target_accept':0.95},chains=10,return_inferencedata=False,random_seed=220626)
        pm.save_trace(WordCattrace,os.path.join(Outputpath,"WordCattrace"),overwrite=True)
else:
    with WordCatModel:
        WordCattrace = pm.load_trace(os.path.join(InputPath,"WordCattrace"))


In [None]:
from sklearn.metrics import roc_curve,recall_score,f1_score
with WordCatModel:
    spp = pm.sample_posterior_predictive(WordCattrace)
ytrue,yhat = Cogobdata.flatten(),spp["res"].mean(axis=0).flatten()
print("recall: {:.3f}".format(recall_score(ytrue,yhat>0.5)))
print("Fscore: {:.3f}".format(f1_score(ytrue,yhat>0.5)))
fpr, tpr, thresholds = roc_curve(ytrue,yhat)
figsize(10,10)
plt.plot(fpr, tpr,lw=3,color="royalblue")
plt.plot([0,1],[0,1],lw=3,color = "grey",linestyle="--",alpha=0.75)
plt.plot([0,0],[0,1],lw=3,color = "brown",linestyle="--",alpha=0.75)
plt.plot([0,1],[1,1],lw=3,color = "brown",linestyle="--",alpha=0.75)

plt.xlabel('FPR: False positive rate')
plt.ylabel('TPR: True positive rate')
plt.grid()
plt.savefig(os.path.join(Outputpath,"CogPat_Metric.png"))
plt.show()
plt.close()

In [None]:
import seaborn as sns
sns.set_style("white")
Var = "Groupmu"
DiffVar = "diff"
CI = 0.95
trace= WordCattrace
BurnRatio = 0.75


BurnedN = int(len(trace[Var])*BurnRatio)

mu = np.mean(trace[DiffVar][BurnedN:])
lb = np.quantile(trace[DiffVar][BurnedN:],(1-CI)/2)
ub = np.quantile(trace[DiffVar][BurnedN:],CI+(1-CI)/2)
print("Difference mean:{:.3f}, {:.1f}% CI: {:.3f}~{:.3f}".format(mu,CI*100,lb,ub))

if len(trace[Var].shape) == 1: variable = np.expand_dims(trace["diff"],axis=1).transpose()
else: variable = trace[Var].transpose()

ColorList = ["brown","royalblue"]
LabelList = ["High-score group","Low-score group"]
figsize(10,10)
for i in range(variable.shape[0]):
    sns.distplot(variable[i][BurnedN:],color=ColorList[i],label=LabelList[i],kde=True)
    mu = np.mean(variable[i][BurnedN:])
    lb = np.quantile(variable[i][BurnedN:],(1-CI)/2)
    ub = np.quantile(variable[i][BurnedN:],CI+(1-CI)/2)
    print("{} mean:{:.3f}, {:.1f}% CI: {:.3f}~{:.3f}".format(Var+"_"+str(i),mu,CI*100,lb,ub))
    
plt.legend(fontsize = 15)
plt.savefig(os.path.join(Outputpath,"CogDiff.png"))
plt.show()
plt.close()

In [None]:
from collections import Counter
trace= WordCattrace
BurnRatio = 0.75
BurnedN = int(len(trace["diff"])*BurnRatio)
bardata = Counter(trace["TurnP"][BurnedN:])
plt.bar(list(bardata.keys()),[i/(len(trace["diff"])-BurnedN) for i in list(bardata.values())],color = "grey")
plt.xticks(list(bardata.keys()),list(bardata.keys()))

for k,v in zip(list(bardata.keys()),[i/(len(trace["diff"])-BurnedN) for i in list(bardata.values())]):
    plt.text(k-0.2,v,np.round(v,3),fontdict={"size":15})

plt.savefig(os.path.join(Outputpath,"KnThreshold.png"))
plt.show()
plt.close()

In [None]:
from scipy.stats import ttest_ind

print(ttest_ind(data.loc[data["Gender"]==1,"TestScore"]\
    ,data.loc[data["Gender"]==2,"TestScore"]))
data.groupby("Gender")["TestScore"].mean()

In [None]:
from scipy.stats import pearsonr
pearsonr(data["Age"],data["TestScore"])

In [None]:
ControlList = ["Age","Gender"]
Xvar ="TestScore"
with pm.Model() as FullModel:
    TurnP = pm.DiscreteUniform("TurnP",lower=TestScoreMin,upper= TestScoreMax)
    constants = pm.Normal("constants",mu=0,sd=100,shape = 2)

    GroupFlag = (data[Xvar].values >= TurnP)

    realbetas = pm.Normal("realbetas",mu = 0, sd = 100)
    realconstants = pm.Deterministic("realconstants",pm.math.switch(GroupFlag,constants[0],constants[1])) 
    
    controlbetas = pm.Normal("controlbetas",mu = 0, sd = 100, shape=len(ControlList))

    mu = realconstants + realbetas*data[Xvar].values
    
    for i, cv in enumerate(ControlList):
        mu += controlbetas[i] * data[cv].values

    mu = tt.reshape(mu,(data.shape[0],1))
    
    sigmas = pm.HalfCauchy("sigmas",beta = 10,shape= (1,coutWord))
    WordCat = pm.Normal("WordCat",mu = mu,sigma = sigmas,shape=data[CogVarList].shape)
    
    constantdiff = pm.Deterministic("contantdiff",constants[0]-constants[1])

    probs = pm.math.sigmoid(WordCat)
    res = pm.Bernoulli("res",p = probs,observed = Cogobdata)

In [None]:
InputPath = ""
InferFlag = True #  False# 
if InferFlag:
    with FullModel:
        Fulltrace = pm.sample(nuts={'target_accept':0.95},chains=10,return_inferencedata=False,random_seed=220626)
        pm.save_trace(Fulltrace,os.path.join(Outputpath,"Fulltrace"),overwrite=True)
else:
    with FullModel:
        Fulltrace = pm.load_trace(os.path.join(InputPath,"Fulltrace"))

In [None]:
from sklearn.metrics import roc_curve,recall_score,f1_score
with FullModel:
    spp = pm.sample_posterior_predictive(Fulltrace)
ytrue,yhat = Cogobdata.flatten(),spp["res"].mean(axis=0).flatten()
print("recall: {:.3f}".format(recall_score(ytrue,yhat>0.5)))
print("Fscore: {:.3f}".format(f1_score(ytrue,yhat>0.5)))
fpr, tpr, thresholds = roc_curve(ytrue,yhat)
figsize(10,10)
plt.plot(fpr, tpr,lw=3,color="royalblue")
plt.plot([0,1],[0,1],lw=3,color = "grey",linestyle="--",alpha=0.75)
plt.plot([0,0],[0,1],lw=3,color = "brown",linestyle="--",alpha=0.75)
plt.plot([0,1],[1,1],lw=3,color = "brown",linestyle="--",alpha=0.75)

plt.xlabel('FPR: False positive rate')
plt.ylabel('TPR: True positive rate')
plt.grid()
plt.savefig(os.path.join(Outputpath,"FullModel_Metric.png"))
plt.show()
plt.close()

In [None]:
def sigmoid(x):
    return 1/(1+ np.exp(-x))

def predict(ConVar,trace,CI=0.95):
    yhat = np.zeros((21,3))
    for i, ts in enumerate(np.arange(0,21)) :
        Flag = ts < trace["TurnP"][BurnedN:]
        constant = np.array([c[int(f)] for f, c in zip(Flag,trace["constants"][BurnedN:])])
        mu = trace["realbetas"][BurnedN:]*ts
        mu+=(trace["controlbetas"][BurnedN:]*ConVar).sum(axis=1)
        mu+=constant
        yhat[i,:] = np.quantile(sigmoid(mu),[(1-CI)/2,0.5,CI+(1-CI)/2]) 
    return yhat

CI = 0.95
yhat1 = predict(np.array([np.mean(data["Age"]),1]),Fulltrace)
yhat2 = predict(np.array([np.mean(data["Age"]),2]),Fulltrace)

In [None]:
figsize(10,10)
plt.plot(yhat1.transpose()[1,:],lw=3,linestyle="--",color="brown",label="male")
plt.fill_between(np.arange(0,21),yhat1.transpose()[0,:],yhat1.transpose()[2,:],color="grey",alpha=0.5,label="{}% CI".format(CI*100))
plt.plot(yhat2.transpose()[1,:],lw=3,linestyle="--",color="royalblue",label="female")
plt.fill_between(np.arange(0,21),yhat2.transpose()[0,:],yhat2.transpose()[2,:],color="grey",alpha=0.5)
plt.legend()
plt.savefig(os.path.join(Outputpath,"yhat_SI.png"))
plt.show()
plt.close()