<a href="https://colab.research.google.com/github/ayaselimm/Airbnb-price-category-prediction/blob/main/Assignment_1_RL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
from enum import IntEnum
from copy import deepcopy
import matplotlib.pyplot as plt
plt.style.use('seaborn-notebook')
plt.style.use('seaborn-whitegrid')
import matplotlib.colors as mcolors

  plt.style.use('seaborn-notebook')
  plt.style.use('seaborn-whitegrid')


**1) Linear Solver Method**

In [2]:
class Action(IntEnum):
    """The class Action represents the set of possible actions in gridworld."""
    up = 0
    right = 1
    down = 2
    left = 3

# String representation for each action saved numerically (easier for human interpertation)
action_to_str = {
    Action.up: "up",
    Action.right: "right",
    Action.down: "down",
    Action.left: "left",
}

# How locations should be updated given any of the 4 actions
action_to_offset = {
    Action.up: (-1, 0),
    Action.right: (0, 1),
    Action.down: (1, 0),
    Action.left: (0, -1),
}


In [3]:
class GridWorld:

    def __init__(self, height, width, goal, goal_value=5.0, danger=[], danger_value=-5.0, blocked=[], noise=0.0):
        """
        Initialize the GridWorld environment.
        Creates a gridworld like MDP
         - height (int): Number of rows
         - width (int): Number of columns
         - goal (int): Index number of goal cell
         - goal_value (float): Reward given for goal cell
         - danger (list of int): Indices of cells marked as danger
         - danger_value (float): Reward given for danger cell
         - blocked (list of int): Indices of cells marked as blocked (can't enter)
         - noise (float): probability of resulting state not being what was expected
        """
        self._width = width
        self._height = height
        self._grid_values = [0 for _ in range(height * width)]
        self._goal_value = goal_value
        self._danger_value = danger_value
        self._goal_cell = goal
        self._danger_cells = danger
        self._blocked_cells = blocked
        self._noise = noise  # Noise level in the environment.
        assert noise >= 0 and noise < 1  # Ensure valid noise value.
        self.create_next_values()  # Initialize the next state values.

    def reset(self):
        """
        Reset the state values to their initial state.
        """
        self._grid_values = [0 for _ in range(self._height * self._width)]
        self.create_next_values()

    def _inbounds(self, state):
        """
        Check if a state index is within the grid boundaries.
        """
        return state >= 0 and state < self._width * self._height

    def _inbounds_rc(self, state_r, state_c):
        """
        Check if row and column indices are within the grid boundaries.
        """
        return state_r >= 0 and state_r < self._height and state_c >= 0 and state_c < self._width

    def _state_to_rc(self, state):
        """
        Convert a state index to row and column indices.
        """
        return state // self._width, state % self._width

    def _state_from_action(self, state, action):
        """
        Gets the state as a result of applying the given action
        """
        # Convert the current state to row and column indices
        row, col = self._state_to_rc(state)
        # Determine the row and column offsets from the action
        offset_row, offset_col = action_to_offset[action]
        # Calculate the new row and column after applying the action
        next_row, next_col = row + offset_row, col + offset_col
        # Convert the new row and column back into a state representation
        next_state = next_row * self._width + next_col
        # Check if the new position is within grid boundaries and not blocked
        if self._inbounds_rc(next_row, next_col):
            if next_state not in self._blocked_cells:
                return next_state
        return state

    def is_terminal(self, state):
        """
        Returns true if a state is terminal (goal, or danger)
        """
        return state == self._goal_cell or state in self._danger_cells

    def get_states(self):
        """
        Gets all non-terminal states in the environment
        """
        non_blocked_states = []
        for s in range(self._height * self._width):
          if s not in self._blocked_cells:
            non_blocked_states.append(s)
        return non_blocked_states

    def get_actions(self, state):
        """
        Returns a list of valid actions given the current state
        """
        if self.is_terminal(state):
            return []
        return list(Action)

    def get_reward(self, state):
        """
        Get the reward for being in the current state
        """
        assert self._inbounds(state)
        # Reward is non-zero for danger or goal
        if state == self._goal_cell:
            return self._goal_value
        elif state in self._danger_cells:
            return self._danger_value
        return 0.0

    def get_transitions(self, state, action):
        """
        Get a list of transitions as a result of attempting the action in the current state
        Each item in the list is a dictionary, containing the probability of reaching that state and the state itself
        """
        # If the current state is terminal, the only transition is to remain in the same state with probability 1.
        if self.is_terminal(state):
            return [{'prob': 1.0, 'state': state}]

        transitions = []
        # Calculate the state that directly results from the intended action.
        intended_state = self._state_from_action(state, action)
        # The most likely outcome is moving to the intended state, affected by 1 minus the noise level.
        transitions.append({'prob': 1 - self._noise, 'state': intended_state})

        # If there is noise in the system, calculate potential unintended transitions.
        if self._noise > 0:
            # Identify other possible actions from the current state.
            alternative_actions = [a for a in self.get_actions(state) if a != action]
            if alternative_actions:  # Ensure there are alternative actions to consider.
                # Calculate the probability of each unintended action.
                noise_probability = self._noise / len(alternative_actions)
                for alt_action in alternative_actions:
                    # Calculate the state resulting from each unintended action.
                    unintended_state = self._state_from_action(state, alt_action)
                    # Append the unintended state with its associated noise-induced probability.
                    transitions.append({'prob': noise_probability, 'state': unintended_state})

        return transitions

    def get_value(self, state):
        """
        Get the current value of the state
        """
        assert self._inbounds(state)
        return self._grid_values[state]

    def create_next_values(self):
        """
        Creates a temporary storage for state value updating
        If this is not used, then asynchronous updating may result in unexpected results
        To use properly, run this at the start of each iteration
        """
        self._next_values = list(self._grid_values)

    def set_next_values(self):
        """
        Set the state values from the temporary copied values
        To use properly, run this at the end of each iteration
        """
        self._grid_values = self._next_values.copy()

    def set_value(self, state, value):
        """
        Set the value of the state into the temporary copy
        This value will not update into main storage until self.set_next_values() is called.
        """
        assert self._inbounds(state)
        self._next_values[state] = value

    def _state_to_index(self, states):
        """Initiate a dictionary maps a state to its index"""
        state_to_index_map = {}
        for idx, state in enumerate(states):
              state_to_index_map[state] = idx
        return state_to_index_map

    def solve_linear_system(self, discount_factor=1.0):
        """
        Solve the gridworld using a system of linear equations.
        :param discount_factor: The discount factor for future rewards.
        """
        # To solve the linear system of equations
        #   find the values of metrices A and B

        # Calculate the total number of states based on grid dimensions.
        num_states = self._width * self._height
        # Initialize the matrix A for coefficients and vector b for constants.
        A = np.zeros((num_states, num_states))
        b = np.zeros(num_states)

        # Populate the matrix A and vector b based on the transition probabilities and rewards.
        for state in range(num_states):
            if state in self._blocked_cells or self.is_terminal(state):
                # For terminal or blocked states, the value is fixed to the immediate reward.
                A[state, state] = 1
                b[state] = self.get_reward(state)
            else:
                # For non-terminal states, set up the transitions based on the Bellman equation.
                A[state, state] = 1
                actions = self.get_actions(state)
                transition_probability = 1 / len(actions)

                for action in actions:
                    next_state = self._state_from_action(state, action)
                    reward = self.get_reward(next_state)
                    A[state, next_state] -= discount_factor * transition_probability
                    b[state] += reward * transition_probability

        # Solve the system of linear equations A * V = b to find the value function V.
        V = np.linalg.solve(A, b)

        # Round the solution for better readability and set the values in the grid.
        V_rounded = np.round(V, 2)
        for state in range(num_states):
            self.set_value(state, V_rounded[state])

        return V_rounded.reshape((self._height, self._width))


    def __str__(self):
        """
        Pretty print the state values
        """
        out_str = ""
        for r in range(self._height):
            for c in range(self._width):
                cell = r * self._width + c
                if cell in self._blocked_cells:
                    out_str += "{:>6}".format("----")
                elif cell == self._goal_cell:
                    out_str += "{:>6}".format("GOAL")
                elif cell in self._danger_cells:
                    out_str += "{:>6.2f}".format(self._danger_value)
                else:
                    out_str += "{:>6.2f}".format(self._grid_values[cell])
                out_str += " "
            out_str += "\n"
        return out_str

