# Imports

In [None]:
from itertools import permutations
from collections import Counter
from copy import deepcopy
import os
import csv
import random

# Generating Profiles

In [None]:
# generate all possible partial preference profiles for one agent
def generate_preferences(house_amt):
    return list(permutations(range(house_amt)))

# generate partial preference profile
def generate_preference(house_amt):
    return random.sample(range(house_amt), house_amt)

# generate preference profile
def generate_profile(agent_amt, house_amt):
    profile = []
    for agent in range(agent_amt):
        profile.append(generate_preference(house_amt))
    return profile

# generates a hash from some profile, useful for preventing non-halting best-response learning
def hash_profile(profile):
    return hash(tuple(tuple(agent) for agent in profile))

# Probabilistic Serial Rule

In [None]:
# calculate probability matrix using the probabilistic serial rule
def probability_matrix(profile):
    agent_amt = len(profile)
    house_amt = len(profile[0])
    # init data of how much of each house is remaining and who ate how much
    houses = [1] * house_amt
    agents = [[0] * house_amt for _ in range(agent_amt)]

    # probabilistic serial rule
    while max(houses) != 0:
        # for each player, find out which house they're eating
        houses_selected = [next(i for i in preference if houses[i] != 0) for preference in profile]

        # how long does it take to eat each house
        house_counts = Counter(houses_selected)
        total_rate_eaten = [house_counts.get(i, 0) for i in range(house_amt)]
        time_to_eat = [None if total_rate_eaten[i] == 0 else houses[i] / total_rate_eaten[i] # Condition to avoid division by zero
                 for i in range(house_amt)]

        # take the lowest time to eat and step forward that amount of time
        t = min(t for t in time_to_eat if t is not None) # poor second condition to avoid floating point errors
        for agent in range(agent_amt):
            house = houses_selected[agent]
            houses[house] -= t
            agents[agent][house] += t
        houses = [house if house > 1e-9 else 0 for house in houses] # 
    return agents

# calculate expected utility for one player from applying Borda scores of a profile to a probability matrix
# Borda score starts at 0
def expected_utility(matrix, profile, agent):
    agent_amt = len(profile)
    house_amt = len(profile[agent])
    utility = 0
    for house in range(house_amt):
        house_value = house_amt - 1 - profile[agent].index(house)
        utility += matrix[agent][house] * house_value
    return utility

# calculate expected utility for all players, see def expected_utility for detail
def expected_utilities(matrix, profile):
    agent_amt = len(profile)
    return [expected_utility(matrix, profile, agent) for agent in range(agent_amt)]

# Best-Response Learning

In [None]:
# Given a certain true profile, and a certain player, find a best response for that player and its utility
def find_best_response(true_profile, profile, agent):
    house_amt = len(profile[agent])
    profile = deepcopy(profile)
    # Prioritize the current action, if supplied
    best_eu = -1
    best_a = profile[agent]
    if profile[agent] != None:
        matrix = probability_matrix(profile)
        best_eu = expected_utility(matrix, true_profile, agent)

    # Loop over the agent's action space
    for action in generate_preferences(house_amt):
        profile[agent] = action
        matrix = probability_matrix(profile)
        eu = expected_utility(matrix, true_profile, agent)
        if eu > best_eu:
            best_eu = eu
            best_a = action
    return [list(best_a), best_eu]

# Check if the current profile is a PNE
def is_PNE(true_profile, profile):
    agent_amt = len(profile)
    matrix = probability_matrix(profile)
    eu = expected_utilities(matrix, true_profile)
    for agent in range(agent_amt):
        _, br_eu = find_best_response(true_profile, profile, agent)
        if br_eu > eu[agent]:
            # reported preference is not a best response, so this is not a PNE
            return False
    return True


# Performs best-response learning
# Returns the PNE and the amount of steps to reach it if a PNE is found
# Returns None and the amount of steps to reach a loop and the length of the loop if a loop is found
def best_response_learning(true_profile, profile, visited=None, depth = 0):
    # print("Current profile: ", profile)
    if visited is None:
        visited = []
    # Loop detected, no PNE found here
    h = hash_profile(profile)
    if h in visited:
        return [None, visited.index(h), depth - visited.index(h)]
    
    visited.append(h)
    agent_amt = len(profile)
    house_amt = len(profile[0])
    
    matrix = probability_matrix(profile)
    eu = expected_utilities(matrix, true_profile)
    # print("Current EUs: ", eu)
    for agent in range(agent_amt):
        br, br_eu = find_best_response(true_profile, profile, agent)
        if br_eu > eu[agent]:
            # print(agent, br_eu, eu)
            profile = deepcopy(profile)
            profile[agent] = br
            return best_response_learning(true_profile, profile, visited, depth + 1)
    return [profile, depth]
        

