In [None]:
import numpy as np
from collections import defaultdict
import coup
from coup import CoupGame
from tqdm import tqdm

import enum
import random
import pyspiel

In [None]:
class CoupGamePlayground:
    def __init__(self, game, solver, show_ai_suggestion=False):
        self.game = game
        self.solver = solver
        self.average_strategy_p0, self.average_strategy_p1 = solver.get_average_strategy()
        self.action_names = {
            0: "INCOME",
            1: "AID",
            2: "COUP",
            3: "TAX",
            4: "ASSASSINATE",
            5: "STEAL",
            6: "BLOCKSTEAL",
            7: "BLOCKAID",
            8: "BLOCKASS",
            9: "CHALLENGE",
            10: "SWAP1",
            11: "SWAP2",
            12: "FOLD1",
            13: "FOLD2",
            14: "NOCHALL",
        }
        self.show_ai_suggestion = show_ai_suggestion  # Toggle for AI suggestions

    def play(self):
        state = self.game.new_initial_state()

        # Allow the human player to choose their player number
        while True:
            try:
                user_input = input("Do you want to play as Player 0 or Player 1? Enter 0 or 1: ")
                human_player = int(user_input)
                if human_player in [0, 1]:
                    break
                else:
                    print("Invalid choice. Please enter 0 or 1.")
            except ValueError:
                print("Invalid input. Please enter 0 or 1.")

        ai_player = 1 - human_player

        print(f"\nWelcome to Coup! You are Player {human_player}.")
        print("Game start!")

        while not state.is_terminal():
            
            current_player = state.current_player()
            
            print("------------------------------------------------------")
            print("Current Action count: ", state.action_count)
            print("Player: ", state.current_player())
            

            if current_player == human_player:
                # Display game state
                
                print("\nYour turn:")
                self.display_state(state, human_player)

                # Get legal actions
                legal_actions = state.legal_actions(human_player)
                print("Legal actions:")
                for action in legal_actions:
                    print(f"{action}: {self.action_names[action]}")

                # Show AI's suggested action distribution if the feature is enabled
                if self.show_ai_suggestion:
                    action_distribution = self.get_ai_action_distribution(state, human_player, legal_actions)
                    print("\nAI suggests the following action probabilities based on the trained strategy:")
                    for action, prob in action_distribution.items():
                        action_name = self.action_names[action]
                        print(f"  {action}: {action_name} - Probability: {prob:.2f}")

                # Prompt user for action
                while True:
                    try:
                        user_input = input("Enter the action number you want to take: ")
                        action = int(user_input)
                        if action in legal_actions:
                            break
                        else:
                            print("Invalid action. Please choose a legal action.")
                    except ValueError:
                        print("Invalid input. Please enter a number corresponding to a legal action.")

                # Apply the action
                state.apply_action(action)

            elif current_player == ai_player:
                # AI's turn
                print("\nAI's turn:")
                action = self.get_ai_action(state, ai_player)
                print(f"AI chooses action: {self.action_names[action]}")
                # Apply the action
                state.apply_action(action)
            else:
                # Chance node (e.g., card drawing)
                outcomes = state.chance_outcomes()
                actions, probs = zip(*outcomes)
                action = np.random.choice(actions, p=probs)
                state.apply_action(action)

        # Game is over
        returns = state.returns()
        print("\nGame over.")
        print("original cards: ", state.original_cards)
        
        if returns[human_player] > returns[ai_player]:
            print("You win!")
        else:
            print("You lose.")
        self.display_state(state, human_player)

    def get_ai_action_distribution(self, state, player, legal_actions):
        """
        Returns the AI's action distribution for the human player's turn.
        """
        infoset = state.information_state_string(player)
        strategy = self.get_average_strategy_for_player(player).get(infoset)

        if strategy is not None:
            # Extract probabilities for legal actions
            action_probs = {action: strategy[action] for action in legal_actions}
            # Normalize the probabilities
            total_prob = sum(action_probs.values())
            if total_prob > 0:
                action_probs = {action: prob / total_prob for action, prob in action_probs.items()}
            else:
                num_actions = len(legal_actions)
                action_probs = {action: 1.0 / num_actions for action in legal_actions}
        else:
            # Default to uniform probabilities if no strategy is available
            num_actions = len(legal_actions)
            action_probs = {action: 1.0 / num_actions for action in legal_actions}

        return action_probs

    def get_ai_action(self, state, player):
        """
        Returns the AI's chosen action based on its strategy.
        """
        infoset = state.information_state_string(player)
        legal_actions = state.legal_actions(player)
        strategy = self.get_average_strategy_for_player(player).get(infoset)

        if strategy is not None:
            # Extract probabilities for legal actions
            action_probs = np.array([strategy[action] for action in legal_actions])
            # Normalize the probabilities
            total_prob = action_probs.sum()
            if total_prob > 0:
                action_probs /= total_prob
            else:
                action_probs = np.ones(len(legal_actions)) / len(legal_actions)
        else:
            # Default to uniform probabilities if no strategy is available
            action_probs = np.ones(len(legal_actions)) / len(legal_actions)

        # Choose action according to the probabilities
        action_idx = np.random.choice(len(legal_actions), p=action_probs)
        action = legal_actions[action_idx]
        return action

    def get_average_strategy_for_player(self, player):
        """
        Returns the average strategy dictionary for the specified player.
        """
        if player == 0:
            return self.average_strategy_p0
        else:
            return self.average_strategy_p1

    def display_state(self, state, player):
        # Display coins and known information
        print(f"Your coins: {state.coins[player]}")
        print(f"AI's coins: {state.coins[1 - player]}")

        # Display your cards (since you know them)
        print(f"Your cards: {state.cards[player]}")

        # For simplicity, we won't display the AI's cards or any hidden information


