In [371]:
import numpy as np
import pandas as pd
import polars as pl
import os
#from file env.py in directory src (you need to change directory)
os.chdir('..')
os.chdir('src')
from env import BlackjackEnv


In [372]:
config = {
    "num_decks": 6,
    "red_card_position": 0.2,
    "bet_size": [1],
    "actions": ["stand", "hit"],
    "num_players": 1
}
# Create environment with 6 decks (standard casino configuration)
env = BlackjackEnv(config=config)


### Initialization of Q table

In [373]:
# Generate relevant states - focusing on decision points that matter
states = []
# For hard totals (no usable ace), only track 12-21
# Below 12, the optimal play is always hit
for player_sum in range(12, 22):
    for dealer_card in range(2, 12):
        states.append((player_sum, dealer_card, 0))  # Hard total

# For soft totals (with usable ace), track 12-21
# Soft totals below 12 are impossible (A+1 = 12)
for player_sum in range(12, 22):
    for dealer_card in range(2, 12):
        states.append((player_sum, dealer_card, 1))  # Soft total

In [374]:

# Initialize Q-table with strategic initial values
q_data = {
    'State': states,
    'Action 0 (Stand)': np.zeros(len(states)),
    'Action 1 (Hit)': np.zeros(len(states))
}

# Strategic initialization: Set high values for "stand" in 20-21, high values for "hit" in 4-11
for i, state in enumerate(states):
    player_sum, _, _ = state
    if player_sum >= 20:
        # For high player sums, initialize stand value higher
        q_data['Action 0 (Stand)'][i] = 0.5
        q_data['Action 1 (Hit)'][i] = -0.1
    elif player_sum < 12:
        # For low player sums, initialize hit value higher
        q_data['Action 0 (Stand)'][i] = -0.1
        q_data['Action 1 (Hit)'][i] = 0.5

Q = pl.DataFrame(q_data)

# Double Q-learning: Second Q-table for reducing bias
Q2 = Q.clone()

### Initialization of Count-Bet table

In [395]:
count_ranges = [[0], [1,2,3],[4]]

count_data = {
    'True Count' : count_ranges,
    'Low Bet (1)' : np.zeros(len(count_ranges)),
    'Mid Bet (2)' : np.zeros(len(count_ranges)),
    'High Bet (5)' : np.zeros(len(count_ranges))
}
# Initialize betting strategy for each count range, which are fixed in all matches
for i, count_range in enumerate(count_ranges):
    if count_range == [0]:
        count_data['Low Bet (1)'][i] = 0.5
        count_data['Mid Bet (2)'][i] = 0.1
        count_data['High Bet (5)'][i] = 0.1
    elif count_range == [1,2,3]:
        count_data['Low Bet (1)'][i] = 0.2
        count_data['Mid Bet (2)'][i] = 0.5
        count_data['High Bet (5)'][i] = 0.2
    else:
        count_data['Low Bet (1)'][i] = 0.2
        count_data['Mid Bet (2)'][i] = 0.3
        count_data['High Bet (5)'][i] = 0.5


count_df = pl.DataFrame(count_data)

# use a dictionary which associates each column name of the type of bet to the actual bet value
betting_strategy = {
    'Low Bet (1)': 1,
    'Mid Bet (2)': 2,
    'High Bet (5)': 5
}

# Track state-action visit counts for adaptive learning rates
visit_counts = {}
for state in states:
    visit_counts[(state, 0)] = 0  # Stand
    visit_counts[(state, 1)] = 0  # Hit

### Hyperparameters

In [376]:

# Improved hyperparameters
initial_lr = 0.1             # Learning rate
lr_decay_rate = 0.00005      # Gentler decay rate
gamma = 0.95                 # Higher discount factor - long-term rewards matter more
n_episodes = 200000         # More training episodes
initial_epsilon = 1.0        # Start with 100% exploration
epsilon_min = 0.01           # Minimum exploration rate
epsilon_decay = 0.99995      # Much slower decay rate

