In [2]:
from enum import Enum
from typing import NamedTuple, List
import csv
from matplotlib import pyplot
import math
import pandas as pd
import statsmodels.api as sm
from scipy import stats
import numpy as np

## Data Definitions

In [3]:
ItemNumber = Enum('ItemNumber',['Three', 'Six', 'Twelve'])
# interp. a number of items in a visual search trial (must be three, six, or twelve)
# examples are redundant for enumerations

# template based on enumeration (3 cases)
def fn_for_item_number(i: ItemNumber) -> ...:
    if i == ItemNumber.Three:
        return ...
    elif i == ItemNumber.Six:
        return ...
    elif i == ItemNumber.Twelve:
        return ...


TrialData = NamedTuple('TrialData',[('subject', int),            # in range [1,...)
                                    ('target_is_long', bool),
                                    ('block', int),              # in range [1,5]
                                    ('trial_num', int),          # in range [1,30]
                                    ('time', int),               # in range [0,...)
                                    ('responded_present', bool), 
                                    ('target_present', bool),
                                    ('total_items', ItemNumber)])
# interp. data from a trial of a visual search experiment. Includes subject number, whether the target is a long or short 
# line ('target_is_long': True if target is a long line, False if short), block number, trial number, time (in ms) to make 
# a response, whether the subject responded that the target was present in the trial ('responded_present': True if 
# subject responded present, False otherwise), whether the target was present in the trial ('target_present': True if 
# target present, False otherwise), and the total number of items in the trial (target + distractors)
TD1 = TrialData(1,True,1,1,1200,True,True,ItemNumber.Three)
TD2 = TrialData(6,False,3,24,2450,False,True,ItemNumber.Twelve)
TD3 = TrialData(20,True,5,13,1860,False,False,ItemNumber.Six)
TD4 = TrialData(12,False,2,10,1100,True,False,ItemNumber.Three)

# template based on compound (8 fields) and the reference rule (once)
def fn_for_trial_data(td: TrialData) -> ...:
    return ...(td.subject,
               td.target_is_long,
               td.block,
               td.trial_num,
               td.time,
               td.responded_present,
               td.target_present,
               fn_for_item_number(td.total_items))
               

# List[TrialData]
# interp. a list of TrialData

LOTD0 = []
LOTD1 = [TD1, TD2]
LOTD2 = [TD1, TD2, TD3, TD4]

def fn_for_lotd(lotd: List[TrialData]) -> ...:
    # template based on arbitrary-sized and reference rule
    # description of the accumulator
    acc = ...      # type: ...
    for td in lotd:
        acc = ...(fn_for_trial_data(td), acc)

    return ...(acc)


### Read data files, produce...

In [10]:
###########
# Functions

def main(filename: str) -> None:
    """
    Reads the file from given filename, analyzes the data, returns the result 
    """
    # return None  # stub
    # Template from HtDAP, based on function composition 
    return plot_response_lines(data) 
    
    
# Read function and helper functions

# old read function, for testing only
def read(filename: str) -> List[TrialData]:
    """    
    reads information from the specified file and returns a list of the trial data
    """
    #return []  #stub
    # Template from HtDAP
    # loc contains the result so far
    lotd = [] # type: List[TrialData]

    with open(filename) as csvfile:
        
        reader = csv.reader(csvfile)
        next(reader) # skip header line

        for row in reader:
            td = TrialData(int(row[1]), 
                           parse_line_type(row[0]),
                           int(row[2]),
                           int(row[3]),
                           int(row[4]),
                           int(row[5]),
                           int(row[6]),
                           parse_item_number(row[8]))
            lotd.append(td)
    
    return lotd


def read_long(filename: str) -> List[TrialData]:
    """    
    reads information from the specified file and returns a list of the trial data
    """
    #return []  #stub
    # Template from HtDAP
    # loc contains the result so far
    lotd = [] # type: List[TrialData]

    with open(filename) as csvfile:
        
        reader = csv.reader(csvfile)
        next(reader) # skip header line

        for row in reader:
            td = TrialData(int(row[0]), 
                           True,
                           int(row[1]),
                           int(row[2]),
                           int(row[3]),
                           int(row[4]),
                           int(row[5]),
                           parse_item_number(row[7]))
            lotd.append(td)
    
    return lotd