# MCCFR

In [None]:
import numpy as np
from collections import defaultdict
from tqdm import tqdm

class MCCFR:
    def __init__(self, game, iterations=10000):
        self.game = game
        self.iterations = iterations

        self.regret_sums = defaultdict(lambda: np.zeros(game.num_distinct_actions()))
        self.strategy_sums = defaultdict(lambda: np.zeros(game.num_distinct_actions()))
        self.node_visits = defaultdict(float)

    def regret_matching(self, regrets):
        positive_regrets = np.maximum(regrets, 0)
        sum_positive = positive_regrets.sum()
        if sum_positive > 0:
            return positive_regrets / sum_positive
        else:
            num_actions = len(regrets)
            return np.ones(num_actions) / num_actions

    def cfr(self, state, player):
        if state.is_terminal():
            return state.returns()[player]

        if state.is_chance_node():
            outcomes = state.chance_outcomes()
            actions, probs = zip(*outcomes)
            action_idx = np.random.choice(len(actions), p=probs)
            action = actions[action_idx]
            state.apply_action(action)
            return self.cfr(state, player)

        current_player = state.current_player()
        infoset = state.information_state_string(current_player)
        legal_actions = state.legal_actions(current_player)
        num_actions = len(legal_actions)
        legal_action_indices = np.array(legal_actions)
        key = (current_player, infoset)

        if current_player == player:
            # Get the regrets for legal actions
            regrets = self.regret_sums[key][legal_action_indices]
            strategy = self.regret_matching(regrets)

            # Expand strategy to full action vector
            full_strategy = np.zeros(self.game.num_distinct_actions())
            full_strategy[legal_action_indices] = strategy

            # Update strategy sums and node visits
            self.strategy_sums[key] += full_strategy
            self.node_visits[key] += 1

            util = np.zeros(num_actions)
            node_util = 0.0
            for idx, action in enumerate(legal_actions):
                next_state = state.clone()
                next_state.apply_action(action)
                util[idx] = self.cfr(next_state, player)
                node_util += strategy[idx] * util[idx]
            regrets = util - node_util
            # Update regrets for legal actions
            self.regret_sums[key][legal_action_indices] += regrets
            return node_util
        else:
            # Sample opponent's action according to a fixed uniform random strategy
            opponent_strategy = np.ones(num_actions) / num_actions
            action_idx = np.random.choice(num_actions, p=opponent_strategy)
            action = legal_actions[action_idx]
            state.apply_action(action)
            return self.cfr(state, player)

    def train(self, verbose=False):
        switch_tqdm = not verbose
        for _ in tqdm(range(self.iterations), disable=switch_tqdm):
            for player in range(self.game.num_players()):
                state = self.game.new_initial_state()
                self.cfr(state, player)

    def get_average_strategy(self):
        average_strategy_p0 = {}
        average_strategy_p1 = {}

        for key in self.strategy_sums:
            player, infoset = key
            strategy_sum = self.strategy_sums[key]
            visits = self.node_visits[key]
            if visits > 0:
                average_strategy = strategy_sum / visits
                total = average_strategy.sum()
                if total > 0:
                    average_strategy /= total
                else:
                    average_strategy = np.ones(len(average_strategy)) / len(average_strategy)
            else:
                num_actions = len(strategy_sum)
                average_strategy = np.ones(num_actions) / num_actions

            if player == 0:
                average_strategy_p0[infoset] = average_strategy
            else:
                average_strategy_p1[infoset] = average_strategy

        return average_strategy_p0, average_strategy_p1


