In [1]:
import math
import glob
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import clear_output
from scipy.io import loadmat

In [7]:
import json
import time
import random
import numpy as np
import scipy.stats as st

class DecisionModeler:
    
    def __init__(self):
        pass
    
    def p_cc2a(self, dp, k1, k2):
        a_given_a = st.norm.cdf(k1 + dp/2)
        b_given_a = 1 - st.norm.cdf(k2 + dp/2)
        a_given_b = st.norm.cdf(k1 - dp/2)
        b_given_b = 1 - st.norm.cdf(k2 - dp/2)
        
        pAA = a_given_a*b_given_a + b_given_a*a_given_a
        pBB = a_given_b*b_given_b + b_given_b*a_given_b
        pAB = a_given_a*b_given_b + b_given_a*a_given_b
        pBA = a_given_b*b_given_a + b_given_b*a_given_a

        return pAA, pBB, pAB, pBA
    
    def generate_2_param_model(self, dp_vals, k_vals, p_function, filepath):
        model = dict()
        
        # Variables used to keep track of progress
        counter = progress = 0
        total = len(dp_vals)
        
        # Calculate p-value tuples for each d' value
        for dp in dp_vals:
            dp_list = []
            for k1 in k_vals:
                for k2 in [k for k in k_vals if k >= k1]:
                    # Calculate p-value tuple and add to current d' list
                    dp_list.append(p_function(dp, k1, k2))
            model[dp] = dp_list
            
            counter += 1
            if counter/total >= progress+0.1:
                progress += 0.1
                print(f'{round(progress*100, 2)}% evaluated.')
            
        # Write data to json file
        with open(filepath, 'w') as file:
            json.dump(model, file)
            print('Successfully written data to file.')
    
    def get_distance(self, pHuman, pModel):
        return sum([(pH-pM)**2 for pH, pM in zip(pHuman, pModel)])
    
    def get_chi_squared(self, pHuman, pModel):
        
        # Number of trials in each block
        n = 18
        
        # See Petrov equation 5
        '''
        chi_squared = 0
        for pH, pM in zip(pHuman, pModel):
            p_avg = (pH + pM) / 2
            if p_avg == 0:
                return np.inf
            chi_squared += n*((pH-p_avg)**2 + (pM-p_avg)**2)/p_avg/(1-p_avg)
        '''
        
        # See Collett equation 2.18
        chi_squared = sum([n*(pH-pM)**2/pM + n*((1-pH)-(1-pM))**2/(1-pM) for pH, pM in zip(pHuman, pModel)])
        
        return chi_squared
    
    def search_dp(self, pHuman_vals, model_input, output_prefix="sample", default_df=None):
        if default_df is None:
            default_df = len(pHuman_vals)
        
        # Variables used to track best fit d' with distance, chi-squared, and closest points
        min_dist = np.inf
        min_chi_squared = None
        best_dp = 0
        best_pts = None
        
        df = pd.DataFrame(columns=['dp', 'distance', 'chi-square', 'df', 'p-value'])
        with open(model_input, 'rb') as file:
            model = json.load(file)
            
            # Instantiate variables for individual d' values
            dist_arr = np.empty(len(pHuman_vals))
            chi_squared_arr = np.empty(len(pHuman_vals))
            
            # Variables used to keep track of progress
            now = time.time()
            ev_count = 0
            counter = progress = 0
            total = len(model.keys())
            
            # Try fit with each available d' value
            for dp in model.keys():
                # Reset variables for current d'
                dist_arr.fill(np.inf)
                chi_squared_arr.fill(np.inf)
                best_dp_pts = np.empty((len(pHuman_vals), 4))

                for pModel in model[dp]:
                    # Try p-value tuple fit with each human data point
                    for index, pHuman in enumerate(pHuman_vals):
                        dist = self.get_distance(pHuman, pModel)
                        chi_squared = self.get_chi_squared(pHuman, pModel)
                        
                        # Minimize distance and chi-square for current pModel, pHuman pair
                        if dist < dist_arr[index]:
                            best_dp_pts[index] = pModel

                        dist_arr[index] = min(dist_arr[index], dist)
                        chi_squared_arr[index] = min(chi_squared_arr[index], chi_squared)
                        ev_count += 1
                
                # Update dataframe with data for current d'
                curr_chi_square = sum([c for c in chi_squared_arr if c != np.inf])
                curr_df = default_df - np.count_nonzero(chi_squared_arr==np.inf)
                df.loc[len(df)] = [dp, sum(dist_arr), curr_chi_square, curr_df, 1-st.chi2.cdf(curr_chi_square, curr_df)]
                
                if sum(dist_arr) < min_dist:
                    # Update best fit variables if distance is new minimum
                    best_pts = best_dp_pts
                    min_dist = sum(dist_arr)
                    min_chi_squared = chi_squared_arr
                    best_dp = float(dp)
                counter += 1
                if counter/total >= progress+0.1:
                    progress += 0.1
                    print(f'{round(progress*100, 2)}% evaluated.')
                
        # Write data to files
        df = df.sort_values(by=['distance'])
        df = df.reset_index(drop=True)
        df.to_csv(f'{output_prefix}_fit.csv')
        
        pts_df = pd.DataFrame(columns=['pAA', 'pBB', 'pAB', 'pBA'])
        for p in best_pts:
            pts_df.loc[len(pts_df)] = p
        pts_df.to_csv(f'{output_prefix}_points.csv')
        
        return best_dp, sum(chi_squared_arr)
                                       

In [None]:
modeler = DecisionModeler()

types = ['AA', 'BB', 'AB', 'BA']
dp_vals = np.linspace(0, 2.5, 101)
p_data = pd.read_csv('p_data.csv', index_col="Subject")

files = glob.glob('raw/8/*.mat')
#files = ['raw/8/CW.mat']
for f in files:
    dat = loadmat(f)
    subject = f[6:f.find('.')]
    
    print(f'Currently checking subject {subject}.')
    
    num_ratings = 0
    for i in range(1, 6):
        if p_data.loc[subject][f'PF{i}'] <= 0.5:
            num_ratings += 1
    
    print(f'Number of ratings to be used: {num_ratings}')
    
    points = []
    num_ratings = 3
    for t in types:
        points.append(np.concatenate([dat[f'p_{t}_{n+1}Diff'][0] for n in range(num_ratings)]))
    points = np.array(points).T
    
    modeler.search_dp(points, model_input='cc2a_small.json', output_prefix=f'cc2a/{subject}', default_df=2)
    clear_output()

Currently checking subject GJ.
Number of ratings to be used: 4
10.0% evaluated.
20.0% evaluated.
30.0% evaluated.
40.0% evaluated.
50.0% evaluated.


In [173]:
modeler = DecisionModeler()

dp_vals = np.linspace(0, 2.5, 11)
k_vals = np.linspace(-3, 3, 101)
modeler.generate_2_param_model(dp_vals, k_vals, modeler.p_cc2a, 'cc2a_xs.json')

10.0% evaluated.
20.0% evaluated.
30.0% evaluated.
40.0% evaluated.
50.0% evaluated.
60.0% evaluated.
70.0% evaluated.
80.0% evaluated.
90.0% evaluated.
100.0% evaluated.
Successfully written data to file.