### Helper functions

In [377]:

# Modified state representation - focusing on meaningful game states
def get_state_features(full_state):
    # Extract just player sum, dealer card, and usable ace
    player_sum = full_state[0]
    dealer_card = full_state[1]
    usable_ace = full_state[2]
    return (player_sum, dealer_card, usable_ace)


In [378]:
def get_adaptive_lr(state, action, base_lr):
    """Get state-action specific learning rate based on visit count"""
    key = (state, action)
    count = visit_counts.get(key, 0) + 1
    # Decay learning rate based on visit count, but maintain a minimum rate
    return max(base_lr / (1 + 0.005 * count), base_lr * 0.1)

In [379]:
def get_q_values(state_features, q_table=Q):
    """Get Q-values for a given state"""
    # Filter the DataFrame for the specific state
    state_row = q_table.filter(pl.col('State') == state_features)
    
    if len(state_row) == 0:
        # Return default values based on player sum
        player_sum = state_features[0]
        if player_sum < 12:
            return np.array([-0.1, 0.5])  # Default to hit for low sums
        elif player_sum >= 20:
            return np.array([0.5, -0.1])  # Default to stand for high sums
        else:
            return np.array([0.0, 0.0])  # Neutral for middle sums
            
    # Extract Q-values from the DataFrame
    stand_val = state_row.select('Action 0 (Stand)').item()
    hit_val = state_row.select('Action 1 (Hit)').item()
    return np.array([stand_val, hit_val])


In [None]:
def update_q_value(state_features, action, reward, next_state_features, lr, q_table=Q, q_table_target=Q2):
    """Update Q-value for state-action pair using Double Q-learning"""
    # Check if state exists in our table
    state_row = q_table.filter(pl.col('State') == state_features)
    if len(state_row) == 0:
        return # State not in our table
    
    # Determine which action column to update
    action_col = 'Action 1 (Hit)' if action == 1 else 'Action 0 (Stand)'
    
    # Current Q-value in the DataFrame
    current_q = state_row.select(action_col).item()
    
    # If next_state_features is None, this is a terminal state
    if next_state_features is None:
        # Terminal state - no future rewards
        new_q = current_q + lr * (reward - current_q)
    else:
        # Get the next state's best action from current Q-table
        next_q_values = get_q_values(next_state_features, q_table)
        best_next_action = np.argmax(next_q_values)
        
        # Get Q-value for best action from target Q-table
        next_q_values_target = get_q_values(next_state_features, q_table_target)
        max_next_q = next_q_values_target[best_next_action]
        
        # Q-learning update formula with future rewards
        new_q = current_q + lr * (reward + gamma * max_next_q - current_q)
    
    # Update the Q-table entry in the DataFrame
    # Create a temporary mask for the state we want to update
    mask = pl.col('State') == state_features
    
    # Use the when/then/otherwise pattern to update values
    q_table = q_table.with_columns(
        pl.when(mask)
        .then(pl.lit(new_q))
        .otherwise(pl.col(action_col))
        .alias(action_col)
    )
    
    # Track visit counts
    visit_counts[(state_features, action)] = visit_counts.get((state_features, action), 0) + 1
    
    return q_table

