In [1]:
import numpy as np
import torch
import sys
sys.path.append('../')
from voting_games.werewolf_env_v0 import plurality_env, plurality_Phase, plurality_Role
import random
import copy
from typing import Any, Generator, Optional, Tuple
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


Done
Done


In [14]:
env = plurality_env(num_agents=10, werewolves=2)
env.reset()

def random_coordinated_wolf(env):
    villagers_remaining = set(env.world_state["villagers"]) & set(env.world_state['alive'])
    wolves_remaining = set(env.world_state["werewolves"]) & set(env.world_state['alive'])

    target = random.choice(list(villagers_remaining))
    return {wolf: int(target.split("_")[-1]) for wolf in wolves_remaining}

def random_wolfs(env):
    return {wolf: env.action_space(wolf).sample() for
            wolf in set(env.world_state["werewolves"]) & set(env.world_state['alive'])}

def revenge_coordinated_wolf(env, actions = None):
    villagers_remaining = set(env.world_state["villagers"]) & set(env.world_state['alive'])
    wolves_remaining = set(env.world_state["werewolves"]) & set(env.world_state['alive'])

    # who tried to vote out a wolf last time?
    
    target = random.choice(list(villagers_remaining))
    # pick 
    for wolf in wolves_remaining:
        actions[wolf] = [0] * len(env.possible_agents)
        actions[wolf][int(target.split("_")[-1])] = -1
        for curr_wolf in wolves_remaining:
            actions[wolf][int(curr_wolf.split("_")[-1])] = 1
    # for wolf in env.werewolves_remaining:

def random_single_target_villager(env, agent):
    targets = set(env.world_state["alive"]) - set([agent])
    return int(random.choice(list(targets)).split("_")[-1])

# random_coordinated_wolf(env)
def random_agent_action(env, agent):
   return env.action_space(agent).sample()


In [9]:
def play_static_wolf_game(env, wolf_policy, villager_agent, num_times=100) -> tuple(plurality_Role):

    villager_wins = 0
    loop = tqdm(range(num_times))

    for _ in loop:
        next_observations, rewards, terminations, truncations, infos = env.reset()
        while env.agents:
            observations = copy.deepcopy(next_observations)
            actions = {}

            villagers = set(env.agents) & set(env.world_state["villagers"])
            wolves = set(env.agents) & set(env.world_state["werewolves"])

            # villager steps
            if env.world_state["phase"] != plurality_Phase.NIGHT:
                # villagers actions
                for villager in villagers:
                    actions[villager] = villager_agent(env, villager)

            # wolf steps
            actions = actions | wolf_policy(env)
        
            next_observations, rewards, terminations, truncations, infos = env.step(actions)

        winner = env.world_state['winners']
        if winner == plurality_Role.VILLAGER:
            villager_wins += 1

        loop.set_description(f"Villagers won {villager_wins} out of a total of {num_times} games")

env = plurality_env(num_agents=10, werewolves=2)
env.reset()

print("Random Coordinated Wolves")
print("\t vs. Single Target Random Villagers")
play_static_wolf_game(env, random_coordinated_wolf, random_single_target_villager, num_times=1000)
print("\t vs. Random Villagers")
play_static_wolf_game(env, random_coordinated_wolf, random_agent_action, num_times=1000)
print("------------------------------------\n")
print("Random Wolves")
print("\t vs. Single Target Random Villagers")
play_static_wolf_game(env, random_wolfs, random_single_target_villager, num_times=1000)
print("\t vs. Random Villagers")
play_static_wolf_game(env, random_wolfs, random_agent_action, num_times=1000)
print("------------------------------------\n")

Random Coordinated Wolves
	 vs. Single Target Random Villagers


Villagers won 105 out of a total of 1000 games: 100%|██████████| 1000/1000 [00:05<00:00, 176.66it/s]


	 vs. Random Villagers


Villagers won 51 out of a total of 1000 games: 100%|██████████| 1000/1000 [00:04<00:00, 200.31it/s]


------------------------------------

Random Wolves
	 vs. Single Target Random Villagers


Villagers won 689 out of a total of 1000 games: 100%|██████████| 1000/1000 [00:04<00:00, 200.93it/s]


	 vs. Random Villagers


Villagers won 589 out of a total of 1000 games: 100%|██████████| 1000/1000 [00:05<00:00, 175.50it/s]

------------------------------------






