### File to calculate and save the keyboard features for data analysis in the lab-study

In [None]:
import json
import operator
from itertools import groupby
import numpy as np
import pandas as pd

In [None]:
# import the dataset
with open("LabStudy_RawData.json") as f:
    firebase_data = json.load(f)

#### ---- Keyboard Data Processing Helper Functions ----

In [None]:
# extract and clean the typing data from the task dataset
def get_typing_data(data):

    # Only include Keydown and Keyup events to exclude mouse data
    key, value1, value2 = "eventType", "KeyDown", "KeyUp"

    # sort the data by its timestamp (sorted returns a list with tuples (id, logged_event) --> only the logged event
    # data is relevant --> only get the second entry of the tuple
    sorted_page_data = sorted(data, key=operator.itemgetter("time"))

    # Keyboard usage data
    keyboard_events = []
    # Keydown events to remove doubled down events
    key_down_events = {}
    # number of potential artifacts
    artifacts = 0

    # Gets all keyup and keydown events and deletes doubled keydown events which occur if the key is held down
    # for a longer time
    for datapoint in sorted_page_data:
        # if it is a keydown event
        if key in datapoint and datapoint[key] == value1:
            # if the keydown event is not in keydownevents (if it is a unique and not repeated down event
            # which happens if the key is hold down )
            if datapoint["code"] not in key_down_events:
                # Save the data in the corresponding arrays
                key_down_events[datapoint["code"]] = [datapoint]
            else:
                # if the time difference between this keydown event and the last keydown event is smaller than
                # 1750 ms, it is likely that the key was held down and more than one keydown events were fired
                if datapoint["time"] - key_down_events[datapoint["code"]][-1]["time"] < 1750:
                    key_down_events[datapoint["code"]].append(datapoint)
                else:
                    # if there is a greater time difference between the keydown events, every key down event prior
                    # to the current keydown event is likely an artifact that has no corresponding keyup event
                    # (e.g. if participants switched tabs during typing)
                    # those potential artifacts are removed from the dataset (they are overwritten by the latest
                    # keydown event
                    print("Potential artifact", datapoint["time"] - key_down_events[datapoint["code"]][-1]["time"])
                    print("Old:", key_down_events[datapoint["code"]])
                    print("New:", datapoint)
                    artifacts += 1
                    key_down_events[datapoint["code"]] = [datapoint]
        # if it is a keyup event
        elif key in datapoint and datapoint[key] == value2:
            if datapoint["code"] in key_down_events:
                # append the keyup event and its corresponding keydown event to the cleaned keyboard event list
                keyboard_events.append(key_down_events[datapoint["code"]][0])
                keyboard_events.append(datapoint)
                # if there are more than one keydown events corresponding the the keyup event, print that information
                # if len(key_down_events[datapoint["code"]]) > 1:
                #     print("More than one Keydown Event for the keyUp event")
                #     print(key_down_events[datapoint["code"]])
                #     print(datapoint)
                # delete the keydown event entry of that key (because the corresponding keyup event was found)
                del key_down_events[datapoint["code"]]
            else:
                # If they Keyup event has no corresponding keydown event, it is likely an artifact
                artifacts += 1

    # sort the keyboard events list by timestamps again to get a cleaned list of valid keyboard events of the typing
    # task: The list should contain each valid keydown event with a corresponding keyup event
    keyboard_events = sorted(keyboard_events, key=operator.itemgetter("time"))
    # print the number of non valid key presses (if participants pressed a key other than the "allowed" digit keys
    # and backspace key
    non_target_presses = sum(1 for i in keyboard_events if "Digit" not in i["code"] and "Backspace" not in i["code"])

    # Return an array with the sorted and cleaned keyboard usage data (should contain a kedown event with a
    # corresponding keyup event
    return keyboard_events, artifacts, non_target_presses


