### Assignment 3

**Course**: 3547 - Intelligent Agents

**Name**: Cameron Turner

**Date**: December 9, 2025

---

Define the Global class which contains the application's global variables.

In [3]:
# Global.py

# Global variables.

class Global:
    
    # Global variables.
    
    _display = False
    _debug   = False
    
    # Start room.
    
    _start_room = (1, 1)
    
    # Probabilities.
    
    _pit_probability = 0.2
    
    # Directions
    
    _east  = "east"
    _west  = "west"
    _north = "north"
    _south = "south"
    
    _left  = "left"
    _right = "right"
    
    _orientation_array = [ _north, _east, _south, _west]
    _facing_array      = [ _left, _right]
    
    # Actions
    
    _forward_action   = "Forward"
    _turnLeft_action  = "TurnLeft"
    _turnRight_action = "TurnRight"
    _shoot_action     = "Shoot"
    _grab_action      = "Grab"
    _climb_action     = "Climb"
    
    # Define the set of actions an Agent can take.
    
    _action_array = [ _forward_action, _turnLeft_action, _turnRight_action, _shoot_action, _grab_action, _climb_action ]
    
    # Agent Fear Index
    
    _agent_fear_index = .5



Define the Percepts class to hold the different percepts the Agent can sense.

In [5]:
# PerceptsC.py

class PerceptsC():

    # Constructor

    def __init__(self, stench=False, breeze=False, glitter=False, bump=False, scream=False, move=(), direction=None):

        # Set object attributes.

        self.stench = stench
        self.breeze = breeze
        self.glitter = glitter
        self.bump = bump
        self.scream = scream
        self.move = move
        self.direction = direction


    # Getters and Setters.

    def get_stench(self) -> bool:
        return self.stench 

    def set_stench(self, stench):
        self.stench = stench

    def get_breeze(self) -> bool:
        return self.breeze 

    def set_breeze(self, breeze):
        self.breeze = breeze 

    def get_glitter(self) -> bool:
        return self.glitter

    def set_glitter(self, glitter):
        self.glitter = glitter 

    def get_bump(self) -> bool:
        return self.bump

    def set_bump(self, bump):
        self.bump = bump

    def get_scream(self) -> bool:
        return self.scream

    def set_scream(self, scream):
        self.scream = scream


    def get_move(self) -> tuple([int, int]):
        return self.move

    def set_move(self, move):
        self.move = move

    def get_direction(self) -> str:
        return self.direction

    def set_direction(self, direction):
        self.direction = direction


    # print

    def print(self):

        # Add to the PerceptsC class

        percepts_list = []

        if (self.get_stench()):
            percepts_list.append("Stench")

        if (self.get_breeze()):
            percepts_list.append("Breeze")

        if (self.get_glitter()):
            percepts_list.append("Glitter")

        if (self.get_bump()):
            percepts_list.append("Bump")

        if (self.get_scream()):
            percepts_list.append("Scream")

        if (self.get_move()):
            percepts_list.append("Move")

        if (self.get_direction()):
            percepts_list.append("Direction")

        print (percepts_list)


Define the Abstract Agent class that will be the abstract super class of the Naive Agent, the Move Planning Agent and a future, smarter Agent.

In [7]:
# AgentA.py

# Import libraries.

from abc import ABC, abstractmethod


class AgentA(ABC):


    # Constructor.

    def __init__(self):

        # Do nothing.

        self.percepts = None


    # print_percepts

    def print_percepts(self):

        # Print out the percepts.

        print ("Percepts:\t", end='')
        self.percepts.print()


    # percept

    @abstractmethod
    def percept(self, percepts):

        self.percepts = percepts

    
    # action

    @abstractmethod
    def action(self) -> str:

        # Implement an algorithm for selecting an action to take based on the percepts.
        
        # Return nil for the abstract class.
        
        return nil
    



Define the Naive Agent class that is a concrete sub-class of the Agent abstract class.

In [9]:
# NaiveAgentC.py

# Import libraries.

import random

class NaiveAgentC(AgentA):
    
    # percept

    def percept(self, percepts):

        # For the Naive Agent that isn't using Percepts, just
        # call the super class.

        super().percept(percepts)

        # Print the percepts.

        if Global._display:
            self.print_percepts()


    # action

    def action(self) -> str:
 
        # For the Naive Agent, the percepts will be received but not used
        # in selecting the action to take.  The algorithm for selecting the
        # action to take is random.
        
        # Randomly select one of the possible actions from the action set.

        agent_action = random.randint(0, 5)

        if Global._display: print ("Action:\t\t\t", Global._action_array[agent_action])
        
        # Return the randomly selected action.

        return Global._action_array[agent_action]


Define the Move Planning Agent class that is a concrete sub-class of the Agent abstract class.

In [11]:
# MovePlanningAgentC.py

# Import libraries.

import random
import networkx as nx

class MovePlanningAgentC(AgentA):
    
    # Constructor.

    def __init__(self, location):

        # Set the state for having the Gold and its location.

        # The Agent should only be notified of its original location and can update it as it
        # builds the graph.

        self.has_gold  = False
        self.location  = location
        self.direction = Global._east
        self.exit_plan = []

        # Add the initial location as the root of the path tree.

        self.G = nx.DiGraph()
        self._create_start_node_in_graph(self.location, self.direction)
        

    # percept

    def percept(self, percepts):

        # For the Move Planning Agent, it will use the Percepts for grabbing the gold and
        # climbing out of the cave.  As it also uses the Percepts for detecting a Bump when
        # trying to go out of bounds, it will also use the Percepts to detect that a valid
        # move has been made.

        # Call the super class to register the percepts.

        super().percept(percepts)

        new_location = self.percepts.get_move()
        direction    = self.percepts.get_direction()

        # Determine if a move has been made.

        if (len(new_location) != 0):

            # A move has been made, add the new node to the graph.
                    
            if Global._display: print ("Action Result:\t\tLooking to add", new_location, "facing", direction, "(and other directions) to the visited room graph.")

            self._add_node_to_graph(self.location, new_location, direction)
            
            # Update the Agent's location and direction.

            self.location  = new_location
            self.direction = direction

        # Print the percepts.

        if Global._display: self.print_percepts()


    # action

    def action(self) -> str:
 
        action = None

        # The Agent is either exploring, looking for the Gold or the Agent has
        # found it and is executing on its exit plan.

        # Determine if the Agent is exploring or executing its exit plan.

        if (len(self.exit_plan) > 0):

            # The Agent is executing its exit plan.  Therefore, get the next
            # action in the plan.

            if Global._display: print ("Status:\t\t\t*** Agent is currently executing its exit plan...")

            if Global._display: print ("Exit plan:\t\t\t", self.exit_plan)

            # Get the next action to take.

            action = self.exit_plan[0]

            if Global._display: print ("Action from Exit plan:\t\t", action)

            # Remove the action from the exit plan.

            self.exit_plan.pop(0)

            if Global._display: print ("Exit plan after action:\t\t", self.exit_plan)

        
        else:

            # For the Move Planning Agent, use its sensors to read the percepts to see if the gold 
            # is in the current room or if they can climb out the cave with the gold.

            if Global._display: print ("Status:\t\t\t*** Agent is currently exploring looking for Gold...")

            if (self.percepts.get_glitter()) and (self.has_gold == False):

                if Global._display: print ("Status:\t\t\t*** Agent has detected the Gold and will now grab it...")
    
                # Glitter has been sensed.  Action the Grab.

                action = Global._grab_action

                # Update the Agent's gold attribute.

                self.has_gold = True

                # Create the Exit plan that will be actioned the next time the Agent needs
                # to action.

                self.exit_plan = self._create_exit_plan(self.location, (1, 1), self.direction)

            elif (self.location == Global._start_room) and (self.has_gold == True):

                # The climb action is handled via the Agent's exit plan and the environment.

                action = Global._climb_action

            else:
      
                # Randomly select one of the possible actions from the action set (minus
                # Grab and Climb).

                agent_action = random.randint(0, 3)

                if Global._display: print ("Action:\t\t\t", Global._action_array[agent_action])
        
                action = Global._action_array[agent_action]

        # Return the selected action.

        return action


    # Protected methods.


    # _create_start_node_in_graph
    
    def _create_start_node_in_graph(self, start_node, direction):

        # The start node is the special case - both the current and new nodes will be the
        # same start node.

        self._add_node_to_graph(start_node, start_node, direction)


    # _add_node_to_graph

    def _add_node_to_graph(self, current_node, new_node, direction):

        # Add four nodes to the graph, one for each direction the Agent
        # can face - north, east, south and west.

        node_north = str(new_node) + "-" + Global._north
        node_south = str(new_node) + "-" + Global._south
        node_east  = str(new_node) + "-" + Global._east
        node_west  = str(new_node) + "-" + Global._west

        self.G.add_node(node_north, node=new_node, direction=Global._north)
        self.G.add_node(node_south, node=new_node, direction=Global._south)
        self.G.add_node(node_east,  node=new_node, direction=Global._east)
        self.G.add_node(node_west,  node=new_node, direction=Global._west)

        # Add the edges between the nodes and the action that would get you there.

        self.G.add_edge(node_north, node_west,  action=Global._turnLeft_action)
        self.G.add_edge(node_north, node_east,  action=Global._turnRight_action)
        self.G.add_edge(node_east,  node_north, action=Global._turnLeft_action)
        self.G.add_edge(node_east,  node_south, action=Global._turnRight_action)
        self.G.add_edge(node_south, node_east,  action=Global._turnLeft_action)
        self.G.add_edge(node_south, node_west,  action=Global._turnRight_action)
        self.G.add_edge(node_west,  node_south, action=Global._turnLeft_action)
        self.G.add_edge(node_west,  node_north, action=Global._turnRight_action)

        # Adding the Forward edges if there are two different nodes (i.e., not the
        # initial start node).

        if (current_node != new_node):

            # Add the Forward direction.  The direction of the node will point
            # to the same direction of the new node and the opposite will be true.
            # e.g., (1,1)-East will have an edge with (2,1)-East and (2,1)-West will
            # have an edge with (1,1)-West.

            current_north = str(current_node) + "-" + Global._north
            current_south = str(current_node) + "-" + Global._south
            current_east  = str(current_node) + "-" + Global._east
            current_west  = str(current_node) + "-" + Global._west

            # Determine the current direction and add the edges as indicated above in
            # the comment.

            if (direction == Global._north):
                self.G.add_edge(current_north,  node_north,  action=Global._forward_action)
                self.G.add_edge(node_south,  current_south,  action=Global._forward_action)
            elif (direction == Global._east):
                self.G.add_edge(current_east,  node_east,  action=Global._forward_action)
                self.G.add_edge(node_west,  current_west,  action=Global._forward_action)
            elif (direction == Global._south):
                self.G.add_edge(current_south,  node_south,  action=Global._forward_action)
                self.G.add_edge(node_north,  current_north,  action=Global._forward_action)
            elif (direction == Global._west):
                self.G.add_edge(current_west,  node_west,  action=Global._forward_action)
                self.G.add_edge(node_east,  current_east,  action=Global._forward_action)


        if Global._debug: print ("\nNodes:", self.G.nodes)
        if Global._debug: print ("\nEdges:", self.G.edges)


    # _create_exit_plan

    def _create_exit_plan(self, source, dest, direction):
        
        source_node = str(source) + "-" + direction
        dest_node = str(dest) + "-" + Global._east

        if Global._display: print ("Status:\t\t\tAgent is creating an exit plan from", source_node, "to", dest)

        if Global._debug:   print ("Status:\t\t\tAgent shortest Dijkstra path:", nx.shortest_path(self.G, source_node, dest_node, weight=None, method='dijkstra'))
        if Global._display: print ("Status:\t\t\tAgent Shortest A* path:", nx.astar_path(self.G, source_node, dest_node, heuristic=None, weight='manhattan_distance'))

        short_path = nx.astar_path(self.G, source_node, dest_node, heuristic=None, weight='manhattan_distance')

        # Print out the nodes if in debug mode.

        if Global._debug:

            for node in short_path:

                print (self.G.nodes[node]["node"], " ", self.G.nodes[node]["direction"])

        # The Agent only has to reach (1,1) in order to climb out,  Therefore, remove all
        # other nodes past the first (1,1) in the path.

        first_home_node = next(filter(lambda node: "(1, 1)" in node, short_path), None)

        first_home_node_idx = short_path.index(first_home_node)

        if Global._display: print ("Status:\t\t\tIndex of (1,1) is: ", first_home_node, "at node", first_home_node_idx, "in the path.")

        new_short_path = short_path[:first_home_node_idx+1]

        if Global._display: print ("Status:\t\t\tAgent new short path =", new_short_path)

        # Build the edges from the path.

        path_edges = []
    
        for i in range(len(new_short_path) - 1):

            # Get the current and new nodes.

            node1 = new_short_path[i]
            node2 = new_short_path[i+1]

            # Add the edge.

            path_edges.append((node1, node2))

        action_plan = []

        # Iterate through the edges to build the action plan by finding the edge in the graph's
        # set of edges and getting the associated "action" with that particular edge.
        #
        # As this is a directed graph, and when we added the next set of nodes the directions between
        # the nodes were bi-directional (e.g., North-West = Left and West-North = Right), the
        # action associated with that edge will be the correct action to take to traverse that edge
        # from the current node to the next node in the edge pair.

        for edge in path_edges:
            if Global._debug: print (edge, " ", self.G.edges[edge]["action"])
            action_plan.append(self.G.edges[edge]["action"])

        # Finally, append the Climb action.

        action_plan.append(Global._climb_action)

        if Global._display: print ("Status:\t\t\t*** The Agent's exit plan is:", action_plan)

        return action_plan


