In [13]:
import torch
import itertools

In [14]:
# deviations are tuples, first index of deviation must always specify player (at least for now)

In [15]:
class InverseCorrelatedEquilibriumProblem:
    
    def __init__(self,
                 K,
                 player_action_dims,
                 observed_strategy,
                 payoff_features,
                 deviations_dim,
                 get_deviation_iter,
                 apply_deviation):
        self.num_players = len(player_action_dims)
        self.player_action_dims = player_action_dims
        self.observed_strategy = observed_strategy
        self.payoff_features_fn = payoff_features
        self.deviations_dim = deviations_dim
        self.get_deviation_iter = get_deviation_iter
        self.apply_deviation_fn = apply_deviation
        self.memoize_regrets_dict = {}
        assert self.deviations_dim[0] == self.num_players
        self.K = K
    
    def enumerate_joint_actions(self):
        return itertools.product(*[range(d) for d in self.player_action_dims])
    
    def predicted_strategy(self, theta):
        unnormalized_dist = torch.zeros(*self.player_action_dims)
        # dot product of each regret feat with each theta
        for joint_action in self.enumerate_joint_actions():
            action_regret_feats = self.compute_phi_regrets_for_action(torch.tensor(list(joint_action)))
            action_regret_scalars = torch.sum(action_regret_feats * theta, dim=len(theta.shape)-1)
            unnormalized_dist[joint_action] = torch.exp(-torch.sum(action_regret_scalars))
        Z = torch.sum(unnormalized_dist)
        return unnormalized_dist / Z

    def compute_phi_regrets_for_action(self, action_tens):
        key = tuple(action_tens.numpy())
        if key in self.memoize_regrets_dict:
            return self.memoize_regrets_dict[key]
        
        
    # these are the instantaneous regrets for all the specific deviations
        regret_feats = torch.zeros(*self.deviations_dim, self.K, requires_grad=False)
        dev_iter = self.get_deviation_iter(self.player_action_dims)
        for deviation in dev_iter():
            deviation_applied = self.apply_deviation_fn(action_tens, deviation)
            # get regrets for specific player only (player is specified by 0 of deviation)
            regret_feats[deviation] = self.payoff_features_fn(deviation_applied)[deviation[0]] - self.payoff_features_fn(action_tens)[deviation[0]]
        self.memoize_regrets_dict[key] = regret_feats
        return regret_feats
    
    def compute_expected_regret_feats(self, action_dist):
        total_regret_feats = torch.zeros(*self.deviations_dim, self.K, requires_grad=False)
        n = 0
        for joint_action in self.enumerate_joint_actions():
            n += 1
            total_regret_feats += action_dist[joint_action] * self.compute_phi_regrets_for_action(torch.tensor(list(joint_action)))
        return total_regret_feats / n
    
    
    def analytic_gradient(self, theta):
        dev_iter = self.get_deviation_iter(self.player_action_dims)
        regret_feats_observed = self.compute_expected_regret_feats(self.observed_strategy)
        regret_feats_predicted = self.compute_expected_regret_feats(self.predicted_strategy(theta))
        g = torch.zeros_like(theta, requires_grad=False)
        for deviation in dev_iter():
            this_deviation_theta = theta[deviation].view(*[1 for _ in deviation],-1) # unsqueeze to broadcast
            # sorry that is a really hacky way to do it, but i think it does what we want
            # i.e. add one empty dim for all dims of deviations, then -1 for the dim that is size K
            little_scalar_regrets = torch.sum(regret_feats_observed * this_deviation_theta, dim=len(theta.shape)-1)
            #  now argmax
            
            fstar_ind = little_scalar_regrets.argmax()
            
            g[deviation] = regret_feats_observed.view(-1,self.K)[fstar_ind] - regret_feats_predicted[deviation]
        return g
            

    def maxent_dual_objective(self, theta):
        bigZ = torch.tensor(0.0, requires_grad=True)
    
        # for each joint action in A
        for joint_action in self.enumerate_joint_actions():
            little_r_a_feats = self.compute_phi_regrets_for_action(torch.tensor(list(joint_action)))
            # scalar features for all deviations f with their own theta_fs
            little_r_a_scalar = torch.sum(little_r_a_feats * theta, dim=len(theta.shape)-1)
            # sum up, exp, add to Z
            bigZ = bigZ + torch.exp( -torch.sum(little_r_a_scalar))
        obj = torch.log(bigZ)
        # computing expected big regret for theta_f is max over phi_f of r_f(predicted | theta_f)
        # phi_f here is just the whole phi
        expected_er_feats = self.compute_expected_regret_feats(self.observed_strategy)

        # for each deviation
        dev_iter = self.get_deviation_iter(self.player_action_dims)
        for deviation in dev_iter():
            this_deviation_theta = theta[deviation].view(*[1 for _ in deviation],-1) # unsqueeze to broadcast
            # sorry that is a really hacky way to do it, but i think it does what we want
            # i.e. add one empty dim for all dims of deviations, then -1 for the dim that is size K
            little_scalar_regrets = torch.sum(expected_er_feats * this_deviation_theta, dim=len(theta.shape)-1)
            # little_scalar_regrets contains the regret for theta_f for all the different fs
            big_Regret = torch.max(little_scalar_regrets)
            obj = obj + big_Regret
        return obj

