# Functions to import a trained MLP

In [10]:
import pref_voting.generate_utility_profiles
import pref_voting.utility_profiles
import pref_voting.utility_functions
import pref_voting.voting_methods
import pref_voting.profiles
import pref_voting.generate_profiles
import torch
import torch.nn as nn
import torch.nn.functional as F
import itertools

import matplotlib.pyplot as plt
import numpy as np

import math

import pickle
from tqdm.notebook import tqdm

import glob

import pandas as pd
import seaborn as sns

DEVICE = 'mps'

In [11]:
def generate_permutations(n):
    
    # create a list from 0 to n-1
    num_list = list(range(n))

    # generate all permutations
    perms = list(itertools.permutations(num_list))

    return perms

permutations_of = dict()
permutations_of = {
    i: generate_permutations(i)
    for i in range(3, 7)
}

permutations_index_dict = {
    i : {
            tuple(permutations_of[i][j]) : j 
            for j in range(len(permutations_of[i]))
        }
    for i in range(3, 7)
}

In [12]:
def to_linear_prof(uprof):
    """Convert a utility profile to a profile of linear orders.""" 
    return pref_voting.profiles.Profile([sorted(uprof.domain, key=lambda x: u(x), reverse=True) for u in uprof.utilities])

def clone_voter(prof, manip_weight = 1): 
    """
    Clone the first voter so that there are an additional manip_weight - 1 copies of the first voter.  Return the profile with the clones of the voter. 
    """
    rankings, rcounts = prof.rankings_counts
    rcounts = list(rcounts)
    rankings = list([tuple(r) for r in rankings])
    new_rcounts = [rcounts[0] + (manip_weight - 1)] + rcounts[1:]
    return pref_voting.profiles.Profile(rankings, rcounts=new_rcounts)

def apply_manipulation(prof_with_clones, new_ranking, manip_weight):
    """
    Given a profile (with clones), replace manip_weight copies of voter 1's ranking with new_ranking.   It is assumed that prof_with_clones has manip_weight copies of voter 0. 
    """
    return pref_voting.profiles.Profile([new_ranking] * manip_weight + prof_with_clones.rankings[manip_weight:])


In [13]:
class Agent(nn.Module):
    def __init__(self, input_dim, output_dim, classification=False, layers=[128, 64, 32]):
        super().__init__()

        self.output_dim = math.factorial(output_dim)  # action space is [0, (output_dim)! - 1]

        module_list = []

        layers = [input_dim] + layers + [self.output_dim]

        for i in range(1, len(layers)):
            module_list.append(
                nn.Linear(layers[i - 1], layers[i]),
            )
            if i != len(layers) - 1:
                module_list.append(
                    nn.LeakyReLU(),
                )

        if classification:
            module_list.append(
                nn.Sigmoid(),
            )
        else:
            module_list.append(
                nn.Softmax(dim=-1),
            )

        self.model = nn.Sequential(*module_list)

        print(self.model)


    def forward(self, manipulator_utilities, additional_contexts): # returns (action_probs, action) # try borda/plurality scores
        # context = [bs, n_c] (utility of voter 0)
        # plurality_scores = [bs, n_c]

        context = torch.cat([manipulator_utilities, additional_contexts], dim=-1)

        action_probs = self.model(context)

        dist=torch.distributions.categorical.Categorical(probs=action_probs)
        actions = dist.sample()

        # action_probs: [BS, num_possible_actions]
        # actions: [BS,]
        return action_probs, actions

cross_entropy_loss = nn.CrossEntropyLoss()


In [14]:
def generate_score_context(profs, scoring_rule = 'plurality', device=DEVICE):

    if scoring_rule == 'plurality':
        scores = [ prof.plurality_scores() for prof in profs] # list of dicts
    elif scoring_rule == 'borda':
        scores = [ prof.borda_scores() for prof in profs] # list of dicts

    final_scores = []
    for score in scores:

        scores_list = []
        for cand_index in sorted(score.keys()):
            scores_list.append(score[cand_index])

        final_scores.append(scores_list)

    return torch.tensor(
        final_scores,
        device=device,
    ) # [bs, n_c]


