<a href="https://colab.research.google.com/github/Tdas-christ/Reinforcement_Learning/blob/main/2348569_RL_Lab6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import random

# Custom Grid Environment
class CustomGridEnvironment:
    def __init__(self, grid_size=4, holes=None, goal=None):
        """
        Create a grid environment.
        Args:
            grid_size (int): Size of the grid (e.g., 4x4).
            holes (list of tuples): Positions of holes (e.g., [(1, 1), (2, 3)]).
            goal (tuple): Position of the goal (e.g., (3, 3)).
        """
        self.grid_size = grid_size
        self.holes = holes if holes else [(1, 1), (1, 3), (3, 0)]
        self.goal = goal if goal else (3, 3)
        self.start = (0, 0)
        self.state = self.start
        self.actions = ['up', 'down', 'left', 'right']
        self.done = False

    def reset(self):
        """Reset the environment to the start state."""
        self.state = self.start
        self.done = False
        return self.state

    def step(self, action):
        """
        Take a step in the environment.
        Args:
            action (str): One of 'up', 'down', 'left', or 'right'.
        Returns:
            next_state (tuple): The new state after taking the action.
            reward (float): The reward received after the action.
            done (bool): Whether the episode is finished.
        """
        if self.done:
            raise Exception("Episode has ended. Please reset the environment.")

        # Map actions to movements
        moves = {
            'up': (-1, 0),
            'down': (1, 0),
            'left': (0, -1),
            'right': (0, 1)
        }

        # Calculate new state
        row, col = self.state
        move = moves.get(action, (0, 0))
        new_row, new_col = row + move[0], col + move[1]

        # Ensure new state is within bounds
        if 0 <= new_row < self.grid_size and 0 <= new_col < self.grid_size:
            next_state = (new_row, new_col)
        else:
            next_state = self.state  # Invalid move, stay in the same state

        # Determine reward and terminal condition
        if next_state in self.holes:
            reward = -1  # Falling into a hole
            self.done = True
        elif next_state == self.goal:
            reward = 1  # Reaching the goal
            self.done = True
        else:
            reward = -0.1  # Small penalty for each step

        self.state = next_state
        return next_state, reward, self.done

    def render(self):
        """Render the grid environment."""
        grid = np.full((self.grid_size, self.grid_size), 'F')  # F for Frozen tile
        for hole in self.holes:
            grid[hole] = 'H'  # H for Hole
        grid[self.goal] = 'G'  # G for Goal
        grid[self.state] = 'P'  # P for Player
        grid[self.start] = 'S'  # S for Start
        for row in grid:
            print(" ".join(row))
        print()

# Model-Based Agent
class ModelBasedAgent:
    def __init__(self, env):
        self.env = env
        self.states = [(i, j) for i in range(env.grid_size) for j in range(env.grid_size)]
        self.actions = env.actions
        self.transitions = {}  # Transition probabilities T(s, a, s')
        self.rewards = {}  # Reward function R(s, a)
        self.policy = {}  # Optimal policy π(s)

    def collect_data(self, num_episodes=100):
        """Collect transition and reward data."""
        for _ in range(num_episodes):
            state = self.env.reset()
            done = False
            while not done:
                action = random.choice(self.actions)
                next_state, reward, done = self.env.step(action)
                if (state, action) not in self.transitions:
                    self.transitions[(state, action)] = {}
                if next_state not in self.transitions[(state, action)]:
                    self.transitions[(state, action)][next_state] = 0
                self.transitions[(state, action)][next_state] += 1
                self.rewards[(state, action)] = reward
                state = next_state

    def compute_policy(self, gamma=0.9, iterations=100):
        """Compute optimal policy using Value Iteration."""
        values = {s: 0 for s in self.states}  # Initialize values
        for _ in range(iterations):
            new_values = values.copy()
            for s in self.states:
                action_values = []
                for a in self.actions:
                    if (s, a) in self.transitions:
                        q_value = 0
                        total_transitions = sum(self.transitions[(s, a)].values())
                        for s_next, count in self.transitions[(s, a)].items():
                            prob = count / total_transitions
                            reward = self.rewards.get((s, a), 0)
                            q_value += prob * (reward + gamma * values[s_next])
                        action_values.append(q_value)
                if action_values:
                    new_values[s] = max(action_values)
            values = new_values

        # Derive policy
        for s in self.states:
            best_action = None
            best_value = float('-inf')
            for a in self.actions:
                if (s, a) in self.transitions:
                    q_value = 0
                    total_transitions = sum(self.transitions[(s, a)].values())
                    for s_next, count in self.transitions[(s, a)].items():
                        prob = count / total_transitions
                        reward = self.rewards.get((s, a), 0)
                        q_value += prob * (reward + gamma * values[s_next])
                    if q_value > best_value:
                        best_value = q_value
                        best_action = a
            self.policy[s] = best_action

    def act(self, state):
        """Choose an action based on the learned policy."""
        return self.policy.get(state, random.choice(self.actions))

# Main Program
if __name__ == "__main__":
    # Create the environment
    env = CustomGridEnvironment(grid_size=4)

    # Render the environment
    print("Initial Environment:")
    env.render()

    # Initialize the agent
    agent = ModelBasedAgent(env)

    # Collect data by interacting with the environment
    print("Collecting data...")
    agent.collect_data(num_episodes=500)

    # Compute the optimal policy
    print("Computing policy...")
    agent.compute_policy()

    # Test the policy
    print("Testing policy:")
    state = env.reset()
    env.render()
    done = False
    while not done:
        action = agent.act(state)
        print(f"Action: {action}")
        state, reward, done = env.step(action)
        env.render()
    print("Episode finished!")


Initial Environment:
S F F F
F H F H
F F F F
H F F G

Collecting data...
Computing policy...
Testing policy:
S F F F
F H F H
F F F F
H F F G

Action: down
S F F F
P H F H
F F F F
H F F G

Action: down
S F F F
F H F H
P F F F
H F F G

Action: right
S F F F
F H F H
F P F F
H F F G

Action: down
S F F F
F H F H
F F F F
H P F G

Action: right
S F F F
F H F H
F F F F
H F P G

Action: right
S F F F
F H F H
F F F F
H F F P

Episode finished!