def read_short(filename: str) -> List[TrialData]:
    """    
    reads information from the specified file and returns a list of the trial data
    """
    #return []  #stub
    # Template from HtDAP
    # loc contains the result so far
    lotd = [] # type: List[TrialData]

    with open(filename) as csvfile:
        
        reader = csv.reader(csvfile)
        next(reader) # skip header line

        for row in reader:
            td = TrialData(int(row[0]), 
                           False,
                           int(row[1]),
                           int(row[2]),
                           int(row[3]),
                           int(row[4]),
                           int(row[5]),
                           parse_item_number(row[7]))
            lotd.append(td)
    
    return lotd


def parse_line_type(s: str) -> bool:
    """
    takes a string refering to line length and returns True if 'Long', and False is 'Short'. String must be either
    'Long' or 'Short'
    """
    # return False  # stub
    # template from atomic non-distinct
    return s == 'Long'

# start_testing()

# # Examples and tests for parse_line_type
# expect(parse_line_type('Short'), False)
# expect(parse_line_type('Long'), True)

# summary()


def parse_item_number(s: str) -> ItemNumber:
    """
    takes a string corresponding to item number and returns the appropriate number as an integer. '1' corresponds to 3
    items, '2' to 6 items, and '3' to 12 items. Input must be either '1', '2', or '3'
    """
    # return False  # stub
    # template from atomic non-distinct
    if s == '1':
        return ItemNumber.Three
    if s == '2':
        return ItemNumber.Six
    if s == '3':
        return ItemNumber.Twelve

# start_testing()

# # Examples and tests for parse_item_number
# expect(parse_item_number('1'), ItemNumber.Three)
# expect(parse_item_number('2'), ItemNumber.Six)
# expect(parse_item_number('3'), ItemNumber.Twelve)

# summary()




# analysis function
def plot_response_lines(lotd: List[TrialData]) -> None:
    """
    Takes a list of trial data, and returns None. Also draws a graph of two lines: one for each type of target line
    in the experiment, representing the average response time across subjects in trials with 3, 6, and 12 total items
    """
    # return None  # stub
    # template from data visualization
    long_target_avgs = avg_by_item_number(lotd, True)  # plot line for long target trials
    short_target_avgs = avg_by_item_number(lotd, False)  # plot line for short target trials
    item_number = [3,6,12]
    pyplot.title('Mean Reaction Time by Item Number')
    pyplot.xlabel('Total Number of Items')
    pyplot.ylabel('Response Time (ms)')
    pyplot.xticks([3,6,9,12])
    line1 = pyplot.plot(item_number, long_target_avgs)
    line2 = pyplot.plot(item_number, short_target_avgs)
    pyplot.setp(line1, color='r', linewidth=2.0, marker="o", label="Long Line")
    pyplot.setp(line2, color='b', linewidth=2.0, marker="+", label="Short Line")
    pyplot.legend(loc='lower right')
    pyplot.show()
    

def data_filter(lotd: List[TrialData]) -> List[TrialData]:
    """
    returns the log transformed data, minus absent and error trials
    """
    return log_list(present_filter(correct_trials(lotd)))


def condition_filter(lotd: List[TrialData], is_long_line: bool, item_number: ItemNumber) -> List[TrialData]:
    """
    Takes a list of trial data and returns a list containing only trials of the specified target type (where
    is_long_line == True specifies long targets, and False specifies short targets) and item number
    """
    # return []  # stub
    # template from composition
    return list_by_item_number(list_by_line_type(data_filter(lotd), is_long_line), item_number)