Define the Predicate Helper class for the Probability Agent.

In [13]:
# PredicateC
#
# Courtesy of Larry Simon

from pomegranate.distributions import Categorical
from pomegranate.distributions import ConditionalCategorical
from pomegranate.bayesian_network import BayesianNetwork

class PredicateC():   
    def __init__(self, prob: float):
        self.p = prob
        
    def toList(self):
        return [1-self.p, self.p]
        
    def toCategorical(self):
        return Categorical([self.toList()])


Define the Probability Agent class that is a concrete sub-class of the Move Planning Agent class.

In [15]:
# ProbAgentC

# Import libraries.

import random
import networkx as nx
import math
import torch
from torch._prims_common import Tensor
from pomegranate.distributions import Categorical
from pomegranate.distributions import ConditionalCategorical
from pomegranate.bayesian_network import BayesianNetwork
import numpy


class ProbAgentC(MovePlanningAgentC):

    # Constructor

    def __init__(self, location):

        # Invoke the super class.

        super().__init__(location)
        
        # Initialize the variables.

        self.has_arrow = True
        self.move_plan = []
        self.path_taken = []
        self.rooms_dict = {}

        # Build the model.

        self._build_model()



    # percept

    def percept(self, percepts):

        # For the Probability Agent, it will use the Percepts for grabbing the gold and
        # climbing out of the cave.  As it also uses the Percepts for detecting a Bump when
        # trying to go out of bounds, it will also use the Percepts to detect that a valid
        # move has been made.  It also uses it to detect screams.

        # Call the super class to register the percepts.

        super().percept(percepts)

        new_location = self.percepts.get_move()
        direction    = self.percepts.get_direction()

        # Determine if a scream has been heard from the wumpus.

        if (self.percepts.get_scream()):

            # A scream has been heard.

            if Global._display: print ("Status:\t\t\t*** Agent has detected the Scream from a dead Wumpus.  Setting Wumpus probability to 0..")

            # As the Wumpus is dead, the probability of dying from the Wumpus is now 0.
            # Update the Wumpus predicates and Wumpus list.

            new_prob = 0
            wumpus_categorical = PredicateC(new_prob).toCategorical()

            # Iterate over rooms and set the new probability info.

            for room_key in self.rooms_dict.keys():

                self.wumpus_predicate_list[room_key] = wumpus_categorical
                self.wumpus_list[room_key] = 0

        # Determine if a move has been made.

        if (len(new_location) != 0):

            # A move has been made, add the new node to the graph.
                    
            if Global._display: print ("Action Result:\t\tLooking to add", new_location, "facing", direction, "(and other directions) to the visited room graph.")

            self._add_node_to_graph(self.location, new_location, direction)
            
            # Update the Agent's location and direction.

            self.location  = new_location
            self.direction = direction

            # The graph node is in the form (1, 1).  Convert it into a dictionary lookup value "1-1".

            current_room = str(self.location[0]) + "-" + str(self.location[1])

            # If the Agent got to this point, the Agent is still alive so this room is neither a pit
            # or the location of the Wumpus.

            self.pit_list[current_room] = 0
            self.wumpus_list[current_room] = 0

            # As the Wumpus has an original 1/15 probability of being in a room, if this room
            # does not have the Wumpus then the probability of the other rooms having the Wumpus
            # increases.

            # Count how many rooms already have been determined to not contain the Wumpus.

            wumpus_free_room_count = 0

            for wumpus_room in self.wumpus_list.keys():

                # If the room was already Wumpus-free, add it to the count.

                if (self.wumpus_list[wumpus_room] == 0):

                    wumpus_free_room_count = wumpus_free_room_count + 1

            # Calculate the new probability for the Wumpus.

            new_wumpus_prob = 1/(15 - wumpus_free_room_count) if wumpus_free_room_count < 15 else 0
            if Global._display: print ("Status:\t\t\t*** Agent has calculated the new Wumpus probability for each unknown room as", new_wumpus_prob)

            # Iterate over the rooms and update the Wumpus predicate and probability info.

            for room_key in self.rooms_dict.keys():

                # Update the Wumpus room probability if the room is 'unknown'.  The other rooms
                # would already have a 0 (e.g., 1-1).

                if (self.wumpus_list[room_key] == -1):

                    # Set the new probability.

                    wumpus_categorical = PredicateC(new_wumpus_prob).toCategorical()
                    self.wumpus_predicate_list[room_key] = wumpus_categorical


            # Finally, add this room to the path taken.

            self.path_taken.append(current_room)


    # action

    def action(self) -> str:
 
        action = None

        # The Agent is either exploring, looking for the Gold or the Agent has
        # found it and is executing on its exit plan.

        # Determine if the Agent is exploring or executing its exit plan.

        if (len(self.exit_plan) > 0):

            # The Agent is executing its exit plan.  Therefore, get the next
            # action in the plan.

            if Global._display: print ("Status:\t\t\t*** Agent is currently executing its exit plan...")

            if Global._display: print ("Exit plan:\t\t\t", self.exit_plan)

            # Get the next action to take.

            action = self.exit_plan[0]

            if Global._display: print ("Action from Exit plan:\t\t", action)

            # Remove the action from the exit plan.

            self.exit_plan.pop(0)

            if Global._display: print ("Exit plan after action:\t\t", self.exit_plan)


        elif (len(self.move_plan) > 0):

            # The Agent is executing its move plan to get to the next best node.  Therefore, get the next
            # action in the plan.

            if Global._display: print ("Status:\t\t\t*** Agent is currently in a movement plan...")

            if Global._display: print ("Move plan:\t\t\t", self.move_plan)

            # Get the next action to take.

            action = self.move_plan[0]

            if Global._display: print ("Action from Move plan:\t\t", action)

            # Remove the action from the exit plan.

            self.move_plan.pop(0)

            if Global._display: print ("Move plan after action:\t\t", self.move_plan)

        
        else:

            # For the Probability Agent, use its sensors to read the percepts to see if the gold 
            # is in the current room or if they can climb out the cave with the gold. Otherwise, if it 
            # detects a stench, fire the arrow (if it still has it) or look for the next best room to move into.

            if Global._display: print ("Status:\t\t\t*** Agent is currently exploring looking for Gold...")

            if (self.percepts.get_glitter()) and (self.has_gold == False):

                if Global._display: print ("Status:\t\t\t*** Agent has detected the Gold and will now grab it...")
    
                # Glitter has been sensed.  Action the Grab.

                action = Global._grab_action

                # Update the Agent's gold attribute.

                self.has_gold = True

                # Create the Exit plan that will be actioned the next time the Agent needs
                # to action.

                self.exit_plan = self._create_exit_plan(self.location, (1, 1), self.direction)

            elif (self.location == Global._start_room) and (self.has_gold == True):

                # The climb action is handled via the Agent's exit plan and the environment.

                action = Global._climb_action

            elif ((self.percepts.get_stench()) and (self.has_arrow == True)):

                # Stench has been detected and we have the arrow.  May as well give it a shot.

                action = Global._shoot_action
                self.has_arrow = False

            else:
      
                # Move Action.  First, identify the Agent location and direction.

                if Global._display: print ("Status:\t\t\t*** Agent is looking to move...")

                # The graph node is in the form (1, 1).  Convert it into a dictionary lookup value "1-1".

                current_room = str(self.location[0]) + "-" + str(self.location[1])

                # See if this is a Breeze room based on the Percepts.

                self.breeze_list[current_room] = 1 if (self.percepts.get_breeze()) else 0

                # See if this is a Stench room based on the Percepts.

                self.stench_list[current_room] = 1 if (self.percepts.get_stench()) else 0

                # Get the possible move options based on the possibility that the next room is a pit or the Wumpus.

                if Global._display: print ("Status:\t\t\t*** Agent is looking to see which move can avoid a pit...")
                pit_move_options = self._get_move_options_to_avoid_pit(current_room)

                if Global._display: print ("Status:\t\t\t*** Agent is looking to see which move can avoid the Wumpus...")
                wumpus_move_options = self._get_move_options_to_avoid_wumpus(current_room)

                # Now get the best room option based on the probabilities calculated above that the neighbour rooms
                # to which the Agent can move contains a pit or the Wumpus.

                best_room_option = self._choose_best_move_option(current_room, pit_move_options, wumpus_move_options, self.direction)
             
                if Global._display: print ("Status:\t\t\t*** The best room for the Agent to move to is", best_room_option)

                # If the best room option is Exit, the Agent has determined that it is too dangerous to move as the
                # probability of dying is > 50 %.  Exit out of the cave.

                if (best_room_option == 'Exit'):

                    if Global._display: print ("Status:\t\t\t*** Next moves have been deemed too dangerous.  Agent is bailing.")

                    # Do one final grab just in case!

                    action = Global._grab_action

                    # Create the Exit plan that will be actioned the next time the Agent needs
                    # to action.

                    self.exit_plan = self._create_exit_plan(self.location, (1, 1), self.direction)
                
                else:

                    # Get the move plan and execute it.  The move plan will be something like ["TurnLeft", "Forward"].

                    self.move_plan = self._get_move_plan(current_room, best_room_option, self.direction)

                    if Global._display: print ("Status:\t\t\t*** The Agent's move plan is", self.move_plan)

                    # Get the first action to take.

                    action = self.move_plan.pop(0)


        # Return the selected action.

        return action


    # Private Methods

    # _build_model

    def _build_model(self):

        # Build the neighbours model.

        # Iterate over the x-grid.

        for i in range (1,5):

            # Iterate over the y-grid.

            for j in range (1,5):

                # Identify the current room's neighboours and add them to the set.

                neighbours = []

                if ((i-1) > 0):
                    west_neighbour = str(i-1) + "-" + str(j)
                    neighbours.append(west_neighbour)

                if ((i+1) < 5):
                    east_neighbour = str(i+1) + "-" + str(j)
                    neighbours.append(east_neighbour)

                if ((j+1) < 5):
                    north_neighbour = str(i) + "-" + str(j+1)
                    neighbours.append(north_neighbour)

                if ((j-1) > 0):
                    south_neighbour = str(i) + "-" + str(j-1)
                    neighbours.append(south_neighbour)

                if Global._debug: print ("Status:\t\t\t*** The Agent's neighbours at (", i, ",", j, ") -> ", neighbours)

                # Construct the room key and add it and the neighbours to the room dictionary.

                my_key = str(i) + "-" + str(j)

                self.rooms_dict[my_key] = neighbours

        # Print the rooms dictionary.

        if Global._debug: print ("Status:\t\t\t*** The Agent's room dictionary is:", self.rooms_dict)

        # Now that the set of rooms and each room's neighbours has been constructed, set all
        # rooms to be the standard 20% probability that the room could be a pit (before any
        # evidence is gathered).

        self.pit_predicate_list = {}
        pit_categorical = PredicateC(0.2).toCategorical()
        
        # All of the rooms, except for 1-1, can be a pit.  Therefore, create a pit list of all
        # the rooms and set them to be -1 (unknown).  These will be updated as the Agent gathers
        # more information about the cave and which rooms are safe.

        self.pit_list = {}
        pit_unknown = -1

        # Keep a list of all rooms that have a breeze.

        self.breeze_list = {}
        breeze_unknown = -1

        # Add the Wumpus model. To start, the probability of the Wumpus being in a room is 1/15 (the Wumpus
        # can't be in the start room).  However, as the cave is explored, the probability of the Wumpus being
        # in each future room increases.

        self.wumpus_predicate_list = {}
        wumpus_categorical = PredicateC(1/15).toCategorical() 

        self.wumpus_list = {}
        wumpus_unknown = -1

        # Add the Stench model.

        self.stench_list = {}
        stench_unknown = -1

        # Iterate over the room keys which will include all rooms in the cave.
        # Create the categorical probability for the dictionary
        # Set each room to be unknown for a pit and unknown for a breeze.

        for room_key in self.rooms_dict.keys():

            # Pit and breeze info.  NOTE: don't need separate unknowns for breeze, pit etc. as they are the same.

            self.pit_predicate_list[room_key] = pit_categorical
            self.pit_list[room_key]           = pit_unknown
            self.breeze_list[room_key]        = breeze_unknown

            # Wumpus and stench info.

            self.wumpus_predicate_list[room_key] = wumpus_categorical
            self.wumpus_list[room_key]           = wumpus_unknown
            self.stench_list[room_key]           = stench_unknown


        # Room 1-1 cannot be a pit or wumpus.  Therefore, it will be set to 0 (safe).

        self.pit_list["1-1"] = 0
        self.wumpus_list["1-1"] = 0

        # Finally, add the starting room to the path taken.

        self.path_taken.append("1-1")


    # _get_move_plan

    def _get_move_plan(self, src_room, dest_room, direction):

        move_plan = []

        # Get the x, y coordinates of the source and destination rooms.

        src_x = src_room[0]
        src_y = src_room[2]

        dest_x = dest_room[0]
        dest_y = dest_room[2]

        # Determine if the destination node is North, East, West or South 
        # from the source node.

        if (dest_x < src_x):
            destination = Global._west

        elif (dest_x > src_x):
            destination = Global._east

        elif (dest_y < src_y):
            destination = Global._south

        elif (dest_y > src_y):
            destination = Global._north

        # Now figure out how to turn to get there.

        if (((direction == Global._east) and (destination == Global._north)) or
            ((direction == Global._west) and (destination == Global._south)) or
            ((direction == Global._south) and (destination == Global._east)) or
            ((direction == Global._north) and (destination == Global._west))):

            # These are the only directions and destinations where turning left
            # is shorter.

            move_plan.append(Global._turnLeft_action)
        else:

            # Might as well turn right.

            while (direction != destination):

                # Continue turning right and updating the direction until it matches the
                # destination.

                direction = self._turn_right(direction)
                move_plan.append(Global._turnRight_action)

        # Add the forward action to the plan.

        move_plan.append(Global._forward_action)

        return move_plan 


    # _turn_right
    
    def _turn_right(self, direction):

        # By turning right, return the new direction.

        if (direction == Global._north):
            return Global._east
        elif (direction == Global._east):
            return Global._south
        elif (direction == Global._south):
            return Global._west
        elif (direction == Global._west):
            return Global._north


    # _choose_best_move_option
            
    def _choose_best_move_option(self, current_location, neighbour_prob_dict, wumpus_neighbour_prob_dict, direction):

        # Set the best room variables.

        best_false_value = 0
        best_room_option = ""
        best_room_options = []

        # Iterate over the current room's neighbours.

        for best_room in neighbour_prob_dict.keys():        # pit_neighbour and wumpus_neighbour are the same thing

            # Get the True / False dictionary for this neighbour from the Pit network and extract the False value.
            # The False value is the % that the room does not have a Pit or Wumpus (i.e., it would be
            # True if it contained a pit or wumpus).

            true_false_dict = neighbour_prob_dict[best_room]
            false_value = true_false_dict["False"]

            # Get the True / False dictionary from the Wumpus network.

            wumpus_true_false_dict = wumpus_neighbour_prob_dict[best_room]
            wumpus_false_value = wumpus_true_false_dict["False"]

            if Global._display: print ("Status:\t\t\t*** Evaluating room", best_room, "the % that it is NOT a pit is", false_value)
            if Global._display: print ("Status:\t\t\t*** Evaluating room", best_room, "the % that it is NOT a wumpus is", wumpus_false_value)

            # Calculate the % that the room is a pit or wumpus. 

            not_a_pit_or_wumpus = false_value * wumpus_false_value

            if Global._display: print ("Status:\t\t\t*** Evaluating room", best_room, "the % that it is NOT both is (1-p)(1-w)", not_a_pit_or_wumpus)

            # Sometimes, the % calculation from the Bayesian Network is not a number.  Convert it to 1.

            is_nan = isinstance(not_a_pit_or_wumpus, float) and math.isnan(not_a_pit_or_wumpus)

            if (is_nan):
                not_a_pit_or_wumpus = 1
                if Global._display: print ("Status:\t\t\t*** Converting room:", best_room, ": the % that it is NOT both is (1-p)(1-w)", not_a_pit_or_wumpus)

            # Determine if the new % is greater or equal to the current best False value.

            if (not_a_pit_or_wumpus >= best_false_value):

                # Ensure that the best room is not in the path of rooms visited so that it doesn't go into
                # an endless loop.  Always explore further!
                #last_node = self.path_taken[-2] if (len(self.path_taken) > 1) else self.path_taken[-1]
                #if ((best_room != last_node) and (len(self.path_taken) == 1)):

               if (best_room not in (self.path_taken)):

                    # If the combined % > Best False value.
                    # Set the best room options to be just this room.

                    if (not_a_pit_or_wumpus > best_false_value):
                        best_room_options = [best_room] 
                    else:
                        # If the combined % = Best False value.
                        # Add this room to the set of best room options.

                        best_room_options.append(best_room) 

                    # Set the new high watermark.

                    best_false_value = not_a_pit_or_wumpus

        # Check if there is a set of best rooms (e.g., tied with their False values).

        if (len(best_room_options) > 1):

            shortest_path = 100    # Make it large to start!

            # Iterate over the best room options.

            for candidate_option in best_room_options:

                # Calculate the shortest path to get to that room.

                path_len = self._calculate_shortest_path(current_location, candidate_option, direction)

                if Global._display: print ("Status:\t\t\t*** Path Len from ", current_location, "to", candidate_option, "while facing", direction, "is", path_len)

                # Check to see if this path length is the shortest path between the nodes.

                if (path_len < shortest_path):
                    shortest_path = path_len
                    best_room_option = candidate_option

        elif (len(best_room_options) == 1):

            # There is only one best room in the set.  Just set it.

            best_room_option = best_room_options[0]

        else:

            # There are no best rooms in the set - could be because of a Tensor computation where the
            # value is NaN.  Therefore, just bail.

            best_room_option = 'Exit'

        # Determine if it is best if the Agent just leaves.  If the probability that the room is safe (i.e., False)
        # is lower than the Agent Fear Index (e.g., 50%), it is best for the Agent to bail.

        if (best_false_value < Global._agent_fear_index):

            # Best for the Agent to leave.

            best_room_option = 'Exit'

        # Return the best room option.

        return best_room_option

    
    # _calculate_shortest_path

    def _calculate_shortest_path(self, source, dest, direction):

        return len(self._get_move_plan(source, dest, direction))


    # _create_neighbour_cases_for_room

    def _create_neighbour_cases_for_room(self, current_room) -> []:

        # Get the current room's neighbours.

        current_room_neighbours = self.rooms_dict[current_room]
        current_room_neighbours_count = len(current_room_neighbours)

        # Based on the number of neighbours, create the cases.

        if (current_room_neighbours_count == 2):

            current_room_cases = self._create_cases_with_two_neighbours()

        elif (current_room_neighbours_count == 3):

            current_room_cases = self._create_cases_with_three_neighbours()

        elif (current_room_neighbours_count == 4):

            current_room_cases = self._create_cases_with_four_neighbours()

        return current_room_cases


    # _get_move_options_to_avoid_wumpus

    def _get_move_options_to_avoid_wumpus(self, current_room):

        # Get the neighbours for the current room.

        current_room_neighbours = self.rooms_dict[current_room]

        # Build the neighbour cases for the current room.

        current_room_cases = self._create_neighbour_cases_for_room(current_room)

        # Create the conditional categorical object now that we have the cases for the current room's neighbours.
        
        stench_condition_categorical = ConditionalCategorical([current_room_cases])

        # Create the variables (stench room and its adjacent rooms) and the
        # edges (an edge is from an adjacent room to the stench room).

        variable_list        = []
        edge_list            = []
        room_and_stench_list = []

        # Iterate over the current room's neighbours.

        for current_room_neighbour in current_room_neighbours:

            # If the stench has been detected, the Wumpus is in one of the neighbouring rooms.
            # Therefore, update the probability.
              
            if (self.percepts.get_stench()):

                # Update the probability.

                if (self.wumpus_list[current_room_neighbour] != 0):

                    # Calculate the new probability and set it.

                    new_prob = 1/len(current_room_neighbours)
                    wumpus_categorical = PredicateC(new_prob).toCategorical()
                    self.wumpus_predicate_list[current_room_neighbour] = wumpus_categorical
            
            # Create the room predicate.

            current_room_neighbour_predicate = self.wumpus_predicate_list[current_room_neighbour]
            variable_list.append(current_room_neighbour_predicate)

            # Create the edge.

            edge_list.append((current_room_neighbour_predicate, stench_condition_categorical))

            # Add the knowledge of if the neighbour is a wumpus to the tensor.
            # -1 is unknown; 0 is safe; 1 is a pit

            current_room_neighbour_wumpus_knowledge = self.wumpus_list[current_room_neighbour]
            room_and_stench_list.append(current_room_neighbour_wumpus_knowledge)

                
        # If there is a stench, then the Wumpus is in one of the neighbouring rooms.
        # Therefore, set the rest of the locations to 0 (no Wumpus) and the
        # predicates to 0 (no chance of a Wumpus).
        # Note that this only applies to the Wumpus since there is only one.  This can't
        # be applied to the pits as there can be many.
        
        if (self.percepts.get_stench()):

            # Iterate over the Wumpus list.

            for room in self.wumpus_list.keys():

                # Update the knowledge about which room can have the Wumpus.

                if ((self.wumpus_list[room] == -1) and (room not in current_room_neighbours)):

                    no_wumpus_categorical = PredicateC(0).toCategorical()
                    self.wumpus_predicate_list[room] = no_wumpus_categorical
                    self.wumpus_list[room] = 0

        # Now add the predicate for the stench room at the end of the list.
        # Add the stench knowledge - 0 if no stench is detected, 1 if a stench is detected.

        stench_condition_knowledge = self.stench_list[current_room]  

        variable_list.append(stench_condition_categorical)
        room_and_stench_list.append(stench_condition_knowledge)

        if Global._display: print ("Status:\t\t\t*** Evaluating probability for the Wumpus - room and stench list", room_and_stench_list)

        # Run the Pomegranate model to get the neighbour rooms' probabilities.

        neighbour_prob_dict = self._run_bayesian_network_model(current_room_neighbours, variable_list, edge_list, room_and_stench_list)

        if Global._display: print ("Status:\t\t\t*** Evaluating probability for the Wumpus - neighbour probabilities", neighbour_prob_dict)

        return neighbour_prob_dict


    # _get_move_options_to_avoid_pit

    def _get_move_options_to_avoid_pit(self, current_room):

        # Get the neighbours for the current room.

        current_room_neighbours = self.rooms_dict[current_room]

        # Build the neighbour cases for the current room.

        current_room_cases = self._create_neighbour_cases_for_room(current_room)

        # Create the conditional categorical object now that we have the cases for the current room's neighbours.
         
        breeze_condition_categorical = ConditionalCategorical([current_room_cases])

        # Create the variables (breeze room and its adjacent rooms) and the
        # edges (an edge is from an adjacent room to the breeze room).

        variable_list        = []
        edge_list            = []
        room_and_breeze_list = []

        # Iterate over the current room's neighbours.

        for current_room_neighbour in current_room_neighbours:

            # If the breeze has been detected, a pit is in one of the neighbouring rooms.
            # Therefore, update the probability.

            if (self.percepts.get_breeze()):

                # Update the probability.
                
                if (self.pit_list[current_room_neighbour] != 0):  # !!! NEW

                    # Calculate the new probability and set it.

                    new_prob = 1/len(current_room_neighbours)
                    pit_categorical = PredicateC(new_prob).toCategorical()
                    self.pit_predicate_list[current_room_neighbour] = pit_categorical
      
            # Create the room predicate.

            current_room_neighbour_predicate = self.pit_predicate_list[current_room_neighbour]
            variable_list.append(current_room_neighbour_predicate)

            # Create the edge.

            edge_list.append((current_room_neighbour_predicate, breeze_condition_categorical))

            # Add the knowledge of if the neighbour is a pit to the tensor.
            # -1 is unknown; 0 is safe; 1 is a pit

            current_room_neighbour_pit_knowledge = self.pit_list[current_room_neighbour]
            room_and_breeze_list.append(current_room_neighbour_pit_knowledge)

        # Now add the predicate for the breeze room at the end of the list.
        # Add the breeze knowledge - 0 if no breeze is detected, 1 if a breeze is detected.

        breeze_condition_knowledge = self.breeze_list[current_room]  #1 # breeze is detected

        variable_list.append(breeze_condition_categorical)
        room_and_breeze_list.append(breeze_condition_knowledge)

        if Global._display: print ("Status:\t\t\t*** Evaluating probability for the pit - room and breeze list", room_and_breeze_list)

        # Run the Pomegranate model to get the neighbour rooms' probabilities.

        neighbour_prob_dict = self._run_bayesian_network_model(current_room_neighbours, variable_list, edge_list, room_and_breeze_list)

        if Global._display: print ("Status:\t\t\t*** Evaluating probability for the pit - neighbour probabilities", neighbour_prob_dict)

        return neighbour_prob_dict


    # _run_bayesian_network_model

    def _run_bayesian_network_model(self, breeze_room_neighbours, variables, edges, room_and_breeze_list): 

        # Construct the model with the three variables, and two edges

        # Display the variables and edges only if required during debugging.

        #if Global._display: print ("Status:\t\t\t*** Running the BayesianNetwork model - variables:", variables)
        #if Global._display: print ("Status:\t\t\t*** Running the BayesianNetwork model - edges:", edges)

        # Construct the Bayesian Network model using the variables and edges to get the probabilities that
        # each of the neighbours could be either a pit or Wumpus (depending on which method called it).

        bayesian_network_model = BayesianNetwork(variables, edges)

        # Create the tensor based on the neighbours and the current room.

        # For example:
        #       X = torch.tensor([[-1, -1, 0]])      # pit_1 ?, pit_2 ?, breeze is false 
        #       X = torch.tensor([[-1, -1, -1, 0]])  # pit_1 ?, pit_2 ?, pit_3 ?, breeze is false    
        #       X = torch.tensor([[1, 1, 0, -1, 0]]) # pit_1 T, pit_2 T, pit_3 F, pit_4 ?, breeze is false

        X = torch.tensor([room_and_breeze_list])

        X_masked = torch.masked.MaskedTensor(X, mask=X >= 0)
        if Global._debug: print ("Status:\t\t\t*** Running the BayesianNetwork model - Tensor mask:", X_masked)

        # Do the prediction.

        bayesian_network_tensors = bayesian_network_model.predict_proba(X_masked)
        if Global._display: print ("Status:\t\t\t*** Running the BayesianNetwork model - Tensors:", bayesian_network_tensors)

        # Extract the probabilities for each of the neighbours to allow the Agent to make the best decision based
        # on probability reasoning.

        neighbour_prob_dict = {}
        tensor_idx = 0

        # Iterate over the room's neighbours.

        for breeze_room_neighbour in breeze_room_neighbours:
            
            # Extract the True and False values, put it into a dictionary and set it on the 
            # neighbour probability dictionary.

            breeze_room_neighbour_tensor = bayesian_network_tensors[tensor_idx]
            breeze_room_neighbour_tensor_probs_list = breeze_room_neighbour_tensor.tolist()

            true_false_dict = {}
            true_false_dict["False"] = breeze_room_neighbour_tensor_probs_list[0][0]
            true_false_dict["True"]  = breeze_room_neighbour_tensor_probs_list[0][1]

            neighbour_prob_dict[breeze_room_neighbour] = true_false_dict

            # Go to the next Tensor.

            tensor_idx = tensor_idx + 1

        # Return the dictionary of neighbours and their probabilities.

        return neighbour_prob_dict


    # _create_cases_with_two_neighbours
    #
    # Courtesy of Larry Simon

    def _create_cases_with_two_neighbours(self) -> []:

        # Initialize the grid.

        grid = []

        # For two neighbours, the grid will include just layers.

        # Iterate over the two T/F values.

        for i in [False, True]:

            # Layer level.

            layer = []

            # Iterate over the two T/F values.

            for j in [False, True]:

                # Case level.

                case = i or j      

                if case:
                    p = 1.0
                else:
                    p = 0.0

                # Append the case to the layer.

                layer.append(PredicateC(p).toList())  # row

            # Append the layer to the grid.

            grid.append(layer)

        if Global._display: print ("Status:\t\t\t*** Cases grid with 2 neighbours:", grid)

        return grid


    # _create_cases_with_three_neighbours
    #
    # Extending Larry Simon's method.

    def _create_cases_with_three_neighbours(self) -> []:

        # Initialize the grid.

        grid = []

        # For three neighbours, the grid will include a cube and layers.

        # Iterate over the two T/F values.

        for h in [False, True]:

            # Cube level.

            cube = []

            # Iterate over the two T/F values.

            for i in [False, True]:

                # Layer level.

                layer = []

                # Iterate over the two T/F values.

                for j in [False, True]:

                    # Case level.

                    case = i or j      

                    if case:
                        p = 1.0
                    else:
                        p = 0.0

                    # Append the case to the layer.

                    layer.append(PredicateC(p).toList())  # row

                # Append the layer to the cube.

                cube.append(layer)
                
            # Append the cube to the grid.

            grid.append(cube)

        if Global._display: print ("Status:\t\t\t*** Cases grid with 3 neighbours:", grid)

        return grid


    # _create_cases_with_four_neighbours
    #
    # Extending Larry Simon's method.

    def _create_cases_with_four_neighbours(self) -> []:

        # Initialize the grid.

        grid = []

        # For four neighbours, the grid will include a structure, cube and layers.

        # Iterate over the two T/F values.

        for g in [False, True]:

            # Structure level.

            structure = [] 

            # Iterate over the two T/F values.

            for h in [False, True]:

                # Cube level.

                cube = []

                # Iterate over the two T/F values.

                for i in [False, True]:

                    # Layer level.

                    layer = []

                    # Iterate over the two T/F values.

                    for j in [False, True]:
                        
                        # Case level.

                        case = i or j     

                        if case:
                            p = 1.0
                        else:
                            p = 0.0

                        # Append the case to the layer.

                        layer.append(PredicateC(p).toList())  # row

                    # Append the layer to the cube.

                    cube.append(layer)

                # Append the cube to the structure.
                
                structure.append(cube)

            # Append the structure to the grid.

            grid.append(structure)

        if Global._display: print ("Status:\t\t\t*** Cases grid with 4 neighbours:", grid)

        return grid