In [None]:
class PluralityAgent(torch.nn.Module):

    def __init__(self, num_actions, obs_size=None):

        self.critic = torch.nn.Sequential(
            self._layer_init(torch.nn.Linear(obs_size, 64)),
            torch.nn.Tanh(),
            self._layer_init(torch.nn.Linear(64,64)),
            torch.nn.Tanh(),
            self._layer_init(torch.nn.Linear(64,1), std=1.0),
        )

        self.actor = torch.nn.Sequential(
            self._layer_init(torch.nn.Linear(obs_size, 64)),
            torch.nn.Tanh(),
            self._layer_init(torch.nn.Linear(64,64)),
            torch.nn.Tanh(),
            self._layer_init(torch.nn.Linear(64, num_actions), std=0.01),
        )
    
    def _layer_init(self, layer, std=np.sqrt(2), bias_const=0.0):
        torch.nn.init.orthogonal_(layer.weight, std)
        torch.nn.init.constant_(layer.bias, bias_const)
        return layer

    def get_value(self, x):
        return self.critic(x)
    
    def get_action_and_value(self, x, action=None):
        logits = self.actor(x)

        probs = torch.distributions.categorical.Categorical(logits=logits)
        if action is None:
            action = probs.sample()
        return action, probs.log_prob(action), probs.entropy(), self.critic(x)

In [9]:
class PluralityRecurrentAgent(torch.nn.Module):

    def __init__(self, num_actions, obs_size=None, hidden_state_size=64):

        self.recurrent_layer = self._rec_layer_init(torch.nn.LSTM(obs_size, hidden_state_size, batch_first=True))
        self.fc1 = self._layer_init(torch.nn.Linear(obs_size,64))
        self.fc2 = self._layer_init(torch.nn.Linear(64,num_actions), std=0.01)

        self.critic = torch.nn.Sequential(
            self._layer_init(torch.nn.Linear(obs_size, 64)),
            torch.nn.Tanh(),
            self._layer_init(torch.nn.Linear(64,64)),
            torch.nn.Tanh(),
            self._layer_init(torch.nn.Linear(64,1), std=1.0),
        )

        self.actor = torch.nn.Sequential(
            self._layer_init(torch.nn.Linear(obs_size, 64)),
            torch.nn.Tanh(),
            self._layer_init(torch.nn.Linear(64,64)),
            torch.nn.Tanh(),
            self._layer_init(torch.nn.Linear(64, num_actions), std=0.01),
        )
    
    def _layer_init(self, layer, std=np.sqrt(2), bias_const=0.0):
        torch.nn.init.orthogonal_(layer.weight, std)
        torch.nn.init.constant_(layer.bias, bias_const)
        return layer

    def _rec_layer_init(layer, std=np.sqrt(2), bias_const=0.0):
        for name, param in layer.named_parameters():
            if "bias" in name:
                torch.nn.init.constant_(param, bias_const)
            if "weight" in name:
                torch.nn.init.orthogonal_(param, std)
        return layer

    def get_value(self, x):
        return self.critic(x)
    
    def get_action_and_value(self, x, recurrent_cell:torch.tensor, action=None):
        h = torch.nn.Tanh(self.fc1(x))
        h, recurrent_cell = self.recurrent_layer(h, recurrent_cell)
        
        logits = self.actor(x)

        probs = torch.distributions.categorical.Categorical(logits=logits)
        if action is None:
            action = probs.sample()
        return action, probs.log_prob(action), probs.entropy(), self.critic(x)

In [10]:
env = plurality_env()
observations, rewards, terminations, truncations, infos = env.reset()

def _rec_layer_init(layer, std=np.sqrt(2), bias_const=0.0):
    for name, param in layer.named_parameters():
        if "bias" in name:
            torch.nn.init.constant_(param, bias_const)
        if "weight" in name:
            torch.nn.init.orthogonal_(param, std)
    return layer
shape = env.convert_obs(observations["player_0"]["observation"]).shape

lstm = _rec_layer_init(torch.nn.LSTM(shape[0], 2, batch_first=True))

In [5]:
# initial hidden states for the given sequences
hxs = torch.zeros((1), 18, dtype=torch.float32)
cxs = torch.zeros((1), 18, dtype=torch.float32)

obs = torch.tensor(env.convert_obs(observations["player_0"]["observation"]), dtype=torch.float32)
lstm(torch.unsqueeze(obs, 0), (torch.zeros((1), 18, dtype=torch.float32), torch.zeros((1), 18, dtype=torch.float32)))

out, (hxsi, cxsi) = lstm(torch.unsqueeze(obs, 0), (torch.zeros((1), 32, dtype=torch.float32), torch.zeros((1), 32, dtype=torch.float32)))


AttributeError: 'numpy.ndarray' object has no attribute 'dim'