In [4]:
# Initialize your GridWorld
simple_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.0)

# Solve the linear system
print("Solving with linear solver: (discount_factor=1)")
values_grid = simple_gw.solve_linear_system()
print(values_grid)

Solving with linear solver: (discount_factor=1)
[[-9.97 -9.99 -5.   -3.33  0.  ]
 [-9.96  0.    0.    0.    3.33]
 [-9.94  0.    0.    0.    5.  ]
 [-9.93 -9.88 -9.71 -5.   -1.6 ]
 [-9.96 -5.   -9.24 -8.02 -4.81]]


In [5]:
print("Solving with linear solver: (discount_factor=0.95)")
values_grid = simple_gw.solve_linear_system(discount_factor=0.95)
print(values_grid)

Solving with linear solver: (discount_factor=0.95)
[[-5.25 -7.02 -5.   -3.2  -0.  ]
 [-4.59  0.    0.   -0.    3.2 ]
 [-4.89  0.    0.   -0.    5.  ]
 [-6.22 -7.62 -8.   -5.   -1.1 ]
 [-7.46 -5.   -7.78 -6.72 -3.54]]


In [6]:
print("Solving with linear solver: (discount_factor=0.75)")
values_grid = simple_gw.solve_linear_system(discount_factor=0.75)
print(values_grid)