def avg_by_item_number(lotd: List[TrialData], is_long_line: bool) -> List[float]:
    """
    Takes a list of trial data and returns a list of the average response times of trials by item number 
    (either 3, 6, or 12), with the specified target line type (is_long_line == True specifies long lines, False
    specifies short)
    """
    # return []  # stub
    # template based on List[TrialData] with additional parameter
    # avg_list contains the list of averages by item number so far
    avg_list = []  # type: List[float]   
    bins = [ItemNumber.Three, ItemNumber.Six, ItemNumber.Twelve]
    for b in bins:
        avg_list.append(avg_response_time(condition_filter(lotd, is_long_line, b)))
    return avg_list


def avg_response_time(lotd: List[TrialData]) -> float:
    """
    Takes a list of trial data and returns their average response time. List must not be empty
    """
    # return 0.0  # stub
    # template from List[TrialData]
    # resp_sum contains the sum of response times so far
    resp_sum = 0.0  # type: float
    for td in lotd:
        resp_sum = resp_sum + td.time
    return (resp_sum/len(lotd))


def participant_filter(lotd: List[TrialData], subject: int) -> List[TrialData]:
    """
    takes a list of trial data, returns only those from the given participant
    """
    #
    #
    #
    result = []  # type: List[TrialData]
    for td in lotd:
        if td.subject == subject:
            result.append(td)
    return result


def avg_by_participant(lotd: List[TrialData], is_long_line: bool, item_number: ItemNumber) -> List[float]:
    """
    Takes a list of trial data and returns a list of the average response times of trials by participant, 
    with the specified target line type (is_long_line == True specifies long lines, False
    specifies short) and item number
    """
    # return []  # stub
    # template based on List[TrialData] with additional parameter
    # avg_list contains the list of averages by item number so far
    avg_list = []  # type: List[float]   
    subjects = subject_number(lotd)
    for s in subjects:
        avg_list.append(avg_response_time(participant_filter(condition_filter(lotd, is_long_line, item_number), s)))
    return avg_list


def subject_number(lotd: List[TrialData]) -> List[int]:
    nums = []  
    for td in lotd:
        if td.subject not in nums:
            nums.append(td.subject)
    return nums


def check_item_number(td: TrialData, item_number: ItemNumber) -> bool:
    """
    Takes a trial data and returns True if it has the same number of items as item_number, False otherwise
    """
    # return False  # stub
    # template from TrialData with additional parameter
    return td.total_items == item_number


def list_by_item_number(lotd: List[TrialData], item_number: ItemNumber) -> List[TrialData]:
    """
    Takes a list of trial data and returns only those trials with the specified number of items
    """
    # return []  # stub
    # template from List[TrialData] with additional parameter
    # result contains the list of trial data with the specified item number so far
    result = []  # type: List[TrialData]
    for td in lotd:
        if check_item_number(td, item_number):
            result.append(td)
    return result


def check_line_type(td: TrialData, is_long_line: bool) -> bool:
    """
    Takes a trial data and returns True if it has the same target line type as is_long_line (where True indicates
    long line and False indicates short line), False otherwise
    """
    # return False  # stub
    # template from TrialData with additional parameter
    return td.target_is_long == is_long_line


def list_by_line_type(lotd: List[TrialData], is_long_line: bool) -> List[TrialData]:
    """
    Takes a list of trial data and returns only those trials with targets of the specified type, where
    is_long_line == True specifies long line targets, and is_long_line == False specifies short line targets
    """
    # return []  # stub
    # template from List[TrialData] with additional parameter
    # result contains the list of trial data with the specified target length so far
    result = []  # type: List[TrialData]
    for td in lotd:
        if check_line_type(td, is_long_line):
            result.append(td)
    return result


def correct_response(td: TrialData) -> bool:
    """
    Returns True if the subject responded correctly in the trial (i.e., subject pressed the expected key),
    False otherwise
    """
    # return False  # stub
    # template from TrialData
    return (td.responded_present == td.target_present)


def correct_trials(lotd: List[TrialData]) -> List[TrialData]:
    """
    Takes a list of trial data and returns a list of only correct trials (where the subject has pressed the expected key)
    """
    # return []  # stub
    # template from List[TrialData]
    # result contains the list of correct trials so far
    result = []  # type: List[TrialData]
    for td in lotd:
        if correct_response(td):
            result.append(td)
    return result

