# 6 qui Prend - The Card Game

In [79]:
import os
import numpy as np
import random
from collections import deque

In [80]:
import torch
import torch.nn as nn
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")
print(f"Using device: {device}")

Using device: cpu


In [81]:
# Constants
NB_TURNS = 10
NB_CARDS = 104
NB_ROWS = 4
CARDS_PER_ROWS = 5
NB_PLAYERS = 2

In [82]:
class Card:
    def __init__(self, value):
        assert 1 <= value <= NB_CARDS
        self.value = value
        if value % 55 == 0:
            self.bullheads = 7
        elif value % 11 == 0:
            self.bullheads = 5
        elif value % 10 == 0:
            self.bullheads = 3
        elif value % 10 == 5:
            self.bullheads = 2
        else:
            self.bullheads = 1
    
    def __str__(self):
        return f" |{self.value:3d}(*{self.bullheads}*)| "

class Deck:
    def __init__(self):
        self.cards = [Card(i) for i in range(1, NB_CARDS + 1)]
        np.random.shuffle(self.cards)

    def draw(self):
        assert len(self.cards) > 0
        return self.cards.pop()

In [83]:
class Player:
    def __init__(self, name):
        self.name = name
        self.hand = []
        self.bullheads = 0
    
    def choose_card(self, gameboard, all_played_cards):
        print(self)
        try:
            card = int(input(f"{self.name}, choose a card: "))
            assert card in [c.value for c in self.hand], "Card not in hand"
        except:
            print("Please choose a valid card")
            return self.choose_card(gameboard, all_played_cards)
        return Card(card)
    
    def choose_row(self, gameboard, all_played_cards):
        try:
            row = int(input(f"{self.name}, choose a row: "))
            assert 1 <= row <= NB_ROWS, "Row not in range"
        except:
            print("Please choose a valid row")
            return self.choose_row(gameboard, all_played_cards)
        return row

    def __str__(self):
        return f"{self.name}: {','.join(str(card) for card in self.hand)}"

In [84]:
class Gameboard:
    def __init__(self, deck):
        self.deck = deck
        self.board = [[deck.draw()] for _ in range(NB_ROWS)]
    
    def clear_row(self, row):
        bullheads = sum([card.bullheads for card in self.board[row][:-1]])
        self.board[row] = [self.board[row][-1]]
        return bullheads
    
    def can_play_card(self, card):
        return any([card.value > row[-1].value for row in self.board])
    
    def play_card(self, card):
        assert self.can_play_card(card)
        row = max((i for i in range(NB_ROWS) if self.board[i][-1].value < card.value), key=lambda i: self.board[i][-1].value)
        self.board[row].append(card)
        bullheads = 0
        if len(self.board[row]) > CARDS_PER_ROWS:
            bullheads = self.clear_row(row)
        return bullheads

    def replace_row(self, card, row):
        assert not self.can_play_card(card)
        self.board[row].append(card)
        bullheads = self.clear_row(row)
        return bullheads

    def __str__(self):
        return "\n".join([
            "=-----------=" * CARDS_PER_ROWS + "\n" + \
            " ".join([str(card) for card in row]) for row in self.board]) \
            + "\n" + "=-----------=" * CARDS_PER_ROWS

In [85]:
class Game:
    def __init__(self, players, display=True):
        self.display = display
        # Initialize the deck and gameboard
        self.deck = Deck()
        self.gameboard = Gameboard(self.deck)
        self.players = players
        self.all_played_cards = []
        # Initialize the players
        self.init_players()

    def init_players(self):
        for player in self.players:
            player.hand = []
            player.bullheads = 0
            for _ in range(NB_TURNS):
                player.hand.append(self.deck.draw())
            player.hand.sort(key=lambda card: card.value)
    
    def get_cards(self):
        chosen_cards = []
        for player in self.players:
            card = player.choose_card(self.gameboard, self.all_played_cards)
            chosen_cards.append((player, card))
            player.hand.remove(card)
        return chosen_cards    
    
    def play_card(self, player, card):
        row = 0
        if self.gameboard.can_play_card(card):
            bullheads = self.gameboard.play_card(card)
            player.bullheads += bullheads
        else:
            if self.display:
                print(f"{player.name} choose a row to replace")
            row = player.choose_row(self.gameboard, self.all_played_cards)
            bullheads = self.gameboard.replace_row(card, row)
            player.bullheads += bullheads
        if self.display:
            print(f"{player.name} got {bullheads} bullheads")
        return row

    def turn(self):      
        # Get the cards played by each player
        actions = self.get_cards()
        actions.sort(key=lambda x: x[1].value)            
        
        # Replace a row if the lowest card cannot be placed next in the row
        for player, card in actions.copy():
            row = self.play_card(player, card)
            actions[actions.index((player, card))] = (player, card, row)
        
        # Add the played cards to the list of all played cards
        self.all_played_cards.extend([card for _, card, _ in actions])
        
        return actions


    def play(self):
        for _ in range(NB_TURNS):
            if self.display:
                print(self.gameboard)
            self.turn()
        
        if self.display:
            # Determine the winner (player with the fewest bullheads)
            winner = min(self.players, key=lambda player: player.bullheads)
            print(f"The winner is {winner.name} with {winner.bullheads} bullheads")
            
            # Print the bullheads for each player
            for player in self.players:
                print(f"{player.name} got {player.bullheads} bullheads")