def generate_majority_contexts(profs, num_cands, device=DEVICE):
    # generates margin contexts

    # profs: list of profiles
    # outputs: [bs, num_cands * num_cands]

    bs = len(profs)
    
    contexts = torch.zeros((bs, num_cands, num_cands), device=device)
    
    for pidx, prof in enumerate(profs): 
        for c1 in prof.candidates: 
            for c2 in prof.candidates: 
                if prof.majority_prefers(c1, c2):
                    contexts[pidx, c1, c2] = 1.0
                elif prof.majority_prefers(c2, c1): 
                    contexts[pidx, c1, c2] = -1.0
                else:
                    contexts[pidx, c1, c2] = 0.0

    contexts = torch.flatten(contexts, start_dim=1) # [bs, num_cands * num_cands]
    return contexts


def generate_sincere_winners_contexts(profs, num_cands, vm, device=DEVICE):

    bs = len(profs)

    contexts = torch.zeros((bs, num_cands), device=device)

    for pidx, prof in enumerate(profs):
        ws = vm(prof)
        for c in ws:
            contexts[pidx, c] = 1.0

    return contexts


In [15]:
def reward_function(
    actions, 
    utility_fns, 
    profs, 
    vm, 
    num_cands, 
    manip_weight, 
    metric_op='normalized_subtract'
    ):
    # actions = [BS,]
    # uprofs = list of uprofs
    # vm = voting method fn

    profs_with_clones = [clone_voter(prof, manip_weight=manip_weight) for prof in profs]

    ws_batch = [vm(prof) for prof in profs_with_clones]
    cands_batch = [prof.candidates for prof in profs_with_clones]

    exp_util_ws_batch = torch.tensor([
        np.average([utility_fn(w) for w in ws])
        for utility_fn, ws in zip(utility_fns, ws_batch)
    ]).float() # [BS,]
    exp_util_ws_batch = exp_util_ws_batch.to(DEVICE)

    max_util_batch = torch.tensor([
        np.max([utility_fn(c) for c in cands])
        for utility_fn, cands in zip(utility_fns, cands_batch)
    ]).float() # [BS,]
    max_util_batch = max_util_batch.to(DEVICE)

    min_util_batch = torch.tensor([
        np.min([utility_fn(c) for c in cands])
        for utility_fn, cands in zip(utility_fns, cands_batch)
    ]).float() # [BS,]
    min_util_batch = min_util_batch.to(DEVICE)

    new_profs = [
        apply_manipulation(prof, permutations_of[num_cands][action], manip_weight)
        for prof, action in zip(profs_with_clones, actions)
    ]

    new_ws_batch = [vm(new_prof) for new_prof in new_profs]
    
    new_exp_util_ws_batch = torch.tensor([
        np.average([utility_fn(w) for w in new_ws])
        for utility_fn, new_ws in zip(utility_fns, new_ws_batch)
    ]).float() # [BS,]

    new_exp_util_ws_batch = new_exp_util_ws_batch.to(DEVICE)

    if metric_op == 'subtract':
        reward = new_exp_util_ws_batch - exp_util_ws_batch
    elif metric_op == 'divide':
        reward = new_exp_util_ws_batch / exp_util_ws_batch
    elif metric_op == 'normalized_subtract':
        reward = (new_exp_util_ws_batch - exp_util_ws_batch) / (max_util_batch - min_util_batch)

    return reward

