In [42]:
!pip install hanabi_learning_environment 


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Experiment with hanabipy environment:

In [43]:
from hanabi_learning_environment import pyhanabi
def card_to_matrix(card):
  assert len(card) == 2
  color = color_to_int(card[0])

  number = -1
  try:
    number = int(card[1]) - 1
  except:
    number = -1
  mat = np.zeros((5, 5))
  try:
    mat[color, number] = 1
  except:
    return mat
  return mat

def color_to_int(color):
  val = -1
  if color == 'R':
    val = 0
  elif color == 'Y':
    val = 1
  elif color == 'G':
    val = 2
  elif color == 'W':
    val = 3
  elif color == 'B':
    val = 4
  return val

def int_to_color(num):
  if num == 0:
    return 'R'
  elif num == 1:
    return 'Y'
  elif num == 2:
    return 'G'
  elif num == 3:
    return 'W'
  elif num == 4:
    return 'B'

def update_hand_hints(hand_hints, hint):
  # print(hint[19])
  is_rank = 1 if hint[19] == 'r' else 0
  zeros = np.zeros((5, ))
  if is_rank:
    player_offset = int(hint[17])
    player_moving = int(hint[37])
    rank = int(hint[24]) - 1
    revealed_cards = []
    for i in range(46, len(hint), 2):
      revealed_cards.append(int(hint[i]))
    unrevealed_cards = [i for i in range(5) if i not in revealed_cards]
    not_rank = list(range(5))
    not_rank.remove(rank)
    for card in revealed_cards:
      hand_hints[(player_offset + player_moving) % 3, card, :, not_rank] = zeros
    for card in unrevealed_cards:
      hand_hints[(player_offset + player_moving) % 3, card, :, rank] = zeros
  else:
    player_offset = int(hint[17])
    player_moving = int(hint[38])
    color = color_to_int(hint[25])
    revealed_cards = []
    for i in range(47, len(hint) - 1, 2):
      revealed_cards.append(int(hint[i]))
    unrevealed_cards = [i for i in range(5) if i not in revealed_cards]
    not_color = list(range(5))
    not_color.remove(color)
    for card in revealed_cards:
      hand_hints[(player_offset + player_moving) % 3, card, not_color, :] = zeros
    for card in unrevealed_cards:
      hand_hints[(player_offset + player_moving) % 3, card, color, :] = zeros

  return hand_hints

def encode_observation(observation, memory=True):
  # If time, add an option to add all observation features into state space (less reliant on memory)
  life_tokens = np.zeros((4, ))
  life_tokens[observation.life_tokens()] = 1
  info_tokens = np.zeros((9, ))
  info_tokens[observation.information_tokens()] = 1

  fireworks = np.zeros((5, 5))
  for i, firework in enumerate(observation.fireworks()):
    if firework == 5:
      continue
    fireworks[i, firework] = 1

  hand_hints = np.ones((3, 5, 5, 5))
  move_count = 0
  for i, move_obj in enumerate(observation.last_moves()):
    if move_count >= 3:
      break
    move = str(move_obj)
    # print(move, len(move))
    if len(move) > 15:
      move_count += 1
      if move[2] == 'R':
        hand_hints = update_hand_hints(hand_hints, move)

  hand_information = np.zeros((3, 5, 5, 5, 3))
  for hand_idx, hand in enumerate(observation.observed_hands()):
    for card_idx, card in enumerate(hand):
      hand_information[hand_idx, card_idx, :, :, 0] = card_to_matrix(str(card))
      hand_information[hand_idx, card_idx, :, :, 1] = fireworks
  
  hand_information[:, :, :, :, 2] = hand_hints

  state = {"life": life_tokens.reshape(1, 4), "info": info_tokens.reshape(1, 9), "hand": hand_information.reshape(1, 3, 5, 5, 5, 3)}
  return state

def encode_possible_moves(possible_moves):
  encoding = np.zeros((30,))

  for i in range(len(possible_moves)):
    
    move = str(possible_moves[i])
    if move[1] == 'D':
      encoding[int(move[9])] = 1
    elif move[1] == 'P':
      encoding[5 + int(move[6])] = 1
    elif move[1] == 'R':
      # print(move[18])
      player = int(move[16]) - 1
      color = 1 if move[18] == 'c' else 0
      # print(color)
      val = int(move[23]) - 1 if color == 0 else color_to_int(move[24])
      idx = 10 + 10 * player + 5 * color + val
      # print(idx, move)
      encoding[idx] = 1
  # print(encoding)
  return encoding