In [86]:
# Hyperparameters
GAMMA = 0.99
LR = 0.001
BATCH_SIZE = 32
MEMORY_SIZE = 5000
CLIP_EPSILON = 0.2

In [87]:
class PPONetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(PPONetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_size, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, output_size),
            nn.Softmax(dim=-1)
        ).to(device)
        
    def forward(self, x):
        return self.fc(x)

In [88]:
class PPOAgent(Player):
    def __init__(self, name):
        super(PPOAgent, self).__init__(name)
        self.state_size = NB_TURNS + NB_ROWS * CARDS_PER_ROWS + NB_PLAYERS * NB_TURNS  # Hand + Gameboard + All played cards
        self.action_size = NB_CARDS + NB_ROWS # Choose a card + Choose a row
        self.model = PPONetwork(self.state_size, self.action_size)
        self.optimizer = optim.Adam(self.model.parameters(), lr=LR)
        self.memory = []
        self.gamma = GAMMA  # Discount factor
        self.clip_epsilon = CLIP_EPSILON  # PPO clip parameter
        self.batch_size = BATCH_SIZE  # Batch size for training
    
    def player_state(self):
        return np.array([card.value for card in self.hand] + [0 for _ in range(NB_TURNS - len(self.hand))], dtype=np.float32)
    
    def game_state(self, gameboard, all_played_cards):
        board = np.zeros((NB_ROWS, CARDS_PER_ROWS), dtype=np.float32)
        for i, row in enumerate(gameboard.board):
            for j, card in enumerate(row):
                board[i, j] = card.value
        played_cards = np.zeros((NB_PLAYERS, NB_TURNS), dtype=np.float32)
        for i, card in enumerate(all_played_cards):
            played_cards[i // NB_TURNS, i % NB_TURNS] = card.value
        return np.concatenate([board.flatten(), played_cards.flatten()])
        
    def get_state(self, gameboard, all_played_cards):
        return np.concatenate([self.player_state(), self.game_state(gameboard, all_played_cards)])
    
    def get_action_mask(self, action_type):
        mask = np.zeros(self.action_size, dtype=np.float32)
        if action_type == "card":
            for card in self.hand:
                mask[card.value - 1] = 1
        else:
            mask[-NB_ROWS:] = 1
        return mask

    def choose_card(self, gameboard, all_played_cards):
        state = self.get_state(gameboard, all_played_cards)
        state = torch.tensor(state, dtype=torch.float32).to(device)
        probs = self.model(state)
        mask = self.get_action_mask("card")
        print("0:", probs)
        probs = probs * torch.tensor(mask, dtype=torch.float32).to(device) # Apply the action mask
        print("1:", probs)
        probs = probs / probs.sum() # Normalize the probabilities
        print("2:", probs)
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()
        return next(card for card in self.hand if card.value == action.item() + 1)

    def choose_row(self, gameboard, all_played_cards):
        state = self.get_state(gameboard, all_played_cards)
        state = torch.tensor(state, dtype=torch.float32).to(device)
        probs = self.model(state)
        mask = self.get_action_mask("row")
        probs = probs * torch.tensor(mask, dtype=torch.float32).to(device)
        probs = probs / probs.sum()
        dist = torch.distributions.Categorical(probs)
        action = dist.sample()
        return action.item() - NB_CARDS
    
    def remember(self, state, action, reward, next_state, done, action_mask):
        self.memory.append((state, action, reward, next_state, done, action_mask))
        
    def train(self):
        if len(self.memory) < self.batch_size:
            return  # Not enough samples for training

        # Sample a batch from memory
        batch = self.memory[-self.batch_size:]
        states, actions, rewards, next_states, dones, action_masks = zip(*batch)

        # Convert to tensors
        states = torch.tensor(np.array(states), dtype=torch.float32).to(device)
        actions = torch.tensor(np.array(actions), dtype=torch.long).to(device)
        rewards = torch.tensor(np.array(rewards), dtype=torch.float32).to(device)
        next_states = torch.tensor(np.array(next_states), dtype=torch.float32).to(device)
        dones = torch.tensor(np.array(dones), dtype=torch.float32).to(device)
        action_masks = torch.tensor(np.array(action_masks), dtype=torch.float32).to(device)

        # Compute discounted rewards
        discounted_rewards = []
        cumulative_reward = 0
        for reward, done in zip(reversed(rewards), reversed(dones)):
            cumulative_reward = reward + self.gamma * cumulative_reward * (1 - done)
            discounted_rewards.insert(0, cumulative_reward)
        discounted_rewards = torch.tensor(discounted_rewards, dtype=torch.float32).to(device)

        # Normalize discounted rewards
        discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-8)

        # Compute old log probabilities
        old_probs = self.model(states)
        old_probs = old_probs * action_masks
        old_probs = old_probs / old_probs.sum(dim=1, keepdim=True)
        old_log_probs = torch.log(old_probs.gather(1, actions.unsqueeze(1))).squeeze()

        # Compute new log probabilities
        new_probs = self.model(states)
        new_probs = new_probs * action_masks
        new_probs = new_probs / new_probs.sum(dim=1, keepdim=True)
        new_log_probs = torch.log(new_probs.gather(1, actions.unsqueeze(1))).squeeze()

        # Compute PPO loss
        ratios = torch.exp(new_log_probs - old_log_probs)
        advantages = discounted_rewards
        surr1 = ratios * advantages
        surr2 = torch.clamp(ratios, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantages
        loss = -torch.min(surr1, surr2).mean()

        # Update model
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Clear memory
        self.memory = []

In [89]:
class GameEnv(Game):
    def __init__(self, players):
        super(GameEnv, self).__init__(players, False)
    
    def play(self):
        for _ in range(NB_TURNS):
            states = [player.get_state(self.gameboard, self.all_played_cards) for player in self.players]
            bullheads = [player.bullheads for player in self.players]
            actions = self.turn()
            next_states = [player.get_state(self.gameboard, self.all_played_cards) for player in self.players]
            rewards = [player.bullheads - bullheads[i] for i, player in enumerate(self.players)]
            for player, card, row in actions:
                player.remember(states[self.players.index(player)], card.value, rewards[self.players.index(player)], next_states[self.players.index(player)], False, player.get_action_mask("card"))
                if row != 0:
                    player.remember(states[self.players.index(player)], row, rewards[self.players.index(player)], next_states[self.players.index(player)], False, player.get_action_mask("row"))

In [90]:
def train_ppo(n_episodes):
    players = [PPOAgent(f"Agent {i}") for i in range(NB_PLAYERS)]
    for episode in range(n_episodes):
        env = GameEnv(players)
        env.play()
        for player in players:
            player.train()
        if episode % 100 == 0:
            print(f"Episode {episode}")

In [91]:
train_ppo(1000)

0: tensor([1.7496e-04, 3.7021e-07, 7.5787e-06, 4.2629e-05, 5.2723e-03, 1.9451e-07,
        2.9216e-06, 4.1024e-07, 1.6594e-07, 1.9671e-07, 1.9755e-03, 8.6463e-05,
        2.7430e-04, 2.0320e-03, 3.0540e-07, 2.5803e-04, 2.4180e-03, 1.0991e-05,
        5.3662e-06, 8.0344e-07, 9.2636e-07, 1.2781e-04, 1.2470e-08, 2.9908e-05,
        6.1007e-02, 1.2642e-04, 5.1102e-08, 4.9297e-04, 4.7633e-06, 1.7296e-03,
        1.1360e-06, 3.9098e-06, 7.7695e-06, 1.8882e-06, 2.8061e-05, 8.3174e-05,
        2.9675e-05, 3.7882e-05, 3.3937e-04, 2.5997e-03, 3.6445e-07, 3.6240e-04,
        5.2439e-05, 1.4564e-06, 7.6146e-04, 2.2373e-04, 1.2343e-03, 8.4141e-06,
        3.1248e-05, 1.3113e-04, 5.8650e-05, 1.4900e-03, 6.8802e-07, 8.4441e-04,
        2.9790e-07, 3.2422e-05, 8.1946e-03, 3.6119e-04, 3.5325e-06, 1.7984e-05,
        3.8096e-06, 4.6745e-06, 2.0851e-04, 6.1408e-04, 9.0314e-07, 2.0936e-06,
        5.0646e-05, 8.7171e-06, 7.5699e-06, 5.3841e-07, 2.8749e-07, 9.3426e-08,
        5.8583e-06, 4.7293e-02, 1.223

ValueError: Expected parameter probs (Tensor of shape (108,)) of distribution Categorical(probs: torch.Size([108])) to satisfy the constraint Simplex(), but found invalid values:
tensor([nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan,
        nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan,
        nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan,
        nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan,
        nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan, nan],
       grad_fn=<DivBackward0>)