In [1]:
import numpy as np
import pandas as pd
import os

In [2]:
# helper function to add an question base label column to each row in answers_df

def get_question_base_label(row):
    if "A" not in row['Question Label']:
        return row['Question Label']
    else:
        base_end_index = row['Question Label'].index("A") - 1
        return row['Question Label'][:base_end_index]
    

# functions for each question type that take in a row from parent_df and output a 
# row with the relevant information (including score) for that user.

def ordinal_question(row, task_answers_df, num_answers, parent_questions):
    article_number = row["article_number"]
    question_label = row["question_label"]
    answer_label = int(row["answer_label"][-1])
    correct_answer = task_answers_df.loc[task_answers_df["Article Number"] == article_number]
    correct_answer = correct_answer.loc[correct_answer["Question Base Label"] == question_label]
    correct_answer = int(correct_answer["Answer Label"].iloc[0][-1])
    
    question_num_answers = num_answers[parent_questions.index(row["question_label"])]
    penalty = 1 / (question_num_answers / 2)
    
    return 1 - abs(answer_label - correct_answer) * penalty

def select_one_question(row, task_answers_df):
    article_number = row["article_number"]
    question_label = row["question_label"]
    answer_label = row["answer_label"]
    correct_answer = task_answers_df.loc[task_answers_df["Article Number"] == article_number]
    correct_answer = correct_answer.loc[correct_answer["Question Base Label"] == question_label]
    correct_answer = correct_answer["Answer Label"].iloc[0]
    
    if answer_label == correct_answer:
        return 1
    else:
        return 0
    
def select_all_question(row, task_answers_df, parent_df):
    user_id = row["contributor_uuid"]
    response_id = row["quiz_taskrun_uuid"]
    
    if user_dfs[user_id].loc[user_dfs[user_id]["quiz_taskrun_uuid"] == response_id].empty:
        article_number = row["article_number"]
        question_label = row["question_label"]
        answer_label = row["answer_label"]

        question_user_df = parent_df.loc[parent_df["quiz_taskrun_uuid"] == response_id]
        question_answers_df = task_answers_df.loc[(task_answers_df["Article Number"] == article_number) &\
                                                 (task_answers_df["Question Base Label"] == question_label)]

        correct_question_counter = 0
        total_question_counter = 0
        
        for index, row in question_answers_df.iterrows():
            if question_user_df.loc[question_user_df["answer_label"] == row["Question Label"]].empty:
                if row["Answer Label"] == "0":
                    correct_question_counter += 1
            else:
                if row["Answer Label"] == "1":
                    correct_question_counter += 1
            total_question_counter += 1

        overall_question_score = correct_question_counter / total_question_counter

        return overall_question_score
    
    return -1

In [3]:
# setup
answers_df = pd.read_csv("convergence/Answer_Consensus.csv")

# array of questions that are "parent" questions
language_parents = ["T1.Q1", "T1.Q12"]
probability_parents = ["T1.Q1", "T1.Q5", "T1.Q6", "T1.Q11"]
reasoning_parents = ["T1.Q1"]
evidence_parents = ["T1.Q1", "T1.Q12"]

# corresponding list of question types, language_parents[n] maps to language_parent_types[n]
language_parents_types = ["select_all", "ordinal"]
probability_parents_types = ["ordinal", "ordinal", "select_one", "ordinal"]
reasoning_parents_types = ["select_all"]
evidence_parents_types = ["select_one", "ordinal"]

# corresponding list of max number of problems, used for scoring ordinal questions
language_num_answers = [13, 4]
probability_num_answers = [3, 3, 3, 4]
reasoning_num_answers = [6]
evidence_num_answers = [3, 4]

user_dfs = {}

