In [1]:
def reconstruct_game_state(encoded_obs):
    # Define game parameters
    num_colors = 5   # R, Y, G, W, B
    num_ranks = 5    # 1 to 5
    num_players = 2
    hand_size = 5
    bits_per_card = num_colors * num_ranks  # 25
    max_deck_size = 50
    max_information_tokens = 8
    max_life_tokens = 3

    # Helper functions
    def card_index(color, rank):
        return color * num_ranks + rank

    def get_color_rank_from_index(index):
        color = index // num_ranks
        rank = index % num_ranks
        return color, rank

    colors = ['R', 'Y', 'G', 'W', 'B']
    ranks = ['1', '2', '3', '4', '5']

    # Initialize offsets
    offset = 0
    encoding = encoded_obs

    # --- Hands Section ---
    hands_section_length = (num_players - 1) * hand_size * bits_per_card + num_players
    hands = []
    hand_sizes = [hand_size] * num_players  # Initialize hand sizes for all players

    # Other players' hands
    for player in range(1, num_players):
        player_hand = []
        num_cards_in_hand = hand_size
        for card_idx in range(num_cards_in_hand):
            card_offset = offset + card_idx * bits_per_card
            card_bits = encoding[card_offset:card_offset + bits_per_card]
            if any(card_bits):
                card_pos = card_bits.index(1)
                color_idx, rank_idx = get_color_rank_from_index(card_pos)
                card_str = f"{colors[color_idx]}{ranks[rank_idx]}"
            else:
                card_str = "XX"
            player_hand.append(card_str)
        hands.append(player_hand)

    offset += (num_players - 1) * hand_size * bits_per_card

    # Missing card indicators
    missing_card_bits = encoding[offset:offset + num_players]
    missing_cards = [bool(bit) for bit in missing_card_bits]
    offset += num_players

    # Adjust hand sizes based on missing cards
    for idx, missing in enumerate(missing_cards):
        if missing:
            hand_sizes[idx] -= 1  # Assume one card is missing

    # --- Board Section ---
    # Deck size (thermometer encoding)
    deck_size_bits = encoding[offset:offset + (max_deck_size - num_players * hand_size)]
    deck_size = sum(deck_size_bits)
    offset += (max_deck_size - num_players * hand_size)

    # Fireworks
    fireworks = []
    for color_idx in range(num_colors):
        firework_bits = encoding[offset:offset + num_ranks]
        if any(firework_bits):
            highest_rank = firework_bits.index(1) + 1  # Ranks are 1-indexed
        else:
            highest_rank = 0
        fireworks.append(f"{colors[color_idx]}{highest_rank}")
        offset += num_ranks

    # Information tokens (thermometer encoding)
    info_token_bits = encoding[offset:offset + max_information_tokens]
    info_tokens = sum(info_token_bits)
    offset += max_information_tokens

    # Life tokens (thermometer encoding)
    life_token_bits = encoding[offset:offset + max_life_tokens]
    life_tokens = sum(life_token_bits)
    offset += max_life_tokens

    # --- Discards Section ---
    discard_bits = encoding[offset:offset + max_deck_size]
    offset += max_deck_size

    # Reconstruct discard pile
    discards = []
    idx_in_discard = 0
    for color_idx in range(num_colors):
        for rank_idx in range(num_ranks):
            num_instances = 0
            if rank_idx == 0:
                num_instances = 3
            elif rank_idx == 4:
                num_instances = 1
            else:
                num_instances = 2
            for i in range(num_instances):
                if discard_bits[idx_in_discard] == 1:
                    discards.append(f"{colors[color_idx]}{ranks[rank_idx]}")
                idx_in_discard += 1

    # --- Last Action Section ---
    last_action_section_length = (
        num_players + 4 + num_players + num_colors + num_ranks +
        hand_size + hand_size + bits_per_card + 2
    )
    offset += last_action_section_length

    # --- Card Knowledge Section ---
    card_knowledge_section_length = num_players * hand_size * (bits_per_card + num_colors + num_ranks)
    card_knowledge = []

    for player in range(num_players):
        player_card_knowledge = []
        num_cards_in_hand = hand_sizes[player]
        for card_idx in range(hand_size):
            if card_idx >= num_cards_in_hand:
                # Skip missing cards
                offset += bits_per_card + num_colors + num_ranks
                continue

            # Plausible cards
            plausible_cards_bits = encoding[offset:offset + bits_per_card]
            offset += bits_per_card

            # Revealed colors
            revealed_color_bits = encoding[offset:offset + num_colors]
            offset += num_colors

            # Revealed ranks
            revealed_rank_bits = encoding[offset:offset + num_ranks]
            offset += num_ranks

            # Determine plausible colors and ranks
            plausible_colors = set()
            plausible_ranks = set()
            for idx_pc, bit in enumerate(plausible_cards_bits):
                if bit == 1:
                    color_idx, rank_idx = get_color_rank_from_index(idx_pc)
                    plausible_colors.add(colors[color_idx])
                    plausible_ranks.add(ranks[rank_idx])

            # Determine explicitly revealed colors and ranks
            revealed_colors = [colors[i] for i, bit in enumerate(revealed_color_bits) if bit == 1]
            revealed_ranks = [ranks[i] for i, bit in enumerate(revealed_rank_bits) if bit == 1]

            card_knowledge_entry = {
                'plausible_colors': plausible_colors,
                'plausible_ranks': plausible_ranks,
                'revealed_colors': revealed_colors,
                'revealed_ranks': revealed_ranks,
            }
            player_card_knowledge.append(card_knowledge_entry)

        card_knowledge.append(player_card_knowledge)

    # --- Construct Output String ---
    output = []

    output.append(f"Information Tokens: {info_tokens}")
    output.append(f"Life Tokens: {life_tokens}")
    output.append("Fireworks: " + ' '.join(fireworks))
    output.append(f"Deck size: {deck_size}")
    output.append("Discards:")
    if discards:
        output.append(' '.join(discards))
    else:
        output.append("None")

    output.append("\nHands:")
    # Your hand with card knowledge
    output.append("Your hand:")
    your_hand_str = []
    for card_idx in range(hand_size):
        if card_idx >= len(card_knowledge[0]):
            continue  # Skip missing cards

        card_knowledge_entry = card_knowledge[0][card_idx]  # Our own hand is player 0

        # Determine known color and rank
        known_color = 'X'
        if len(card_knowledge_entry['revealed_colors']) == 1:
            known_color = card_knowledge_entry['revealed_colors'][0]

        known_rank = 'X'
        if len(card_knowledge_entry['revealed_ranks']) == 1:
            known_rank = card_knowledge_entry['revealed_ranks'][0]

        # Build plausible colors and ranks strings
        plausible_colors_str = ''.join(sorted(card_knowledge_entry['plausible_colors'], key=lambda x: colors.index(x)))
        plausible_ranks_str = ''.join(sorted(card_knowledge_entry['plausible_ranks'], key=lambda x: ranks.index(x)))

        # If no plausible colors or ranks, show all
        if not plausible_colors_str:
            plausible_colors_str = ''.join(colors)
        if not plausible_ranks_str:
            plausible_ranks_str = ''.join(ranks)

        # Format the card knowledge as per your requirement
        card_str = f"{known_color}{known_rank} || {plausible_colors_str}{plausible_ranks_str}"
        your_hand_str.append(card_str)

    output.extend(your_hand_str)

    # Other player's hand
    output.append("\nOther player's hand:")
    other_hand = hands[0]
    other_hand_str = ' '.join(other_hand)
    if missing_cards[1]:
        other_hand_str += " (missing cards)"
    output.append(other_hand_str)

    # Return the output string
    return '\n'.join(output)