Define the Abstract Character State, the abstract super class for the Environment to manage the Agent and Wumpus characters' states .

In [17]:
# CharacterStateA.py

class CharacterStateA:

    # Constructor.

    def __init__(self, location):

        # General.

        self.location = location
        self.isAlive = True


    # Getters and Setters.

    def get_location(self) -> ():
        return self.location

    def set_location(self, location):
        self.location = location

    def get_isAlive(self) -> bool:
        return self.isAlive

    def set_isAlive(self, isAlive):
        self.isAlive = isAlive


Define the Agent State class, the concrete sub-class of the Abstract Character State so the Environment can manage the state of the Agent.

In [19]:
# AgentStateC.py


class AgentStateC(CharacterStateA):


    # Default constructor.

    def __init__(self, location):

        # Call the super class to set the location.

        super().__init__(location)

        # Define attributes specific to AgentStateC.

        self.orientation = Global._east
        self.facing = Global._right

        self.hasGold = False
        self.hasArrow = True
        self.hasClimbedOut = False

        # Initialize the score to 0.

        self.score = 0


    # Define Getters and Setters.

    def get_orientation(self) -> str:
        return self.orientation

    def set_orientation(self, orientation):
        self.orientation = orientation

    def get_facing(self) -> str:
        return self.facing

    def set_facing(self, facing):
        self.facing = facing

    def get_hasGold(self) -> bool:
        return self.hasGold

    def set_hasGold(self, hasGold):
        self.hasGold = hasGold

    def get_hasArrow(self) -> bool:
        return self.hasArrow

    def set_hasArrow(self, hasArrow):
        self.hasArrow = hasArrow
    
    def get_hasClimbedOut(self) -> bool:
        return self.hasClimbedOut

    def set_hasClimbedOut(self, hasClimbedOut):
        self.hasClimbedOut = hasClimbedOut

    def get_score(self) -> int:
        return self.score

    def set_score(self, score):
        self.score = score

    def update_score(self, score_value):
        self.score = self.score + score_value


    # forward

    def forward(self) -> ():

        # Only advance the Agent if within the cave boundaries.

        current_loc_col = self.location[0]
        current_loc_row = self.location[1]

        if (self.orientation == Global._south):
            current_loc_row = current_loc_row - 1
        elif (self.orientation == Global._north):
            current_loc_row = current_loc_row + 1
        elif (self.orientation == Global._east):
            current_loc_col = current_loc_col + 1
        elif (self.orientation == Global._west):
            current_loc_col = current_loc_col - 1

        if Global._display: print ("Action Result:\t\tAgent is looking to move", self.orientation, "to", (current_loc_col, current_loc_row))

        return (current_loc_col, current_loc_row)


    # turnLeft

    def turnLeft(self):

        # Update the Agent's orientation based on its current orientation.

        orientation_index = Global._orientation_array.index(self.orientation)
        self.orientation = Global._orientation_array[orientation_index-1]

        if Global._display: print ("Action Result:\t\tAgent has turned left and is now facing", self.orientation)
        
    
    # turnRight

    def turnRight(self):

        # Update the Agent's orientation based on its current orientation.

        orientation_index = Global._orientation_array.index(self.orientation)
        right_index = orientation_index + 1

        # Unlike the turnRight method, an index larger than the array is an out
        # of bounds error.  Check for this condition (len will be the index of the
        # out of bounds as array starts at 0).

        if (right_index == len(Global._orientation_array)):
            right_index = 0

        self.orientation = Global._orientation_array[right_index]

        if Global._display: print ("Action Result:\t\tAgent has turned right and is now facing", self.orientation)



