<a href="https://colab.research.google.com/github/ExCaLBBR/Demos/blob/main/demoIAT_SingleParticipantAnalysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Gorilla Info: <br>
Project: RaciallyBiasedDecisions_APF <br>
Account: Roberto Vargas (robertov@andrew.cmu.edu)

In [2]:
#@title Import libraries
#Load relevant libraries and install dependencies
import pandas as pd
import numpy as np
import re
import os # for joining paths and filenames sensibly
import glob # for finding csv data files
import platform # paths use different dividers on linux vs windows, so we need to test for this
import string

In [3]:
#@title Define Utility functions
#Compute adjusted mean
def adjustedmean(RTs,corrs,penalty):
    n=len(corrs) #trials
    n_errors=int(n-sum(corrs)) #errors
    # print("Number of correctness: ", int(sum(corrs)))
    # print("Number of errors: ", n_errors)
    cor_RTs=np.array(corrs)*RTs #sum of correct RTs
    cor_mean=sum(cor_RTs)/sum(corrs)

    #mean with errors replaced with penalty value
    return cor_mean+(n_errors*penalty)/n


#Remove timed out trials
def exclude_slows(RTs,corrs,slowRT_limit):
    new_rt=[] #holding variables
    new_cr=[]
    for i in range(len(RTs)): #iterate over every item
        if RTs[i] < slowRT_limit: #if it isn't too fast, include RT and corr values
            new_rt.append(RTs[i])
            new_cr.append(corrs[i])

    return (new_rt, new_cr)

#Compute IAT bias rating
def iat_analyze(congr_rts_raw, congr_corr_raw, incon_rts_raw, incon_corr_raw, df_name):
    #1 discard subject if too many fast responses
    if sum(np.array(np.concatenate((np.array(congr_rts_raw), np.array(incon_rts_raw))))<fastRT_limit)>len(np.concatenate((np.array(congr_rts_raw), np.array(incon_rts_raw))))*fast_prop_limit:
        print ("excluding subject for BM STR because too many fast responses")
    else:
        #2 Eliminate scores over 10,000 ms

        congr_rts,congr_corr=exclude_slows(congr_rts_raw,congr_corr_raw,slowRT_limit)
        incon_rts,incon_corr=exclude_slows(incon_rts_raw,incon_corr_raw,slowRT_limit)

        #3 Calculate pooled std
        #pooled_std=pooled.std(0) #n-1 std sample std
        #(Use N not N-1 because this is the whole sample).
        #numpy.std is population std
        pooled=congr_rts + incon_rts #all RTs from both blocks, correct and incorrect
        pooled_std=np.std(pooled)

        #4 Calculated adjusted means, including the penalty
        congr_adjmean=adjustedmean(congr_rts,congr_corr,penalty)
        incon_adjmean=adjustedmean(incon_rts,incon_corr,penalty)

        #5 Calculate the IAT, so that pro-stereotype RTs are a -ve score
        IAT=(congr_adjmean-incon_adjmean)/pooled_std

        simpleIAT=sum(congr_rts)/len(congr_rts)-sum(incon_rts)/len(incon_rts)

        return(IAT)
        print("IAT for " + df_name + " is : {:+.3f}".format(IAT))
        print("Mean difference (uncorrected) " + df_name + " is {:+.3f}".format(simpleIAT)+" seconds")

# function to convert values in the pivot values from list to float
def convert_to_float(x):
    if isinstance(x, list):
        return float(x[0]) # assuming there is only one value in the list
    else:
        return x

In [None]:
#@title Load Data:

#Implicit Association Test (IAT)
url = 'https://github.com/ExCaLBBR/ExCaLBBR_Projects/raw/main/RaciallyBiasedDecisions/RaciallyBiasedDecisions-Intersectionality_SURG/data/raw/raw_task_IAT_BM.csv'
df_IAT = pd.read_csv(url)


In [None]:
#@title Specify thresholds
penalty=0.600 #penalty - in seconds - for incorrect responses
slowRT_limit=10000 #threshold at which slow RTs are discarded
fastRT_limit=300 #threshold which defines responses which are "too fast"
fast_prop_limit=0.1 # threshold proportion of "too fast" responses which defines exclusion of ppt

In [None]:
#@title Isolate relevant columns
#Implicit Association Test (IAT)

include = ['Participant Private ID', 'Absolute Reaction Time', 'Correct', 'Component Name', 'allocator-k3xu', 'Spreadsheet: ImageLeft', 'Spreadsheet: ImageRight', 'Spreadsheet: TextLeft', 'Spreadsheet: TextRight', 'Spreadsheet: metadata', 'Response']
df_IAT.drop(columns=df_IAT.columns.difference(include), inplace=True)
mask = df_IAT['Component Name'].str.contains('Keyboard Response', na=True)
df_IAT = df_IAT[mask]
mask = df_IAT['Response'].str.contains('BEGIN', na=True) | df_IAT['Response'].str.contains('END', na=True)
df_IAT = df_IAT[~mask]
df_IAT.drop(columns=['Component Name', 'Response'], inplace=True)
df_IAT = df_IAT.reset_index(drop=True)