In [16]:
def get_profits_actions(
    agent, 
    batch_size, 
    vm, 
    num_cands, 
    num_voters, 
    manip_weight, 
    elections, 
    decision_rule='argmax', 
    metric_op="normalized_subtract", 
    agent_infos=('plurality_scores',)):

    manipulator_utility_fns, profiles = elections

    manipulator_utilities = torch.tensor(
        [
            [m_util_fn(i) for i in range(num_cands)]
            for m_util_fn in manipulator_utility_fns
        ],
    ).float().to(DEVICE)

    additional_contexts = None # guarantee that this is of shape [bs, x]

    additional_contexts = [] # guarantee that each entry is of shape [bs, x]

    for agent_info in agent_infos:
        additional_context = None

        if agent_info == 'full':
            additional_context = generate_full_knowledge_contexts(
                profiles,
                num_cands=num_cands,
                num_voters=num_voters,
                device=DEVICE
            )

        elif agent_info == 'anon_prof':
            additional_context = generate_anon_prof_contexts(
                profiles,
                num_cands=num_cands,
                device=DEVICE,
            )

        elif agent_info == 'plurality_scores':

            additional_context = generate_score_context(
                profiles,
                scoring_rule='plurality',
                device=DEVICE,
            ).float()
            
        elif agent_info == 'plurality_ranking':

            additional_context = generate_score_ranking_context(
                profiles,
                scoring_rule='plurality',
                device=DEVICE,
            ).float()
            
        elif agent_info == 'borda_scores':
            additional_context = generate_score_context(
                profiles,
                scoring_rule='borda',
                device=DEVICE,
            ).float()

        elif agent_info == 'margin':
            additional_context = generate_margin_contexts(
                profiles,
                num_cands=num_cands,
                device=DEVICE
            )
        elif agent_info == 'qual_margin':
            additional_context = generate_qual_margin_contexts(
                profiles,
                num_cands=num_cands,
                device=DEVICE
            )

        elif agent_info == 'majority':
            additional_context = generate_majority_contexts(
                profiles,
                num_cands=num_cands,
                device=DEVICE
            )


        elif agent_info == 'sincere_winners':
            additional_context = generate_sincere_winners_contexts(
                profiles,
                num_cands=num_cands,
                vm=vm,
                device=DEVICE
            )
            
        additional_contexts.append(additional_context)

    additional_contexts = torch.cat(additional_contexts, dim=-1)

    action_probs_batch, actions_batch = agent(manipulator_utilities, additional_contexts)

    if decision_rule == 'expectation':

        eval_result = torch.zeros((batch_size,)).to(DEVICE)

        for i in range(action_probs_batch.shape[-1]):
            actions_batch = torch.ones_like(actions_batch) * i

            reward_val = reward_function(
                actions=actions_batch,
                utility_fns=manipulator_utility_fns,
                profs=profiles,
                vm=vm,
                num_cands=num_cands,
                manip_weight=manip_weight,
                metric_op=metric_op,
            )

            eval_result += reward_val * action_probs_batch[:, i]
    elif decision_rule == 'argmax':
        # using the argmax
        actions_batch = torch.argmax(action_probs_batch, dim=-1)
        eval_result = reward_function(
                actions=actions_batch,
                utility_fns=manipulator_utility_fns,
                profs=profiles,
                vm=vm,
                num_cands=num_cands,
                manip_weight=manip_weight,
                metric_op=metric_op,
            )
    elif decision_rule == 'distribution':
        eval_result = reward_function(
            actions=actions_batch,
            utility_fns=manipulator_utility_fns,
            profs=profiles,
            vm=vm,
            num_cands=num_cands,
            manip_weight=manip_weight,
            metric_op=metric_op,
        )
    else:
        raise Exception("pick one")
    
    return eval_result, actions_batch


In [17]:
def generate_utility_profile(
        num_cands, 
        num_voters, 
        probmodel = 'uniform', 
        num_profiles = 1
        ): 

    if probmodel == 'uniform': 
        return pref_voting.generate_utility_profiles.generate_utility_profile_uniform(num_cands, num_voters, num_profiles = num_profiles)
    
    elif probmodel == 'spatial_2dim':
        ndims = 2
        sprofs = pref_voting.generate_spatial_profiles.generate_spatial_profile(num_cands, num_voters, ndims, num_profiles = num_profiles)
        return [sprof.to_utility_profile() for sprof in sprofs]

In [18]:
def get_agent(
    model_size,
    num_cands, 
    num_voters, 
    agent_infos, 
    vm, 
    probmodel='uniform', 
    labeling='satisfice', 
    gen=1,
    manip_weight=1
    ):
    # Get the trained model for the provided parameters

    agents = None

    # load the appropriate models dictionary
    for file in glob.glob(f"example_trained_models/Borda_1_6_10_uniform_('majority',)_1_08-18-2023_09-10-18.pickle"):
        print(f"Loading: {file}")
        agents = pickle.load(open(file, "rb"))

    if agents is None: 
        print(f"ERROR: models file not found:", f"models/models_{agent_infos}_{probmodel}_{labeling}_{manip_weight}/{vm.name}_{gen}_{num_cands}_{num_voters}_{probmodel}_{agent_infos}_{manip_weight}_*.pickle")
        return None

    # find the agent with the key[2] (the model_size) equal to the model_size
    for key in agents.keys(): 
        if key[2] == model_size:
            agent = agents[key][0].to(DEVICE)
            losses = agents[key][1]

    # plot the losses for the model
    plt.plot(losses)
    plt.title(f"model size {model_size} for {vm.name} losses")
    plt.show();
    
    return agent