In [16]:
def external_enumerator(player_action_dims):
    def e():
        for i in range(len(player_action_dims)):
            for j in range(player_action_dims[i]):
                yield (i, j)
    return e

In [17]:
def switch_enumerator(player_action_dims):
    def e():
        for i in range(len(player_action_dims)):
            for j in range(player_action_dims[i]):
                for k in range(player_action_dims[i]):
                    yield (i, j, k)
    return e

In [18]:
def apply_external_deviation(action_tens, deviation):
    new_action_tens = torch.clone(action_tens)
    player, action = deviation
    new_action_tens[player] = action
    return new_action_tens

In [19]:
def apply_switch_deviation(action_tens, deviation):
    new_action_tens = torch.clone(action_tens)
    player, actionx, actiony = deviation
    if new_action_tens[player] == actionx:
        new_action_tens[player] = actiony
    return new_action_tens

In [20]:
def optimize_analytic(prob_obj, theta, epochs=100, lr=0.1):
    for i in range(epochs):
        g = prob_obj.analytic_gradient(theta)
        theta -= lr*g
    return theta

# chicken

In [21]:
def chicken_feats(action_tuple):
    p1, p2 = action_tuple
    # 0 is drive, 1 is swerve
    # for utility vectors first dim is crash, second dim is look cool, third dim is look like a wimp
    if p1 == 0:
        if p2 == 0:
            return torch.tensor([[1.0,0.0,0.0], [1.0,0.0,0.0]])
        if p2 == 1:
            return torch.tensor([[0.0,1.0,0.0], [0.0,0.0,1.0]])
    elif p1 == 1:
        if p2 == 0:
            return torch.tensor([[0.0,0.0,1.0], [0.0,1.0,0.0]])
        if p2 == 1:
            return torch.tensor([[0.0,0.0,1.0], [0.0,0.0,1.0]])
        
# explicit payoffs for util vector [-5.0, 1.0, 0.0], for reference
chicken_payoffs = torch.tensor([
    [[-5.0, 1.0],[0.0,0.0]],
    [[-5.0, 0.0],[1.0,0.0]]
])

# pure nash equilibrium

for some reason, the output trying to recover a pure Nash equilibrium by using external deviations doesn't work. Admittedly a pure Nash EQ is very low entropy, but the result seems to be a correlated equilibrium and not a Nash equilibrium.

In [217]:
pure_nash_chicken = torch.tensor([[0.0,1.0],[0.0,0.0]])
pure_nash_chicken

tensor([[0., 1.],
        [0., 0.]])

In [218]:
chicken_obj_ext = InverseCorrelatedEquilibriumProblem(3, (2,2), pure_nash_chicken, chicken_feats, (2,2), external_enumerator, apply_external_deviation)

In [219]:
chicken_theta =  torch.zeros(2,2,3)

In [220]:
chicken_obj_ext.predicted_strategy(chicken_theta)

tensor([[0.2500, 0.2500],
        [0.2500, 0.2500]])

In [221]:
optimize_analytic(chicken_obj_ext,  chicken_theta, epochs=10000,  lr=0.01)

tensor([[[-0.5921,  1.1846, -0.5925],
         [-1.1846,  0.5921,  0.5925]],

        [[-0.5921,  1.1846, -0.5925],
         [-1.1846,  0.5921,  0.5925]]])

