In [1]:
import gym 
from gym import spaces
from enum import Enum
import numpy as np
import random

class Actions(Enum):
    HIT = 0
    STAND = 1
    SPLIT = 2
    DOUBLE = 3
        

In [195]:
class BlackjackEnv(gym.Env):
    def __init__(self, number_deck):
        super().__init__()
        self.number_deck = number_deck
        self.current_player_index = 0
        self.current_hand_index = 0
        self.wallet = 1000
        self.number_players = 6
        self.total_rewards = 0
        self.dealer = []
        self.hand_players = {f'player_{i}': \
            {'hands': [[]], 'value': 0, 'nb_ace': 0, 'split': False,
             'bet': 50, 'current_player': self.current_player_index == i, 'hand_playing': 0, 'reward':0, 'blackjack': False} for i in range(self.number_players)}
        self.deck = [2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10, 11] * 4 * self.number_deck 
        self.len_deck = 13 * 4 * number_deck
        self.dealer_playing = self.current_player_index == self.number_players
        self.action_space = spaces.Discrete(4)
        self.observation_space = spaces.Dict({
            "dealer":  spaces.Dict({
                'value': spaces.Box(low=0, high=31, shape=(1,), dtype=np.int32),
                #hand represent card hand 
                'hand': spaces.Box(low=0, high=11, shape=(10,), dtype=np.int32)
                }),
            "len_deck": spaces.Discrete(self.len_deck + 1),  # Nombre de cartes restantes dans le deck
            "info": spaces.Dict({  # Comptage des cartes restantes
                card: spaces.Discrete(52 * self.number_deck + 1) for card in set(self.deck)
            }),
            "players": spaces.Dict({  # Informations détaillées pour chaque joueur
                f'player_{i}': spaces.Dict({
                    "hands": spaces.Box(low=0, high=31, shape=(5,), dtype=np.int32),  # Valeur de chaque main (on suppose jusqu'à 5 mains max)
                    "value": spaces.Box(low=0, high=31, shape=(1,), dtype=np.int32),  # Valeur de la main active
                    "nb_ace": spaces.Discrete(10),  # Nombre d'as (jusqu'à 4 ou 5 possibles par main)
                    "split": spaces.Discrete(2),  # 1 si la main est splittée, sinon 0
                    "bet": spaces.Box(low=0, high=500, shape=(1,), dtype=np.int32),  # Montant du pari (peut être ajusté selon les limites)
                    "current_player": spaces.Discrete(2),  # Indique si c'est le joueur actif
                    "hand_playing": spaces.Discrete(2),  # Index de la main active (jusqu'à 2)
                    "blackjack": spaces.Discrete(2)  # 1 si le joueur a un blackjack, sinon 0
                }) for i in range(6) # Prend en charge jusqu'à 6 joueurs
            }),
            "wallet": spaces.Box(low=0, high=1e6, shape=(1,), dtype=np.float32)
        })
    def shuffle_deck(self):
            """
            Mélange le deck de cartes.
            """
            random.shuffle(self.deck)
            return self.deck
    def time_to_shuffle(self):
        return len(self.deck) < self.len_deck // 2
    
 
    
    def value_hands(self, hands):
        """
        Calculates the value of a hand taking into account aces.
        
        Parameters:
        - hands: list of cards in the hand.
        
        Returns:
        - value: value of the hand.
        """
        value = 0
        num_aces = 0
        
        for card in hands:
            if card == 11:  
                num_aces += 1
                value += 11
            else:
                value += card
    
        while value > 21 and num_aces > 0:
            value -= 10 
            num_aces -= 1
        
        return value

    def play_dealer_hand(self):
        """
        Plays the dealer's hand according to the rules.
        """
        self.dealer_value = self.value_hands(self.dealer)
        while self.dealer_value < 17:
            self.dealer.append(self.deck.pop())
            self.dealer_value = self.value_hands(self.dealer)
            print(self.dealer)
       
    def initialize_new_game(self):    
        """
        Initializes a new game by shuffling the deck, dealing the initial hands, and setting the initial player.
        """
        self.current_player_index = 0
        
        self.dealer = []
        self.hand_players = {f'player_{i}': \
            {'hands': [[]], 'value': 0, 'nb_ace': 0, 'split': False,
             'bet': 5, 'current_player': self.current_player_index == i, 'hand_playing': 0, 'reward':0, 'blackjack': False} 
            for i in range(self.number_players) }
        
        if self.time_to_shuffle(): 
            self.shuffle_deck()
        
        for _ in range(2):
            for player in self.hand_players:
                self.hand_players[player]['hands'][self.current_hand_index].append(self.deck.pop())
            self.dealer.append(self.deck.pop())
        
        for i in range(self.number_players):
            blackjack = (self.value_hands(self.hand_players[f'player_{i}']['hands'][self.current_hand_index]) == 21)
            self.hand_players[player]['blackjack'] = blackjack
            self.update_player_info(i)
        
        return self._get_obs(), 0, False, False

    def reset(self): 
       return self.initialize_new_game()
        
    def _get_obs(self):          
        player_hands = [] 
        value_hands = []
        hands_split = []
        bets = []
        normalized_wallet = self.wallet / 1e6  # Exemple de normalisation
        
        dealer = None
        # Collecte des informations des joueurs
        for i in range(self.number_players):
            player_hands.append(self.hand_players[f'player_{i}']['hands'])
            value_hands.append(self.hand_players[f'player_{i}']['value'])
            hands_split.append(1 if self.hand_players[f'player_{i}']['split'] else 0)
            bets.append(self.hand_players[f'player_{i}']['bet'])
        
        # Gestion de la main du croupier selon le tour
        if self.current_player_index == self.number_players:
            # C'est le tour du croupier, on montre toute sa main
            dealer = self.dealer
        else:
            # Ce n'est pas encore le tour du croupier, on montre seulement la première carte
            dealer = [self.dealer[0]]
        
        return {
            "dealer": {
                "dealer_hand": dealer
            },
            "len_deck": len(self.deck),
            "info": list({card: self.deck.count(card) for card in set(self.deck)}.values()),  # Liste des cartes restantes dans le deck
            "player_hands": player_hands,
            "current_player_index": self.current_player_index,
            "value_hands": value_hands,
            "hands_split": hands_split,
            "bets": bets,
            "normalized_wallet": normalized_wallet,
        }