In [397]:
def update_count_table(current_count, reward, count_df=count_df):
    """Update the value associated with the move of betting a certain amount based on the reward received"""
    print(count_df)
    # Check if reward is positive or negative
    if reward > 0:
        # Positive reward - increase the value of betting a higher amount and decrease the value of betting a lower amount for the row corresponding to the current count
        # Find the index of the current count range
        index = None
        if current_count <= 0:
            index = 0
        elif current_count <= 3:
            index = 1
        else:
            index = 2
        # update the row corresponding to the current count
        if index is None:
            raise ValueError("Current count not found in count ranges")
        # Update the values in the count_df DataFrame
        count_df = count_df.with_columns(
            pl.when(pl.col('True Count') == count_ranges[index])
            .then(pl.lit(count_df['Low Bet (1)'][index] * 0.9))
            .otherwise(pl.col('Low Bet (1)'))
            .alias('Low Bet (1)')
        )
        count_df = count_df.with_columns(
            pl.when(pl.col('True Count') == count_ranges[index])
            .then(pl.lit(count_df['Mid Bet (2)'][index] * 0.95))
            .otherwise(pl.col('Mid Bet (2)'))
            .alias('Mid Bet (2)')
        )
        count_df = count_df.with_columns(
            pl.when(pl.col('True Count') == count_ranges[index])
            .then(pl.lit(count_df['High Bet (5)'][index] * 1.05))
            .otherwise(pl.col('High Bet (5)'))
            .alias('High Bet (5)')
        )
    elif reward < 0:
        # Negative reward - decrease the value of betting a higher amount and increase the value of betting a lower amount for the row corresponding to the current count
        # Find the index of the current count range
        index = None
        if current_count <= 0:
            index = 0
        elif current_count <= 3:
            index = 1
        else:
            index = 2
        # update the row corresponding to the current count
        if index is None:
            raise ValueError("Current count not found in count ranges")
        # Update the values in the count_df DataFrame
        count_df = count_df.with_columns(
            pl.when(pl.col('True Count') == count_ranges[index])
            .then(pl.lit(count_df['Low Bet (1)'][index] * 1.05))
            .otherwise(pl.col('Low Bet (1)'))
            .alias('Low Bet (1)')
        )
        count_df = count_df.with_columns(
            pl.when(pl.col('True Count') == count_ranges[index])
            .then(pl.lit(count_df['Mid Bet (2)'][index] * 0.95))
            .otherwise(pl.col('Mid Bet (2)'))
            .alias('Mid Bet (2)')
        )
        count_df = count_df.with_columns(
            pl.when(pl.col('True Count') == count_ranges[index])
            .then(pl.lit(count_df['High Bet (5)'][index] * 0.9))
            .otherwise(pl.col('High Bet (5)'))
            .alias('High Bet (5)')
        )
    else:
        #do nothing
        #ALTERNATIVE: update something
        pass
    

    
    return count_df

### Training Q-table

In [382]:
# Training loop with convergence check
print("Starting improved training...")
wins = 0
draws = 0
losses = 0
epsilon = initial_epsilon
lr = initial_lr
money_won = 0
money_lost = 0

# Parameters for convergence
n_episodes = 200000  # Number of episodes for training
convergence_threshold = 0.001  # Lower threshold for better stability
convergence_check_interval = 10000  # Check for convergence every N episodes
convergence_required_count = 3  # Number of consecutive checks below threshold to confirm convergence
max_episodes = n_episodes  # Maximum episodes as a fallback

