<a href="https://colab.research.google.com/github/YingzeH/Multi-feature-SEIR/blob/main/RL_Grid_example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [170]:
import numpy as np
import random

Simlple grid game, goal is to reach the bottom corner

In [271]:
class GridEnvironment:
    def __init__(self, n, m, start=(0, 0)):
        self.n = n  # Number of rows
        self.m = m  # Number of columns
        self.goal = (n - 1, m - 1)  # Goal position at the bottom-right corner
        self.state = start  # Initialize the state to a specified starting position

    def reset(self, start=(0, 0)):
        """Resets the environment to the initial state."""
        self.state = start  # Reset to the specified starting position
        return self.state

    def step(self, action):
        """Executes the given action and returns the new state, reward, and done flag."""
        x, y = self.state

        # Define action behavior: 0=up, 1=down, 2=left, 3=right
        if action == 0 and y > 0:  # Move up
            y -= 1
        elif action == 1 and y < self.n - 1:  # Move down
            y += 1
        elif action == 2 and x > 0:  # Move left
            x -= 1
        elif action == 3 and x < self.m - 1:  # Move right
            x += 1

        # Update state
        self.state = (x, y)

        # Check if the goal is reached
        if self.is_goal_reached():
            reward = 1
            done = True
        else:
            reward = -0.01  # Penalty for each step to encourage shorter paths
            done = False

        return self.state, reward, done

    def is_goal_reached(self):
        """Checks if the current state is the goal."""
        return self.state == self.goal

    def get_action_space(self):
        """Returns the number of possible actions (4: up, down, left, right)."""
        return 4

    def get_state_space(self):
        """Returns the state space dimensions (n x m)."""
        return self.n, self.m