def possible_move_from_index(possible_moves, index):
  is_discard = False
  is_play = False
  num = 0
  if index < 5:
    is_discard = True
    num = index % 5
  elif index < 10:
    is_play = True
    num = (index - 5) % 5
  val = index - 10
  player = val // 10
  val = val % 10
  is_color = val // 5
  val = val % 5
  for i in range(len(possible_moves)):

    move = str(possible_moves[i])
    # print(move)
    if move[1] == 'D':
      if not is_discard:
        continue
      if num == int(move[9]):
        return possible_moves[i]
    elif move[1] == 'P':
      if not is_play:
        continue
      if num == int(move[6]):
        return possible_moves[i]
    elif move[1] == 'R':
      
      cur_player = int(move[16]) - 1
      cur_is_color = 1 if "color" in move else 0
      cur_val = int(move[23]) - 1 if cur_is_color == 0 else color_to_int(move[24])
      # print(10 + cur_player * 10 + 5 * cur_is_color + cur_val)
      if cur_player == player and cur_is_color == is_color and cur_val == val:
        return possible_moves[i]
  # print(possible_moves, index)
  return None

# def encode_state(state):
#   obs = state.observation(state.cur_player())
#   life_input = np.zeros((4, ))
#   life_input[obs.life_tokens()] = 1
#   info_input = np.zeros((9, ))
#   info_input[obs.information_tokens()] = 1
#   hand_information = None#encode_observation(obs)
#   return [life_input, info_input, hand_information]

In [44]:
# #Environment
# from hanabi_learning_environment import pyhanabi

# # Copyright 2018 Google LLC
# #
# # Licensed under the Apache License, Version 2.0 (the "License");
# # you may not use this file except in compliance with the License.
# # You may obtain a copy of the License at
# #
# #    https://www.apache.org/licenses/LICENSE-2.0
# #
# # Unless required by applicable law or agreed to in writing, software
# # distributed under the License is distributed on an "AS IS" BASIS,
# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# # See the License for the specific language governing permissions and
# # limitations under the License.

# """Example code demonstrating the Python Hanabi interface."""


import numpy as np
from hanabi_learning_environment import pyhanabi


def run_game(game_parameters):
  """Play a game, selecting random actions."""

  def print_state(state):
    """Print some basic information about the state."""
    print("")
    print("Current player: {}".format(state.cur_player()))
    print(state)

    # Example of more queries to provide more about this state. For
    # example, bots could use these methods to to get information
    # about the state in order to act accordingly.
    print("### Information about the state retrieved separately ###")
    print("### Information tokens: {}".format(state.information_tokens()))
    print("### Life tokens: {}".format(state.life_tokens()))
    print("### Fireworks: {}".format(state.fireworks()))
    print("### Deck size: {}".format(state.deck_size()))
    print("### Discard pile: {}".format(str(state.discard_pile())))
    print("### Player hands: {}".format(str(state.player_hands())))
    print("")

  def print_observation(observation):
    """Print some basic information about an agent observation."""
    print("--- Observation ---")
    print(observation)

    print("### Information about the observation retrieved separately ###")
    print("### Current player, relative to self: {}".format(
        observation.cur_player_offset()))
    print("### Observed hands: {}".format(observation.observed_hands()))
    print("### Card knowledge: {}".format(observation.card_knowledge()))
    print("### Discard pile: {}".format(observation.discard_pile()))
    print("### Fireworks: {}".format(observation.fireworks()))
    print("### Deck size: {}".format(observation.deck_size()))
    move_string = "### Last moves:"
    for move_tuple in observation.last_moves():
      move_string += " {}".format(move_tuple)
    print(move_string)
    print("### Information tokens: {}".format(observation.information_tokens()))
    print("### Life tokens: {}".format(observation.life_tokens()))
    print("### Legal moves: {}".format(observation.legal_moves()))
    print("--- EndObservation ---")

  def print_encoded_observations(encoder, state, num_players):
    print("--- EncodedObservations ---")
    print("Observation encoding shape: {}".format(encoder.shape()))
    print("Current actual player: {}".format(state.cur_player()))
    for i in range(num_players):
      print("Encoded observation for player {}: {}".format(
          i, encoder.encode(state.observation(i))))
    print("--- EndEncodedObservations ---")

  game = pyhanabi.HanabiGame(game_parameters)
  # print(game.parameter_string(), end="")
  obs_encoder = pyhanabi.ObservationEncoder(game, enc_type=pyhanabi.ObservationEncoderType.CANONICAL)

  state = game.new_initial_state()
  enc_move = None
  while not state.is_terminal():
    if state.cur_player() == pyhanabi.CHANCE_PLAYER_ID:
      state.deal_random_card()
      continue

    # print_state(state)

    observation = state.observation(state.cur_player())
    enc_move = encode_observation(observation)

    # print_observation(observation)
    # print_encoded_observations(obs_encoder, state, game.num_players())

    legal_moves = state.legal_moves()
    
    moves = encode_possible_moves(legal_moves)
    
    
    t = []
    for i in range(30):
      if moves[i] == 0:
        continue
      move = possible_move_from_index(legal_moves, i)
      if move == None:
        # print(i)
        # print(legal_moves)
        # print(moves)
        return
      t.append(possible_move_from_index(legal_moves, i))
      # print(possible_move_from_index(legal_moves, i))
    # print(len(t))
    # print(len(legal_moves))
    # print("")
    # print("Number of legal moves: {}".format(len(legal_moves)))

    move = np.random.choice(legal_moves)
    # print(observation.last_moves())
    # print("Chose random legal move: {}".format(move))
    # print(state.score())
    # print(state.cur_player())
    state.apply_move(move)
    # print(state.is_terminal())

  print("")
  print("Game done. Terminal state:")
  print("")
  print(state)
  print("")
  print("score: {}".format(state.score()))