# Keep a copy of the previous Q-table for comparison
previous_q = Q.clone()
convergence_count = 0
converged = False
episode = 0
#first training phase only for the Q-table with fixed betting strategy
while episode < max_episodes and not converged:

    env.reset()
    bet_index = env.bet_space.sample()  # Sample bet index from the environment
    bet_amount = env.bets[bet_index]  # Sample bet amount from the environment
    # print(env.step(bet_index, action_type="bet"))
    state, reward, done = env.step(bet_index, action_type="bet")  # Place bet
    if done:
        if reward > 0:
            wins += 1
            money_won += reward * bet_amount
        elif reward == 0:
            draws += 1
        else:
            losses += 1
            money_lost += abs(reward) * bet_amount
    # print(bet_amount)
    state_features = get_state_features(state)

    # Training episode
    while not done:
        
        if state_features[0] < 12:
        # Always hit this state as it's not relevant for our training
            next_state, _, _ = env.step(1, action_type="move")
            next_state_features = get_state_features(next_state) if not done else None
            state = next_state
            state_features = next_state_features if next_state is not None else None
            continue
        
        # Epsilon-greedy action selection
        elif np.random.rand() < epsilon:
            action = env.move_space.sample()  # Random action
        else:
            q_values = get_q_values(state_features)
            action = np.argmax(q_values)  # Greedy action
        
        # Take action
        next_state, reward, done = env.step(action, action_type="move")
        next_state_features = get_state_features(next_state) if not done else None

        # Get adaptive learning rate for this state-action pair
        adaptive_lr = get_adaptive_lr(state_features, action, lr)

        # Randomly decide which Q-table to update (Double Q-learning)
        # print(f"State: {state_features}, Action: {action}, Done: {done}, Reward: {reward}, Next State: {next_state_features}")
        if np.random.rand() < 0.5:
            # print("Updating Q-table 1")
            Q = update_q_value(state_features, action, reward*bet_amount, next_state_features, adaptive_lr, Q, Q2)
        else:
            # print("Updating Q-table 2")
            Q2 = update_q_value(state_features, action, reward*bet_amount, next_state_features, adaptive_lr, Q2, Q)
            
        # Track outcomes
        if done:
            if reward > 0:
                wins += 1
                money_won += reward * bet_amount
            elif reward == 0:
                draws += 1
            else:
                losses += 1
                money_lost += abs(reward) * bet_amount
        
        state = next_state
        state_features = next_state_features if next_state is not None else None
        
        if state_features is None:
            # print(f"Entered break condition with done being {done}")
            break
    
    # Decay epsilon and learning rate
    epsilon = max(epsilon_min, epsilon * epsilon_decay)
    lr = initial_lr / (1 + lr_decay_rate * episode)
    
    # Check for convergence periodically
    if episode % convergence_check_interval == 0 and episode > 0:
        # Calculate the maximum absolute difference between current and previous Q-values
        diff_stand = (Q.select('Action 0 (Stand)').to_numpy() - 
                     previous_q.select('Action 0 (Stand)').to_numpy())
        diff_hit = (Q.select('Action 1 (Hit)').to_numpy() - 
                   previous_q.select('Action 1 (Hit)').to_numpy())
        
        max_diff_stand = np.max(np.abs(diff_stand))
        max_diff_hit = np.max(np.abs(diff_hit))
        max_diff = max(max_diff_stand, max_diff_hit)
        
        if max_diff < convergence_threshold:
            convergence_count += 1
            print(f"Episode {episode}, max Q-value change: {max_diff:.6f} (convergence count: {convergence_count}/{convergence_required_count})")
            if convergence_count >= convergence_required_count:
                print(f"Converged after {episode} episodes (max Q-value change: {max_diff:.6f})")
                converged = True
        else:
            convergence_count = 0
            print(f"Episode {episode}, max Q-value change: {max_diff:.6f}")
        
        # Store current Q-values for next comparison
        previous_q = Q.clone()
    
    episode += 1

# Final statistics
total_episodes = episode
print(f"Training complete after {total_episodes} episodes.")
print(f"Win rate: {wins/total_episodes:.4f}")
print(f"Draw rate: {draws/total_episodes:.4f}")
print(f"Loss rate: {losses/total_episodes:.4f}")

Starting improved training...
Episode 10000, max Q-value change: 0.790221
Episode 20000, max Q-value change: 0.464655
Episode 30000, max Q-value change: 0.285216
Episode 40000, max Q-value change: 0.196771
Episode 50000, max Q-value change: 0.200968
Episode 60000, max Q-value change: 0.123454
Episode 70000, max Q-value change: 0.108336
Episode 80000, max Q-value change: 0.093450
Episode 90000, max Q-value change: 0.109359
Episode 100000, max Q-value change: 0.084005
Episode 110000, max Q-value change: 0.073266
Episode 120000, max Q-value change: 0.046212
Episode 130000, max Q-value change: 0.049739
Episode 140000, max Q-value change: 0.042723
Episode 150000, max Q-value change: 0.033059
Episode 160000, max Q-value change: 0.034446
Episode 170000, max Q-value change: 0.040687
Episode 180000, max Q-value change: 0.030728
Episode 190000, max Q-value change: 0.030142
Training complete after 200000 episodes.
Win rate: 0.4072
Draw rate: 0.0625
Loss rate: 0.5303