In [272]:
class QLearningAgent:
    def __init__(self, environment, alpha=0.1, gamma=0.9, epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        self.env = environment
        self.alpha = alpha  # Learning rate
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilon  # Exploration rate
        self.epsilon_decay = epsilon_decay  # Epsilon decay rate
        self.epsilon_min = epsilon_min  # Minimum exploration rate
        self.q_table = np.zeros((self.env.n, self.env.m, self.env.get_action_space()))  # Initialize Q-table

    def choose_action(self, state):
        """Choose an action using an epsilon-greedy strategy."""
        if random.uniform(0, 1) < self.epsilon:
            return random.randint(0, self.env.get_action_space() - 1)  # Explore
        else:
            return np.argmax(self.q_table[state])  # Exploit

    def update_q_table(self, state, action, reward, next_state):
        """Update the Q-table based on the action taken and reward received."""
        best_next_action = np.argmax(self.q_table[next_state])
        td_target = reward + self.gamma * self.q_table[next_state][best_next_action]
        td_delta = td_target - self.q_table[state][action]
        self.q_table[state][action] += self.alpha * td_delta

    def train(self, episodes):
      """Train the agent over a number of episodes."""
      total_reward = 0  # To accumulate total reward for each episode
      rewards = []  # List to keep track of rewards per episode

      for episode in range(episodes):
          state = self.env.reset()  # Reset the environment for a new episode
          done = False
          episode_reward = 0  # Reward for the current episode

          while not done:
            action = self.choose_action(state)  # Choose action
            next_state, reward, done = self.env.step(action)  # Take action in the environment
            self.update_q_table(state, action, reward, next_state)  # Update Q-table
            state = next_state  # Transition to the next state

            episode_reward += reward  # Accumulate reward for this episode

          rewards.append(episode_reward)  # Store the total reward for this episode
          total_reward += episode_reward

          # Decay epsilon after each episode
          if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

          # Print progress every 100 episodes
          if (episode + 1) % 100 == 0:
            avg_reward = total_reward / 100  # Average reward over the last 100 episodes
            print(f"Episode {episode + 1}/{episodes}, Average Reward: {avg_reward:.2f}")
            total_reward = 0  # Reset total reward for the next average calculation


    def print_q_table(self):
        """Prints the Q-table in a formatted way."""
        print("State       Up           Down         Left         Right")
        print("---------------------------------------------------------------")
        for i in range(self.env.n):
            for j in range(self.env.m):
                state = (i, j)
                q_values = self.q_table[i, j]
                # Convert state tuple to a formatted string
                state_str = str(state).ljust(12)
                print(f"{state_str} {q_values[0]:<12.2f} {q_values[1]:<12.2f} {q_values[2]:<12.2f} {q_values[3]:<12.2f}")

    def get_optimal_path(self, start):
        """Returns the optimal path from the given starting point to the goal."""
        path = [start]
        state = start

        while state != self.env.goal:
            action = np.argmax(self.q_table[state])  # Choose the best action based on Q-values
            if action == 0:  # Up
                state = (state[0], state[1] - 1) if state[1] > 0 else state
            elif action == 1:  # Down
                state = (state[0], state[1] + 1) if state[1] < self.env.n - 1 else state
            elif action == 2:  # Left
                state = (state[0] - 1, state[1]) if state[0] > 0 else state
            elif action == 3:  # Right
                state = (state[0] + 1, state[1]) if state[0] < self.env.m - 1 else state

            path.append(state)  # Add the new state to the path

        return path



In [273]:
# Example usage:
n, m = 3, 3  # Grid size
env = GridEnvironment(n, m)
agent = QLearningAgent(env)

# Train the agent
agent.train(1000)  # Train for 1000 episodes

# Print the Q-table
agent.print_q_table()

Episode 100/1000, Average Reward: 0.88
Episode 200/1000, Average Reward: 0.95
Episode 300/1000, Average Reward: 0.95
Episode 400/1000, Average Reward: 0.96
Episode 500/1000, Average Reward: 0.97
Episode 600/1000, Average Reward: 0.97
Episode 700/1000, Average Reward: 0.97
Episode 800/1000, Average Reward: 0.97
Episode 900/1000, Average Reward: 0.97
Episode 1000/1000, Average Reward: 0.97
State       Up           Down         Left         Right
---------------------------------------------------------------
(0, 0)       0.62         0.70         0.62         0.70        
(0, 1)       0.52         0.66         0.64         0.79        
(0, 2)       0.50         0.55         0.58         0.89        
(1, 0)       0.70         0.79         0.61         0.77        
(1, 1)       0.70         0.89         0.70         0.88        
(1, 2)       0.79         0.89         0.79         1.00        
(2, 0)       0.45         0.89         0.50         0.52        
(2, 1)       0.56         1.00   

In [274]:
# Input starting coordinates
start_x = int(input("Enter starting x coordinate (0 to {}): ".format(n-1)))
start_y = int(input("Enter starting y coordinate (0 to {}): ".format(m-1)))

# Output the optimal path from the given starting point
optimal_path = agent.get_optimal_path((start_x, start_y)) # Pass start_x and start_y as a tuple
print("Optimal Path:", optimal_path)

Enter starting x coordinate (0 to 2): 0
Enter starting y coordinate (0 to 2): 1
Optimal Path: [(0, 1), (1, 1), (1, 2), (2, 2)]


There are traps in the diagnoles, try to avoid them!

In [297]:
class GridEnvironment:
    def __init__(self, n, m, start=(0, 0)):
        self.n = n  # Number of rows
        self.m = m  # Number of columns
        self.goal = (n - 1, m - 1)  # Goal position at the bottom-right corner
        self.state = start  # Initialize the state to a specified starting position
        self.traps = {(i, i) for i in range(1, min(n, m) - 1)}    # Set of trap positions on the diagonal

        # Check if the starting position is a trap
        if self.state in self.traps:
            raise ValueError("Starting position is a trap! Game terminates.")

    def reset(self, start=(0, 0)):
        """Resets the environment to the initial state."""
        self.state = start  # Reset to the specified starting position

        # Check if the new starting position is a trap
        if self.state in self.traps:
            raise ValueError("Starting position is a trap! Game terminates.")

        return self.state

    def step(self, action):
        """Executes the given action and returns the new state, reward, and done flag."""
        x, y = self.state

        # Define action behavior: 0=up, 1=down, 2=left, 3=right
        if action == 0 and y > 0:  # Move up
            y -= 1
        elif action == 1 and y < self.n - 1:  # Move down
            y += 1
        elif action == 2 and x > 0:  # Move left
            x -= 1
        elif action == 3 and x < self.m - 1:  # Move right
            x += 1

        # Update state
        self.state = (x, y)

        # Check for traps
        if self.state in self.traps:
            reward = -1  # Penalty for landing on a trap
            done = True  # The game ends when hitting a trap
            print("Landed in a trap! Game terminates.")
        elif self.is_goal_reached():
            reward = 1
            done = True
        else:
            reward = -0.01  # Penalty for each step to encourage shorter paths
            done = False

        return self.state, reward, done

    def is_goal_reached(self):
        """Checks if the current state is the goal."""
        return self.state == self.goal

    def get_action_space(self):
        """Returns the number of possible actions (4: up, down, left, right)."""
        return 4

    def get_state_space(self):
        """Returns the state space dimensions (n x m)."""
        return self.n, self.m


In [282]:
class QLearningAgent:
    def __init__(self, environment, alpha=0.1, gamma=0.9, epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        self.env = environment
        self.alpha = alpha  # Learning rate
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilon  # Exploration rate
        self.epsilon_decay = epsilon_decay  # Epsilon decay rate
        self.epsilon_min = epsilon_min  # Minimum exploration rate
        self.q_table = np.zeros((self.env.n, self.env.m, self.env.get_action_space()))  # Initialize Q-table

    def choose_action(self, state):
        """Choose an action using an epsilon-greedy strategy."""
        if random.uniform(0, 1) < self.epsilon:
            return random.randint(0, self.env.get_action_space() - 1)  # Explore
        else:
            return np.argmax(self.q_table[state])  # Exploit

    def update_q_table(self, state, action, reward, next_state):
        """Update the Q-table based on the action taken and reward received."""
        best_next_action = np.argmax(self.q_table[next_state])
        td_target = reward + self.gamma * self.q_table[next_state][best_next_action]
        td_delta = td_target - self.q_table[state][action]
        self.q_table[state][action] += self.alpha * td_delta

    def train(self, episodes):
        """Train the agent over a number of episodes."""
        for episode in range(episodes):
            state = self.env.reset()  # Reset the environment for a new episode
            done = False

            while not done:
                action = self.choose_action(state)  # Choose action
                next_state, reward, done = self.env.step(action)  # Take action in the environment
                self.update_q_table(state, action, reward, next_state)  # Update Q-table
                state = next_state  # Transition to the next state

            # Decay epsilon after each episode
            if self.epsilon > self.epsilon_min:
                self.epsilon *= self.epsilon_decay

            # Output training progress
            if (episode + 1) % 100 == 0:  # Every 100 episodes
                print(f"Episode {episode + 1}/{episodes}, Epsilon: {self.epsilon:.4f}")

    def print_q_table(self):
        """Prints the Q-table in a formatted way."""
        print("State       Up           Down         Left         Right")
        print("---------------------------------------------------------------")
        for i in range(self.env.n):
            for j in range(self.env.m):
                state = (i, j)
                q_values = self.q_table[i, j]
                # Convert state tuple to a formatted string
                state_str = str(state).ljust(12)
                print(f"{state_str} {q_values[0]:<12.2f} {q_values[1]:<12.2f} {q_values[2]:<12.2f} {q_values[3]:<12.2f}")

    def get_optimal_path(self, start):
        """Returns the optimal path from the given starting point to the goal."""
        path = [start]
        state = start

        while state != self.env.goal:
            action = np.argmax(self.q_table[state])  # Choose the best action based on Q-values
            if action == 0:  # Up
                state = (state[0], state[1] - 1) if state[1] > 0 else state
            elif action == 1:  # Down
                state = (state[0], state[1] + 1) if state[1] < self.env.n - 1 else state
            elif action == 2:  # Left
                state = (state[0] - 1, state[1]) if state[0] > 0 else state
            elif action == 3:  # Right
                state = (state[0] + 1, state[1]) if state[0] < self.env.m - 1 else state

            path.append(state)  # Add the new state to the path

        return path


In [300]:
# Example usage:
n, m = 5, 5  # Grid size
env = GridEnvironment(n, m)
agent = QLearningAgent(env)

# Train the agent
agent.train(1000)  # Train for 1000 episodes

# Print the Q-table
agent.print_q_table()

Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Game terminates.
Landed in a trap! Ga

In [301]:
# Input starting coordinates
start_x = int(input("Enter starting x coordinate (0 to {}): ".format(n-1)))
start_y = int(input("Enter starting y coordinate (0 to {}): ".format(m-1)))

# Check if the entered starting position is a trap
if (start_x, start_y) in env.traps:
    print("Starting position is a trap! Game terminates.")
else:
    # Output the optimal path from the given starting point
    optimal_path = agent.get_optimal_path((start_x, start_y))  # Pass start_x and start_y as a tuple
    print("Optimal Path:", optimal_path)

Enter starting x coordinate (0 to 4): 0
Enter starting y coordinate (0 to 4): 2
Optimal Path: [(0, 2), (0, 3), (0, 4), (1, 4), (2, 4), (3, 4), (4, 4)]