In [2]:
import re
import ast

# Initialize lists to store data
data = []
moves = [
    {'action_type': 'DISCARD', 'card_index': 0},
    {'action_type': 'DISCARD', 'card_index': 1},
    {'action_type': 'DISCARD', 'card_index': 2},
    {'action_type': 'DISCARD', 'card_index': 3},
    {'action_type': 'DISCARD', 'card_index': 4},
    {'action_type': 'PLAY', 'card_index': 0},
    {'action_type': 'PLAY', 'card_index': 1},
    {'action_type': 'PLAY', 'card_index': 2},
    {'action_type': 'PLAY', 'card_index': 3},
    {'action_type': 'PLAY', 'card_index': 4},
    {'action_type': 'REVEAL_COLOR', 'target_offset': 1, 'color': 'R'},
    {'action_type': 'REVEAL_COLOR', 'target_offset': 1, 'color': 'Y'},
    {'action_type': 'REVEAL_COLOR', 'target_offset': 1, 'color': 'G'},
    {'action_type': 'REVEAL_COLOR', 'target_offset': 1, 'color': 'W'},
    {'action_type': 'REVEAL_COLOR', 'target_offset': 1, 'color': 'B'},
    {'action_type': 'REVEAL_RANK', 'target_offset': 1, 'rank': 0},
    {'action_type': 'REVEAL_RANK', 'target_offset': 1, 'rank': 1},
    {'action_type': 'REVEAL_RANK', 'target_offset': 1, 'rank': 2},
    {'action_type': 'REVEAL_RANK', 'target_offset': 1, 'rank': 3},
    {'action_type': 'REVEAL_RANK', 'target_offset': 1, 'rank': 4}
]