In [None]:
#@title Data Extraction
df_IAT.rename(columns={"Spreadsheet: metadata": "Congruence", "allocator-k3xu": "Group Type"}, inplace = True)
df_IAT = df_IAT.drop(["Spreadsheet: ImageLeft", "Spreadsheet: ImageRight"], axis = 1)

#Isolate columns based on stereotype category which are not part of the practice blocks
df_IAT_STR = df_IAT[((df_IAT["Spreadsheet: TextLeft"] == "Strong") | (df_IAT["Spreadsheet: TextRight"] == "Strong")) & (~df_IAT["Congruence"].str.startswith("practice"))]
df_IAT_INT = df_IAT[((df_IAT["Spreadsheet: TextLeft"] == "Intelligence") | (df_IAT["Spreadsheet: TextRight"] == "Intelligence")) & (~df_IAT["Congruence"].str.startswith("practice"))]
df_IAT_DIR = df_IAT[((df_IAT["Spreadsheet: TextLeft"] == "Dirty") | (df_IAT["Spreadsheet: TextRight"] == "Clean")) & (~df_IAT["Congruence"].str.startswith("practice"))]

# STR lists RV: polarity of the condition was mislabed in the original data and is corrected here
congr_STR_ID = df_IAT_STR[df_IAT_STR["Congruence"] == "congruent"]["Participant Private ID"].dropna().tolist()
congr_corr_STR = df_IAT_STR[df_IAT_STR["Congruence"] == "congruent"]["Correct"].dropna().tolist()
congr_rts_STR = df_IAT_STR[df_IAT_STR["Congruence"] == "congruent"]["Absolute Reaction Time"].dropna().tolist()
incon_STR_ID = df_IAT_STR[df_IAT_STR["Congruence"] == "incongruent"]["Participant Private ID"].dropna().tolist()
incon_corr_STR = df_IAT_STR[df_IAT_STR["Congruence"] == "incongruent"]["Correct"].dropna().tolist()
incon_rts_STR = df_IAT_STR[df_IAT_STR["Congruence"] == "incongruent"]["Absolute Reaction Time"].dropna().tolist()
# INT lists
congr_INT_ID = df_IAT_INT[df_IAT_INT["Congruence"] == "congruent"]["Participant Private ID"].dropna().tolist()
congr_corr_INT = df_IAT_INT[df_IAT_INT["Congruence"] == "congruent"]["Correct"].dropna().tolist()
congr_rts_INT = df_IAT_INT[df_IAT_INT["Congruence"] == "congruent"]["Absolute Reaction Time"].dropna().tolist()
incon_INT_ID = df_IAT_INT[df_IAT_INT["Congruence"] == "incongruent"]["Participant Private ID"].dropna().tolist()
incon_corr_INT = df_IAT_INT[df_IAT_INT["Congruence"] == "incongruent"]["Correct"].dropna().tolist()
incon_rts_INT = df_IAT_INT[df_IAT_INT["Congruence"] == "incongruent"]["Absolute Reaction Time"].dropna().tolist()
# DIR lists
congr_DIR_ID = df_IAT_DIR[df_IAT_DIR["Congruence"] == "congruent"]["Participant Private ID"].dropna().tolist()
congr_corr_DIR = df_IAT_DIR[df_IAT_DIR["Congruence"] == "congruent"]["Correct"].dropna().tolist()
congr_rts_DIR = df_IAT_DIR[df_IAT_DIR["Congruence"] == "congruent"]["Absolute Reaction Time"].dropna().tolist()
incon_DIR_ID = df_IAT_DIR[df_IAT_DIR["Congruence"] == "incongruent"]["Participant Private ID"].dropna().tolist()
incon_corr_DIR = df_IAT_DIR[df_IAT_DIR["Congruence"] == "incongruent"]["Correct"].dropna().tolist()
incon_rts_DIR = df_IAT_DIR[df_IAT_DIR["Congruence"] == "incongruent"]["Absolute Reaction Time"].dropna().tolist()
# check correctness list has the same length as the rt list
# print(len(congr_corr_BM_STR))
# print(len(congr_rts_BM_STR))


In [None]:
#@title Filter participants who are too fast
#Find and remove participants who are too fast
# Strength
ID = np.unique(congr_STR_ID)
remIndx = []
for p in range(len(ID)):
    indx_cong = np.where(np.array(congr_STR_ID) == ID[p])
    indx_incon = np.where(np.array(incon_STR_ID) == ID[p])
    if sum(np.array(np.concatenate((np.array(congr_rts_STR)[indx_cong], np.array(incon_rts_STR)[indx_incon])))<fastRT_limit)>len(np.concatenate((np.array(congr_rts_STR)[indx_cong], np.array(incon_rts_STR)[indx_incon])))*fast_prop_limit:
      print ("Excluding subject", ID[p], "for WM STR because too many fast responses")
      remIndx.append(p)
