In [5]:
# dynamic_programming.py

import numpy as np
from tqdm import tqdm


class DPAgent:
    """
    Agent that uses Dynamic Programming for action_policy computation.
    """

    def __init__(self, environment):
        self.environment = environment
        self.all_states = self._enumerate_all_states()
        self.state_map = {state: idx for idx, state in enumerate(self.all_states)}
        self.num_all_states = len(self.all_states)
        self.n_actions = environment.action_space.n
        self.v_func = np.zeros(self.num_all_states)
        self.action_policy = np.zeros(self.num_all_states, dtype=int)

    def _enumerate_all_states(self):
        # Generate all possible all_states
        all_states = []
        positions = [
            (i, j) for i in range(self.environment.grid_height) for j in range(self.environment.grid_width)
        ]
        for player_pos in positions:
            for box_pos in positions:
                for target_pos in positions:
                    if player_pos != box_pos and box_pos != target_pos and player_pos != target_pos:
                        all_states.append((player_pos, box_pos, target_pos))
        return all_states

    def val_iterate(self, gamma=0.99, convergence_limit=1e-6, max_iters=1000):
        # Perform value iteration
        for _ in tqdm(range(max_iters), desc="Value Iteration Progress"):
            delta = 0
            for idx, state in enumerate(self.all_states):
                v = self.v_func[idx]
                act_vals = []
                for action in range(self.n_actions):
                    new_state, return_val, terminal = self._simulate_step(state, action)
                    if new_state in self.state_map:
                        next_idx = self.state_map[new_state]
                        action_value = return_val + gamma * self.v_func[next_idx] * (not terminal)
                    else:
                        action_value = return_val
                    act_vals.append(action_value)
                self.v_func[idx] = max(act_vals)
                delta = max(delta, abs(v - self.v_func[idx]))
            if delta < convergence_limit:
                break
        self._derive_action_policy(gamma)

    def _simulate_step(self, state, action):
        # Simulate the environmentironment step without modifying the actual environmentironment
        self.environment.reset()
        self.environment.player_position, self.environment.box_position, self.environment.target_position = state
        self.environment.grid[self.environment.player_position[0], self.environment.player_position[1]] = self.environment.PLAYER
        self.environment.grid[self.environment.box_position[0], self.environment.box_position[1]] = self.environment.BOX
        self.environment.grid[self.environment.target_position[0], self.environment.target_position[1]] = self.environment.TARGET

        _, return_val, terminal, _, _ = self.environment.step(action)
        new_state = self.environment.get_state()

        return new_state, return_val, terminal

    def _derive_action_policy(self, gamma):
        # Extract action_policy from the computed value function
        for idx, state in enumerate(self.all_states):
            act_vals = []
            for action in range(self.n_actions):
                new_state, return_val, terminal = self._simulate_step(state, action)
                if new_state in self.state_map:
                    next_idx = self.state_map[new_state]
                    action_value = return_val + gamma * self.v_func[next_idx] * (not terminal)
                else:
                    action_value = return_val
                act_vals.append(action_value)
            self.action_policy[idx] = np.argmax(act_vals)

    def select_action(self, state):
        # Select action based on the derived action_policy
        if state in self.state_map:
            idx = self.state_map[state]
            return self.action_policy[idx]
        else:
            return self.environment.action_space.sample()

In [6]:
# monte_carlo.py

from collections import defaultdict


class MCAgent:
    """
    Agent that uses Monte Carlo methods for action_policy computation.
    """

    def __init__(self, environment):
        self.environment = environment
        self.Q_values = defaultdict(lambda: np.zeros(environment.action_space.n))
        self.returns = defaultdict(lambda: defaultdict(list))
        self.action_policy = {}
        self.initial_epsilon = 1.0
        self.minimum_epsilon = 0.01
        self.epsilon_decay_rate = 0.995
        self.gamma = 0.99

    def _epsilon_greedy(self, state, epsilon):
        # Epsilon-greedy action selection
        if np.random.rand() < epsilon:
            return self.environment.action_space.sample()
        else:
            return np.argmax(self.Q_values[state]) if state in self.Q_values else self.environment.action_space.sample()

    def generate_episode(self, epsilon):
        # Generate an episode using the current action_policy
        episode = []
        self.environment.reset()
        terminal = False
        steps = 0
        while not terminal and steps < 100:
            state = self.environment.get_state()
            action = self._epsilon_greedy(state, epsilon)
            _, return_val, terminal, _, _ = self.environment.step(action)
            episode.append((state, action, return_val))
            steps += 1
        return episode

    def learn_policy(self, episode_count=10000):
        # Train the agent using Monte Carlo method
        epsilon = self.initial_epsilon
        for _ in tqdm(range(episode_count), desc="Monte Carlo Training Progress"):
            episode = self.generate_episode(epsilon)
            G = 0
            visited_state_actions = set()
            for state, action, return_val in reversed(episode):
                G = self.gamma * G + return_val
                if (state, action) not in visited_state_actions:
                    visited_state_actions.add((state, action))
                    self.returns[state][action].append(G)
                    self.Q_values[state][action] = np.mean(self.returns[state][action])
                    self.action_policy[state] = np.argmax(self.Q_values[state])
            epsilon = max(self.minimum_epsilon, epsilon * self.epsilon_decay_rate)

    def select_action(self, state):
        # Select action based on the learned action_policy
        return self.action_policy.get(state, self.environment.action_space.sample())