# Read and parse the data from the 'alphazero_data.txt' text file
with open('alphazero_data.txt', 'r') as file:
    content = file.read()

# Updated regex to capture the new data format with the root policy
tuple_strings = re.findall(
    r'\(tensor\(\[([\s\S]*?)\]\),\s*'    # Encoded observation
    r'tensor\(\[([\s\S]*?)\]\),\s*'      # Policy
    r'tensor\(([\s\S]*?)\),\s*'          # Value
    r'tensor\(\[([\s\S]*?)\]\)\)',       # Root policy
    content, re.DOTALL
)

# Process each tuple string
for encoded_obs_str, policy_str, value_str, root_policy_str in tuple_strings:
    # Clean and process the encoded observation
    encoded_obs_list = [float(x.strip()) for x in encoded_obs_str.strip().split(',')]
    # Clean and process the policy (MCTS policy)
    policy_list = [float(x.strip()) for x in policy_str.strip().split(',')]
    # Clean and process the value outcome
    value_outcome = float(value_str.strip())
    # Clean and process the root policy
    root_policy_list = [float(x.strip()) for x in root_policy_str.strip().split(',')]

    # Append to data list
    data.append((encoded_obs_list, policy_list, value_outcome, root_policy_list))

# Open the output file 'interpret_data' for writing
with open('interpret_data', 'w') as output_file:
    # Process each tuple in the data
    for idx, (encoded_obs_list, policy_list, value_outcome, root_policy_list) in enumerate(data):
        # Reconstruct the game state
        game_state_str = reconstruct_game_state(encoded_obs_list)
        # Get the value outcome
        value = value_outcome

        # Find the index/indices of the action(s) with the highest MCTS probability
        max_prob = max(policy_list)
        max_prob_indices = [i for i, prob in enumerate(policy_list) if prob == max_prob]

        # Find the index/indices of the action(s) with the highest root policy probability
        max_root_prob = max(root_policy_list)
        max_root_prob_indices = [i for i, prob in enumerate(root_policy_list) if prob == max_root_prob]

        # Write the results to the output file
        output_file.write(f"{'='*76}\n")
        output_file.write(f"Game State {idx+1}\n")
        output_file.write(f"{'='*76}\n\n")

        output_file.write("Game State Details:\n")
        output_file.write(f"{'-'*76}\n")
        output_file.write(game_state_str + '\n\n')

        # Write the moves and corresponding policy probabilities in a table format
        output_file.write("Moves and Policy Probabilities:\n")
        output_file.write(f"{'-'*76}\n")
        output_file.write(f"{'Move Description':<50} {'MCTS Prob':>12} {'Root Policy':>12}\n")
        output_file.write(f"{'-'*50}{'-'*13}{'-'*13}\n")
        for i, (move, prob, root_prob) in enumerate(zip(moves, policy_list, root_policy_list)):
            # Format the move description
            if move['action_type'] == 'DISCARD' or move['action_type'] == 'PLAY':
                move_desc = f"{move['action_type']} card at index {move['card_index']}"
            elif move['action_type'] == 'REVEAL_COLOR':
                move_desc = f"REVEAL_COLOR {move['color']} to player {move['target_offset']}"
            elif move['action_type'] == 'REVEAL_RANK':
                move_desc = f"REVEAL_RANK {move['rank'] + 1} to player {move['target_offset']}"
            else:
                move_desc = f"Unknown move type: {move['action_type']}"

            # Initialize the marker for this move
            marker = '  '

            # Check if this is the action with the highest MCTS probability
            if i in max_prob_indices:
                marker = '* '
            # Check if this is the action with the highest root policy probability
            if i in max_root_prob_indices:
                if marker.strip():
                    # If already marked (i.e., both max MCTS and max root policy)
                    marker = '*^'
                else:
                    marker = '^ '

            output_file.write(f"{marker}{move_desc:<48} {prob:>12.4f} {root_prob:>12.4f}\n")
        output_file.write('\n')
        output_file.write(f"Value Outcome: {value:.4f}\n")
        output_file.write(f"{'='*76}\n\n\n")