if __name__ == "__main__":
  # Check that the cdef and library were loaded from the standard paths.
  assert pyhanabi.cdef_loaded(), "cdef failed to load"
  assert pyhanabi.lib_loaded(), "lib failed to load"
  run_game({"players": 3, "random_start_player": True})


Game done. Terminal state:

Life tokens: 0
Info tokens: 0
Fireworks: R1 Y0 G0 W1 B0 
Hands:
G4 || G4|G4
B3 || X3|RYWB3
G1 || G1|G1
Y3 || XX|RYWB1235
Y4 || X4|RYGWB4
-----
R2 || RX|R2345
B1 || X1|WB1
G1 || X1|RYGWB1
B5 || XX|RYGWB12345
-----
Y2 || XX|RYG2345
R5 || XX|RYG2345
B2 || BX|B2345
Y1 || X1|RYGWB1
W5 || XX|RYGWB2345
Deck size: 26
Discards: W4 B3 R4 R3 Y4 W4 R1 G4

score: 0


In [45]:
!pip install keras

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [46]:
import random
import numpy as np
import keras
from collections import deque
from keras.models import Sequential
from keras.layers import Dense, Activation, Flatten, Conv2D, MaxPooling2D, Lambda, Concatenate
from keras.optimizers import Adam
import numpy.ma as ma

In [47]:
a = [1, 2 ,3 ,4]
print(a[0:3])

[1, 2, 3]