# separate the typing data into the typing trials
def get_trial_data(keyboard_data):

    # split the keyboard data into trials and sort the trial data by time
    keyboard_trial_data = [list(g) for k, g in groupby(keyboard_data, operator.itemgetter("taskNumber"))]

    # the experiment was programmed in a way that trials (and the task) ended, as soon as the string in the input
    # field matched the given string to retype: --> The trial ended on a keydown event and the corresponding keyup
    # event to that keypress was tagged with the next trial number
    # --> there are datapoints of one trial in the next trial which need to be added to the correct trial

    # get a dictionary of keyup events, which are in the wrong trial
    items_to_reorder = {}
    for index, trial in enumerate(keyboard_trial_data[1:]):
        items_to_reorder[index] = []
        keydown_events = {}
        for i in trial:
            if i["eventType"] == "KeyDown":
                keydown_events[i["code"]] = i
            elif i["eventType"] == "KeyUp":
                if i["code"] in keydown_events:
                    del keydown_events[i["code"]]
                else:
                    items_to_reorder[index].append(i)

    # place the keyup events into the correct trial
    for i in items_to_reorder:
        for k in items_to_reorder[i]:
            keyboard_trial_data[i].append(k)
            keyboard_trial_data[i + 1].remove(k)

    return keyboard_trial_data


# get the longest pause between consecutive keyboard data events to potentially filter bad cases
def get_longest_pause(keyboard_data):
    # calculate the maximum time difference between consecutive keyboard data events
    longest_pause = max(np.diff([i["time"] for i in keyboard_data]))

    # return the value in seconds
    return np.round(longest_pause / 1000, 4)

#### --- Keyboard Feature Calculation Functions ---

A seperate function for each feature (or related features): This was done to increase the readability of the code. From a Performance point of view, it might be better to not loop the keyboard data separately for the calculation of each feature (and might need refinement in cases with large datasets)

In [None]:
# get the number of times, the backspace key was pressed during the task
def get_backspace_presses(keyboard_data):

    backspace_presses = sum(1 for i in keyboard_data if i["eventType"] == "KeyDown" and i["code"] == "Backspace")

    return {"Backspace_presses": backspace_presses}


# get the writing time (time difference between last and first keyboard event in the dataset) as well as the writing
# time in relation to the number of pushed keys
def get_writing_time(keyboard_data):

    # get the max and min timestamp and calculate the difference between them (keyboard_data) should already be
    # chronologically ordered in regards to time --> a faster solution, which is tallied to the data structure is
    # writing_time = keyboard_data[-1]["time"] - keyboard_data[0]["time"]
    # get the writing time in seconds = division by 1000
    writing_time = (max(i["time"] for i in keyboard_data) - min(i["time"] for i in keyboard_data)) / 1000

    # get the number of pressed keys during the keyboard task
    number_of_keypresses = sum(1 for i in keyboard_data if i["eventType"] == "KeyDown")

    # calculate the time per keystroke
    time_per_keystroke = writing_time / number_of_keypresses

    return {"Writing_Time": writing_time, "Time_per_Keystroke": time_per_keystroke}


# calculate the time difference between the last timestamp of one trial and the first timestamp of the next trial
# calculate the time per trial
def get_trial_onset_and_trial_times(trial_data, task_start_time):

    trial_onset_times = []
    trial_times = []
    # the onset time for the first trial is the first timestamp of the first trial and the task start timestamp
    start_timestamp = task_start_time
    for trial in trial_data:
        trial_onset_times.append(trial[0]["time"] - start_timestamp)
        start_timestamp = trial[-1]["time"]
        trial_times.append(trial[-1]["time"] - trial[0]["time"])

    # calculate the mean and standard deviation of the trial onset times
    trial_onset_mean = np.round(np.mean(trial_onset_times), 3)
    trial_onset_sd = np.round(np.std(trial_onset_times), 3)

    # calculate the mean and standard deviation of the trial times
    trial_time_mean = np.round(np.mean(trial_times), 3)
    trial_time_sd = np.round(np.std(trial_times), 3)

    return {"Trial_Onset_Mean": trial_onset_mean, "Trial_Onset_SD": trial_onset_sd,
            "Trial_Time_Mean": trial_time_mean, "Trial_Time_SD": trial_time_sd}