Define the Wumpus State class, the concrete sub-class of the Abstract Character State so the Environment can manage the state of the Wumpus.

In [21]:
# WumpusStateC.py

class WumpusStateC(CharacterStateA):

    # Constructor.

    def __init__(self, location):

        # Call the super class to set the location.

        super().__init__(location)

        # No other attributes required for WumpusState other than 
        # what is provided in the super class (location, isAlive).


Define the Environment class.

In [23]:
# Import libaries.

import random

# class: EnvironmentC

class EnvironmentC:
    
    # Constructor.
    
    def __init__(self, width=4, height=4, allowClimbWithoutGold=True, pitProb=0.2):

        self.active_episode = True
        self.width = width
        self.height = height
        self.allowClimbWithoutGold = allowClimbWithoutGold
        self.pitProb = pitProb

        # Create an array to keep track of which rooms are not available.

        unavailable_rooms_array = []

        # Initialize the Agent and add its location to the unavailable rooms list.
    
        self.agent_location = (1, 1)
        self.agentState = AgentStateC(self.agent_location)
        unavailable_rooms_array.append(self.agent_location)

        # Initialize the location of the Wumpus and add its location to the unavailable rooms list (maybe future).

        self.wumpus_location = self.__get_random_coordinate(unavailable_rooms_array)
        self.wumpusState = WumpusStateC(self.wumpus_location)
        # unavailable_rooms_array.append(self.wumpus_location)

        # Initialize the location of the Gold and add its location to the unavailable rooms list (maybe future).

        self.gold_location = self.__get_random_coordinate(unavailable_rooms_array)
        # unavailable_rooms_array.append(self.gold_location)

        # Initialize the location of the pits based on the unavailable list (currently, just the Agent).

        self.pit_locations = self.__determine_pit_locations(unavailable_rooms_array)


    # is_active_episode

    def is_active_episode(self) -> bool:

        # An episode is still active if the Agent is still alive and has not yet
        # climbed out of the cave.

        if ((self.agentState.get_isAlive()) and (self.agentState.get_hasClimbedOut() == False)):
            return True

        # Inactive episode.

        return False


    # get_Agent_Score

    def get_Agent_Score(self) -> int:

        return self.agentState.get_score()


    # display_initial_episode

    def display_initial_episode(self):

        print ("The Episode:")
        print ("------------")
        print ("The Agent is at: \t", self.agent_location)
        print ("The Wumpus is at: \t", self.wumpus_location)
        print ("The Gold is at: \t", self.gold_location)
        print ("The Pit(s) are at: \t", self.pit_locations)
        print ("The Agent is facing: \t", self.agentState.get_orientation())
        print ("------------")

        # Display the initial episode board.

        self.display_board()


    # display_board

    def display_board(self):
        
        print ("The agent score is:\t", self.agentState.get_score())

        print ("\n\t     ", end='')

        # Draw the x grid coordinates.

        for x in range(1, self.width+1):
            print (' ', x, '   ', end='')

        print ("\n")

        # Get the orientation of the Agent.

        agent_orientation = self.agentState.get_orientation()

        # Draw the y grid starting from height.

        for y in range(self.height, 0, -1):
            
            # Draw the northern orientation layer.
            
            print ("\t     ", end='')

            for z in range(1, self.width+1):

                # Determine if this is the area the Agent is facing north.

                if ((z, y) == self.agent_location) and (agent_orientation == Global._north):
                    print ('  ^    ', end='')
                else:
                    print ('       ', end='')

            # Tab over for more readability and output the y grid coordinate.

            print ("\n\t", y, "  ", end='')

            # Draw the x grid starting from 1.

            for x in range(1, self.width+1):

                if ((x, y) == self.agent_location) and (agent_orientation == Global._west):

                    # Display the Agent facing west.

                    print ('< A  ', ' ', end='')

                elif ((x, y) == self.agent_location) and (agent_orientation == Global._east):

                    # Display the Agent facing east.

                    print ('  A >', ' ', end='')

                elif ((x, y) == self.agent_location):

                    # Display the Agent (facing either north or south).

                    print ('  A  ', ' ', end='')

                elif ((x, y) == self.wumpus_location):

                    # Determine if the Wumpus is dead or alive.

                    if (self.wumpusState.get_isAlive()):
                        print ('  W  ', ' ', end='')
                    else:
                        print ('  w  ', ' ', end='')

                elif ((x, y) == self.gold_location) and (self.agentState.get_hasGold() == False):

                    # Determine if the Gold has already been grabbed.

                    print ('  G  ', ' ', end='')

                elif ((x, y) in self.pit_locations):
                    print ('  P  ', ' ', end='')

                else:
                    print ('     ', ' ', end='')

            # Draw the southern orientation layer.
             
            print ("\n\t     ", end='')
          
            for z in range(1, self.width+1):

                # Determine if this is the area the Agent is facing south.

                if ((z, y) == self.agent_location) and (agent_orientation == Global._south):
                    print ('  V    ', end='')
                else:
                    print ('       ', end='')

            print("\n")


    # get_percepts

    def get_percepts(self) -> PerceptsC:

        agent_percepts = PerceptsC()

        # Pits.  If the Agent is adjacent to a pit send a breeze percept.

        for loc in self.pit_locations:

            pit_adjacent_rooms = []

            curr_loc_col = loc[0]
            curr_loc_row = loc[1]

            pit_adjacent_rooms.append((curr_loc_col, curr_loc_row+1)) # north
            pit_adjacent_rooms.append((curr_loc_col, curr_loc_row-1)) # south
            pit_adjacent_rooms.append((curr_loc_col+1, curr_loc_row)) # east
            pit_adjacent_rooms.append((curr_loc_col-1, curr_loc_row)) # west

            if self.agent_location in pit_adjacent_rooms:
                agent_percepts.set_breeze(True)
                break


        # Wampus.  If the Agent is adjacent to the Wumpus or is on top of a dead
        # Wampus, send a stench percept.

        wampus_adjacent_rooms = []

        wampus_col = self.wumpus_location[0]
        wampus_row = self.wumpus_location[1]

        wampus_adjacent_rooms.append((wampus_col, wampus_row+1)) # north
        wampus_adjacent_rooms.append((wampus_col, wampus_row-1)) # south
        wampus_adjacent_rooms.append((wampus_col+1, wampus_row)) # east
        wampus_adjacent_rooms.append((wampus_col-1, wampus_row)) # west

        if self.agent_location in wampus_adjacent_rooms:
            agent_percepts.set_stench(True)

        # Gold.  If the Agent is in the same room as the gold
        # send a glitter percept.

        if (self.agent_location == self.gold_location):
            agent_percepts.set_glitter(True)

        return agent_percepts


    # take_action

    def take_action(self, action) -> PerceptsC:

        my_actionPercepts = PerceptsC()

        # Time to take action.  Regardless of the outcome, update 
        # the score by -1.

        self.agentState.update_score(-1)

        if Global._display: print ("Action Result:\t\tAgent is currently at", self.agent_location, "facing", self.agentState.get_orientation())

        if (action == Global._forward_action):  
            my_actionPercepts = self.__forward_action()

        elif (action == Global._turnLeft_action):
            self.__turnLeft_action()

        elif (action == Global._turnRight_action):
            self.__turnRight_action()

        elif (action == Global._shoot_action):
            my_actionPercepts = self.__shoot_action()

        elif (action == Global._grab_action):
            self.__grab_action()

        elif (action == Global._climb_action):
            self.__climb_action()

        else:
            if Global._display: print ("Invalid action", action)

        return my_actionPercepts


    # Private methods


    # __get_random_coordinate

    def __get_random_coordinate(self, unavailable_rooms_array) -> tuple([int, int]):

        # Get a random coordinate.  Keep trying until the random coordinate
        # is available.

        attempts = 0

        grid_size = self.width * self.height

        while (attempts != grid_size):
            random_row = random.randint(1, self.height)
            random_col = random.randint(1, self.width)

            if ((random_col, random_row) not in unavailable_rooms_array):
                break

            attempts = attempts + 1

        # Check to see if all points on the board are unavailable.

        if (attempts == grid_size):
            random_col, random_row = 0

        return (random_col, random_row)


    # __determine_pit_locations

    def __determine_pit_locations(self, unavailable_rooms_array) -> list [(int, int)]:

        pit_list = []
        pit_or_nopit = [ 'P', '-' ]
        pit_probabilities = [self.pitProb, (1 - self.pitProb)]

        for i in range(1, self.height+1):

            for j in range(1, self.width+1):

                if ((i, j) not in unavailable_rooms_array):

                    pit = random.choices(pit_or_nopit, weights=pit_probabilities, k=1)[0]

                    if (pit == 'P'):
                        pit_list.append((i, j))

        return pit_list


    # __forward_action

    def __forward_action(self) -> PerceptsC:

        my_actionPercepts = PerceptsC()

        # Get the candidate move location from the Agent moving forward.
        # The move location is only a 'candidate' until the Environment can
        # ascertain that it is a valid grid location.

        candidate_move_loc = self.agentState.forward()

        candidate_loc_col = candidate_move_loc[0]
        candidate_loc_row = candidate_move_loc[1]
        
        if ((candidate_loc_col < 1) or (candidate_loc_col > self.width) or 
            (candidate_loc_row < 1) or (candidate_loc_row > self.height)):

            # Invalid forward move.  Set the Bump percept.

            if Global._display: print("Action Result:\t\tInvalid forward move (out of bounds).  Agent remains at",self.agent_location)
            my_actionPercepts.set_bump(True)

        else:

            # Valid forward move.
        
            self.agentState.set_location(candidate_move_loc)
            self.agent_location = candidate_move_loc
            my_actionPercepts.set_move(candidate_move_loc)
            my_actionPercepts.set_direction(self.agentState.get_orientation())

            if Global._display: print("Action Result:\t\tValid forward move.  Agent is now at", self.agent_location)

            self.__determine_forward_fate()

        return my_actionPercepts


    # __determine_forward_fate

    def __determine_forward_fate(self):

        # Determine if the move forward action by the Agent has resulted in running into a
        # pit or an alive Wumpus.

        if (self.agent_location in self.pit_locations):

            if Global._display: print ("Action Result:\t\t*** Agent has fallen into a pit ***")
            self.agentState.set_isAlive(False)

            # The Agent has fallen into a pit.  Update the score -1000.

            self.agentState.update_score(-1000)


        elif ((self.agent_location == self.wumpus_location) and
              (self.wumpusState.get_isAlive())):

            if Global._display: print ("Action Result:\t\t*** Agent has been eaten by the Wumpus ***")
            self.agentState.set_isAlive(False)

            # The Agent has been eaten by the Wumpus.  Update the score -1000.

            self.agentState.update_score(-1000)
        else:

            # No fate on the Agent moving forward.
            # Agent can continue.

            pass


    # __shoot_action

    def __shoot_action(self) -> PerceptsC:

        my_actionPercepts = PerceptsC()

        if Global._display: print ("Action Result:\t\tDoes the Agent have the arrow?", self.agentState.get_hasArrow())

        # If the Agent has the arrow then it can be fired.

        if (self.agentState.get_hasArrow()):

            # The arrow has been shot.  Update the score -10.
            if Global._display: print ("Action Result:\t\tThe Agent has shot the arrow")

            self.agentState.update_score(-10)

            orientation = self.agentState.get_orientation()
            current_loc = self.agentState.get_location()
            current_loc_col = current_loc[0]
            current_loc_row = current_loc[1]

            arrow_flight_path = []

            # Determine the arrow flight path through the cave based on the
            # direction the Agent is facing.

            if (orientation == Global._south):
                # South

                for i in range(current_loc_row-1, 0, -1):
                    arrow_flight_path.append((current_loc_col, i))

            elif (orientation == Global._north):
                # North
                
                for i in range(current_loc_row+1, 4+1):
                    arrow_flight_path.append((current_loc_col, i))
            elif (orientation == Global._east):
                # East

                for i in range(current_loc_col+1, 4+1):
                    arrow_flight_path.append((i, current_loc_row))
            elif (orientation == Global._west):
                # West

                for i in range(current_loc_col-1, 0, -1):
                    arrow_flight_path.append((i, current_loc_row))
                    
            # Iterate over the rooms that are in the shooting path.

            for room_location in arrow_flight_path:

                if Global._display: print ("Action Result:\t\tArrow is travelling through room", room_location, '.. ', end='')

                # Is the Wumpus in this room?

                if (room_location == self.wumpus_location):
                   # Yes.

                    if Global._display: print ("*** The Wumpus has been killed ***")

                    # Update the precepts and set the Wumpus alive status to False.

                    my_actionPercepts.set_scream(True)
                    self.wumpusState.set_isAlive(False)
                    break;
                else:
                    # No.

                    if Global._display: print ("The Wumpus is not in room", room_location)

            # Remove the arrow from the Agent state.

            self.agentState.set_hasArrow(False)

        else:
            
            # The arrow has already been shot.
            if Global._display: print ("Action Result:\t\tThe Agent has already shot the arrow")


        # Return the new set of percepts that occurred post-action firing.

        return my_actionPercepts


    # __grab_action

    def __grab_action(self):

        # Determine if the Gold is in the same room as the Agent.

        if (self.agent_location == self.gold_location):
            if Global._display: print("Action Result:\t\t*** The Agent has grabbed the gold ***")
            self.agentState.set_hasGold(True)

        else:
            if Global._display: print("Action Result:\t\tThere is no gold in room", self.agent_location)


    # __climb_action

    def __climb_action(self):

        # The Agent can only climb out from the original room.

        if (self.agent_location == Global._start_room):

            # Determine if the Agent has the gold.

            if (self.agentState.get_hasGold()):
                
                # The gold has been grabbed and climbing out.  Update the score +1000.

                self.agentState.update_score(1000)

                if Global._display: print ("Action Result:\t\t*** The Agent is climbing out with the gold :-) ***")
                self.agentState.set_hasClimbedOut(True)

            else:
                # The Agent can only climb out without the gold if the
                # episode permits its.

                if (self.allowClimbWithoutGold):
                    if Global._display: print ("Action Result:\t\t*** The Agent is climbing out without the gold :-( ***")
                    self.agentState.set_hasClimbedOut(True)

                else:
                    if Global._display: print ("Action Result:\t\tThe Agent cannot climb out without the gold")

        else:
            # The Agent is not in the original room.

            if Global._display: print ("Action Result:\t\tThe Agent cannot climb out as they are not in the original room", Global._start_room)


    # __turnRight_action

    def __turnRight_action(self):

        # Agent turns right.

        self.agentState.turnRight()


    # __turnLeft_action

    def __turnLeft_action(self):

        # Agent turns left.

        self.agentState.turnLeft()