## Selfplay


In [None]:
def simulate_self_play(game, average_strategy):
    state = game.new_initial_state()
    while not state.is_terminal():
        current_player = state.current_player()
        if state.is_chance_node():
            # Handle chance node
            outcomes = state.chance_outcomes()
            actions, probs = zip(*outcomes)
            action = np.random.choice(actions, p=probs)
            state.apply_action(action)
        else:
            infoset = state.information_state_string(current_player)
            legal_actions = state.legal_actions(current_player)
            num_actions = len(legal_actions)
            legal_action_indices = np.array(legal_actions)

            # Retrieve strategy for the current infoset
            strategy_full = average_strategy.get(infoset, np.ones(game.num_distinct_actions()) / game.num_distinct_actions())
            # Extract probabilities for legal actions
            strategy = strategy_full[legal_action_indices]

            # Normalize strategy
            strategy_sum = strategy.sum()
            if strategy_sum > 0:
                strategy /= strategy_sum
            else:
                strategy = np.ones(num_actions) / num_actions

            # Choose action according to strategy
            action_idx = np.random.choice(num_actions, p=strategy)
            action = legal_actions[action_idx]
            state.apply_action(action)

    returns = state.returns()
    return returns


In [None]:
def get_winrates(solver, game, num_games=100):
    
    average_strategy = solver.get_average_strategy()
    wins = 0
    for _ in range(num_games):
        returns = simulate_self_play(game, average_strategy)
        if returns[0] > returns[1]:
            wins += 1  # Player 0 wins
    
    return wins / num_games


#get_winrates(solver, 100)

### best response given a strategy

In [None]:
# pruning

def compute_best_response(game, opponent_strategy, best_response_player=0, threshold=0.02):
    """
    Computes the best response value for the best_response_player against the opponent's strategy,
    using pruning with a specified threshold.

    Args:
        game: The game object.
        opponent_strategy: A dictionary mapping information states to numpy arrays of action probabilities.
        best_response_player: The player index (0 or 1) for whom to compute the best response.
        threshold: The probability threshold below which branches are pruned.

    Returns:
        The expected utility (win rate) for the best_response_player.
    """
    print("Pruned, Computing BR for: player", best_response_player)

    root_state = game.new_initial_state()
    cache = {}

    def best_response_value(state):
        # Check for terminal state
        if state.is_terminal():
            returns = state.returns()
            return returns[best_response_player]

        # Check for chance node
        elif state.is_chance_node():
            total_util = 0.0
            outcomes = state.chance_outcomes()
            for action, prob in outcomes:
                if prob < threshold:
                    continue  # Prune chance outcomes with low probability
                next_state = state.clone()
                next_state.apply_action(action)
                util = best_response_value(next_state)
                total_util += prob * util
            return total_util

        else:
            current_player = state.current_player()
            infoset = state.information_state_string(current_player)
            legal_actions = state.legal_actions()

            # Use a unique key for caching to prevent infinite loops
            key = (current_player, infoset, tuple(state.history()))

            # Check cache to avoid redundant computations
            if key in cache:
                return cache[key]

            # If it's the opponent's turn
            if current_player != best_response_player:
                strategy = opponent_strategy.get(infoset, None)
                if strategy is None:
                    # If no strategy is provided, assume uniform random
                    num_actions = len(legal_actions)
                    strategy = np.ones(num_actions) / num_actions
                else:
                    # Ensure the strategy corresponds to the legal actions
                    strategy = strategy[legal_actions]

                    strategy_sum = strategy.sum()
                    if strategy_sum > 0:
                        strategy /= strategy_sum
                    else:
                        # If the sum is zero, use a uniform random strategy over legal actions
                        num_actions = len(legal_actions)
                        strategy = np.ones(num_actions) / num_actions
                        #print(f"Warning: Strategy sum is zero for infoset '{infoset}'. Using uniform strategy.")

                total_util = 0.0
                for idx, action in enumerate(legal_actions):
                    prob = strategy[idx]
                    if prob < threshold:
                        continue  # Prune actions with probability less than threshold
                    next_state = state.clone()
                    next_state.apply_action(action)
                    util = best_response_value(next_state)
                    total_util += prob * util
                cache[key] = total_util  # Store result in cache
                return total_util

            # If it's the best response player's turn
            else:
                max_util = float('-inf')
                best_action_util = {}
                for action in legal_actions:
                    next_state = state.clone()
                    next_state.apply_action(action)
                    util = best_response_value(next_state)
                    best_action_util[action] = util
                    if util > max_util:
                        max_util = util

                # Prune actions that are worse than the max utility minus the threshold
                pruned_actions = [action for action, util in best_action_util.items()
                                  if max_util - util > threshold]

                # If all actions are pruned, select the best action
                if len(pruned_actions) == len(legal_actions):
                    cache[key] = max_util  # Store result in cache
                    return max_util
                else:
                    # Recompute max_util considering pruned actions
                    for action in legal_actions:
                        if action in pruned_actions:
                            continue
                        util = best_action_util[action]
                        if util > max_util:
                            max_util = util
                    cache[key] = max_util  # Store result in cache
                    return max_util

    # Start traversal from the root state
    expected_utility = best_response_value(root_state)
    return expected_utility


