In [163]:
import random
import numpy as np
from copy import deepcopy
from tqdm import tqdm
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
import seaborn as sns

In [165]:
# Import necessary modules
import sys
import os

# Add the 'src' directory to the system path if it's not already
sys.path.append(os.path.abspath(os.path.join('..', 'environment')))

import random
from environment import BlackjackGame  # Adjust if your environment file is in a different location

In [137]:
class TemporalDifference:
    def __init__(self, env, alpha=0.1, gamma=0.9, epsilon=0.1, lambd=0.9):
        self.env = env
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.lambd = lambd

        # Q-table as a dictionary
        self.Q = {}  # Key: state (tuple), Value: [Q-value for action 0, Q-value for action 1]
        self.E = {}  # Eligibility trace dictionary

        print("Dynamic Q-table initialized with dictionary structure.")

    def get_q_values(self, state):
        """
        Get the Q-values for a given state. Default to [0, 0] if not present.
        """
        return self.Q.get(tuple(state), [0.0, 0.0])

    def set_q_value(self, state, action, q_value):
        """
        Set the Q-value for a specific state-action pair.
        """
        # Ensure the state exists in the Q-table
        if tuple(state) not in self.Q:
            self.Q[tuple(state)] = [0.0, 0.0]
        self.Q[tuple(state)][action] = q_value

    def get_e_trace(self, state):
        """
        Get the eligibility trace for a given state. Default to [0, 0] if not present.
        """
        return self.E.get(tuple(state), [0.0, 0.0])

    def set_e_trace(self, state, action, value):
        """
        Set the eligibility trace for a specific state-action pair.
        """
        # Ensure the state exists in the eligibility trace
        if tuple(state) not in self.E:
            self.E[tuple(state)] = [0.0, 0.0]
        self.E[tuple(state)][action] = value

    def epsilon_greedy_policy(self, state):
        """
        Epsilon-greedy policy for action selection.
        Randomly selects an action if the state is not in Q-table.
        """
        if random.random() < self.epsilon:
            return random.choice(self.env.actions)
        else:
            q_values = self.get_q_values(state)
            return np.argmax(q_values)

    def train(self, num_episodes, true_count=0, on_policy=True):
        """
        Train the agent using Temporal Difference learning with eligibility traces.
        """
        self.env.reset(true_count=true_count)
        for _ in tqdm(range(num_episodes)):
            self.E.clear()  # Reset eligibility trace

            reward, state, winner = self.env.new_game()
            while winner:
                reward, state, winner = self.env.new_game()  # Skip initial game if it's over immediately

            action = self.epsilon_greedy_policy(state)
            while not self.env.winner:
                reward, next_state, winner = self.env.step(action)
                next_action = self.epsilon_greedy_policy(next_state)

                # Compute target
                next_q_values = self.get_q_values(next_state)
                if winner:
                    target = reward
                else:
                    if on_policy:
                        target = reward + self.gamma * next_q_values[next_action]
                    else:
                        target = reward + self.gamma * max(next_q_values)

                # Update Q-value and eligibility trace
                current_q_value = self.get_q_values(state)[action]
                delta = target - current_q_value

                # Update eligibility trace
                e_trace = self.get_e_trace(state)
                e_trace[action] += 1
                self.set_e_trace(state, action, e_trace[action])

                # Update Q-values and decay traces
                for s, trace in self.E.items():
                    q_values = self.get_q_values(s)
                    for a in range(len(q_values)):
                        updated_q_value = q_values[a] + self.alpha * delta * trace[a]
                        self.set_q_value(s, a, updated_q_value)
                        self.set_e_trace(s, a, trace[a] * self.gamma * self.lambd)

                if winner:
                    break  # End of episode

                state, action = next_state, next_action

    def get_best_action(self, state):
        """
        Returns the best action for a given state based on the trained Q-values.
        For unseen states, it chooses a random action.
        """
        q_values = self.get_q_values(state)  # Retrieve Q-values for the state
        if q_values == [0.0, 0.0]:  # If unseen state, Q-values will be default
            return random.choice(self.env.actions)
        return int(np.argmax(q_values))

    def print_q_table(self):
        """
        Print the current Q-table for debugging purposes.
        """
        print("Current Q-table:")
        for state, q_values in self.Q.items():
            print(f"State: {state}, Q-values: {q_values}")


In [139]:
env = BlackjackGame()
agent = TemporalDifference(
    env=env,
    alpha=0.1,    # Learning rate
    gamma=0.9,    # Discount factor
    epsilon=0.1,  # Exploration rate
    lambd=1     # Trace decay
)

Dynamic Q-table initialized with dictionary structure.


In [147]:
num_episodes = 10000000  # Define the number of episodes to train
agent.train(num_episodes=num_episodes)

100%|███████████████████████████████████████████████████████████████████| 10000000/10000000 [05:28<00:00, 30433.63it/s]


In [149]:
len(agent.Q)

13347

In [159]:
num_test_games = 1000
wins = 0
losses = 0
ties = 0
blackjack_count = 0
total_reward = 0
env = BlackjackGame()

for _ in tqdm(range(num_test_games), desc="Testing Games"):
    env.reset(true_count=2)
    game_reward, state, winner = env.new_game()  # Start a new game

    while not winner:
        # Get the best action based on the trained model
        action = agent.get_best_action(state)
        
        # Take the action in the environment
        reward, next_state, winner = env.step(action)
        
        # Accumulate reward for the current game
        game_reward += reward
        
        # Update the state
        state = next_state

    # Track total reward
    total_reward += game_reward

    # Step 3: Record the outcome
    if winner == 'player':
        wins += 1
    elif winner == 'blackjack':
        wins += 1
        blackjack_count += 1
    elif winner == 'dealer':
        losses += 1
    else:  # Tie
        ties += 1

# Step 4: Compute performance metrics
win_rate = wins / num_test_games
loss_rate = losses / num_test_games
tie_rate = ties / num_test_games
average_reward = total_reward / num_test_games
blackjack_rate = blackjack_count / num_test_games

# Step 5: Print results
print(f"Results after {num_test_games} test games:")
print(f"Win Rate: {win_rate:.2%}")
print(f"Loss Rate: {loss_rate:.2%}")
print(f"Tie Rate: {tie_rate:.2%}")
print(f"Blackjack Rate: {blackjack_rate:.2%}")
print(f"Total Reward: {total_reward}")
print(f"Average Reward: {average_reward:.2f}")

Testing Games: 100%|█████████████████████████████████████████████████████████████| 1000/1000 [00:00<00:00, 3004.46it/s]

Results after 1000 test games:
Win Rate: 27.90%
Loss Rate: 70.10%
Tie Rate: 2.00%
Blackjack Rate: 6.40%
Total Reward: -7800.0
Average Reward: -7.80