### Testing Q-table win rate

In [387]:

# Evaluate the final policy with more episodes
print("\nFinal policy evaluation...")
eval_wins = 0
eval_draws = 0
eval_loss = 0
money_won = 0
money_lost = 0
eval_episodes = 10000

for _ in range(eval_episodes):
    env.reset()
    bet_index = env.bet_space.sample()  # Sample bet index from the environment
    bet_amount = env.bets[bet_index]  # Sample bet amount from the environment
    # print(env.step(bet_index, action_type="bet"))
    state, reward, done = env.step(bet_index, action_type="bet")  # Place bet
    if done:
        if reward > 0:
            eval_wins += 1
            money_won += reward * bet_amount
        elif reward == 0:
            eval_draws += 1
        else:
            eval_loss += 1
            money_lost += abs(reward) * bet_amount
    # print(bet_amount)
    state_features = get_state_features(state)

    # Training episode
    while not done:
        # Always choose the best action according to average of both Q-tables
        q_values1 = get_q_values(state_features, Q)
        q_values2 = get_q_values(state_features, Q2)
        avg_q_values = (q_values1 + q_values2) / 2
        action = np.argmax(avg_q_values)
        
        next_state, reward, done = env.step(action, action_type="move")
        
        if done and reward > 0:
            eval_wins += 1
            money_won += reward * bet_amount
        elif done and reward == 0:
            eval_draws += 1
        elif done and reward < 0:
            eval_loss += 1
            money_lost += abs(reward) * bet_amount
        else:
            pass
        
        state = next_state
        state_features = get_state_features(state)

        if done:
            break

print(f"Final evaluation complete.")
print(f"Win rate: {eval_wins/eval_episodes:.4f}")
print(f"Draw rate: {eval_draws/eval_episodes:.4f}")
print(f"Loss rate: {eval_loss/eval_episodes:.4f}")
print(f"Money won: {money_won}")
print(f"Money lost: {money_lost}")


Final policy evaluation...
Final evaluation complete.
Win rate: 0.4336
Draw rate: 0.0871
Loss rate: 0.4793
Money won: 4550.0
Money lost: 4793


###  Training Count-Bet table (Work in progress)

In [None]:
# Training loop with convergence check
print("Starting improved training...")
wins = 0
draws = 0
losses = 0
money_won = 0
money_lost = 0

# Parameters for convergence
convergence_threshold = 0.001  # Lower threshold for better stability
convergence_check_interval = 10000  # Check for convergence every N episodes
convergence_required_count = 3  # Number of consecutive checks below threshold to confirm convergence
max_episodes = n_episodes  # Maximum episodes as a fallback

# Keep a copy of the previous Q-table for comparison
previous_count = count_df.clone()
convergence_count = 0
converged = False
episode = 0