In [222]:
chicken_obj_ext.predicted_strategy(chicken_theta)

tensor([[0.0139, 0.4861],
        [0.4861, 0.0139]])

# mixed Nash equilibrium

In [188]:
mixed_nash_chicken = torch.tensor([0.1667,.8333]).view(-1,1) @ torch.tensor([.1667, .8333]).view(1,-1)

In [189]:
mixed_nash_chicken

tensor([[0.0278, 0.1389],
        [0.1389, 0.6944]])

In [190]:
chicken_obj_ext = InverseCorrelatedEquilibriumProblem(3, (2,2), mixed_nash_chicken, chicken_feats, (2,2), external_enumerator, apply_external_deviation)

In [191]:
chicken_analytic =  torch.zeros(2,2,3)

In [192]:
chicken_obj_ext.predicted_strategy(chicken_analytic)

tensor([[0.2500, 0.2500],
        [0.2500, 0.2500]])

In [193]:
optimize_analytic(chicken_obj_ext,  chicken_analytic, epochs=10000,  lr=0.1)

tensor([[[ 0.3250, -0.1992, -0.1258],
         [-0.6867, -0.1625,  0.8492]],

        [[ 0.3250, -0.1992, -0.1258],
         [-0.6867, -0.1625,  0.8492]]])

In [194]:
chicken_obj_ext.predicted_strategy(chicken_analytic)

tensor([[0.0278, 0.1389],
        [0.1389, 0.6944]])

# correlated equilibrium

In [195]:
# this is another correlated equilibrium per John's slides. sometimes both people are cowardly in this one.
corr_chicken = torch.tensor([[0.0,0.4],[0.4,0.2]])

In [196]:
correq_analytic  = torch.zeros(2,2,2,3, requires_grad=False)

In [197]:
chicken_obj_int = InverseCorrelatedEquilibriumProblem(3, (2,2), corr_chicken, chicken_feats, (2,2,2), switch_enumerator, apply_switch_deviation)

In [198]:
chicken_obj_int.predicted_strategy(correq_analytic)

tensor([[0.2500, 0.2500],
        [0.2500, 0.2500]])

In [199]:
optimize_analytic(chicken_obj_int, correq_analytic,  epochs=10000)

tensor([[[[ 0.0000,  0.0000,  0.0000],
          [-1.8500,  0.8394,  1.0106]],

         [[-0.2194,  0.2888, -0.0694],
          [ 0.0000,  0.0000,  0.0000]]],


        [[[ 0.0000,  0.0000,  0.0000],
          [-1.8500,  0.8394,  1.0106]],

         [[-0.2194,  0.2888, -0.0694],
          [ 0.0000,  0.0000,  0.0000]]]])

In [200]:
chicken_obj_int.predicted_strategy(correq_analytic)

tensor([[0.0013, 0.3996],
        [0.3996, 0.1994]])

In [201]:
# this is a social-welfare-maximizing correlated equilibrium. Computed by Kevin's code, or it just makes sense.
corr_chicken = torch.tensor([[0.0,0.5], [0.5,0.0]])
corr_chicken_approx = torch.tensor([[0.0,0.46], [0.54,0.0]])

correq_theta = torch.ones(2,2,2,3)
chicken_obj_int = InverseCorrelatedEquilibriumProblem(3, (2,2), corr_chicken_approx, chicken_feats, (2,2,2), switch_enumerator, apply_switch_deviation)

In [202]:
chicken_obj_int.predicted_strategy(correq_theta)

tensor([[0.2500, 0.2500],
        [0.2500, 0.2500]])

In [203]:
optimize_analytic(chicken_obj_int,  correq_theta, epochs=10000)

tensor([[[[ 1.0000,  1.0000,  1.0000],
          [-0.9752,  1.9861,  1.9886]],

         [[ 0.0139,  2.9751,  0.0110],
          [ 1.0000,  1.0000,  1.0000]]],


        [[[ 1.0000,  1.0000,  1.0000],
          [-0.9752,  1.9861,  1.9886]],

         [[ 0.0139,  2.9751,  0.0110],
          [ 1.0000,  1.0000,  1.0000]]]])

In [204]:
chicken_obj_int.predicted_strategy(correq_theta)

tensor([[0.0013, 0.4987],
        [0.4987, 0.0013]])