Solving with linear solver: (discount_factor=0.75)
[[-1.37 -3.91 -5.   -2.69  0.  ]
 [-0.66  0.    0.    0.    2.69]
 [-0.84  0.    0.    0.    5.  ]
 [-2.14 -4.29 -4.78 -5.   -0.3 ]
 [-4.14 -5.   -4.74 -4.09 -1.32]]




---



**2) Dynamic Programming Solver**

In [7]:
def value_iteration(grid_world, discount_factor, tolerance=0.1):
    """
    Perform value iteration to solve the gridworld.

    Args:
        grid_world : The gridworld environment.
        discount_factor: The discount factor for future rewards.
        tolerance : The convergence threshold for value updates.

    Return The number of iterations taken to converge.
    """
    grid_world.reset()
    iterations = 0

    while True:
        iterations += 1
        max_delta = 0  # Track the maximum change in value

        grid_world.create_next_values()

        for state in grid_world.get_states():
            if grid_world.is_terminal(state):
                continue  # Skip terminal states as their value is fixed

            action_values = []

            for action in grid_world.get_actions(state):
                expected_value = 0

                for transition in grid_world.get_transitions(state, action):
                    next_state = transition['state']
                    probability = transition['prob']
                    reward = grid_world.get_reward(next_state)
                    expected_value += probability * (reward + discount_factor * grid_world.get_value(next_state))

                action_values.append(expected_value)

            # Select the maximum value among all possible actions
            optimal_value = max(action_values)
            max_delta = max(max_delta, abs(optimal_value - grid_world.get_value(state)))

            grid_world.set_value(state, optimal_value)

        grid_world.set_next_values()

        if max_delta < tolerance:
            break  # Convergence criterion met

    return iterations