# return np.array([
#             dealer,
#             len(self.deck),
#             list(self.info.values()),  # Assurez-vous que self.info est une liste plate
#             player_hands,
#             self.current_player_index,
#             value_hands,
#             hands_split,
#             bets,
#             normalized_wallet,
#         ])         

    def update_player_info(self, player_index):
        """
        Met à jour les informations du joueur, y compris la valeur de la main et le nombre d'as.
        """
        player = self.hand_players[f'player_{player_index}']
        hand = player['hands'][self.current_hand_index]
        player['value'] = self.value_hands(hand)
        player['nb_ace'] = hand.count(11)  # Nombre d'as dans la main
        player['blackjack'] = (player['value'] == 21 and len(hand) == 2)  # Vérifie le blackjack
        
    def play_single_hand(self, action):
        reward = 0
        next_player = False

        if not (self.current_player_index == self.number_players):
            
            hand = self.hand_players[f'player_{self.current_player_index}']['hands'][self.current_hand_index]
            bet = self.hand_players[f'player_{self.current_player_index}']['bet']
            if action == Actions.HIT.value:
                hand.append(self.deck.pop())
                self.update_player_info(self.current_player_index)
                value = self.hand_players[f'player_{self.current_player_index}']['value']
                
                if value < 21:
                    reward = (21 - value) / 21
                elif value == 21:
                    reward = 1  # Bonus pour avoir 21
                    next_player = True
                elif value > 21:
                    # Récompense normalisée pour un dépassement de 21
                    reward = -bet / self.wallet
                    next_player = True
            
            elif action == Actions.STAND.value:
                next_player = True

            elif action == Actions.SPLIT.value and len(hand) == 2 and hand[0] == hand[1]:
                if not self.hand_players[f'player_{self.current_player_index}']['split']:
                    self.handle_split() 
                    self.update_player_info(self.current_player_index)
                else:
                    reward = -bet / self.wallet  # Pénalité normalisée pour un split invalide

            elif action == Actions.DOUBLE.value:
                hand.append(self.deck.pop())
                self.update_player_info(self.current_player_index)
                value = self.hand_players[f'player_{self.current_player_index}']['value']
                reward = (21 - value) / 21 if value <= 21 else -bet / self.wallet
                next_player = True

            # Passe au joueur suivant si nécessaire
            self.advance_hand_or_player(next_player)
            return self._get_obs(), reward, False, False
        else:
            # Gérer le tour du croupier après tous les joueurs
            self.play_dealer_hand()
            for i in range(self.number_players):
                player = f'player_{i}'
                if self.hand_players[player]['split']:
                    for hand in self.hand_players[player]['hands']:
                        reward = self.player_vs_dealer(reward, self.value_hands(hand), self.dealer_value, i)
                        self.total_rewards += reward
                else:
                    hand = self.hand_players[player]['hands'][0]
                    reward = self.player_vs_dealer(reward, self.value_hands(hand), self.dealer_value, i)
                    self.total_rewards += reward
            return self._get_obs(), self.total_rewards, True, False

    def player_vs_dealer(self, reward, player_value, dealer_value, index):
        """
        Calcule la récompense pour un joueur en fonction de la comparaison de sa main avec celle du croupier.
        """
        
        bet = self.hand_players[f'player_{index}']['bet']
        
        if player_value > 21:
            reward = -bet / self.wallet  # Pénalité normalisée
        elif dealer_value > 21 or player_value > dealer_value:
            reward = bet / self.wallet  # Gain normalisé
        elif player_value == dealer_value:
            reward = 0
        elif player_value == 21 and self.hand_players[f'player_{index}']['blackjack']:
            reward = (1.5 * bet) / self.wallet  # Blackjack naturel avec gain de 1.5x
        else:
            reward = -bet / self.wallet  # Perte normalisée si le croupier gagne
        return reward

    def advance_hand_or_player(self, next_player):
        """
        Moves to the next hand for the current player or to the next player if all hands of the player have been played.
        """
        if next_player:
            self.current_hand_index = 0  # Reset the hand index for the next player
            self.current_player_index += 1
        elif self.current_hand_index < len(self.hand_players[f'player_{self.current_player_index}']['hands']) - 1:
            self.current_hand_index += 1  # Move to the next hand of the same player
       


    def handle_split(self):
        current_player = f'player_{self.current_player_index}'
        hand = self.hand_players[current_player]['hands'][self.current_hand_index]
        
        card1, card2 = hand
        new_hand1 = [card1, self.deck.pop()]
        new_hand2 = [card2, self.deck.pop()]
        
        self.hand_players[current_player]['hands'][self.current_hand_index] = new_hand1
        self.hand_players[current_player]['hands'].insert(self.current_hand_index + 1, new_hand2)
        self.hand_players[current_player]['split'] = True
        
        # Met à jour les informations pour les nouvelles mains
        self.update_player_info(self.current_player_index)
        
    def get_action_mask(self):
        """
        Crée un masque binaire pour les actions valides et invalides en fonction de l’état actuel de la main du joueur.
        """
        mask = np.ones(self.action_space.n)  # Par défaut, toutes les actions sont valides
        
        # Récupérer la main actuelle du joueur
        if self.current_player_index == self.number_players:
            return np.zeros(self.action_space.n) 
        current_hand = self.hand_players[f'player_{self.current_player_index}']['hands'][self.current_hand_index]   
        value = self.value_hands(current_hand)
        
        # Vérifier les conditions d’invalidité pour chaque action
        if value >= 21:
            # Si la main est 21 ou plus, "HIT" et "DOUBLE" sont invalides
            mask[Actions.HIT.value] = 0
            mask[Actions.DOUBLE.value] = 0

        # "SPLIT" est invalide si les cartes sont différentes ou si le joueur a déjà splitté
        if len(current_hand) != 2 or current_hand[0] != current_hand[1] or self.hand_players[f'player_{self.current_player_index}']['split']:
            mask[Actions.SPLIT.value] = 0

        # "DOUBLE" peut être restreint aux cas où le joueur n’a pas encore tiré de cartes supplémentaires
        if len(current_hand) > 2:
            mask[Actions.DOUBLE.value] = 0  
            
        return mask

    def step(self, action):
        """
        Exécute une action valide et retourne les résultats.
        """
        mask = self.get_action_mask()

        # # Vérifier si l’action choisie est valide
        # if mask[action] == 0:
        #     # Si l'action est invalide, appliquer une pénalité ou ignorer l'action
        #     reward = -1000  # Pénalité pour l’action invalide
        #     return self._get_obs(), reward, False, False

        # Si l'action est valide, exécute la main normalement
        new_state, reward, done, truncated = self.play_single_hand(
            action
            )

        return new_state, reward, done, truncated