ID_filt = np.delete(ID, remIndx)
# Intel
remIndx = []
for p in range(len(ID_filt)):
    indx_cong = np.where(np.array(congr_INT_ID) == ID_filt[p])
    indx_incon = np.where(np.array(incon_INT_ID) == ID_filt[p])
    if sum(np.array(np.concatenate((np.array(congr_rts_INT)[indx_cong], np.array(incon_rts_INT)[indx_incon])))<fastRT_limit)>len(np.concatenate((np.array(congr_rts_INT)[indx_cong], np.array(incon_rts_INT)[indx_incon])))*fast_prop_limit:
      print ("Excluding subject", ID_filt[p], "for WM INT because too many fast responses")
      remIndx.append(p)
ID_filt = np.delete(ID_filt, remIndx)
# Dirt
remIndx = []
for p in range(len(ID_filt)):
    indx_cong = np.where(np.array(congr_DIR_ID) == ID_filt[p])
    indx_incon = np.where(np.array(incon_DIR_ID) == ID_filt[p])
    if sum(np.array(np.concatenate((np.array(congr_rts_DIR)[indx_cong], np.array(incon_rts_DIR)[indx_incon])))<fastRT_limit)>len(np.concatenate((np.array(congr_rts_DIR)[indx_cong], np.array(incon_rts_DIR)[indx_incon])))*fast_prop_limit:
      print ("Excluding subject", ID_filt[p], "for WM INT because too many fast responses")
      remIndx.append(p)
ID_filt = np.delete(ID_filt, remIndx)

In [None]:
#@title Compute IAT bias
# Strength
IAT_STR = []
for p in range(len(ID_filt)):
  indx_cong = np.where(np.array(congr_STR_ID) == ID_filt[p])[0]
  cong_corr_WM_STR_pi = np.array(congr_corr_STR)[indx_cong]
  cong_rts_WM_STR_pi = np.array(congr_rts_STR)[indx_cong]
  indx_incon = np.where(np.array(incon_STR_ID) == ID_filt[p])[0]
  incon_corr_WM_STR_pi = np.array(incon_corr_STR)[indx_incon]
  incon_rts_WM_STR_pi = np.array(incon_rts_STR)[indx_incon]
  #print(WM_ID_filt[p])
  iatBias = iat_analyze(cong_rts_WM_STR_pi, cong_corr_WM_STR_pi, incon_rts_WM_STR_pi, incon_corr_WM_STR_pi, "WM STR")
  IAT_STR.append(iatBias)

# Intel
IAT_INT = []
for p in range(len(ID_filt)):
  indx_cong = np.where(np.array(congr_INT_ID) == ID_filt[p])[0]
  cong_corr_WM_INT_pi = np.array(congr_corr_INT)[indx_cong]
  cong_rts_WM_INT_pi = np.array(congr_rts_INT)[indx_cong]
  indx_incon = np.where(np.array(incon_INT_ID) == ID_filt[p])[0]
  incon_corr_WM_INT_pi = np.array(incon_corr_INT)[indx_incon]
  incon_rts_WM_INT_pi = np.array(incon_rts_INT)[indx_incon]
  #print(WM_ID_filt[p])
  iatBias = iat_analyze(cong_rts_WM_INT_pi, cong_corr_WM_INT_pi, incon_rts_WM_INT_pi, incon_corr_WM_INT_pi, "WM INT")
  IAT_INT.append(iatBias)

# Dirt
IAT_DIR = []
for p in range(len(ID_filt)):
  indx_cong = np.where(np.array(congr_DIR_ID) == ID_filt[p])[0]
  cong_corr_WM_DIR_pi = np.array(congr_corr_DIR)[indx_cong]
  cong_rts_WM_DIR_pi = np.array(congr_rts_DIR)[indx_cong]
  indx_incon = np.where(np.array(incon_DIR_ID) == ID_filt[p])[0]
  incon_corr_WM_DIR_pi = np.array(incon_corr_DIR)[indx_incon]
  incon_rts_WM_DIR_pi = np.array(incon_rts_DIR)[indx_incon]
  #print(WM_ID_filt[p])
  iatBias = iat_analyze(cong_rts_WM_DIR_pi, cong_corr_WM_DIR_pi, incon_rts_WM_DIR_pi, incon_corr_WM_DIR_pi, "WM DIR")
  IAT_DIR.append(iatBias)

In [None]:
#@title Participant-wise analysis of IAT bias by Stereotype category

#Group by stereotype means
IAT_STR_Avg = np.mean(IAT_STR)*-1
IAT_INT_Avg = np.mean(IAT_INT)*-1
IAT_DIR_Avg = np.mean(IAT_DIR)*-1

#Group by stereotype standard deviation
IAT_STR_SD = np.std(IAT_STR)
IAT_INT_SD = np.std(IAT_INT)
IAT_DIR_SD = np.std(IAT_DIR)

#Print output
print("Strength Stereotype IAT bias =", IAT_STR_Avg, "(SD = ", IAT_STR_SD, ")")
print("Intelligence Stereotype IAT bias =", IAT_INT_Avg, "(SD = ", IAT_INT_SD, ")")
print("Dirtiness Stereotype IAT bias =", IAT_DIR_Avg, "(SD = ", IAT_DIR_SD, ")")