action_symbols = {
    Action.up: " ↑ ",
    Action.right: " → ",
    Action.down: " ↓ ",
    Action.left: " ← "
}
def print_policy(grid_world, discount):
    """
    Print the optimal policy for the gridworld, marking the best action for each state.
    """
    # Initialize the policy grid with spaces for better readability
    policy_grid = [[" " for _ in range(grid_world._width)] for _ in range(grid_world._height)]

    # Iterate over all states in the gridworld
    for state in range(grid_world._height * grid_world._width):
        row, col = grid_world._state_to_rc(state)

        # Mark blocked cells with '🚫'
        if state in grid_world._blocked_cells:
            policy_grid[row][col] = "🚫"

        # Mark the goal cell with '🥳'
        elif state == grid_world._goal_cell:
            policy_grid[row][col] = "🥳"

        # Mark danger cells with '☠️'
        elif state in grid_world._danger_cells:
            policy_grid[row][col] = "☠️"

        # For non-terminal states, compute the best action
        elif not grid_world.is_terminal(state):
            best_action = None
            best_value = float('-inf')

            # Evaluate each possible action from the current state
            for action in grid_world.get_actions(state):
                action_value = 0

                # Sum the expected values for taking this action
                for transition in grid_world.get_transitions(state, action):
                    next_state = transition['state']
                    prob = transition['prob']
                    reward = grid_world.get_reward(next_state)
                    action_value += prob * (reward + discount * grid_world.get_value(next_state))

                # Update the best action if the current action has a higher value
                if action_value > best_value:
                    best_value = action_value
                    best_action = action

            # Map the best action to its symbol and place it in the policy grid
            policy_grid[row][col] = action_symbols[best_action]

    # Print the policy grid row by row
    for row in policy_grid:
        print(" ".join(row))



In [8]:
# Initialize your GridWorld
simple_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.0)
noisy_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.2)
discount = 0.95
tolerance = 0.1

**trail 1**
we are using a discount factor of 0.95, a tolerance of 0.1, and the model will operate without any noise.

In [9]:
# Reset and solve using value iteration
simple_gw.reset()
print("Solving with value iteration (discount_factor=0.95, noise=0.0):")
iterations = value_iteration(simple_gw, discount, tolerance=0.1)
print(simple_gw)
print("-"*30)
print(f"Solution found after {iterations} trials.\n")
print("-"*30)
print_policy(simple_gw,discount)

Solving with value iteration (discount_factor=0.95, noise=0.0):
  3.15   2.99  -5.00   4.51   4.75 
  3.32   ----   ----   4.75   5.00 
  3.49   ----   ----   5.00   GOAL 
  3.68   3.87   4.07  -5.00   5.00 
  3.49  -5.00   4.29   4.51   4.75 

------------------------------
Solution found after 12 trials.

------------------------------
 ↓   ←  ☠️  →   ↓ 
 ↓  🚫 🚫  →   ↓ 
 ↓  🚫 🚫  →  🥳
 →   →   ↓  ☠️  ↑ 
 ↑  ☠️  →   →   ↑ 


**trail 2**
we are using a discount factor of 0.75, a tolerance of 0.1, and the model will operate without any noise.

In [10]:
# Reset and solve using value iteration
simple_gw.reset()
print("Solving with value iteration (discount_factor=0.75, noise=0.0):")
iterations = value_iteration(simple_gw, discount_factor=0.75, tolerance=0.1)
print("-"*30)
print(simple_gw)
print(f"Solution found after {iterations} trials.\n")
print("-"*30)
print_policy(simple_gw,discount=0.75)

Solving with value iteration (discount_factor=0.75, noise=0.0):
------------------------------
  0.38   0.28  -5.00   2.81   3.75 
  0.50   ----   ----   3.75   5.00 
  0.67   ----   ----   5.00   GOAL 
  0.89   1.19   1.58  -5.00   5.00 
  0.67  -5.00   2.11   2.81   3.75 

Solution found after 12 trials.

------------------------------
 ↓   ←  ☠️  →   ↓ 
 ↓  🚫 🚫  →   ↓ 
 ↓  🚫 🚫  →  🥳
 →   →   ↓  ☠️  ↑ 
 ↑  ☠️  →   →   ↑ 


For a noisy grid world

**trail 3**

we are using a discount factor of 0.95, a tolerance of 0.1, and the model will operate with some noise.

In [11]:
print("Solving with value iteration (discount_factor=0.95, noise=0.2):")
iterations = value_iteration(noisy_gw, discount_factor=0.95, tolerance=0.1)
print(noisy_gw)
print("-"*30)
print(f"Solution found after {iterations} trials.\n")
print("-"*30)
print_policy(noisy_gw,discount)