In [48]:
class DQN_Agent:
    #
    # Initializes attributes and constructs CNN model and target_model
    #
    def __init__(self, action_size, self_talk_size, uncertain=False):
        self.action_size = action_size
        self.self_talk_size = self_talk_size
        self.memory = deque(maxlen=5000)
        self.uncertain = uncertain
        
        # Hyperparameters
        self.gamma = 0.9            # Discount rate
        self.epsilon = 1.0          # Exploration rate
        self.epsilon_min = 0.1      # Minimal exploration rate (epsilon-greedy)
        self.epsilon_decay = 0.9  # Decay rate for epsilon
        # self.update_rate = 1     # Number of steps until updating the target network
        
        # Construct DQN models
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.target_model.set_weights(self.model.get_weights())

    #
    # Constructs CNN
    #
    def _build_model(self):
      
      life_input = keras.Input(shape=(4,), name="life")
      info_input = keras.Input(shape=(9,), name="info")
      hand_input = keras.Input(shape=(3, 5, 5, 5, 3), name="hand")
      talk_input = keras.Input(shape=(self.self_talk_size,), name="talk")

      hand_outputs = []
      for i in range(3):
        card_outputs = []
        for j in range(5):
          out = Lambda(lambda x: x[:, i, j, :, :, :])(hand_input)
          out = Conv2D(filters=1, kernel_size=3, activation='relu')(out)
          out = Flatten()(out)
          out = Dense(32, activation='relu')(out)
          card_outputs.append(out)
        out = Concatenate()(card_outputs)
        out = Dense(32, activation='relu')(out)
        hand_outputs.append(out)
      hand_outputs.append(life_input)
      hand_outputs.append(info_input)
      hand_outputs.append(talk_input)
      out = Concatenate()(hand_outputs)
      out = Dense(64, activation='relu')(out)
      out = Dense(64, activation='relu')(out)
      actions_out = Dense(self.action_size + self.self_talk_size, activation='sigmoid')(out)
      # talk_out = Dense(self.self_talk_size, activation = 'sigmoid')(out)
      model = keras.Model(
        inputs=[life_input, info_input, hand_input, talk_input],
        outputs=[actions_out],
      )
      model.compile(loss='mse', optimizer=Adam())
      return model

    #
    # Stores experience in replay memory
    #
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def get_epsilon(self):
      return self.epsilon
    #
    # Chooses action based on epsilon-greedy policy
    #
    def act(self, state, enc_legal_moves):
        # Random exploration
        if np.random.rand() <= self.epsilon:
          idxs = enc_legal_moves.nonzero()
          return [random.choice(idxs[0]), np.zeros(self.self_talk_size)]
        
        act_talk_values = self.model.predict(state, verbose = 0)
        act_values = act_talk_values[0][0:self.action_size]
        
        # act_values += np.argmin(act_values)
        # print(act_values)
        act_values = act_values * enc_legal_moves
        uncertainty = 1
        if self.uncertain:
          maxes = act_values[np.argpartition(act_values, -5)[-5:]]
          mean = sum(maxes) / len(maxes)
          uncertainty = (1 + (sum([((x - mean) ** 2) for x in maxes]) / len(maxes)) ** 0.5)
        # print(enc_legal_moves)
        return [np.argmax(act_values), act_talk_values[0][self.action_size:] * uncertainty]  # Returns action using policy

    #
    # Trains the model using randomly selected experiences in the replay memory
    #
    def replay(self, batch_size):
        
        minibatch = random.sample(self.memory, batch_size)
        
        for state, action, reward, next_state, done in minibatch:
            # print(self.target_model.predict(next_state, verbose=0))
            if not done:
                target = reward + self.gamma * np.amax(self.target_model.predict(next_state, verbose=0))
            else:
                target = reward

            # print('test')
            # Construct the target vector as follows:
            # 1. Use the current model to output the Q-value predictions
            target_f = self.model.predict(state, verbose=0)
            
            # 2. Rewrite the chosen action value with the computed target
            target_f[0][action[0]] = target
            
            # 3. Use vectors in the objective computation
            self.model.fit(state, target_f, epochs=1, verbose=0)
            
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    #
    # Sets the target model parameters to the current model parameters
    #
    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())
            
    #
    # Loads a saved model
    #
    def load(self, name):
        self.model.load_weights(name)

    #
    # Saves parameters of a trained model
    #
    def save(self, name):
        self.model.save_weights(name)

In [49]:
import copy
def train_agent(game_parameters, episodes):
  action_size = 30 
  self_talk_size = 4
  agent = DQN_Agent(action_size, self_talk_size)

  batch_size = 8
  total_time = 0   # Counter for total number of steps taken
  all_rewards = 0  # Used to compute avg reward over time
  game = pyhanabi.HanabiGame(game_parameters)
  
  possible_moves = None
  game_scores = [] #has length episodes
  for e in range(episodes):
    print(e)
    print(agent.get_epsilon())
    # print(game.parameter_string(), end="")
    # obs_encoder = pyhanabi.ObservationEncoder(game, enc_type=pyhanabi.ObservationEncoderType.CANONICAL)

    state = game.new_initial_state()
    enc_obs = None
    game_score = 0
    done = False

    # enc_obs = [None, None, None]
    actions = [0, 0, 0]

    while not state.is_terminal():
      player = state.cur_player()
      if player == pyhanabi.CHANCE_PLAYER_ID:
        state.deal_random_card()
        continue
      
      total_time += 1
      if enc_obs == None:
        enc_obs = encode_observation(state.observation(state.cur_player()))
        enc_obs["talk"] = np.zeros((1, self_talk_size))
      # Every update_rate timesteps we update the target network parameters
      # if total_time % agent.update_rate == 0:
      # print('test5')
      agent.update_target_model()
      legal_moves = state.legal_moves()
      # print('test4')
      enc_legal_moves = encode_possible_moves(legal_moves)
      # print('test3')
      # Transition Dynamics
      action = agent.act(enc_obs, enc_legal_moves)
      # actions[player] = action
      move = possible_move_from_index(legal_moves, action[0])
      
      state.apply_move(move)
      done = state.is_terminal()
      
      while state.cur_player() == pyhanabi.CHANCE_PLAYER_ID:
        state.deal_random_card()
      
      score = state.score()
      game_score = score if score > game_score else game_score
      reward = game_score if done else 0

      new_enc_obs = encode_observation(state.observation(state.cur_player()))
      # print(action[1])
      new_enc_obs["talk"] = action[1].reshape((1, self_talk_size))
      agent.remember(enc_obs, action, reward, new_enc_obs, done)
      enc_obs = new_enc_obs

      if done:
          game_scores.append(game_score)
          all_rewards += game_score
          
          moving_avg_score = sum(game_scores[-10:])/10 if len(game_scores) >= 10 else sum(game_scores) / len(game_scores)
          print("episode: {}/{}, game score: {}, moving avg reward: {}, total time: {}"
                .format(e+1, episodes, game_score, moving_avg_score, total_time))
          
          break
      # Store sequence in replay memory
          
      if len(agent.memory) > batch_size:
        agent.replay(batch_size)
  return game_scores