Define the Episode Controller which will create the Agent and the Environment and then run the episode.

In [25]:
# EpisodeControllerC.py


class EpisodeControllerC():
    
    # Constructor.

    def __init__(self, agent, environment):
        
        # The Episode Controller requires an Agent and an Environment.

        self.agent = agent
        self.environment = environment


    # playEpisode

    def playEpisode(self) -> int:

        # Display the initial board.

        if Global._display: self.environment.display_initial_episode()

        # Execute the episode until it is no longer active.

        agent_move = 0
        
        while (self.environment.is_active_episode()):

            # Increase the agent_move to track the number of movements in the episode.
            
            agent_move = agent_move + 1
            
            if Global._display: print (">>----- Agent Move", agent_move, "----->>")
            
            # Get the pre-action Percepts to notify the Agent.

            pre_action_percepts = self.environment.get_percepts()

            # Notify the Agent of the pre-action percepts.

            self.agent.percept(pre_action_percepts)

            # The Agent will now select its next action.

            action = self.agent.action()

            # Take the action selected by the Agent in the Environment.

            post_action_percepts = self.environment.take_action(action)
           
            # Determine if this is still an active episode after the action
            # has been taken (e.g., the Agent hasn't fallen into a pit or
            # has been eaten by the Wumpus).

            if (self.environment.is_active_episode()):

                # Notify the Agent of the post-action percepts.

                self.agent.percept(post_action_percepts)

            # Display the game board after each action.

            if Global._display: self.environment.display_board()


        # Return the final score for the Agent.

        return self.environment.get_Agent_Score(), agent_move

