# Step 1: Importing Libraries and Defining Constants

In this step, we import necessary libraries such as pygame, numpy, pickle, and random. We also define constants for colors and gridworld dimension



In [1]:
import pygame
import numpy as np
import pickle
import random

# Define colors
BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
GREEN = (0, 150, 0)  # Darker shade of green
RED = (255, 0, 0)
BLUE = (0, 0, 255)

# Gridworld dimensions
n = 5  # Number of rows
m = 5  # Number of columns

# Font for text display (using default font)
pygame.init()
font = pygame.font.Font(None, 20)  # Using the default font

pygame 2.5.2 (SDL 2.28.3, Python 3.12.3)
Hello from the pygame community. https://www.pygame.org/contribute.html


This step sets up the foundation for building the gridworld environment and defining its characteristics.

# Step 2: Initializing Pygame and Setting Up the Screen
Here, we initialize Pygame and set up the screen for our gridworld environment. We create a Pygame window with a specific size to accommodate the grid and log area. Additionally, we set the window caption.

In [2]:
# Initialize Pygame
pygame.init()
screen = pygame.display.set_mode((650, 700))  # Set screen size to accommodate log area and wider grid
pygame.display.set_caption("Gridworld Reinforcement Learning")

This step prepares the environment for rendering the gridworld and displaying it to the user.

# Step 4: Drawing the Gridworld
In this step, we define a function draw_gridworld() to draw the gridworld on the Pygame screen. This function takes the grid state grid and optionally the state value table V as input and visualizes the grid, rewards, values, and log entries.s.