In [7]:
# sokoban_environment.py

import gym
from gym import spaces


class SokobanEnv(gym.Env):
    """
    Custom Sokoban environmentironment for reinforcement learning.
    """

    def __init__(self):
        super(SokobanEnv, self).__init__()

        # Define the grid size
        self.grid_height = 6
        self.grid_width = 7

        # Define action and observation space
        # Actions: Up, Down, Left, Right
        self.action_space = spaces.Discrete(4)
        # Observation: The grid representation
        self.observation_space = spaces.Box(
            low=0, high=5, shape=(self.grid_height, self.grid_width), dtype=np.uint8
        )

        # Define grid elements
        self.EMPTY = 0
        self.WALL = 1
        self.BOX = 2
        self.TARGET = 3
        self.PLAYER = 4
        self.BOX_ON_TARGET = 5

        # Map actions to movements
        self.action_map = {
            0: (-1, 0),  # Up
            1: (1, 0),   # Down
            2: (0, -1),  # Left
            3: (0, 1),   # Right
        }

        self.reset()

    def reset(self):
        # Initialize the grid
        self.grid = np.zeros((self.grid_height, self.grid_width), dtype=int)
        self.grid[1:-1, 1:-1] = self.EMPTY  # Set inner area to empty

        # Add walls around the grid
        self.grid[0, :] = self.WALL
        self.grid[-1, :] = self.WALL
        self.grid[:, 0] = self.WALL
        self.grid[:, -1] = self.WALL

        # Place the player at a random empty position
        empty_positions = list(zip(*np.where(self.grid == self.EMPTY)))
        self.player_position = np.array(empty_positions[np.random.choice(len(empty_positions))])
        self.grid[self.player_position[0], self.player_position[1]] = self.PLAYER

        # Place the box at a random empty position
        empty_positions = list(zip(*np.where(self.grid == self.EMPTY)))
        self.box_position = np.array(empty_positions[np.random.choice(len(empty_positions))])
        self.grid[self.box_position[0], self.box_position[1]] = self.BOX

        # Place the target at a random empty position
        empty_positions = list(zip(*np.where(self.grid == self.EMPTY)))
        self.target_position = np.array(empty_positions[np.random.choice(len(empty_positions))])
        self.grid[self.target_position[0], self.target_position[1]] = self.TARGET

        return self.grid.copy(), {}

    def step(self, action):
        move = self.action_map[action]
        new_player_pos = self.player_position + move

        # Check for wall collision
        if self.grid[new_player_pos[0], new_player_pos[1]] == self.WALL:
            return self.grid.copy(), -1, False, False, {}

        # Check if the player is pushing the box
        if np.array_equal(new_player_pos, self.box_position):
            new_box_pos = self.box_position + move

            # Check if the box can be moved
            if self.grid[new_box_pos[0], new_box_pos[1]] in [self.WALL, self.BOX]:
                return self.grid.copy(), -1, False, False, {}

            # Move the box
            self.grid[self.box_position[0], self.box_position[1]] = self.EMPTY
            self.box_position = new_box_pos

            if np.array_equal(self.box_position, self.target_position):
                self.grid[self.box_position[0], self.box_position[1]] = self.BOX_ON_TARGET
            else:
                self.grid[self.box_position[0], self.box_position[1]] = self.BOX

        # Move the player
        self.grid[self.player_position[0], self.player_position[1]] = self.EMPTY
        self.player_position = new_player_pos
        self.grid[self.player_position[0], self.player_position[1]] = self.PLAYER

        # Check if the box is on the target
        terminal = np.array_equal(self.box_position, self.target_position)
        return_val = 10 if terminal else -1  # Positive return_val if solved, negative otherwise

        return self.grid.copy(), return_val, terminal, False, {}

    def render(self, mode="human"):
        if mode == "human":
            symbols = {
                self.EMPTY: ' ',
                self.WALL: '#',
                self.BOX: '$',
                self.TARGET: '.',
                self.PLAYER: '@',
                self.BOX_ON_TARGET: '*'
            }
            print("\n".join("".join(symbols[cell] for cell in row) for row in self.grid))
        elif mode == "rgb_array":
            # Optional implementation for visualizing the environmentironment
            pass

    def get_state(self):
        # Returns a tuple representing the current state
        return (tuple(self.player_position), tuple(self.box_position), tuple(self.target_position))

