In [None]:
!pip install gymnasium stable-baselines3 shimmy gymnasium[box2d] tqdm

In [3]:
# Run `pip install "gymnasium[classic-control]"` for this example.
import gymnasium as gym

# Create our training environment - a cart with a pole that needs balancing
env = gym.make("CartPole-v1", render_mode="human")

# Reset environment to start a new episode
observation, info = env.reset()
# observation: what the agent can "see" - cart position, velocity, pole angle, etc.
# info: extra debugging information (usually not needed for basic learning)

print(f"Starting observation: {observation}")
# Example output: [ 0.01234567 -0.00987654  0.02345678  0.01456789]
# [cart_position, cart_velocity, pole_angle, pole_angular_velocity]

episode_over = False
total_reward = 0

while not episode_over:
    # Choose an action: 0 = push cart left, 1 = push cart right
    action = env.action_space.sample()  # Random action for now - real agents will be smarter!

    # Take the action and see what happens
    observation, reward, terminated, truncated, info = env.step(action)

    # reward: +1 for each step the pole stays upright
    # terminated: True if pole falls too far (agent failed)
    # truncated: True if we hit the time limit (500 steps)

    total_reward += reward
    episode_over = terminated or truncated

print(f"Episode finished! Total reward: {total_reward}")
env.close()

Starting observation: [ 0.00613935  0.04930774  0.0210934  -0.00365053]
Episode finished! Total reward: 24.0


In [6]:
import gymnasium as gym
from gymnasium.wrappers import FlattenObservation

# Start with a complex observation space
env = gym.make("CarRacing-v3")
env.observation_space.shape

(96, 96, 3)

In [7]:
# Wrap it to flatten the observation into a 1D array
wrapped_env = FlattenObservation(env)
wrapped_env.observation_space.shape

# This is useful for scenarios where some algorithms expect a 1D input

(27648,)

# Building an Agent that actually learns stuff

In [8]:
from collections import defaultdict
import gymnasium as gym
import numpy as np

class BlackJackAgent:


  def __init__(
        self,
        env: gym.Env, # The environment
        learning_rate: float, # How fast the agent updates the Q-value
        initial_epsilon: float, # Starting exploration rate
        epsilon_decay: float, # The rate at which the exploration rate reduces after each episode
        final_epsilon: float, # Final exploration rate
        discount_factor: float = 0.95, # Value of future rewards
    ):

    self.env = env

    # Q-table: maps (state, action) to expected reward
    # defaultdict automatically creates entries with zeros for new states
    self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))

    self.lr = learning_rate
    self.discount_factor = discount_factor  # How much we care about future rewards

    # Exploration parameters
    self.epsilon = initial_epsilon
    self.epsilon_decay = epsilon_decay
    self.final_epsilon = final_epsilon

    # Track learning progress
    self.training_error = []


  def get_action(self, obs: tuple[int, int, bool]) -> int:
    """
    Returns 0 (stand) or 1 (hit)
    """
    # with some probability epsilon, the agent should explore
    if np.random.random() < self.epsilon:
      return self.env.action_space.sample()
    else:
      return int(np.argmax(self.q_values[obs]))


  def update(
        self,
        obs: tuple[int, int, bool],
        action: int,
        reward: float,
        terminated: bool,
        next_obs: tuple[int, int, bool],
    ):
        """Update Q-value based on experience.

        This is the heart of Q-learning: learn from (state, action, reward, next_state)
        """
        # What's the best we could do from the next state?
        # (Zero if episode terminated - no future rewards possible)
        future_q_value = (not terminated) * np.max(self.q_values[next_obs])

        # What should the Q-value be? (Bellman equation)
        target = reward + self.discount_factor * future_q_value

        # How wrong was our current estimate?
        temporal_difference = target - self.q_values[obs][action]

        # Update our estimate in the direction of the error
        # Learning rate controls how big steps we take
        self.q_values[obs][action] = (
            self.q_values[obs][action] + self.lr * temporal_difference
        )

        # Track learning progress (useful for debugging)
        self.training_error.append(temporal_difference)


  def decay_epsilon(self):
      """Reduce exploration rate after each episode."""
      self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)


In [9]:
# Training hyperparameters
learning_rate = 0.01        # How fast to learn (higher = faster but less stable)
n_episodes = 100_000        # Number of hands to practice
start_epsilon = 1.0         # Start with 100% random actions
epsilon_decay = start_epsilon / (n_episodes / 2)  # Reduce exploration over time
final_epsilon = 0.1         # Always keep some exploration

# Create environment and agent
env = gym.make("Blackjack-v1", sab=False)
env = gym.wrappers.RecordEpisodeStatistics(env, buffer_length=n_episodes)

agent = BlackJackAgent(
    env=env,
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    epsilon_decay=epsilon_decay,
    final_epsilon=final_epsilon,
)

In [11]:
from tqdm import tqdm  # Progress bar

for episode in tqdm(range(n_episodes)):

  obs, info = env.reset()
  done = False

  while not done:

    action = agent.get_action(obs=obs)

    next_obs, reward, terminated, truncated, info = env.step(action)

    agent.update(obs=obs, action=action, next_obs=next_obs, reward=reward, terminated=terminated)

    done = terminated or truncated
    obs = next_obs

  # Reduce exploration rate
  agent.decay_epsilon()

100%|██████████| 100000/100000 [00:17<00:00, 5625.12it/s]
