In [None]:
import pandas as pd
import numpy as np

from pathlib import Path

In [None]:
# Path to questionnaire.csv
# Will also be the target directory for writing processed data files to
# data_csv = Path("F:/") / "Research Project" / "data-analysis" / "r-project" / "data" / "questionnaire.csv"
data_csv = Path().resolve() / "r-project" / "data" / "questionnaire.csv"

df = pd.read_csv(data_csv)

df.dropna(how='all', axis=1, inplace=True) 

# Insert sample ids and make it the index
df_1 = pd.DataFrame([x for x in range(0, len(df))], columns=["sample_id"])
df = df_1.join(df)
df = df.set_index("sample_id")

id_database = {"next_free_id": 0}

def person_id_map(row):
    uid = f"{row["dyad_id"]}.{row["person_id"]}"
    if uid not in id_database:
        id_database[uid] = id_database["next_free_id"]
        id_database["next_free_id"] += 1
    
    return id_database[uid]        

def treatment_map(row):
    mapping = {
        "A": "Static Face",
        "B": "Eye Tracked",
        "C": "Full Tracked",
    }
    return mapping[row["treatment"]]

# Give every person a unique id
df["person_id"] = df.apply(person_id_map, axis=1)
# Insert condition names next to condition ids
df.insert(int(df.columns.to_list().index("treatment")) + 1, "treatment_name", df.apply(treatment_map, axis=1))

questions = {
    "physical_presence": [f"q{i}" for i in range(1,6)],
    "social_presence": [f"q{i}" for i in range(6,13)],
    "self_presence": [f"q{i}" for i in range(13,18)]
}

# Line removing q11 from condideration, not in final analsyis but part of short analysis in discussions
# q11 is the only social presence question that has a decreasing mean with higher facial realism (though not significantly decreasing)
# questions["social_presence"] = list(filter(lambda x: x not in ["q11"], questions["social_presence"]))

def calculate_physical_presence_score(df: pd.DataFrame):
    means = df[questions["physical_presence"]].mean(axis=1)
    # sum = df[questions["physical_presence"]].sum(axis=1)
    # df = df.join(sum.rename("physical_presence_score"))
    df = df.join(means.rename("physical_presence_score"))
    return df

def calculate_social_presence_score(df: pd.DataFrame):
    means = df[questions["social_presence"]].mean(axis=1)
    # sum = df[questions["social_presence"]].sum(axis=1)
    # df = df.join(sum.rename("social_presence_score"))
    df = df.join(means.rename("social_presence_score"))
    return df

def calculate_self_presence_score(df: pd.DataFrame):
    means = df[questions["self_presence"]].mean(axis=1)
    # sum = df[questions["self_presence"]].sum(axis=1)
    # df = df.join(sum.rename("self_presence_score"))
    df = df.join(means.rename("self_presence_score"))
    return df

def calculate_presence_score(df: pd.DataFrame):
    means = df[[f"q{i}" for i in range(1,18)]].mean(axis=1)
    # sum = df[questions["self_presence"]].sum(axis=1)
    # df = df.join(sum.rename("self_presence_score"))
    df = df.join(means.rename("presence_score"))
    return df

def calculate_scores(df: pd.DataFrame):
    # df = df[[f"q{i}" for i in range(1,18)]] - 1
    df = calculate_physical_presence_score(df)
    df = calculate_social_presence_score(df)
    df = calculate_self_presence_score(df)
    return calculate_presence_score(df)

df = calculate_scores(df)

for (treatment, treatment_df) in df.groupby("treatment"):
    physical_presence = treatment_df["physical_presence_score"].to_numpy()
    social_presence = treatment_df["social_presence_score"].to_numpy()
    self_presence = treatment_df["self_presence_score"].to_numpy()
    
    physical_summary = {"mean": np.mean(physical_presence), "std": np.std(physical_presence)}
    social_summary = {"mean": np.mean(social_presence), "std": np.std(social_presence)}
    self_summary = {"mean": np.mean(self_presence), "std": np.std(self_presence)}
    
    print(f"=== Scenario: {treatment} ===")
    print(f"Physical: mean score = {physical_summary['mean']:.5f}, std = {physical_summary['std']:.5f}")
    print(f"Social  : mean score = {social_summary['mean']:.5f}, std = {social_summary['std']:.5f}")
    print(f"Self    : mean score = {self_summary['mean']:.5f}, std = {self_summary['std']:.5f}")
    print()

# print(questions["social_presence"])

In [None]:
# Display full dataframe with calculated scores
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
    display(df)

In [None]:
# Drop question answers from data, leaving only calculated subscale scores
# data = data.drop(questions["physical_presence"], axis=1, errors="ignore")
# data = data.drop(questions["social_presence"], axis=1, errors="ignore")
# data = data.drop(questions["self_presence"], axis=1, errors="ignore")

with pd.option_context('display.max_rows', None, 'display.max_columns', None):
    display(df)

df.to_csv(data_csv.parent / "data_processed.csv")