# Calculate the time between pressing a key and releasing a key --> Keypress Dwell Time
def get_keypress_dwell_time(keyboard_data):

    key, value1, value2 = "eventType", "KeyDown", "KeyUp"

    # store the individual keypress dwell times
    key_press_time = []
    # Hold information on which key is pressed down
    key_pushed = []

    # loop over all keyboard data
    for datapoint in keyboard_data:
        # if the datapoint is a keydown event (and not a shiftkey)
        if (datapoint[key] == value1) and (datapoint["key"] != "Shift"):
            # add the datapoint to the key_pushed list
            key_pushed.append(datapoint)
        # If the datapoint is a keyup event (and not a shiftkey)
        elif (datapoint[key] == value2) and (datapoint["key"] != "Shift"):
            # loop the key_pushed list and search for the corresponding keydown event to the keyup event
            for i in range(len(key_pushed)):
                # if the keydown to the corresponding keyup is found
                if datapoint["code"] == key_pushed[i]["code"]:
                    # calculate the time it took between pressing and releasing the key and append it to the
                    # key_press_time list
                    key_press_time.append(datapoint["time"] - key_pushed[i]["time"])
                    # delete the keydown event from the list and break the inner loop
                    del key_pushed[i]
                    break

    # calculate the mean and standard deviation of the keypress dwell times
    dwelltime_mean = np.round(np.mean(key_press_time), 3)
    dwelltime_sd = np.round(np.std(key_press_time), 3)

    return {"Dwelltime_Mean": dwelltime_mean, "Dwelltime_SD": dwelltime_sd}


# Calculate the time between releasing a key and pressing the next key (can contain negative values if a key is pressed
# before the previous key is released)
def get_keypress_latency(trial_data):

    # calculate the latency per trial to exclude the latency time between releasing the last key of one trial and
    # pressing the first key of the next trial (i.e. trial onset time)

    # latency calculation function using a list with keyboard data (here trial list, but would also work with all
    # mouse data when ignoring the trial structure
    def _calc_latency(data):
        key, value1, value2 = "eventType", "KeyDown", "KeyUp"

        # store the individual latency times
        latency_times = []

        # initialize variables
        next_key_down_time = 0
        currentkey_up_time = 0
        down_time_set = False
        up_time_set = False

        # loop over all datapoints: if it is a keydown event, loop over all consecutive datapoints until the time of the
        # corresponding keyup event and the time of the next keydown event is found (latency as the time difference
        # between pressing the next button and releasing the previous button
        for datapoint in data:
            # if its a keydown press and not a shift key
            if (datapoint[key] == value1) and (datapoint["key"] != "Shift"):
                # loop over all consecutive datapoints to find the corresponding keyup event
                for i in data[(data.index(datapoint) + 1):]:
                    # if the consecutive datapoint is the corresponding keyup event to the previous datapoint
                    if (i[key] == value2) and (i["code"] == datapoint["code"]) and (up_time_set is False):
                        # save the timepoint of the keyup event, tell the algorithm its ready and break the loop
                        currentkey_up_time = i["time"]
                        up_time_set = True
                        break
                # loop over all consecutive datapoints to find the corresponding keydown event
                for i in data[(data.index(datapoint) + 1):]:
                    # if the consecutive datapoint is the next keydown event
                    if (i[key] == value1) and (i["key"] != "Shift") and (down_time_set is False):
                        # save the timestamp of the keydown event, tell the algorithms its ready and break the loop
                        next_key_down_time = i["time"]
                        down_time_set = True
                        break
                # if there is a new up time and a new down time calculate a latency time and save it in the array
                if down_time_set and up_time_set:
                    latency_times.append(next_key_down_time - currentkey_up_time)
                # Reset to False to prevent wrong latency calculations
                down_time_set = False
                up_time_set = False

        return latency_times

    # loop over each trial, calculate the latency times within each trial and merge them into one list
    all_latency_times = []
    for trial in trial_data:
        trial_latencys = _calc_latency(trial)
        all_latency_times += trial_latencys

    # Calculate the mean and standard deviation of the keypress latency time
    mean_latency = np.round(np.mean(all_latency_times), 3)
    sd_latency = np.round(np.std(all_latency_times), 3)

    return {"Latency_Mean": mean_latency, "Latency_SD": sd_latency}

#### --- Main Function to calculate the keyboard features for each participant and save it in a dataframe ---