#training phase for the betting strategy
while episode < max_episodes and not converged:

    obs = env.reset()
    current_count = obs[3]
    bet_index = env.bet_space.sample()
    bet_amount = env.bets[bet_index]
    state, reward, done = env.step(bet_index, action_type="bet") 

    # Find the index of the current count range
    if current_count <= 0:
        index = 0
    elif current_count <= 3:
        index = 1
    else:
        index = 2
    # Get the row for current count
    count_row = count_df.filter(pl.col('True Count') == count_ranges[index])

    # Get values for each bet type
    low_bet_val = count_row.select('Low Bet (1)').item()
    mid_bet_val = count_row.select('Mid Bet (2)').item()
    high_bet_val = count_row.select('High Bet (5)').item()

    # Find column with maximum value
    if low_bet_val >= mid_bet_val and low_bet_val >= high_bet_val:
        max_bet_col = 'Low Bet (1)'
    elif mid_bet_val >= low_bet_val and mid_bet_val >= high_bet_val:
        max_bet_col = 'Mid Bet (2)'
    else:
        max_bet_col = 'High Bet (5)'

    # Get the bet amount
    bet_amount = betting_strategy[max_bet_col]

    if done:
        if reward > 0:
            wins += 1
            money_won += reward * bet_amount
        elif reward == 0:
            draws += 1
        else:
            losses += 1
            money_lost += abs(reward) * bet_amount
    
    state_features = get_state_features(state)

    # Training episode
    while not done:

        if state_features[0] < 12:
        # Always hit this state as it's not relevant for our training
            next_state, _, _ = env.step(1, action_type="move")
            next_state_features = get_state_features(next_state) if not done else None
            state = next_state
            state_features = next_state_features if next_state is not None else None
            continue
        
        q_values1 = get_q_values(state_features, Q)
        q_values2 = get_q_values(state_features, Q2)
        avg_q_values = (q_values1 + q_values2) / 2
        action = np.argmax(avg_q_values)
        
        # Take action
        next_state, reward, done = env.step(action, action_type="move")
        next_state_features = get_state_features(next_state) if not done else None
        
        #update the count_df DataFrame based on the reward received
        print(current_count)
        count_df = update_count_table(current_count, reward*bet_amount, count_df)
            
        # Track outcomes
        if done:
            if reward > 0:
                wins += 1
                money_won += reward * bet_amount
            elif reward == 0:
                draws += 1
            else:
                losses += 1
                money_lost += abs(reward) * bet_amount
        
        state = next_state
        state_features = next_state_features if next_state is not None else None
        
        if state_features is None:
            break
    #check for convergence of the count_df DataFrame
    if episode % convergence_check_interval == 0 and episode > 0:
        # Calculate the maximum absolute difference between current and previous Q-values
        diff_stand = (count_df.select('Low Bet (1)').to_numpy() - 
                     previous_count.select('Low Bet (1)').to_numpy())
        diff_hit = (count_df.select('Mid Bet (2)').to_numpy() - 
                   previous_count.select('Mid Bet (2)').to_numpy())
        
        max_diff_stand = np.max(np.abs(diff_stand))
        max_diff_hit = np.max(np.abs(diff_hit))
        max_diff = max(max_diff_stand, max_diff_hit)
        
        if max_diff < convergence_threshold:
            convergence_count += 1
            print(f"Episode {episode}, max Q-value change: {max_diff:.6f} (convergence count: {convergence_count}/{convergence_required_count})")
            if convergence_count >= convergence_required_count:
                print(f"Converged after {episode} episodes (max Q-value change: {max_diff:.6f})")
                converged = True
        else:
            convergence_count = 0
            print(f"Episode {episode}, max value change: {max_diff:.6f}")
        
        # Store current Q-values for next comparison
        previous_count = count_df.clone()
    
    episode += 1

# Final statistics
total_episodes = episode
print(f"Training complete after {total_episodes} episodes.")
print(f"Win rate: {wins/total_episodes:.4f}")
print(f"Draw rate: {draws/total_episodes:.4f}")
print(f"Loss rate: {losses/total_episodes:.4f}")

### Testing Q + Count-bet

### Extra

In [388]:
# Calculate average Q-values (ensemble approach)
Qp = Q.to_pandas()
Q2p = Q2.to_pandas()
avg_Q = Qp.copy()
avg_Q['Action 0 (Stand)'] = (Qp['Action 0 (Stand)'] + Q2p['Action 0 (Stand)']) / 2
avg_Q['Action 1 (Hit)'] = (Qp['Action 1 (Hit)'] + Q2p['Action 1 (Hit)']) / 2
avg_Q['Best Action'] = avg_Q.apply(
    lambda row: "Stand" if row['Action 0 (Stand)'] > row['Action 1 (Hit)'] else "Hit", 
    axis=1
)

In [None]:

# Close environment
env.close()
