%load_ext autoreload
%autoreload 2

In [17]:
import requests
import aiohttp
import random
import json
import uuid
import asyncio

import numpy as np
from common.models.trial import Solution, Advisor, AdvisorSelection, WrittenStrategy, PostSurvey, Trial, TrialSaved, TrialError, SessionError
from utils.process import process_solution

In [18]:
# create new experiment
max_participants = 20  # Number of parallel participants in simulation

baseurl = "http://0.0.0.0:5050"

url = f"{baseurl}/admin/config"

experiment_type = 'sim_3states_8r_v4'

payload = json.dumps({
  "active": True,
  "created_at": "2023-10-17T09:57:36.204000",
  "redirect_url": "https://app.prolific.co/submissions/complete",
  "experiment_type": experiment_type,
  "rewrite_previous_data": True,
  "seed": 1,
  "n_generations": 5,
  "n_ai_players": 3,
  "networks_path": "data/23_11_30",
  "n_sessions_per_generation": 16,
  "n_advise_per_session": 5,
  "n_session_tree_replications": 8,
  "conditions": [
    "w_ai",
    "wo_ai"
  ],
  "n_social_learning_blocks": 1,
  "n_social_learning_networks_per_block": 4,
  "n_practice_trials": 2,
  "n_demonstration_trials": 4,
  "simulate_humans": False,
  "social_learning_trials": [
    "observation",
    "repeat",
    "try_yourself"
  ],
  "main_only": True,
  "session_timeout": 0.5
})
headers = {
  'Content-Type': 'application/json',
  'Authorization': 'Basic YWRtaW46YWRtaW4='
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

{"id":"65b99a3d926acf08142b859b","active":true,"created_at":"2024-01-31T00:54:21.024000","redirect_url":"https://app.prolific.co/submissions/complete","experiment_type":"sim_3states_8r_v3","rewrite_previous_data":true,"networks_path":"data/23_11_30","seed":1,"n_generations":5,"n_ai_players":3,"n_sessions_per_generation":16,"n_advise_per_session":5,"n_session_tree_replications":8,"conditions":["w_ai","wo_ai"],"n_social_learning_blocks":1,"n_social_learning_networks_per_block":4,"n_practice_trials":2,"n_demonstration_trials":4,"simulate_humans":false,"social_learning_trials":["observation","repeat","try_yourself"],"main_only":true,"session_timeout":0.5}


In [19]:
session = aiohttp.ClientSession()

async def get_trial(prolific_id, experiment_type):
    url = f"{baseurl}/session/{experiment_type}/{prolific_id}"
    headers = {'Content-Type': 'application/json'}

    async with session.get(url, headers=headers) as response:
        if response.status == 200:
            try:
                return Trial(**await response.json())
            except:
                return None
        else:
            return None


async def post_trial(prolific_id, trial_id, body):
    url = f"{baseurl}/session/{prolific_id}/{trial_id}"
    headers = {'Content-Type': 'application/json'}

    if body is not None:
        async with session.post(url, headers=headers, data=body) as response:
            return response.status == 200
    else:
        async with session.post(url, headers=headers) as response:
            return response.status == 200


In [20]:
from pathlib import Path
from common.utils.utils import estimate_solution_score, estimate_average_player_score
from common.models.network import Network


networks_path = Path("../data/23_11_30")
network_data = json.load(open(networks_path / "networks.json"))
solutions_myopic = json.load(open(networks_path / "solution__myopic.json"))
solutions_m1 = json.load(open(networks_path / "solutions" / "0.json"))
solutions_random = json.load(open(networks_path / "solution__random.json"))
networks_by_id = {n["network_id"]: n for n in network_data}
solutions_myopic_by_id = {s["network_id"]: s for s in solutions_myopic}
solutions_m1_by_id = {s["network_id"]: s for s in solutions_m1}
solutions_random_by_id = {s["network_id"]: s for s in solutions_random}


def _get_solution(network_id, solution_type):
    network = networks_by_id[network_id]
    # get the solution for the network
    if solution_type == "myopic":
        solution = solutions_myopic_by_id[network_id]
    elif solution_type == "machine":
        solution = solutions_m1_by_id[network_id]
    elif solution_type == "random":
        solution = solutions_random_by_id[network_id]
    else:
        raise ValueError("Invalid solution type")

    solution['moves'][0] = network['starting_node']
    score = estimate_solution_score(Network(**network), solution['moves'], 10)
    assert score > 0, f"Invalid solution score: {score}"

    return Solution(**solution)


def get_solution(network_id, state):
    assert np.absolute(state.sum() - 1) < 0.0001, f"Invalid state: {state}"
    s_type_idx = np.random.choice(list(range(len(state))), p=state)
    s_type = ['random', "myopic", "machine"][s_type_idx]
    return _get_solution(network_id, s_type)


def get_solution_evaluation(solution: Solution, network_id):
    # get rewards
    evaluation = process_solution(networks_by_id[network_id], solution.dict())
    return evaluation
    
    

In [21]:
p_mypopic_init = 1.
max_p = 1.
drop_rate = 0.0

individual_learning_factors = (
    (0.01, [0,0,1]),
    (0.2, [0,0,0]),
    (0.79, [0,0,0])
)

social_learning_factors = {
    'optimal': (
        (0.5, [0,0,1]),
        (0.5, [0,0,0])
    ),
    'myopic': (
        (0.5, [0,0.5,0]),
        (0.5, [0,0,0])
    )
}

social_learning_strategy = "best"


def scale_state(state, change):
    state = state + change
    state[2] = np.minimum(state[2], max_p)
    state[1] = np.minimum(state[1], np.minimum(1 - state[2], max_p))
    state[0] = 1 - state[2] - state[1]
    assert np.absolute(state.sum() - 1) < 0.0001, f"Invalid state: {state}"
    return state


def sample_change(options):
    p = np.array([o[0] for o in options])
    values = np.array([o[1] for o in options])
    change_idx = np.random.choice(list(range(len(p))), p=p)
    change = values[change_idx]
    return change


def individual_learning(state):
    change = sample_change(individual_learning_factors)
    state = scale_state(state, change)
    return state


def social_learning(state, strategy):
    change = sample_change(social_learning_factors[strategy])
    state = scale_state(state, change)
    return state

def init_state():
    p_mypopic = random.random() * p_mypopic_init * 2
    state = np.array([1 - p_mypopic, p_mypopic, 0]) # [random, myopic, machine]
    return state

def handle_instruction_trial(trial, state):
    body = None
    return body, state

def handle_individual_trial(trial, state):
    solution = get_solution(trial.network.network_id, state)
    state = individual_learning(state)
    return solution.json(), state

def handle_written_strategy_trial(trial, state):
    strategy = WrittenStrategy(
        strategy=''
    )
    body = strategy.json()
    return body, state

def handle_demonstration_trial(trial, state):
    solution = get_solution(trial.network.network_id, state)
    return solution.json(), state

def handle_debriefing_trial(trial, state):
    body = None
    return body, state

def handle_social_learning_selection_trial(trial, state):
    advisor_selection = trial.advisor_selection
    advisor = advisor_selection.advisor_ids
    scores = advisor_selection.scores
    
    if social_learning_strategy == "best":
        # select the advisor with the highest score
        max_score_idx = np.argmax(scores)
        advisor = advisor[max_score_idx]
    elif social_learning_strategy == "random":
        # select a random advisor
        advisor = random.choice(advisor)
    else:
        raise ValueError(f"Invalid social learning strategy: {social_learning_strategy}")

    selection = Advisor(
        advisor_id=advisor
    )
    body = selection.json()
    return body, state

def handle_observation_trial(trial, state):
    solution = trial.advisor.solution
    network_id = trial.network.network_id
    evaluation = get_solution_evaluation(solution, network_id)
    body = None
    return body, state

def handle_repeat_trial(trial, state):
    solution = trial.advisor.solution
    
    solution = trial.advisor.solution
    network_id = trial.network.network_id
    evaluation = get_solution_evaluation(solution, network_id)
    if evaluation['optimal'] == 10:
        state = social_learning(state, 'optimal')
    elif evaluation['myopic'] > 0:
        state = social_learning(state, 'myopic')
    
    body = solution.json()
    return body, state

def handle_try_yourself_trial(trial, state):
    solution = get_solution(trial.network.network_id, state)
    state = individual_learning(state)
    return solution.json(), state


def handle_trial(trial, state):
    if trial.trial_type == "instruction":
        return handle_instruction_trial(trial, state)
    elif trial.trial_type == "individual":
        return handle_individual_trial(trial, state)
    elif trial.trial_type == "written_strategy":
        return handle_written_strategy_trial(trial, state)
    elif trial.trial_type == "demonstration":
        return handle_demonstration_trial(trial, state)
    elif trial.trial_type == "debriefing":
        return handle_debriefing_trial(trial, state)
    elif trial.trial_type == "social_learning_selection":
        return handle_social_learning_selection_trial(trial, state)
    elif trial.trial_type == "observation":
        return handle_observation_trial(trial, state)
    elif trial.trial_type == "repeat":
        return handle_repeat_trial(trial, state)
    elif trial.trial_type == "try_yourself":
        return handle_try_yourself_trial(trial, state)
    else:
        raise ValueError(f"{trial.trial_type} is an invalid trial type")

In [22]:
async def run_participant():
    trials = []
    prolific_id = "sim_" + uuid.uuid4().hex[:8]
    state = init_state()
    current_trial_id = None
    while True:
        trial = await get_trial(prolific_id, experiment_type)
        if trial is None:
            await asyncio.sleep(5)
            continue
        if trial.id == current_trial_id:
            if trial.trial_type == "debriefing":
                break
            else:
                raise ValueError(f"Trial {trial.id} is a duplicate")
            
        if random.random() < drop_rate / 2:
            trials = [{**t, 'dropped': True} for t in trials]
            break
        
        current_trial_id = trial.id
        body, state = handle_trial(trial, state)
        await post_trial(prolific_id, trial.id, body)
        trial_clean = json.loads(trial.json())
        # session_clean = json.loads(trail.session.json())
        trials.append({'trial': trial_clean, 'prolific_id': prolific_id})
        
        if random.random() < drop_rate / 2:
            trials = [{**t, 'dropped': True} for t in trials]
            break

    return trials

In [24]:

def check_unfinished(experiment_type):
    BACKEND_URL = 'http://localhost:5050'
    BACKEND_USER = 'admin'
    BACKEND_PASSWORD = 'admin'
    finished = False
    url = f'{BACKEND_URL}/results'
    headers = {'Accept': 'application/json'}
    auth = (BACKEND_USER, BACKEND_PASSWORD)
    sessions = requests.get(f'{url}/sessions?experiment_type={experiment_type}&finished={finished}', headers=headers, auth=auth)
    sessions_json = sessions.json()

    n_unfinished_sessions = len(sessions_json)
    return n_unfinished_sessions


async def run_with_limit(semaphore, trials):
    async with semaphore:
        new_trials = await run_participant()
        if new_trials:
            trials.extend(new_trials)
            

async def main(max_concurrent_tasks):
    trials = []
    semaphore = asyncio.Semaphore(max_concurrent_tasks)
    tasks = []

    n_sessions = check_unfinished(experiment_type)
    n_finished = 0
    n_started = 0

    # Assuming you have a way to determine if more participants should be run
    while n_finished < n_sessions:
        while len(tasks) < max_concurrent_tasks and n_started < n_sessions:
            task = asyncio.create_task(run_with_limit(semaphore, trials))
            n_started += 1
            tasks.append(task)
        
        # Wait for one of the tasks to complete
        done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

        n_finished += len(done)

        # Remove the completed tasks
        tasks = [t for t in tasks if not t.done()]
        
        

    # Wait for the remaining tasks to complete
    if tasks:
        await asyncio.wait(tasks)

    return trials

# Run the main coroutine with the desired maximum number of concurrent tasks

trials = await main(max_participants)

# Save trials as json
with open('trials.json', 'w') as f:
    json.dump(trials, f, indent=4)


CancelledError: 