In [19]:
def present_filter(lotd: List[TrialData]) -> List[TrialData]:
    acc = []
    for td in lotd:
        if td.target_present:
            acc.append(td)
    
    return acc

In [20]:
def log_list(lotd: List[TrialData]) -> List[TrialData]:
    log_list = []
    for td in lotd:
        log_list.append(log_time(td))
    return log_list

In [21]:
def log_time(td: TrialData) -> TrialData:
    return TrialData(td.subject,
                     td.target_is_long,
                     td.block,
                     td.trial_num,
                     math.log10(td.time),
                     td.responded_present,
                     td.target_present,
                     td.total_items)

In [22]:
def slope(time1, time2, time3):
    """
    returns the slope of the mean response times for 3, 6, and 12 items
    """
    X = [3,6,12]
    y = [time1, time2, time3]
    X = sm.add_constant(X) ## adds intercept to regression model
    
    model = sm.OLS(y, X).fit() ## sm.OLS(output, input)
    predictions = model.predict(X)

    return model.params[1]

In [44]:
def inv_log10(x):
    """
    inverse operation of log10
    """
    return 10 ** x

In [23]:
# Read data from the two raw files
data = read_long('VSa49_long.csv') + read_short('VSa49_short.csv')

In [45]:
# create a list of mean response time in each condition (target type x item number) for PRESENT trials
long3 = pd.Series(avg_by_participant(data, True, ItemNumber.Three))
long6 = pd.Series(avg_by_participant(data, True, ItemNumber.Six))
long12 = pd.Series(avg_by_participant(data, True, ItemNumber.Twelve))
short3 = pd.Series(avg_by_participant(data, False, ItemNumber.Three))
short6 = pd.Series(avg_by_participant(data, False, ItemNumber.Six))
short12 = pd.Series(avg_by_participant(data, False, ItemNumber.Twelve))

In [46]:
# asseble lists into dataframe
present_data_log = pd.DataFrame({'long3':long3, 'long6':long6, 'long12':long12,
                             'short3':short3, 'short6':short6, 'short12':short12})

In [47]:
# backtransform log values
present_data = present_data_log.applymap(inv_log10)

In [48]:
# add columns for slopes and slope ratio
present_data['long_slope'] = slope(present_data['long3'], present_data['long6'], present_data['long12'])
present_data['short_slope'] = slope(present_data['short3'], present_data['short6'], present_data['short12'])
present_data['slope_ratio'] = present_data['short_slope']/present_data['long_slope']

In [49]:
present_data['slope_ratio_sqrt'] = np.sqrt(present_data['slope_ratio'])

In [50]:
present_data

Unnamed: 0,long3,long6,long12,short3,short6,short12,long_slope,short_slope,slope_ratio,slope_ratio_sqrt
0,650.561664,754.291511,916.117543,700.521272,799.345348,936.741506,29.144037,25.768502,0.884178,0.940307
1,999.620304,1291.43132,1605.373156,1470.443988,1798.2571,1952.877599,65.165553,49.627499,0.76156,0.872674
2,1213.001352,1104.164645,1408.875238,1421.45348,1430.914521,1881.170424,25.90967,54.502945,2.103575,1.450371
3,641.632757,751.847109,808.529986,768.773345,976.697578,1126.791973,17.244567,37.670688,2.184496,1.478004
4,1248.816509,1362.249674,1654.968642,1188.585076,1314.871045,1814.766199,45.650655,71.538563,1.567087,1.251834
5,1155.517288,1221.257581,1512.602018,1089.715615,1361.840083,1809.805828,40.944842,79.245871,1.93543,1.391197
6,1180.19629,1190.859692,1529.758815,1102.161457,1456.993058,1822.378428,41.360696,77.291744,1.868724,1.367013
7,1018.07375,1178.668927,1502.999215,1591.207118,1842.445637,2211.381971,53.905527,67.84847,1.258655,1.121898
8,2254.976651,2340.386801,2639.427153,1344.391166,1493.656938,1673.436789,43.734342,35.618151,0.814421,0.902453
9,977.265742,1072.796389,1304.476036,949.621369,1176.057735,1684.234634,36.679067,82.062618,2.237315,1.495766


