In [None]:
from typing import List, Tuple
import matplotlib.pyplot as plt
import numpy as np
from itertools import combinations
np.random.seed(1)

In [None]:
# Environment Definition (Already Implemented)

actions = {
    "up": (0, 1),
    "left": (-1, 0),
    "right": (1, 0),
    "down": (0, -1),
}

p_e = 1/3

def get_adjacent_positions(position: Tuple[int, int]) -> List[Tuple[int, int]]:
    x, y = position
    return [
            (x, y + 1),
            (x, y - 1),
            (x + 1, y),
            (x - 1, y),
    ]

class DungeonEnvironment:
    def __init__(self):
        self.start_pos = (0,0)
        self.goal_pos = (3,4)
        self.holes = [(0,4), (3,2)]
        self.walls = [(0,2), (2,0), (2,2), (2,3)]
        self.current_position = (0,0)

    def get_echo(self):
        for adjacent in get_adjacent_positions(self.current_position):
            if adjacent in self.holes:
                # hear an each with prob 1/3
                if np.random.random() < p_e:
                    return True
        return False

    def reset(self):
        """
        Called at the start of the game.
        """
        self.current_position = self.start_pos

    def step(self, action: str) -> Tuple[dict, str, bool]:
        """
        Updates the environment with the action of the agent.
        :returns: new observation for the agent, as well as if the game ended and the outcome.
        """
        act_x, act_y = actions[action]
        curr_x, curr_y = self.current_position

        new_x = curr_x + act_x
        new_y = curr_y + act_y

        # we bump if we go into a wall, or if we go out of bound
        if (new_x, new_y) in self.walls or not (0<=new_x<=4 and 0<=new_y<=4):
            bump = True
        else:
            # if we do not bump, update the position.
            bump = False
            self.current_position = (new_x, new_y)

        observation = {
            "echo" : self.get_echo(),
            "position": self.current_position,
        }

        if self.current_position == self.goal_pos:
            outcome = "Escaped"
            terminated = True
        elif self.current_position in self.holes:
            outcome = "Fell into hole"
            terminated = True

        else:
            outcome = None
            terminated = False

        return observation, outcome, terminated

In [None]:
# Belief State (To be completed)


class BeliefState:
    """
    Maintains what we believe, using the knowledge base.
    """
    def __init__(self, initial_hole_belief=0.5):
        self.hole_beliefs = {
            (0, 4): initial_hole_belief,
            (3, 2): initial_hole_belief,
            (4, 0): initial_hole_belief,
        }  # list of positions where we sensed echoes
        self.current_position = (0, 0)

    def update(self, echo: bool, position: Tuple[int, int]):
        """
        :param echo: did we hear an echo ?
        :param position: our current position.
        """
        self.current_position = position
        
        new_belief = self.hole_beliefs.copy()

        for adjacent_pos in get_adjacent_positions(position):
            if adjacent_pos not in self.hole_beliefs:
                continue
            # We are adjacent to a hole position
            # TODO:
            # update the belief
                
        self.hole_beliefs = new_belief
        
        
    def visualise(self):
        grid_size = (5, 5)
        grid = np.zeros(grid_size)

        for x in range(5):
            for y in range(5):
                grid[y, x] = self.hole_beliefs.get((x,y), 0)

        plt.figure()
        plt.imshow(grid, cmap="viridis", origin="lower", extent=[0, 5, 0, 5])

        plt.colorbar(label="Hole probability")
        plt.xlabel("x")
        plt.ylabel("y")
        plt.xticks(range(0, 6))
        plt.yticks(range(0, 6))
        plt.grid(True, linestyle="--", alpha=0.6, color="black")
        plt.title("Holes belief")
        plt.show()

In [None]:
# function for running any agent on the dungeon environment (Already implemented)
def play_game(policy, belief_state):
    terminated = False
    environment = DungeonEnvironment()
    environment.reset()
    num_steps = 0
    trajectory = []
    while not terminated:
        action = policy(belief_state)
        observation, outcome, terminated = environment.step(action)
        belief_state.update(**observation)
        trajectory.append(environment.current_position)
        num_steps += 1

        if num_steps >= 500:
            print("Check your code, probably your agent is stuck somewhere")
            outcome = "Timeout"
            terminated = True
    
    return outcome, trajectory

In [None]:
# Agent definitions

# Naive agent Example 
def random_agent(belief) -> str:
    """
    This is a naive agent, picking actions randomly.
    """
    action_names = list(actions.keys())
    return np.random.choice(action_names)


def smarter_agent(belief) -> str:
    """
    This function selects a new action based on the belief state.
    :param belief: Current belief state.
    :return: action picked by the policy (e.g., "up"):
    """
    # TODO (optional): try implementing a smarter agent
    pass

In [None]:
belief_state = BeliefState()

print("Initial belief:")
belief_state.visualise()

for episode in range(1, 21):
    outcome, trajectory = play_game(random_agent, belief_state)
    
    print(f"Episode {episode}:")
    print(f"The game ended in {len(trajectory)} steps with the following outcome: {outcome}.")
    
    print("Belief:")
    belief_state.visualise()