print("Data interpretation complete. Results saved in 'interpret_data' file.")

Data interpretation complete. Results saved in 'interpret_data' file.


Most of the mass is in one action and the rest is 0. 
MCTS without rules with 1000 rollouts => 8.3 for 10 episodes.

In [7]:
import re

# Initialize lists to store data
data = []
moves = [
    {'action_type': 'DISCARD', 'card_index': 0},
    {'action_type': 'DISCARD', 'card_index': 1},
    {'action_type': 'DISCARD', 'card_index': 2},
    {'action_type': 'DISCARD', 'card_index': 3},
    {'action_type': 'DISCARD', 'card_index': 4},
    {'action_type': 'PLAY', 'card_index': 0},
    {'action_type': 'PLAY', 'card_index': 1},
    {'action_type': 'PLAY', 'card_index': 2},
    {'action_type': 'PLAY', 'card_index': 3},
    {'action_type': 'PLAY', 'card_index': 4},
    {'action_type': 'REVEAL_COLOR', 'target_offset': 1, 'color': 'R'},
    {'action_type': 'REVEAL_COLOR', 'target_offset': 1, 'color': 'Y'},
    {'action_type': 'REVEAL_COLOR', 'target_offset': 1, 'color': 'G'},
    {'action_type': 'REVEAL_COLOR', 'target_offset': 1, 'color': 'W'},
    {'action_type': 'REVEAL_COLOR', 'target_offset': 1, 'color': 'B'},
    {'action_type': 'REVEAL_RANK', 'target_offset': 1, 'rank': 0},
    {'action_type': 'REVEAL_RANK', 'target_offset': 1, 'rank': 1},
    {'action_type': 'REVEAL_RANK', 'target_offset': 1, 'rank': 2},
    {'action_type': 'REVEAL_RANK', 'target_offset': 1, 'rank': 3},
    {'action_type': 'REVEAL_RANK', 'target_offset': 1, 'rank': 4}
]

# Read and parse the data from the 'alphazero_data.txt' text file
with open('../mcts/mcts_data.txt', 'r') as file:
    content = file.read()

# Split the content into individual entries
# This regex matches each complete data entry
entry_pattern = re.compile(
    r'\(tensor\(\[([\s\S]*?)\]\),\s*'     # Encoded observation
    r'tensor\(\[([\s\S]*?)\]\),\s*'       # Policy A
    r'tensor\(\[([\s\S]*?)\]\),\s*'       # Policy B
    r'tensor\(([\s\S]*?)\)\)\s*',         # Value
    re.DOTALL
)

entries = entry_pattern.findall(content)

# Process each entry
for encoded_obs_str, policy_a_str, policy_b_str, value_str in entries:
    # Clean and process the encoded observation
    encoded_obs_list = [float(x.strip()) for x in re.split(r',\s*', encoded_obs_str.strip()) if x.strip()]
    # Clean and process Policy A
    policy_a_list = [float(x.strip()) for x in re.split(r',\s*', policy_a_str.strip()) if x.strip()]
    # Clean and process Policy B
    policy_b_list = [float(x.strip()) for x in re.split(r',\s*', policy_b_str.strip()) if x.strip()]
    # Clean and process the value outcome
    value_outcome = float(value_str.strip())

    # Append to data list
    data.append((encoded_obs_list, policy_a_list, policy_b_list, value_outcome))