Define the Wumpus Driver that is the entry point for the application.

In [27]:
# WumpusDriver.py

# Import libraries.

import warnings

# main

def main(agent_arg, episodes):

    # Filter out all warnings.

    warnings.filterwarnings("ignore")

    # Set local variables.

    number_of_episodes = episodes
    episode_wins = 0
    episode_scares = 0
    episode_total_score = 0

    print ("Ready Player 1")
    print ("--------------")
    print ("Starting the game... there are", number_of_episodes, "episodes to play.")

    for i in range(number_of_episodes):

        # Initialize the Agent.

        # For Project 1, use the Naive Agent (does not store location, all actions are random)
        # For Project 2, use the Move Planning Agent (stores location, move / turns are random, exit plan on gold)
        # For Project 3, use the Probability Agent (stores location, moves based on probability, exit plan on gold or danger)

        myAgent = None

        # Determine the Agent to use.

        if (agent_arg == "Naive"):

            myAgent = NaiveAgentC()

        elif (agent_arg == "MovePlanning"):

            myAgent = MovePlanningAgentC(Global._start_room)
        
        elif (agent_arg == "Prob"):

            myAgent = ProbAgentC(Global._start_room)

        else:
            # Default to the Naive Agent.

            myAgent = NaiveAgentC()

        # Initialize the environment.

        myEnvironment = EnvironmentC()

        # Initialize the game controller  and play the episode.

        episodeController = EpisodeControllerC(myAgent, myEnvironment)
        episodeScore, episodeMoves = episodeController.playEpisode()

        # Print the final score for the Agent.

        if Global._display: print ("Episode Complete: the Agent's final score is:", episodeScore, "in", episodeMoves, "moves.")

        # Capture the wins.

        if (episodeScore > 0):
            episode_wins = episode_wins + 1

        # Capture the times the Agent was too scared to continue.

        if ((episodeScore < 0) and (episodeScore > -10)):
            episode_scares = episode_scares + 1

        episode_total_score = episode_total_score + episodeScore
        if Global._display: print ("episode_total_score:", episode_total_score)

        # Print out a status message every 100 episodes.

        if ((i != 0) and ((i % 100) == 0)):
            print ("Completed", i, "episodes...")


    print ("\nStats:")
    print ("------")
    print ("Number of episodes:", number_of_episodes)
    print ("Number of episode wins:", episode_wins)
    win_format = f"% of wins: {(episode_wins / number_of_episodes) * 100:.2f}"
    print (win_format)
    print ("Number of times Agent was too scared or climbed out quickly:", episode_scares)
    scare_format = f"% times Agent was too scared or climbed out quickly: {(episode_scares / number_of_episodes) * 100:.2f}"
    print (scare_format)
    print ("Average score:", (episode_total_score / number_of_episodes))
    print ("\nNote: for the Naive Agent, instead of being scared with reason, it is just simply climbing out")
    print ("        for the Move Planning Agent, it cannot climb out without having the gold")
    print ("        for the Probability Agent, it can climb out without having the gold if the next move is too dangerous for it")