In [4]:
# user rep score for single file
def single_file(datahunt_file):
    df = pd.read_csv(datahunt_file)
    
    file_answers_df = answers_df.copy()
    
    # assigns a task type based on the name of the inputted datahunt csv
    if "language" in datahunt_file.lower():
        task_type = "language"
    elif "probability" in datahunt_file.lower():
        task_type = "probability"
    elif "reasoning" in datahunt_file.lower():
        task_type = "reasoning"
    else:
        task_type = "evidence"
        
    file_answers_df["Question Base Label"] = file_answers_df.apply(get_question_base_label, axis=1)
    
    # cuts down file_answers_df to the rows with the relevant task type
    if task_type == "language":
        task_answers_df = file_answers_df.loc[file_answers_df["Task File"] == "Language"]
        parent_questions = language_parents
        num_answers = language_num_answers
    elif task_type == "probability":
        task_answers_df = file_answers_df.loc[file_answers_df["Task File"] == "Probability"]
        parent_questions = probability_parents
        num_answers = probability_num_answers
    elif task_type == "reasoning":
        task_answers_df = file_answers_df.loc[file_answers_df["Task File"] == "Reasoning"]
        parent_questions = reasoning_parents
        num_answers = reasoning_num_answers
    elif task_type == "evidence":
        task_answers_df = file_answers_df.loc[file_answers_df["Task File"] == "Evidence"]
        parent_questions = evidence_parents
        num_answers = evidence_num_answers
    else:
        print("Invalid task type")
    
    # set parent_df to the relevant columns of the parent questions
    df = df[["contributor_uuid", "quiz_task_uuid", "article_number", "question_label", 
             "answer_label", "answer_text", "quiz_taskrun_uuid", "finish_time"]]

    # creates a dictionary mapping the task type to an list of which questions are parent questions
    task_parents = {"language": language_parents, "probability": probability_parents,
                    "reasoning": reasoning_parents, "evidence": evidence_parents}
    task_parents_types = {"language": language_parents_types, "probability": probability_parents_types,
                    "reasoning": reasoning_parents_types, "evidence": evidence_parents_types}


    parent_df = df.loc[df['question_label'].isin(task_parents[task_type])]
    
    # adds a column with the question_type of the question_label
    question_types = []
    for i in parent_df['question_label']:
        question_type = task_parents_types[task_type][task_parents[task_type].index(i)]
        question_types.append(question_type)

    parent_df.insert(2, "question_type", question_types, True)
    

    for index, row in parent_df.iterrows():
        if row['contributor_uuid'] not in user_dfs:
            user_dfs[row['contributor_uuid']] = pd.DataFrame(columns=['quiz_task_uuid', 'quiz_taskrun_uuid', 'question_label', 'question_type', 'answer_score', 'time_stamp'])
        if row['question_type'] == "select_all":
            score = select_all_question(row, task_answers_df, parent_df)
        elif row['question_type'] == "select_one":
            score = select_one_question(row, task_answers_df)
        else:
            score = ordinal_question(row, task_answers_df, num_answers, parent_questions)
        if score != -1:
            user_dfs[row['contributor_uuid']] = user_dfs[row['contributor_uuid']].append({'quiz_task_uuid': row['quiz_task_uuid'], 'quiz_taskrun_uuid': row['quiz_taskrun_uuid'], 'question_label': row['question_label'], 'task_type': task_type, 'question_type': row['question_type'], 'answer_score': score, 'time_stamp': row['finish_time']}, ignore_index=True)
    

In [5]:
def task_rep_score(user_df, task_type):
    tt_user_df = user_df.loc[user_df["task_type"] == task_type]

    if(len(tt_user_df) == 0):
        return 0
    
    else:
        temp = tt_user_df.time_stamp.unique()
        temp[::-1].sort()

        temp = temp[:30]

        return tt_user_df.loc[tt_user_df["time_stamp"].isin(temp)].answer_score.mean()
    

In [6]:
directory = r'testing-format'
for filename in os.listdir(directory):
    if filename.endswith(".csv"):
        single_file(os.path.join(directory, filename))
    else:
        continue

In [7]:
user_rep_scores = {}

for user in user_dfs:
    l_urs = task_rep_score(user_dfs[user], "language")
    p_urs = task_rep_score(user_dfs[user], "probability")
    r_urs = task_rep_score(user_dfs[user], "reasoning")
    e_urs = task_rep_score(user_dfs[user], "evidence")
    total_urs = np.mean([l_urs, p_urs, r_urs, e_urs])
    
    data = {'task-type':  ['Language', 'Probability', 'Reasoning', 'Evidence', 'Total User Rep Score'],
        'rep-score': [l_urs, p_urs, r_urs, e_urs, total_urs]}
    
    urs_df = pd.DataFrame (data, columns = ['task-type', 'rep-score'])
    
    urs_df.to_csv("userRepScore-csvs/" + user)
    