In [50]:
import copy
def train_agent_uncertain_talk(game_parameters, episodes):
  action_size = 30 
  self_talk_size = 4
  agent = DQN_Agent(action_size, self_talk_size, uncertain=True)

  batch_size = 8
  total_time = 0   # Counter for total number of steps taken
  all_rewards = 0  # Used to compute avg reward over time
  game = pyhanabi.HanabiGame(game_parameters)
  
  possible_moves = None
  game_scores = [] #has length episodes
  for e in range(episodes):
    print(e)
    print(agent.get_epsilon())
    # print(game.parameter_string(), end="")
    # obs_encoder = pyhanabi.ObservationEncoder(game, enc_type=pyhanabi.ObservationEncoderType.CANONICAL)

    state = game.new_initial_state()
    enc_obs = None
    game_score = 0
    done = False

    # enc_obs = [None, None, None]
    actions = [0, 0, 0]

    while not state.is_terminal():
      player = state.cur_player()
      if player == pyhanabi.CHANCE_PLAYER_ID:
        state.deal_random_card()
        continue
      
      total_time += 1
      if enc_obs == None:
        enc_obs = encode_observation(state.observation(state.cur_player()))
        enc_obs["talk"] = np.zeros((1, self_talk_size))
      # Every update_rate timesteps we update the target network parameters
      # if total_time % agent.update_rate == 0:
      # print('test5')
      agent.update_target_model()
      legal_moves = state.legal_moves()
      # print('test4')
      enc_legal_moves = encode_possible_moves(legal_moves)
      # print('test3')
      # Transition Dynamics
      action = agent.act(enc_obs, enc_legal_moves)
      # actions[player] = action
      move = possible_move_from_index(legal_moves, action[0])
      
      state.apply_move(move)
      done = state.is_terminal()
      
      while state.cur_player() == pyhanabi.CHANCE_PLAYER_ID:
        state.deal_random_card()
      
      score = state.score()
      game_score = score if score > game_score else game_score
      reward = game_score if done else 0

      new_enc_obs = encode_observation(state.observation(state.cur_player()))
      # print(action[1])
      new_enc_obs["talk"] = action[1].reshape((1, self_talk_size))
      agent.remember(enc_obs, action, reward, new_enc_obs, done)
      enc_obs = new_enc_obs

      if done:
          game_scores.append(game_score)
          all_rewards += game_score
          
          moving_avg_score = sum(game_scores[-10:])/10 if len(game_scores) >= 10 else sum(game_scores) / len(game_scores)
          print("episode: {}/{}, game score: {}, moving avg reward: {}, total time: {}"
                .format(e+1, episodes, game_score, moving_avg_score, total_time))
          
          break
      # Store sequence in replay memory
          
      if len(agent.memory) > batch_size:
        agent.replay(batch_size)
  return game_scores

In [2]:
game_scores = []
episodes = 200
if __name__ == "__main__":
  # Check that the cdef and library were loaded from the standard paths.
  assert pyhanabi.cdef_loaded(), "cdef failed to load"
  assert pyhanabi.lib_loaded(), "lib failed to load"
  game_scores = train_agent_uncertain_talk({"players": 3, "random_start_player": True}, episodes)

NameError: ignored

In [53]:
moving_avg_game_scores = []
for i in range(episodes - 10):
  moving_avg_game_scores.append(sum(game_scores[i:i + 10]) / 10)

In [1]:


plt.plot(game_scores)
plt.plot(moving_avg_game_scores)
plt.xlabel("Epiodes")
plt.ylabel("Average Score")
plt.title("DQN with Uncertainity Memory")
plt.savefig("UNc", dpi = 1000)

NameError: ignored