In [None]:
def simulate_strategy_vs_strategy(game, strategy_player0, strategy_player1, num_games=1000):
    wins = [0, 0]  # Index 0 for Player 0 wins, Index 1 for Player 1 wins

    for _ in range(num_games):
        state = game.new_initial_state()
        while not state.is_terminal():
            current_player = state.current_player()
            if state.is_chance_node():
                # Handle chance node
                outcomes = state.chance_outcomes()
                actions, probs = zip(*outcomes)
                action = np.random.choice(actions, p=probs)
                state.apply_action(action)
            else:
                infoset = state.information_state_string(current_player)
                legal_actions = state.legal_actions(current_player)
                num_actions = len(legal_actions)
                legal_action_indices = np.array(legal_actions)

                # Select the appropriate strategy for the current player
                if current_player == 0:
                    strategy_full = strategy_player0.get(infoset)
                else:
                    strategy_full = strategy_player1.get(infoset)

                if strategy_full is None:
                    # Default to uniform random strategy if infoset not found
                    num_actions_total = game.num_distinct_actions()
                    strategy_full = np.ones(num_actions_total) / num_actions_total

                # Extract probabilities for legal actions
                strategy = []
                for action in legal_actions:
                    strategy.append(strategy_full[action])
                strategy = np.array(strategy)

                # Normalize strategy
                strategy_sum = strategy.sum()
                if strategy_sum > 0:
                    strategy /= strategy_sum
                else:
                    strategy = np.ones(num_actions) / num_actions

                # Choose action according to strategy
                action_idx = np.random.choice(num_actions, p=strategy)
                action = legal_actions[action_idx]
                state.apply_action(action)

        returns = state.returns()
        
        if returns[0] > returns[1]:
            wins[0] += 1  # Player 0 wins

    # Calculate win rates
    win_rate_player0 = wins[0] / num_games

    return win_rate_player0


# training results

### nash gap

In [None]:
max_count = 10

N = 1000
Episode = 20


N_simulate = 1000 # number of games to evaluate the strategy

game = CoupGame(max_action_count=max_count, favorite_pl=0)
solver = MCCFR(game, iterations = N)

edge_0 = [0] * Episode
edge_1 = [0] * Episode

for epi in tqdm(range(Episode)):
    
    solver.train(verbose=True)
    
    average_strategy_p0, average_strategy_p1 = solver.get_average_strategy()

    br_p0 = compute_best_response(game, average_strategy_p1, player=0)
    br_p1 = compute_best_response(game, average_strategy_p0, player=1)


print("COMPLETE!")
    


In [None]:
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick

plt.figure(figsize=(10, 6))
plt.plot(range(1, Episode + 1), edge_0, label='P0 is BR')
plt.plot(range(1, Episode + 1), edge_1, label='P1 is BR')

plt.xlabel('Episode')
plt.ylabel('Win Rate')
plt.title('Win Rate Over Training')
plt.legend()
plt.grid(True)
plt.ylim(0, 1.1)  # Set y-axis limits from 0 to 1
plt.gca().yaxis.set_major_formatter(mtick.PercentFormatter(1.0))  # Format y-axis as percentages
plt.show()


# simulate and play!

In [None]:
playground = CoupGamePlayground(game, solver,show_ai_suggestion=True)
playground.play()