# Open the output file 'interpret_mcts_data' for writing
with open('interpret_mcts_data', 'w') as output_file:
    # Process each tuple in the data
    for idx, (encoded_obs_list, policy_a_list, policy_b_list, value_outcome) in enumerate(data):
        # Reconstruct the game state
        game_state_str = reconstruct_game_state(encoded_obs_list)
        # Get the value outcome
        value = value_outcome

        # Find the index/indices of the action(s) with the highest probability in Policy A
        max_prob_a = max(policy_a_list)
        max_prob_a_indices = [i for i, prob in enumerate(policy_a_list) if prob == max_prob_a]

        # Find the index/indices of the action(s) with the highest probability in Policy B
        max_prob_b = max(policy_b_list)
        max_prob_b_indices = [i for i, prob in enumerate(policy_b_list) if prob == max_prob_b]

        # Write the results to the output file
        output_file.write(f"{'='*76}\n")
        output_file.write(f"Game State {idx+1}\n")
        output_file.write(f"{'='*76}\n\n")

        output_file.write("Game State Details:\n")
        output_file.write(f"{'-'*76}\n")
        output_file.write(game_state_str + '\n\n')

        # Write the moves and corresponding policy probabilities in a table format
        output_file.write("Moves and Policy Probabilities:\n")
        output_file.write(f"{'-'*76}\n")
        output_file.write(f"{'Move Description':<50} {'Policy A':>12} {'Policy B':>12}\n")
        output_file.write(f"{'-'*50}{'-'*13}{'-'*13}\n")
        for i, (move, prob_a, prob_b) in enumerate(zip(moves, policy_a_list, policy_b_list)):
            # Format the move description
            if move['action_type'] == 'DISCARD' or move['action_type'] == 'PLAY':
                move_desc = f"{move['action_type']} card at index {move['card_index']}"
            elif move['action_type'] == 'REVEAL_COLOR':
                move_desc = f"REVEAL_COLOR {move['color']} to player {move['target_offset']}"
            elif move['action_type'] == 'REVEAL_RANK':
                move_desc = f"REVEAL_RANK {move['rank'] + 1} to player {move['target_offset']}"
            else:
                move_desc = f"Unknown move type: {move['action_type']}"

            # Initialize the marker for this move
            marker = '  '

            # Check if this is the action with the highest probability in Policy A
            if i in max_prob_a_indices:
                marker = '* '
            # Check if this is the action with the highest probability in Policy B
            if i in max_prob_b_indices:
                if marker.strip():
                    marker = '*^'
                else:
                    marker = '^ '

            output_file.write(f"{marker}{move_desc:<48} {prob_a:>12.4f} {prob_b:>12.4f}\n")
        output_file.write('\n')
        output_file.write(f"Value Outcome: {value:.4f}\n")
        output_file.write(f"{'='*76}\n\n\n")

print("Data interpretation complete. Results saved in 'interpret_mcts_data' file.")

Data interpretation complete. Results saved in 'interpret_mcts_data' file.


(hanabi) kaanucar@MacBook-Pro-de-Kaan hanabi_epfl % python rl_env_example.py --players 2 --num_episodes 10 --agent AlphaZero_Agent --agents AlphaZero_Agent --mcts_types 00
Running Episodes:   0%|                                                                                         | 0/10 [00:00<?, ?episode/s, Avg Score=N/A, Score=N/A, Avg Loss=N/A, Loss=N/A]/Users/kaanucar/Desktop/hanabi_epfl/agents/alphazero/alphazero_network.py:172: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).
  root_policy = torch.tensor(root_policy, dtype=torch.float32)
Running Episodes: 100%|████████████████████████████████████████████████████████████████████████| 10/10 [21:39<00:00, 129.92s/episode, Avg Score=17.10, Score=17, Avg Loss=2.9694, Loss=2.9340]

Losses:  ['3.03', '3.01', '2.99', '2.98', '2.97', '2.93', '2.94', '2.95', '2.93']
Average Loss:  2.969351159201728

Scores: [18, 19, 17, 17, 13, 17, 16, 17, 20, 17]
Average Score: 17.1
Standard Deviation: 1.7578395831246947
Standard Error: 0.5558776843874919
Errors: 0