In [None]:
# Process recorded experiment times data
from dateutil import parser

df_times = pd.read_csv(data_csv.parent / "experiment_times.csv")
df_times = df_times.set_index("sample_id")
# display(df_times)

for index, row in df_times.iterrows():
    time_1: str = row["time_formatted_1"]
    time_2: str = row["time_formatted_2"]
    time_1_minutes, time_1_seconds = map(np.double, time_1.split(":"))
    time_2_minutes, time_2_seconds = map(np.double, time_2.split(":"))
    total_time_seconds = (time_1_minutes + time_2_minutes) * 60 + time_1_seconds + time_2_seconds
    
    df_times.at[index, "time_seconds_total"] = total_time_seconds
    
for (treatment, treatment_df) in df_times.groupby("treatment"):
    time_data_mins = treatment_df["time_seconds_total"].to_numpy() / 60
    print(f"=== Scenario: {treatment} ===")
    print(f"mean time = {np.mean(time_data_mins):.5f}, std = {np.std(time_data_mins):.5f}")

df_times.to_csv(data_csv.parent / "experiment_times.csv")

In [None]:
# DEPRECATED



# This cell is for generating a hypothetical case where the sample size was 4 times bigger (28 dyads instead of 7)
# This generated data was obviously not used in primary data analysis,
# only for testing if a bigger sample size would improve significance with this data.

# Data was copied, dyad/person ids changed and data randomly fuzzed. New data was then appended to original data.
# This happens 3 times for a 4x sample size dataset

# display(df)

# df_1 = df.copy()
# dyads = df_1["dyad_id"].unique()
# print(dyads)

# # Shift new ids up to make room for generating new ones by adding 1 every time
# df_1.dyad_id *= 10

# for i in range(3):
#     df_1.dyad_id += 1
#     df_1.person_id += 13 + 1
    
#     df_1.social_presence_score = df.social_presence_score
#     df_1.physical_presence_score = df.physical_presence_score
#     df_1.self_presence_score = df.self_presence_score
    
#     score_diff = (np.random.random(len(df_1)) - 0.5) * (0.4)
    
#     df_1.social_presence_score += score_diff
#     df_1.physical_presence_score += score_diff
#     df_1.self_presence_score += score_diff
#     df_1.index = df_1.index + len(df_1)

#     df = pd.concat([df, df_1])

# df.to_csv(data_csv.parent / "data_calculated_x4.csv")
# display(df)

In [None]:
# Monte Carlo sampling to generate dataset with higher sample size.
# Randomly selected dyads have a randomly distributed noise added (with a standard distribution matching the relevant score and treatment)
# This code is likely horribly inefficient and was written in a time crunch as a more correct replacement of the other data generation code above
# This cell takes more than 1 hour to run.


dyads = df["dyad_id"].unique()
treatments = df["treatment"].unique()
scales = ["physical_presence_score", "social_presence_score", "self_presence_score"]
print(dyads, treatments)

std = {}

for treatment in treatments:
    for scale in scales:
        # display(df[df["treatment"] == treatment][scale])
        # std[treatment][scale] = np.std(df[df["treatment"] == treatment, scale], ddof=1)
        if treatment not in std:
            std[treatment] = {}
        std[treatment][scale] = np.std(df[df["treatment"] == treatment][scale])
        
        
times = 10
sample_size = len(dyads) * times

max_simulations = 100

for simulation in range(max_simulations):
    break
    next_dyad_id = 0
    generated_data = pd.DataFrame()

    for i in range(sample_size):
        selected_dyad_id = np.random.choice(dyads)
        selected_dyad = df[df.dyad_id == selected_dyad_id]
        # print(selected_dyad_id)
        
        new_dyad = selected_dyad.copy()
        new_dyad.dyad_id = next_dyad_id
        next_dyad_id += 1
        # display(new_dyad)
        
        for treatment in treatments:
            for scale in scales:
                # display(new_dyad)
                noise = np.zeros((2,))
                for i in range(2):
                    noise[i] = np.random.normal(0, std[treatment][scale])
                    
                column = new_dyad.loc[new_dyad.treatment == treatment, scale]
                
                column += noise
                
                # Clamp column to range [1, 7]
                column = column.clip(lower = 1, upper = 7)
                
                # Update column to new column with normally distributed noise
                new_dyad.loc[new_dyad.treatment == treatment, scale] = column
        
        if generated_data.empty:
            generated_data = new_dyad
        else:
            generated_data = pd.concat([generated_data, new_dyad])

        # Fix person_ids, give every person in 
        id_database = {"next_free_id": 0}
        generated_data["person_id"] = generated_data.apply(person_id_map, axis=1)

        # with pd.option_context('display.max_rows', None, 'display.max_columns', None):
            # display(generated_data)

        out_file = data_csv.parent / "generated" / f"generated_data_simulation_{simulation}.csv"
        if not out_file.parent.exists:
            out_file.parent.mkdir()    
        generated_data.to_csv(out_file)