In [17]:
class RolloutBuffer():
    
    def __init__(self, 
                 buffer_size: int, 
                 gamma: float, 
                 gae_lambda: float,
                 is_recurrent: bool,
                 recurrent_size: int = None,
                 ):
        '''
            @bufffer_size: This is the number of trajectories
        '''

        self.rewards = None
        self.actions = None
        self.dones = None
        self.observations = None

        # do we want these for both actor and critic?
        self.actor_hxs = None 
        self.actor_cxs = None
        self.critic_hxs = None 
        self.critic_cxs = None


        self.log_probs = None
        self.values = None
        self.advantages = None

        self.buffer_size = buffer_size
        self.gamma = gamma 
        self.gae_lambda = gae_lambda
        self.is_recurrent = is_recurrent
        if self.is_recurrent:
            self.recurrent_size = recurrent_size
        
        self.reset()

    def reset(self):
        self.rewards = []
        self.actions = []
        self.dones = []
        self.observations = []

        # do we want these for both actor and critic?
        self.hxs = [] 
        self.cxs = []

        self.log_probs = []

        # to be calculated by _calculate_advantages
        self.values = []
        self.advantages = []
        self.returns = []

    def add_replay(self, game) -> bool:
         
         self.rewards.append(game['rewards'])
         self.actions.append(game['actions'])
         self.dones.append(game["terms"])
         self.observations.append(game["obs"])
         self.log_probs.append(game["logprobs"])
         self.values.append(game["values"])
        
         advantages, values, returns = self._calculate_advantages(game)

         return True
    
    def _calculate_advantages(self, game):
        """Generalized advantage estimation (GAE)
            Arguments:
                last_value {torch.tensor} -- Value of the last agent's state
                gamma {float} -- Discount factor
                lamda {float} -- GAE regularization parameter
        """
        return 0, 0, 0
        with torch.no_grad():
            last_advantage = 0
            mask = torch.tensor(game["dones"]).logical_not() # mask values on terminal states
            rewards = torch.tensor(game["rewards"])
            for t in reversed(range(len(game["obs"]))):
                last_value = last_value * mask[:, t]
                last_advantage = last_advantage * mask[:, t]
                delta = rewards[:, t] + gamma * last_value - self.values[:, t]
                last_advantage = delta + gamma * lamda * last_advantage
                self.advantages[:, t] = last_advantage
                last_value = self.values[:, t]

        return advantages, values, returns


In [22]:
def fill_buffer(env, wolf_policy, villager_policy, num_times=10) -> RolloutBuffer:

    buffer = RolloutBuffer(buffer_size=10, 
                           gamma=0.90, 
                           gae_lambda=0.90,
                           is_recurrent=True)
    buffer.reset()

    for _ in range(num_times):
        ## Play the game 
        next_observations, rewards, terminations, truncations, infos = env.reset()
        magent_obs = {agent: {'obs': [], 
                              'rewards': [], 
                              'actions': [], 
                              'logprobs': [], 
                              'values': [], 
                              'terms': []} for agent in env.agents if not env.agent_roles[agent]}
        while env.agents:
            observations = copy.deepcopy(next_observations)
            actions = {}

            villagers = set(env.agents) & set(env.world_state["villagers"])
            wolves = set(env.agents) & set(env.world_state["werewolves"])

            # villager steps
            if env.world_state["phase"] != plurality_Phase.NIGHT:
                # villagers actions
                for villager in villagers:
                    obs = torch.Tensor(env.convert_obs(observations[villager]['observation']))

                    # TODO: Testing this, we may need a better way to pass in villagers
                    ml_action,  logprobs, _, value = villager_policy(obs, villager, env)
                    actions[villager] = ml_action

                    # can store some stuff 
                    magent_obs[villager]["obs"].append(obs)
                    magent_obs[villager]["actions"].append(ml_action)
                    magent_obs[villager]["logprobs"].append(logprobs)
                    magent_obs[villager]["values"].append(value)

            # wolf steps
            actions = actions | wolf_policy(env)
        
            next_observations, rewards, terminations, truncations, infos = env.step(actions)

            for villager in villagers:
                    if env.history[-1]["phase"] == plurality_Phase.NIGHT:
                        magent_obs[villager]["rewards"][-1] += rewards[villager]
                        magent_obs[villager]["terms"][-1] = terminations[villager]
                    else:
                        magent_obs[villager]["rewards"].append(rewards[villager])
                        magent_obs[villager]["terms"].append(terminations[villager])

        ## Fill bigger buffer, keeping in mind sequence
        for agent in magent_obs:
            buffer.add_replay(magent_obs[agent])
    
    return buffer
        

env = plurality_env(num_agents=10, werewolves=2)
env.reset()

def test_policy(obs, agent=None, env=None):
   #
   return env.action_space(agent).sample(), np.random.uniform(0,1), None, np.random.randn()

def test_recurrent_policy(obs, agent=None, env=None):
    # we need to return the hx and cx from the model, chyou know? and also have an initial one of 0 to feed the model the first time
    


buff = fill_buffer(env, random_coordinated_wolf, test_policy, num_times=10)


In [25]:
len(buff.rewards)

80