In [196]:
env = BlackjackEnv(6)

epochs = 10
env.initialize_new_game()
for epoch in range(epochs):
    while True: 
        state, reward, done, _ = env.step(1)
        print(env._get_obs(), reward, done)
        if done: 
            print(state, reward, done)
            break
    env.reset()

{'dealer': {'dealer_hand': [8]}, 'len_deck': 298, 'info': [23, 23, 23, 23, 23, 23, 23, 23, 92, 22], 'player_hands': [[[11, 7]], [[10, 6]], [[10, 5]], [[10, 4]], [[10, 3]], [[9, 2]]], 'current_player_index': 1, 'value_hands': [18, 16, 15, 14, 13, 11], 'hands_split': [0, 0, 0, 0, 0, 0], 'bets': [5, 5, 5, 5, 5, 5], 'normalized_wallet': 0.001} 0 False
{'dealer': {'dealer_hand': [8]}, 'len_deck': 298, 'info': [23, 23, 23, 23, 23, 23, 23, 23, 92, 22], 'player_hands': [[[11, 7]], [[10, 6]], [[10, 5]], [[10, 4]], [[10, 3]], [[9, 2]]], 'current_player_index': 2, 'value_hands': [18, 16, 15, 14, 13, 11], 'hands_split': [0, 0, 0, 0, 0, 0], 'bets': [5, 5, 5, 5, 5, 5], 'normalized_wallet': 0.001} 0 False
{'dealer': {'dealer_hand': [8]}, 'len_deck': 298, 'info': [23, 23, 23, 23, 23, 23, 23, 23, 92, 22], 'player_hands': [[[11, 7]], [[10, 6]], [[10, 5]], [[10, 4]], [[10, 3]], [[9, 2]]], 'current_player_index': 3, 'value_hands': [18, 16, 15, 14, 13, 11], 'hands_split': [0, 0, 0, 0, 0, 0], 'bets': [5, 5,

In [176]:
print(env._get_obs())

main du deaelr [2]
{'dealer': {'dealer_hand': [2]}, 'len_deck': 149, 'info': [12, 12, 12, 12, 12, 12, 11, 11, 44, 11], 'player_hands': [[[8, 11]], [[7, 10]], [[6, 10]], [[5, 10]], [[4, 10]], [[3, 9]]], 'current_player_index': 0, 'value_hands': [19, 17, 16, 15, 14, 12], 'hands_split': [0, 0, 0, 0, 0, 0], 'bets': [5, 5, 5, 5, 5, 5], 'normalized_wallet': 0.001}


[2, 8]