In [51]:
present_data.describe()

Unnamed: 0,long3,long6,long12,short3,short6,short12,long_slope,short_slope,slope_ratio,slope_ratio_sqrt
count,16.0,16.0,16.0,16.0,16.0,16.0,16.0,16.0,16.0,16.0
mean,1078.389953,1173.045567,1429.007851,1077.014317,1271.190413,1579.046109,39.486521,55.142449,1.493288,1.202318
std,390.124965,390.852559,440.745238,280.072214,303.582922,352.580337,11.995233,16.853765,0.528478,0.225613
min,641.632757,729.736363,808.529986,700.498557,799.345348,936.741506,17.244567,25.768502,0.76156,0.872674
25%,830.293962,915.430366,1125.552073,848.260689,1013.504553,1320.422044,31.850005,42.500221,0.878878,0.937472
50%,1026.427798,1141.416786,1466.31737,1056.805582,1284.680142,1678.835712,39.323378,53.436842,1.581636,1.257618
75%,1214.561712,1309.135908,1575.413242,1273.012289,1437.434155,1816.669256,45.801704,68.770993,1.885401,1.373059
max,2254.976651,2340.386801,2639.427153,1591.207118,1842.445637,2211.381971,65.165553,82.062618,2.237315,1.495766


In [37]:
stats.ttest_rel(present_data['short_slope'],present_data['long_slope'])

Ttest_relResult(statistic=3.3268286835113656, pvalue=0.004598801806368277)

In [153]:
# Backtransformed present ratio
(present_data['slope_ratio_sqrt'].mean())**2

1.4455677616762936

In [148]:
present_drop = present_data.drop([1,3], axis=0)

In [152]:
present_drop.describe()

Unnamed: 0,long3,long6,long12,short3,short6,short12,long_slope,short_slope,slope_ratio,slope_ratio_sqrt
count,14.0,14.0,14.0,14.0,14.0,14.0,14.0,14.0,14.0,14.0
mean,1115.213299,1194.675046,1460.730177,1070.92941,1254.577995,1584.647727,39.241015,56.784357,1.496182,1.206172
std,398.749719,401.297208,437.09352,266.932563,279.291676,341.936828,8.78514,17.292332,0.494277,0.210976
min,650.561664,729.736363,916.117543,700.498557,799.345348,936.741506,25.90967,25.768502,0.814421,0.902453
25%,885.850535,958.917318,1187.264068,876.186877,1058.825966,1341.412002,32.875569,44.592376,0.977797,0.985705
50%,1095.149567,1141.416786,1466.31737,1056.805582,1284.680142,1678.835712,39.323378,55.746578,1.581636,1.257618
75%,1217.682432,1327.001651,1556.509656,1234.060766,1414.139456,1813.526106,45.171577,70.61604,1.8553,1.362067
max,2254.976651,2340.386801,2639.427153,1591.207118,1842.445637,2211.381971,53.905527,82.062618,2.237315,1.495766


In [150]:
stats.ttest_rel(present_drop['short_slope'],present_drop['long_slope'])

Ttest_relResult(statistic=3.6221490740020754, pvalue=0.0030978549307114356)

In [154]:
# Backtransformed present ratio
(present_drop['slope_ratio_sqrt'].mean())**2

1.4548502539367265

Notes: 
        
    - Order functions/clean up
    
    - Function tests?
    
    - response_time() necessary?
    
    - Need log transform/backtransform
    
    - Error check
    
    - Way of excluding participants; list of exclusions?
    
    - Outlier tests
    
    - Absent trials?
    
    - Descriptives: mean, sd, se?, histogram of means?
    
    - Verify t-test
    
    - Verify slopes?