Solving with value iteration (discount_factor=0.95, noise=0.2):
  0.42  -0.10  -5.00   3.60   4.51 
  0.56   ----   ----   4.49   4.88 
  0.65   ----   ----   4.22   GOAL 
  0.72   0.83   1.40  -5.00   4.17 
  0.23  -5.00   2.09   2.90   3.84 

------------------------------
Solution found after 15 trials.

------------------------------
 ↓   ←  ☠️  →   ↓ 
 ↓  🚫 🚫  →   ↓ 
 ↓  🚫 🚫  →  🥳
 →   →   ↓  ☠️  ↑ 
 ↑  ☠️  →   →   ↑ 


In [12]:
print("Solving with value iteration (discount_factor=0.95, noise=0.2):")
iterations = value_iteration(noisy_gw, discount_factor=0.95, tolerance=0.1)
print(noisy_gw)
print("-"*30)
print(f"Solution found after {iterations} trials.\n")
print("-"*30)
print_policy(noisy_gw,discount)
# optimum policy

Solving with value iteration (discount_factor=0.95, noise=0.2):
  0.42  -0.10  -5.00   3.60   4.51 
  0.56   ----   ----   4.49   4.88 
  0.65   ----   ----   4.22   GOAL 
  0.72   0.83   1.40  -5.00   4.17 
  0.23  -5.00   2.09   2.90   3.84 

------------------------------
Solution found after 15 trials.

------------------------------
 ↓   ←  ☠️  →   ↓ 
 ↓  🚫 🚫  →   ↓ 
 ↓  🚫 🚫  →  🥳
 →   →   ↓  ☠️  ↑ 
 ↑  ☠️  →   →   ↑ 


**trail 4**
 we are using a discount factor of 0.75, a tolerance of 0.1, and the model will operate with some noise.

In [13]:
print("Solving with value iteration (discount_factor=0.75, noise=0.2):")
iterations = value_iteration(noisy_gw, discount_factor=0.75, tolerance=0.1)
print(noisy_gw)
print(f"Solution found after {iterations} trials.\n")
print("-"*30)
print_policy(noisy_gw,discount=0.75)
# Bad Policy

Solving with value iteration (discount_factor=0.75, noise=0.2):
 -0.02  -0.39  -5.00   1.81   3.12 
 -0.00   ----   ----   3.17   4.54 
 -0.00   ----   ----   4.03   GOAL 
 -0.04  -0.39  -0.08  -5.00   4.00 
 -0.40  -5.00   0.51   1.40   2.74 

Solution found after 7 trials.

------------------------------
 ↓   ←  ☠️  ↓   ↓ 
 ←  🚫 🚫  →   ↓ 
 ↑  🚫 🚫  →  🥳
 ↑   ←   ↓  ☠️  ↑ 
 ↑  ☠️  →   →   ↑ 


**trail 4**
We are using a discount factor of 0.75 and a smaller tolerance of 0.00001. Additionally, the model will operate with some noise

In [14]:
print("Solving with value iteration (discount_factor=0.75, noise=0.2):")
iterations = value_iteration(noisy_gw, discount_factor=0.75, tolerance=0.00001)
print(noisy_gw)
print(f"Solution found after {iterations} trials.\n")
print("**----------------------**")
print_policy(noisy_gw,discount=0.75)

Solving with value iteration (discount_factor=0.75, noise=0.2):
 -0.02  -0.39  -5.00   1.82   3.13 
 -0.00   ----   ----   3.18   4.54 
 -0.01   ----   ----   4.03   GOAL 
 -0.04  -0.37  -0.03  -5.00   4.00 
 -0.40  -5.00   0.54   1.41   2.75 

Solution found after 20 trials.

**----------------------**
 ↓   ←  ☠️  ↓   ↓ 
 →  🚫 🚫  →   ↓ 
 ↑  🚫 🚫  →  🥳
 ↑   →   ↓  ☠️  ↑ 
 ↑  ☠️  →   →   ↑ 


In [15]:
# compare using: different discount factors, noise values, number of iterations, tolerance values