# Data collection

We produce two files for each combination of amount of agents and amount of houses that we want to test.
The first file contains only data about a true preference.
The second file contains data for true/reported preference pairs.
The amount of samples in second file, as well as the amount of agents and houses, is mostly limited by computational cost, as the game matrix grows exponentially with amount of agents and by the factorial of the number of houses.
We collect the following data for arbitrarily generated profiles:
- The content of the preference profile by agent
- Whether the true preference profile is a PNE
- What portion of possible reported profiles are a PNE (this is estimated by sampling as testing the entire matrix may be infeasible)
- Whether performing deterministic best-response learning from an arbitrary position results in a PNE
- If a PNE is reached, how many iterations it took to get there
- If a loop is reached, how many steps it took to enter the loop
- If a loop is reached, how many steps long the loop is

In [None]:
# FOR DATA COLLECTION: produce identifier data from a true profile
def true_id_data(id, profile):
    agent_amt = len(profile)
    data = {}
    data["ID"] = id
    for agent in range(agent_amt):
        data[f"P_{agent}"] = "-".join(map(str, profile[agent]))
    return data

# FOR DATA COLLECTION: produce identifier data from a sampled profile
def sampled_id_data(true_id, sample_id, true_profile, profile):
    agent_amt = len(profile)
    data = true_id_data(true_id, true_profile)
    data["sample ID"] = sample_id
    for agent in range(agent_amt):
        data[f"a_{agent}"] = "-".join(map(str, profile[agent]))
    return data

# FOR DATA COLLECTION: write data to file
def write_csv(name, data):
    folder = "data"
    os.makedirs(folder, exist_ok=True)
    with open(os.path.join(folder, name + ".csv"), mode="w", newline="") as file:
        writer = csv.DictWriter(file, fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)


SAMPLES = 10 # samples per category
RANDOM_TESTS = 10 # for each sample we want to do a number of tests from arbitrary positions
MAX_AGENTS = 5
MAX_HOUSES = 4
for house_amt in range(2, MAX_HOUSES + 1):
    for agent_amt in range(2, MAX_AGENTS + 1):
        pref_data = []
        sampled_data = []
        for i in range(SAMPLES):
            true_profile = generate_profile(agent_amt, house_amt)
            # data that applies to one true profile
            pref_datapoint = true_id_data(i, true_profile)
            pref_datapoint["is_PNE"] = "yes" if is_PNE(true_profile, true_profile) else "no"
            pref_datapoint["est. PNEs%"] = 0 # gets filled in within the sampling loop

            for j in range(RANDOM_TESTS):
                profile = generate_profile(agent_amt, house_amt)
                if is_PNE(true_profile, profile):
                    pref_datapoint["est. PNEs%"] += 100 / RANDOM_TESTS
                
                # data that applies to individual samples
                sampled_datapoint = sampled_id_data(i, j, true_profile, profile)
                brl = best_response_learning(true_profile, profile)
                if brl[0] is not None:
                    sampled_datapoint["PNE reached"] = "yes"
                    sampled_datapoint["steps to PNE"] = brl[1]
                    # not applicable data should still get a key
                    sampled_datapoint["Steps to loop"] = "-"
                    sampled_datapoint["Loop length"] = "-"
                else:
                    sampled_datapoint["PNE reached"] = "no"
                    sampled_datapoint["Steps to loop"] = brl[1]
                    sampled_datapoint["Loop length"] = brl[2]
                    # not applicable data should still get a key
                    sampled_datapoint["steps to PNE"] = "-"
                sampled_data.append(sampled_datapoint)
            pref_data.append(pref_datapoint)
        write_csv(f"dat1_agents={agent_amt}_houses={house_amt}", pref_data)
        write_csv(f"dat2_agents={agent_amt}_houses={house_amt}", sampled_data)


# For testing code

In [None]:
True_profile = generate_profile(5, 5)
Profile = generate_profile(5, 5)
print(best_response_learning(True_profile, Profile))