## <font color="Blue">Agent Episodes</font>

### <font color="Green">Naive Agent</font>

In [30]:
# Release the Wumpus and the Naive Agent!

main("Naive", 1000)


Ready Player 1
--------------
Starting the game... there are 1000 episodes to play.
Completed 100 episodes...
Completed 200 episodes...
Completed 300 episodes...
Completed 400 episodes...
Completed 500 episodes...
Completed 600 episodes...
Completed 700 episodes...
Completed 800 episodes...
Completed 900 episodes...

Stats:
------
Number of episodes: 1000
Number of episode wins: 14
% of wins: 1.40
Number of times Agent was too scared or climbed out quickly: 349
% times Agent was too scared or climbed out quickly: 34.90
Average score: -323.897

Note: for the Naive Agent, instead of being scared with reason, it is just simply climbing out
        for the Move Planning Agent, it cannot climb out without having the gold
        for the Probability Agent, it can climb out without having the gold if the next move is too dangerous for it


### <font color="Green">Move Planning Agent</font>

In [32]:
# Release the Wumpus and the Move Planning Agent!

main("MovePlanning", 1000)


Ready Player 1
--------------
Starting the game... there are 1000 episodes to play.
Completed 100 episodes...
Completed 200 episodes...
Completed 300 episodes...
Completed 400 episodes...
Completed 500 episodes...
Completed 600 episodes...
Completed 700 episodes...
Completed 800 episodes...
Completed 900 episodes...