In [None]:
def create_keyboard_dataset(data):

    keyboard_params = {}

    keyboard_tasks = ["HS_PatternTyping", "LS_PatternTyping"]

    for par_num, participant in enumerate(data):

        # if the participant finished the study
        if "phaseFinishedTimestamps" in data[participant] and "BfiNeuroticism" in \
                data[participant]["phaseFinishedTimestamps"]:

            print("Processing participant " + participant)

            # loop over the keyboard tasks
            for task in keyboard_tasks:

                # get the typing data
                typing_data, artifacts, non_valid_presses = get_typing_data(data[participant][task])
                # get the trial data
                trial_data = get_trial_data(typing_data)

                # "visualize" the typing data to check if the task was done correctly
                # this was also done to filter out participants, who did not finish the task in time
                # unfortunately, it was not logged by the experimental app, if the task countdown finished
                # the data therefore had to be scanned by eye (this did not reveal any suspicious cases)
                print([i["code"] for i in typing_data if i["eventType"] == "KeyUp"])

                # get the start time of the typing task
                if task == "HS_PatternTyping":
                    task_start = data[participant]["phaseFinishedTimestamps"]["HS_Mental_Arithmetic_PatternTyping"]
                    print("High-Stress Condition")
                else:
                    task_start = data[participant]["phaseFinishedTimestamps"]["LS_Mental_Arithmetic_PatternTyping"]
                    print("Low-Stress Condition")
                # calculate and save the keyboard features
                typing_task_features = {**get_backspace_presses(typing_data),
                                        **get_writing_time(typing_data),
                                        **get_keypress_dwell_time(typing_data),
                                        **get_trial_onset_and_trial_times(trial_data, task_start),
                                        **get_keypress_latency(trial_data)}

                # save the features with additional information about the participant and the condition
                condition = "HS" if "HS" in task else "LS"
                keyboard_params[task[:3] + participant] = {"par_Num": par_num + 1, "condition": condition,
                                                           **typing_task_features}
                # save additional task data to filter potential bad cases
                keyboard_params[task[:3] + participant]["Artifacts"] = artifacts
                keyboard_params[task[:3] + participant]["Invalid_Presses"] = non_valid_presses / 2
                keyboard_params[task[:3] + participant]["Longest_Pause"] = get_longest_pause(typing_data)
                keyboard_params[task[:3] + participant]["Task_Start_Time"] = typing_data[0]["time"] - task_start
                keyboard_params[task[:3] + participant]["Task_Time"] = typing_data[-1]["time"] - task_start

    # convert the dictionary with all features per participant per condition into a pandas Dataframe
    df = pd.DataFrame(keyboard_params).T

    return df

#### --- Get the keyboard Features Dataframe ---

In [None]:
# create the keyboard parameter dataframe
keyboard_features_df = create_keyboard_dataset(firebase_data)

print("Number of all datasets", len(keyboard_features_df))

#### --- Filter out potential bad cases in the typing dataset ---

In [None]:
# Remove participants who did too non valid keypresses
# (Participant did not do the task properly: 10 presses are chosen as the cut-off above which "bad key presses"
# do not represent honest mistakes anymore, but misbehavior
bad_keypresses = keyboard_features_df.loc[(keyboard_features_df["Invalid_Presses"] >= 10)].index

print("Number of datasets with too many bad keypresses", len(bad_keypresses))

In [None]:
# Remove Participants who took a break for too long during the task --> Task was not done properly anymore (stress
# manipulation likely lost its effect, if participants took a break during the task, e.g. by switching tabs or doing
# something different in between): 10 seconds between consecutive keyboard events are chosen as a too long pause

pause_too_long = keyboard_features_df.loc[(keyboard_features_df["Longest_Pause"] >= 10)].index

print("Number of datasets with too long pauses", len(pause_too_long))

In [None]:
bad_participants = list(set(np.concatenate((bad_keypresses, pause_too_long))))

print("Total number of filtered out datasets:", len(bad_participants))

In [None]:
# drop filtered out cases from the dataset

filtered_keyboard_features_df = keyboard_features_df.drop(bad_participants)

#### --- Save the final filtered keyboard feature dataframe---

In [None]:
# save the dataframe as a csv file for the data analysis

filtered_keyboard_features_df.to_csv("Labstudy_Keyboard_Features.csv", sep="\t", encoding="utf-8")