In [3]:
def draw_gridworld(grid, V=None):
    """
    Draws the gridworld on the screen with rewards, values, and log entries.
    """
    screen.fill(WHITE)  # Fill screen with white color
    for i in range(n):
        for j in range(m):
            # Draw border around each cell
            pygame.draw.rect(screen, BLACK, (j * cell_size, i * cell_size, cell_size, cell_size), 1)

            # Color cells based on grid values
            if grid[i, j] == 'X':  # Obstacle
                color = BLACK
                pygame.draw.rect(screen, color, (j * cell_size, i * cell_size, cell_size, cell_size))  # Draw obstacle background
            elif grid[i, j] == '1':  # Goal
                color = GREEN
                pygame.draw.rect(screen, GREEN, (j * cell_size + 1, i * cell_size + 1, cell_size - 2, cell_size - 2))  # Dark green for goal state
            else:
                color = WHITE

            # Draw value on top of the grid
            if V is not None:
                value_text = font.render(str(round(V[i, j], 2)), True, BLACK)
                text_rect = value_text.get_rect(center=(j * cell_size + cell_size // 2, i * cell_size + cell_size // 2))  # Center-align text
                screen.blit(value_text, text_rect)

            # Draw reward text in the center of the cell
            reward_text = font.render(str(grid[i, j]), True, color)
            reward_text_rect = reward_text.get_rect(center=(j * cell_size + cell_size // 2, i * cell_size + cell_size // 2))
            screen.blit(reward_text, reward_text_rect)

    # Draw log text
    log_text = font.render("Log:", True, BLACK)
    screen.blit(log_text, (10, n * cell_size + 20))
    log_area_height = 150
    log_area = pygame.Rect(10, (n * cell_size + 50), 630, log_area_height)  # Define log area rectangle
    pygame.draw.rect(screen, WHITE, log_area)  # Draw log area background
    for index, entry in enumerate(log[-5:]):  # Display only the last 5 log entries
        log_entry = font.render(entry, True, BLACK)
        screen.blit(log_entry, (20, (n * cell_size + 70) + index * 25))


This function is responsible for rendering the gridworld environment on the screen, including obstacles, goal states, rewards, and log entries. It provides a visual representation of the gridworld state to the user.

# Step 5: Drawing the Agent
In this step, we define a function draw_agent() to draw the agent on the screen. The agent is represented as a red circle on the gridworld. Additionally, a small black dot is drawn at the center of the agent's cell for better visualization.

In [4]:
def draw_agent(agent_pos):
    """
    Draws the agent on the screen.
    """
    pygame.draw.circle(screen, RED, (agent_pos[1] * cell_size + cell_size // 2,
                                   agent_pos[0] * cell_size + cell_size // 2), cell_size // 3)
    # Draw a dot in the center of the agent's cell
    pygame.draw.circle(screen, BLACK, (agent_pos[1] * cell_size + cell_size // 2,
                                     agent_pos[0] * cell_size + cell_size // 2), 3)


This function takes the agent's position agent_pos as input and draws a red circle representing the agent at the appropriate grid cell on the screen. Additionally, a small black dot is drawn at the center of the agent's cell for better visualization of the agent's position. This function is called during the rendering of the gridworld to display the agent's position to the user.

# Step 6: Determining Action Index
Here, we define a function get_action_index() to determine the index corresponding to a given action. This function is useful for mapping actions to indices, which can be helpful for various operations, such as policy extraction.

In [5]:
def get_action_index(action):
    """
    Returns the index corresponding to a given action.
    """
    actions = ['up', 'down', 'left', 'right']
    return actions.index(action)


The get_action_index() function takes an action as input and returns its corresponding index in the predefined list of actions. This index is used for various operations where actions need to be represented numerically, such as accessing elements in arrays or policy extraction.

# Step 7: Determining Valid Actions
In this step, we define a function get_valid_actions() to determine the list of valid actions from a given state, considering obstacles in the grid. This function checks for valid actions by considering the boundaries of the grid and obstacles present in the gridworld.

In [6]:
def get_valid_actions(grid, state):
    """
    Returns a list of valid actions from the given state (considering obstacles).
    """
    actions = ['up', 'down', 'left', 'right']
    valid_actions = []
    for action in actions:
        new_pos = get_next_state(grid, state, action)
        if new_pos is not None and grid[new_pos[0], new_pos[1]] != 'X':
            valid_actions.append(action)
    return valid_actions


The get_valid_actions() function takes the grid and a state as input and returns a list of valid actions that can be taken from that state without hitting obstacles. It iterates over all possible actions and checks if moving in that direction would result in a valid position on the grid without hitting obstacles. If so, the action is added to the list of valid actions.

# Step 8: Value Iteration
In this step, we implement the value_iteration() function to perform the Value Iteration algorithm. This algorithm calculates the state value table V, addressing invalid moves and revisiting penalty. The value_iteration() function iterates until convergence to update the state values based on the rewards and transitions in the gridworld.

In [7]:
def value_iteration(grid, start, goal, discount_factor=0.9):
    """
    Performs Value Iteration to calculate the state value table, addressing invalid moves and revisiting penalty.
    """
    n = grid.shape[0]
    m = grid.shape[1]
    V = np.zeros((n, m))
    state_visit_counts = {}  # Initialize dictionary to store visit counts for each state

    while True:
        delta = 0
        for i in range(n):
            for j in range(m):
                if grid[i, j] == 'X':
                    V[i, j] = float('-inf')  # Set value to negative infinity for obstacle cells
                    continue

                v_old = V[i, j]
                max_reward = float('-inf')  # Initialize maximum reward to negative infinity
                valid_actions = get_valid_actions(grid, (i, j))

                # Only consider valid actions to avoid invalid transitions
                for action in valid_actions:
                    new_pos = get_next_state(grid, (i, j), action)
                    reward = float(grid[new_pos[0], new_pos[1]]) if new_pos != goal else 10.0  # Goal state reward
                    reward += -1.0  # Step cost
                    if grid[new_pos[0], new_pos[1]] == 'X':  # Penalty for obstacle
                        reward -= 10.0
                    # Stronger penalty for revisiting (adjust as needed)
                    revisit_penalty = -2 * state_visit_counts.get((i, j), 0)
                    total_reward = reward + revisit_penalty + discount_factor * V[new_pos[0], new_pos[1]]  # Total accumulated reward
                    max_reward = max(max_reward, total_reward)  # Update maximum reward

                # Update value based on maximum reward
                V[i, j] = max_reward
                delta = max(delta, abs(v_old - V[i, j]))

                # Update state visit count (after updating V)
                state = (i, j)
                state_visit_counts[state] = state_visit_counts.get(state, 0) + 1

        if delta < 1e-8:  # Epsilon for convergence criteria
            break

    return V


The value_iteration() function takes the grid, start state, goal state, and discount factor as input and returns the state value table V. It iterates over all states in the grid, updating their values based on the rewards, transitions, and the discount factor. This step is crucial for learning the optimal policy through reinforcement learning.





# Step 9: Determining Next State
In this step, we define the get_next_state() function, which calculates the next state after taking an action from the current state. This function handles boundary conditions to ensure that the agent does not move out of the gridworld.

In [8]:
def get_next_state(grid, state, action):
    """
    Returns the next state after taking an action from the current state,
    handling boundary conditions.
    """
    row_index, col_index = state  # Unpack state tuple

    if action == 'up':
        new_row_index = max(row_index - 1, 0)
        new_col_index = col_index
    elif action == 'down':
        new_row_index = min(row_index + 1, n - 1)
        new_col_index = col_index
    elif action == 'left':
        new_row_index = row_index
        new_col_index = max(col_index - 1, 0)
    elif action == 'right':
        new_row_index = row_index
        new_col_index = min(col_index + 1, m - 1)

    if new_row_index < 0 or new_col_index < 0 or new_row_index >= n or new_col_index >= m:
        return None  # Return None for invalid states (out of bounds)
    else:
        return (new_row_index, new_col_index)


The get_next_state() function takes the grid, current state, and action as input and returns the next state after applying the action. It handles boundary conditions to ensure that the agent remains within the gridworld. This function is essential for determining the agent's movement in the environment.

# Step 10: Implementing Epsilon-Greedy Action Selection
In this step, we define the get_action_with_exploration() function, which implements epsilon-greedy exploration with Boltzmann exploration for better spread. This function selects an action for the agent based on the current state and a given policy, considering exploration with a specified epsilon value.

In [9]:
def get_action_with_exploration(state, policy, epsilon, temperature=1.0):
    """
    Implements Epsilon-greedy exploration with Boltzmann for better spread.
    """
    valid_actions = get_valid_actions(grid, state)

    if random.random() < epsilon:
        action_probs = np.exp(np.array([V[state[0], state[1] + get_action_index(a)] for a in valid_actions]) / temperature)
        action_probs /= np.sum(action_probs)
        return random.choices(valid_actions, k=1, weights=action_probs)[0]
    else:
        return policy.get(state, random.choice(valid_actions))  # Prefer policy, fallback to random


This function takes the current state, policy, epsilon value, and optional temperature parameter as input and returns the selected action for the agent. If the random number generated is less than epsilon, it performs exploration by selecting an action probabilistically based on Boltzmann exploration. Otherwise, it chooses the action according to the policy. This function is crucial for balancing exploration and exploitation in the agent's decision-making process.

# Step 11: Defining the Gridworld Environment
In this step, we initialize the gridworld environment by creating a placeholder grid represented as a NumPy array. This grid contains reward values and obstacles represented by specific symbols. The goal state is set to have a reward value of 1.

In [10]:
# Placeholder grid
grid = np.array([
    [0.2, 0.3, 0.8, 0.1, 0],
    [0.5, 'X', 0.7, 0.4, 0],
    [0.9, 0.1, 0.6, 'X', 0.2],
    [0.4, 'X', 0.3, 0.5, 0.8],
    [0.7, 0.6, 0.1, 0.9, 1]  # Setting the goal state to 1
])


This placeholder grid serves as the environment in which the agent will navigate. It contains various reward values, obstacles represented by 'X', and a goal state with a reward value of 1. The grid's structure and reward values will influence the agent's learning process during training.

# Step 12: Main Loop Initialization
In this step, we initialize the main loop of our gridworld reinforcement learning environment. Several variables are defined to control the flow of the loop, including:<ul><li>

running: A boolean variable to control whether the main loop should continue runni </li>n<li>g.
agent_pos: A tuple representing the initial position of the agent in the gridwo</li>r<li>ld.
training_step_frequency: An integer defining how often the training step should be perfo</li>r<li>med.
training_active: A boolean flag indicating whether the training step should be executed in the current iter</li>a<li>tion.
frame_count: An integer to keep track of the number of frames processed in th</li>e<li> loop.
Additionally, we define variables related to exploration and exploitation, such as epsilon, decay_rate, and temp</li>
</ul>erature.

In [11]:
# Main loop
running = True
agent_pos = (0, 0)  # Initial agent position
training_step_frequency = 10  # Train every 10 frames
training_active = True  # Flag for training
frame_count = 0

# Additional variables for saving the model
policy = {}  # Dictionary to store optimal actions for each state (learned from Value Iteration)
model_saved = False  # Flag to track if the model is saved

epsilon = 0.5  # Initial epsilon for exploration
decay_rate = 0.95  # Initial decay rate for epsilon  # Steeper initial decay
temperature = 1.0  # Temperature parameter for Boltzmann exploration (adjust as needed)

status_text = font.render("Training Status:", True, BLACK)


# Step 13: Main Loop Execution
In this step, we execute the main loop of our gridworld reinforcement learning environment. Within the loop, we handle events using Pygame's event system, such as checking for window close events to terminate the loop.

In [None]:
while running:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False

    # Update logic (separate training and visualization)
    if training_active:
        # Perform Value Iteration training step
        V = value_iteration(grid.copy(), start=(0, 0), goal=(n - 1, m - 1))

        # Extract optimal policy from the state value table (V)
        policy = {}
        for i in range(n):
            for j in range(m):
                if grid[i, j] == 'X':
                    continue
                valid_actions = get_valid_actions(grid, (i, j))

                # Choose action with exploration (using temperature)
                best_action = get_action_with_exploration((i, j), policy, epsilon, temperature=temperature)

                # ... (Rest of the policy extraction logic, similar to previous code)
                # Here, update the policy dictionary with the chosen action for the current state

    # Choose action based on the learned policy (with exploration using Epsilon-greedy)
    current_action = policy.get(agent_pos, random.choice(['up', 'down', 'left', 'right']))

    # Update agent position based on the action
    new_pos = get_next_state(grid, agent_pos, current_action)
    if new_pos != agent_pos:  # Check if the agent's position has changed
        agent_pos = new_pos
        # Log action, reward, and current state
        reward = grid[agent_pos[0], agent_pos[1]]  # Obtain reward from grid
        log.append(f"Action: {current_action}, Total Reward: {reward}, Current State: {agent_pos}")

    # Render the environment
    draw_gridworld(grid, V=V)  # Optional: Visualize the state-value table (V)
    draw_agent(agent_pos)
    pygame.display.flip()  # Update the entire display at once for better performance

    # Training frequency control
    if frame_count % training_step_frequency == 0:
        training_active = True  # Trigger training step (optional)
    else:
        training_active = False

    frame_count += 1


Inside the loop, we update the training logic and visualization separately. The training logic involves performing the Value Iteration step, extracting the optimal policy, and updating the agent's position based on the chosen action. The visualization part renders the gridworld environment and the agent's position on the screen using Pygame. Additionally, we control the frequency of training using the training_step_frequency variable.

# Step 14: Quitting Pygame and Cleaning Up
In this step, we handle the termination of the Pygame environment and perform cleanup operations before exiting the program. We use the pygame.quit() function to quit Pygame and release any resources used by the library.

In [None]:
# Quit pygame and cleanup
pygame.quit()

# Save the learned policy (optional)
with open("policy.pkl", "wb") as f:
    pickle.dump(policy, f)


This step ensures that all resources allocated by Pygame are properly released and the program exits gracefully. Additionally, we have an optional step to save the learned policy to a file using pickle. This allows us to persist the learned policy for later use without having to retrain the model every time.