Stats:
------
Number of episodes: 1000
Number of episode wins: 195
% of wins: 19.50
Number of times Agent was too scared or climbed out quickly: 0
% times Agent was too scared or climbed out quickly: 0.00
Average score: -653.685

Note: for the Naive Agent, instead of being scared with reason, it is just simply climbing out
        for the Move Planning Agent, it cannot climb out without having the gold
        for the Probability Agent, it can climb out without having the gold if the next move is too dangerous for it


### <font color="Green">Probability Agent</font>

#### <font color="Tomato">Danger Index = 50%</font>

In [35]:
# Release the Wumpus and the Probability Agent!

main("Prob", 1000)


Ready Player 1
--------------
Starting the game... there are 1000 episodes to play.
Completed 100 episodes...
Completed 200 episodes...
Completed 300 episodes...
Completed 400 episodes...
Completed 500 episodes...
Completed 600 episodes...
Completed 700 episodes...
Completed 800 episodes...
Completed 900 episodes...

Stats:
------
Number of episodes: 1000
Number of episode wins: 192
% of wins: 19.20
Number of times Agent was too scared or climbed out quickly: 283
% times Agent was too scared or climbed out quickly: 28.30
Average score: -168.429

Note: for the Naive Agent, instead of being scared with reason, it is just simply climbing out
        for the Move Planning Agent, it cannot climb out without having the gold
        for the Probability Agent, it can climb out without having the gold if the next move is too dangerous for it


#### <font color="Tomato">Danger Index = 5%</font>

In [37]:
# Agent Fear Index

Global._agent_fear_index = .05

# Release the Wumpus and the Probability Agent!

main("Prob", 1000)

Ready Player 1
--------------
Starting the game... there are 1000 episodes to play.
Completed 100 episodes...
Completed 200 episodes...
Completed 300 episodes...
Completed 400 episodes...
Completed 500 episodes...
Completed 600 episodes...
Completed 700 episodes...
Completed 800 episodes...
Completed 900 episodes...

Stats:
------
Number of episodes: 1000
Number of episode wins: 204
% of wins: 20.40
Number of times Agent was too scared or climbed out quickly: 0
% times Agent was too scared or climbed out quickly: 0.00
Average score: -510.831

Note: for the Naive Agent, instead of being scared with reason, it is just simply climbing out
        for the Move Planning Agent, it cannot climb out without having the gold
        for the Probability Agent, it can climb out without having the gold if the next move is too dangerous for it


#### <font color="Tomato">Individual Probability Agent Runs</font>

In [39]:
# Return the Agent Fear Index to 50%
# Update the Display variable to show the Agent in action.

# Agent Fear Index

Global._agent_fear_index = .5

Global._display = True

##### <font color="Grey">Probability Agent Run #1</font>

In [41]:
# Release the Wumpus and the Probability Agent!

main("Prob", 1)


Ready Player 1
--------------
Starting the game... there are 1 episodes to play.
The Episode:
------------
The Agent is at: 	 (1, 1)
The Wumpus is at: 	 (2, 1)
The Gold is at: 	 (3, 4)
The Pit(s) are at: 	 [(2, 3)]
The Agent is facing: 	 east
------------
The agent score is:	 0

	       1      2      3      4    

	                                 
	 4                   G           
	                                 

	                                 
	 3            P                  
	                                 

	                                 
	 2                               
	                                 

	                                 
	 1     A >    W                  
	                                 

>>----- Agent Move 1 ----->>
Percepts:	['Stench']
Status:			*** Agent is currently exploring looking for Gold...
Action Result:		Agent is currently at (1, 1) facing east
Action Result:		Does the Agent have the arrow? True
Action Result:		The Agent has shot the

##### <font color="Grey">Probability Agent Run #2</font>

In [43]:
# Release the Wumpus and the Probability Agent!

main("Prob", 1)


Ready Player 1
--------------
Starting the game... there are 1 episodes to play.
The Episode:
------------
The Agent is at: 	 (1, 1)
The Wumpus is at: 	 (1, 3)
The Gold is at: 	 (2, 3)
The Pit(s) are at: 	 [(1, 4), (3, 3), (3, 4), (4, 4)]
The Agent is facing: 	 east
------------
The agent score is:	 0

	       1      2      3      4    

	                                 
	 4     P             P      P    
	                                 

	                                 
	 3     W      G      P           
	                                 

	                                 
	 2                               
	                                 

	                                 
	 1     A >                       
	                                 

>>----- Agent Move 1 ----->>
Percepts:	[]
Status:			*** Agent is currently exploring looking for Gold...
Status:			*** Agent is looking to move...
Status:			*** Agent is looking to see which move can avoid a pit...
Status:			*** Cases g

##### <font color="Grey">Probability Agent Run #3</font>

In [45]:
# Release the Wumpus and the Probability Agent!

main("Prob", 1)


Ready Player 1
--------------
Starting the game... there are 1 episodes to play.
The Episode:
------------
The Agent is at: 	 (1, 1)
The Wumpus is at: 	 (2, 2)
The Gold is at: 	 (3, 1)
The Pit(s) are at: 	 [(2, 1), (3, 3), (4, 4)]
The Agent is facing: 	 east
------------
The agent score is:	 0

	       1      2      3      4    

	                                 
	 4                          P    
	                                 

	                                 
	 3                   P           
	                                 

	                                 
	 2            W                  
	                                 

	                                 
	 1     A >    P      G           
	                                 

>>----- Agent Move 1 ----->>
Percepts:	['Breeze']
Status:			*** Agent is currently exploring looking for Gold...
Status:			*** Agent is looking to move...
Status:			*** Agent is looking to see which move can avoid a pit...
Status:			*** Cases g

## <font color="Blue">Agent Analysis</font>

For the **Naive Agent**, all movements are completely random.  The Naive Agent can simply climb out at the start of the episode.  As the movements are completely random, it only wins on average ~1% of the time.  Approximately 30-35% of the time, the Naive Agent simply climbs out or does a few other actions first.  However, the average score over 1000 episodes is ~-300 to -400.

For the **Move Planning Agent** the movements and shooting the arrow are random.  It creates an exit plan once it grabs the gold and it cannot climb out without having the gold. It wins about 17-20% of the time as it cannot climb out quickly.  However, it's score over 1000 episodes is ~ -700 as it will get killed more often by a pit or the Wumpus.

For the **Probability Agent**, the movements are probabilistic and the arrow is only shot (in this instantiation) when the stench of the Wumpus is detected to give it the best chance of killing the Wumpus.  At a 50% danger / fear index (i.e., if the room options for its next move is less than 50% safe to go into a room, the Agent will bail out and invoke its exit plan), it wins about 15-17% of the time, bails out about 30% of the episodes but has a pretty good average score over 1000 episodes of -200.  At a 5% danger / fear index, it wins slightly more at about 20% of the time, bails out 0% of the time but has a lower average score of -700 as it will get killed more often by a pit or the Wumpus.

In summary, the more time the Agent spends in the cave, the greater the chance that it will find the gold and win.  However, it will die more so that will bring down its overall score.