In [8]:
# main.py

import time


def simulate_episode(environment, agent, max_steps=100, render=False):
    state, _ = environment.reset()
    total_return_val = 0
    terminal = False
    steps = 0
    while not terminal and steps < max_steps:
        current_state = environment.get_state()
        action = agent.select_action(current_state)
        state, return_val, terminal, _, _ = environment.step(action)
        total_return_val += return_val
        steps += 1
        if render:
            environment.render()
    return total_return_val, steps


def agent_assess(environment, agent, episode_count=100):
    return_vals = []
    step_count_list = []
    for _ in range(episode_count):
        total_return_val, steps = simulate_episode(environment, agent)
        return_vals.append(total_return_val)
        step_count_list.append(steps)
    return np.mean(return_vals), np.mean(step_count_list)


def main():
    environment = SokobanEnv()

    print("Training Dynamic Programming Agent...")
    init_time = time.time()
    dp_agent = DPAgent(environment)
    dp_agent.val_iterate()
    dp_learn_policying_time = time.time() - init_time
    print(f"Dynamic Programming learn_policying completed in {dp_learn_policying_time:.2f} seconds")

    print("\nTraining Monte Carlo Agent...")
    init_time = time.time()
    mc_agent = MCAgent(environment)
    mc_agent.learn_policy(episode_count=10000)
    mc_learn_policying_time = time.time() - init_time
    print(f"Monte Carlo learn_policying completed in {mc_learn_policying_time:.2f} seconds")

    print("\nEvaluating Dynamic Programming Agent...")
    dp_avg_return_val, dp_avg_steps = agent_assess(environment, dp_agent)
    print(f"Dynamic Programming - Average Reward: {dp_avg_return_val:.2f}, Average Steps: {dp_avg_steps:.2f}")

    print("\nEvaluating Monte Carlo Agent...")
    mc_avg_return_val, mc_avg_steps = agent_assess(environment, mc_agent)
    print(f"Monte Carlo - Average Reward: {mc_avg_return_val:.2f}, Average Steps: {mc_avg_steps:.2f}")

    print("\nComparison:")
    print(f"Training Time - DP: {dp_learn_policying_time:.2f}s, MC: {mc_learn_policying_time:.2f}s")
    print(f"Average Reward - DP: {dp_avg_return_val:.2f}, MC: {mc_avg_return_val:.2f}")
    print(f"Average Steps - DP: {dp_avg_steps:.2f}, MC: {mc_avg_steps:.2f}")


if __name__ == "__main__":
    main()

Training Dynamic Programming Agent...


Value Iteration Progress:   0%|          | 0/1000 [00:00<?, ?it/s]

Value Iteration Progress:   0%|          | 1/1000 [03:00<50:00:52, 180.23s/it]


Dynamic Programming training completed in 266.38 seconds

Training Monte Carlo Agent...


Monte Carlo Training Progress: 100%|██████████| 10000/10000 [01:03<00:00, 157.74it/s]


Monte Carlo training completed in 63.40 seconds

Evaluating Dynamic Programming Agent...
Dynamic Programming - Average Reward: -97.81, Average Steps: 98.03

Evaluating Monte Carlo Agent...
Monte Carlo - Average Reward: -88.30, Average Steps: 89.51

Comparison:
Training Time - DP: 266.38s, MC: 63.40s
Average Reward - DP: -97.81, MC: -88.30
Average Steps - DP: 98.03, MC: 89.51
