In [1]:
# Imports
import cv2
from dowhy import CausalModel
import econml
import glob
import gym
from gym import spaces
import matplotlib.pyplot as plt
from matplotlib.offsetbox import OffsetImage, AnnotationBbox
import numpy as np
import pandas as pd
import random
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LassoCV
from sklearn.ensemble import GradientBoostingRegressor
import tensorflow as tf
from tensorflow.keras import backend as k
from tensorflow.keras.layers import Dense, Input
from keras.models import Model
from tensorflow.keras.optimizers import Adam
from time import time

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

import warnings
warnings.filterwarnings("ignore")

pd.set_option('display.max_columns', None)

### Wumpus World Environment

In [2]:
# Defining the Wumpus World Environment.
class WumpusWorldEnvironment(gym.Env):
    """This class implements the Wumpus World environment."""

    def __init__(self, environment_type):
        """This method initializes the environment.

        :param str environment_type: - (It can take two values: 1. 'training' 2. 'testing' indicating the type of
                                    environment.)"""

        self.environment_type = environment_type

        if self.environment_type == 'training':
            self.environment_width = 6
            self.environment_height = 6
            # This defines the total number of grid blocks in the environment.
            self.observation_space = spaces.Discrete(self.environment_width * self.environment_height)
            # This defines that there are 4 discrete actions that the agent can perform.
            self.action_space = spaces.Discrete(4)
            self.number_of_agents = 1  # This defines the number of agents in the environment.
            self.agent_pos = np.asarray([0, 0])  # This defines the agent's default initial position in the environment.
            # This defines the positions of breeze in the environment.
            self.breeze_pos = np.asarray([[1, 0], [3, 0], [5, 0], [2, 1], [4, 1], [1, 2], [3, 2], [5, 2], [0, 3],
                                          [2, 3], [1, 4], [3, 4], [5, 4], [0, 5], [2, 5], [4, 5]])
            self.gold_pos = np.asarray([4, 5])  # This defines the position of gold in the environment.
            self.gold_quantity = 1  # This defines the quantity of gold.
            # This defines the positions of pit in the environment.
            self.pit_pos = np.asarray([[2, 0], [5, 1], [2, 2], [0, 4], [2, 4], [3, 5], [5, 5]])
            # This defines the positions of stench in the environment.
            self.stench_pos = np.asarray([[3, 2], [2, 3], [4, 3], [3, 4]])
            self.wumpus_pos = np.asarray([3, 3])  # This defines the position of the Wumpus in the environment.
            self.timesteps = 0  # This defines the steps the agent has taken during an episode.
            self.max_timesteps = 1000  # This defines the maximum steps the agent can take during an episode.

            # Creating the mapping from the co-ordinates to the state.
            self.coordinates_state_mapping = {}
            for i in range(self.environment_height):
                for j in range(self.environment_width):
                    self.coordinates_state_mapping[f'{np.asarray([j, i])}'] = i * self.environment_width + j

            # Storing the terminal and non-terminal states.
            self.terminal_states = []
            self.non_terminal_states = []
            for position in self.coordinates_state_mapping:
                if np.array_equal(f'{self.wumpus_pos}', position) or np.array_equal(f'{self.gold_pos}', position) or \
                        any(np.array_equal(f'{self.pit_pos[i]}', position) for i in range(len(self.pit_pos))) or \
                        any(np.array_equal(f'{self.breeze_pos[i]}', position) for i in range(len(self.breeze_pos))) or \
                        any(np.array_equal(f'{self.stench_pos[i]}', position) for i in range(len(self.stench_pos))):
                    self.terminal_states.append(self.coordinates_state_mapping[position])
                else:
                    self.non_terminal_states.append(self.coordinates_state_mapping[position])

        elif self.environment_type == 'testing':
            self.environment_width = 6
            self.environment_height = 6
            # This defines the total number of grid blocks in the environment.
            self.observation_space = spaces.Discrete(self.environment_width * self.environment_height)
            # This defines that there are 4 discrete actions that the agent can perform.
            self.action_space = spaces.Discrete(4)
            self.number_of_agents = 1  # This defines the number of agents in the environment.
            self.agent_pos = np.asarray([0, 0])  # This defines the agent's default initial position in the environment.
            # This defines the positions of breeze in the environment.
            self.breeze_pos = np.asarray(
                [[1, 0], [3, 0], [5, 0], [2, 1], [4, 1], [1, 2], [3, 2], [5, 2], [0, 3], [2, 3],
                 [1, 4], [3, 4], [5, 4], [0, 5], [2, 5], [4, 5]])
            self.gold_pos = np.asarray([0, 5])  # This defines the position of gold in the environment.
            self.gold_quantity = 1  # This defines the quantity of gold.
            # This defines the positions of pit in the environment.
            self.pit_pos = np.asarray([[2, 0], [5, 1], [2, 2], [0, 4], [2, 4], [3, 5], [5, 5]])
            # This defines the positions of stench in the environment.
            self.stench_pos = np.asarray([[3, 2], [2, 3], [4, 3], [3, 4]])
            self.wumpus_pos = np.asarray([3, 3])  # This defines the position of the Wumpus in the environment.
            self.timesteps = 0  # This defines the steps the agent has taken during an episode.
            self.max_timesteps = 1000  # This defines the maximum steps the agent can take during an episode.

            # Creating the mapping from the co-ordinates to the state.
            self.coordinates_state_mapping = {}
            for i in range(self.environment_height):
                for j in range(self.environment_width):
                    self.coordinates_state_mapping[f'{np.asarray([j, i])}'] = i * self.environment_width + j

            # Storing the terminal and non-terminal states.
            self.terminal_states = []
            self.non_terminal_states = []
            for position in self.coordinates_state_mapping:
                if np.array_equal(f'{self.wumpus_pos}', position) or np.array_equal(f'{self.gold_pos}', position) or \
                        any(np.array_equal(f'{self.pit_pos[i]}', position) for i in range(len(self.pit_pos))) or \
                        any(np.array_equal(f'{self.breeze_pos[i]}', position) for i in range(len(self.breeze_pos))) or \
                        any(np.array_equal(f'{self.stench_pos[i]}', position) for i in range(len(self.stench_pos))):
                    self.terminal_states.append(self.coordinates_state_mapping[position])
                else:
                    self.non_terminal_states.append(self.coordinates_state_mapping[position])

    def partially_observable_state(self, agent_position):
        """This method returns the array to append to the states for partially observable MDP.
        :param arr agent_position: Integer representation of the state from the environment.

        :return: arr observation: Array representing the partial observation."""

        observation = np.zeros(9 * 5)
        positions_to_evaluate = [agent_position, [agent_position[0] - 1, agent_position[1]],
                                 [agent_position[0] - 1, agent_position[1] + 1],
                                 [agent_position[0], agent_position[1] + 1],
                                 [agent_position[0] + 1, agent_position[1] + 1],
                                 [agent_position[0] + 1, agent_position[1]],
                                 [agent_position[0] + 1, agent_position[1] - 1],
                                 [agent_position[0], agent_position[1] - 1],
                                 [agent_position[0] - 1, agent_position[1] - 1]]

        index = 0
        for position in positions_to_evaluate:
            if any(np.array_equal(position, self.breeze_pos[x]) for x in range(len(self.breeze_pos))):
                observation[index] = 1
            if any(np.array_equal(position, self.stench_pos[x]) for x in range(len(self.stench_pos))):
                observation[index + 1] = 1
            if any(np.array_equal(position, self.pit_pos[x]) for x in range(len(self.pit_pos))):
                observation[index + 2] = 1
            if np.array_equal(position, self.wumpus_pos):
                observation[index + 3] = 1
            if np.array_equal(position, self.gold_pos):
                observation[index + 4] = 1
            index += 5

        return observation

    def reset(self, random_start=False):
        """This method resets the agent position and returns the state as the observation.

        :param bool random_start: - Boolean indicating whether the agent will start in a random or fixed position.

        :returns arr observation: -  Array representing the partial observation."""

        if not random_start:
            self.agent_pos = np.asarray([0, 0])  # Upon resetting the environment the agent's position is set to [0, 0].
        else:
            # Randomly selecting the agent's position.
            random_state = random.choice(self.non_terminal_states)
            self.agent_pos = np.asarray([random_state % self.environment_width,
                                         int(np.floor(random_state / self.environment_width))])

        observation = self.partially_observable_state(self.agent_pos)
        self.timesteps = 0  # Resetting the number of steps taken by the agent.
        self.gold_quantity = 1  # Resetting the Gold quantity to be 1.

        return observation

    def step(self, action):
        """This method implements what happens when the agent takes a particular action. It changes the agent's
        position (While not allowing it to go out of the environment space.), maps the environment co-ordinates to a
        state, defines the rewards for the various states, and determines when the episode ends.

        :param int action: - Integer in the range 0 to 3 inclusive representing the different actions the agent can
        take.

        :returns arr observation: - Array representing the partial observation.
                 int reward: - Integer value that's used to measure the performance of the agent.
                 bool done: - Boolean describing whether the episode has ended.
                 dict info: - A dictionary that can be used to provide additional implementation information."""

        # Describing the outcomes of the various possible actions.
        if action == 0:
            self.agent_pos[0] += 1  # This action causes the agent to go right.
        if action == 1:
            self.agent_pos[0] -= 1  # This action causes the agent to go left.
        if action == 2:
            self.agent_pos[1] += 1  # This action causes the agent to go up.
        if action == 3:
            self.agent_pos[1] -= 1  # This action causes the agent to go down.

        # Ensuring that the agent doesn't go out of the environment.
        self.agent_pos = np.clip(self.agent_pos, a_min=[0, 0],
                                 a_max=[self.environment_width - 1, self.environment_height - 1])
        observation = self.partially_observable_state(self.agent_pos)

        self.timesteps += 1  # Increasing the total number of steps taken by the agent.

        reward = 0
        # Setting the reward to 10 if the agent reaches the gold.
        if np.array_equal(self.agent_pos, self.gold_pos) and self.gold_quantity > 0:
            self.gold_quantity -= 1
            reward = 1000

        for i in range(len(self.pit_pos)):  # Setting the reward to -1 if the agent falls in the pit.
            if np.array_equal(self.agent_pos, self.pit_pos[i]):
                reward = -50

        # Setting the reward to -1 if the agent is killed by Wumpus.
        if np.array_equal(self.agent_pos, self.wumpus_pos):
            reward = -100

        # The episode terminates when the agent reaches the Gold, or is killed by the Wumpus, falls into the pit, or
        # takes more than 10 steps.
        if self.gold_quantity == 0 or \
                np.array_equal(self.agent_pos, self.wumpus_pos):
            done = True
        else:
            done = False
        for i in range(len(self.pit_pos)):
            if np.array_equal(self.agent_pos, self.pit_pos[i]):
                done = True
        if self.timesteps == self.max_timesteps:
            done = True

        info = {}

        return observation, reward, done, info

    def render(self, mode='human', plot=False):
        """This method renders the environment.

        :param str mode: 'human' renders to the current display or terminal and returns nothing.
        :param bool plot: Boolean indicating whether we show a plot or not. If False, the method returns a resized NumPy
                     array representation of the environment to be used as the state. If True it plots the environment.

        :returns arr preprocessed_image: Grayscale NumPy array representation of the environment."""

        fig, ax = plt.subplots(figsize=(15, 15))  # Initializing the figure.
        ax.set_xlim(0, 6)  # Setting the limit on the x-axis.
        ax.set_ylim(0, 6)  # Setting the limit on the y-axis.

        def plot_image(plot_pos):
            """This is a helper function to render the environment. It checks which objects are in a particular
            position on the grid and renders the appropriate image.

            :param arr plot_pos: Co-ordinates of the grid position which needs to be rendered."""

            # Initially setting every object to not be plotted.
            plot_agent, plot_breeze, plot_gold, plot_pit, plot_stench, plot_wumpus = \
                False, False, False, False, False, False

            # Checking which objects need to be plotted by comparing their positions.
            if np.array_equal(self.agent_pos, plot_pos):
                plot_agent = True
            if any(np.array_equal(self.breeze_pos[i], plot_pos) for i in range(len(self.breeze_pos))):
                plot_breeze = True
            if self.gold_quantity > 0:  # Gold isn't plotted if it has already been picked by one of the agents.
                if np.array_equal(plot_pos, self.gold_pos):
                    plot_gold = True
            if any(np.array_equal(self.pit_pos[i], plot_pos) for i in range(len(self.pit_pos))):
                plot_pit = True
            if any(np.array_equal(self.stench_pos[i], plot_pos) for i in range(len(self.stench_pos))):
                plot_stench = True
            if np.array_equal(plot_pos, self.wumpus_pos):
                plot_wumpus = True

            # Plot for Agent.
            if plot_agent and \
                    all(not item for item in
                        [plot_breeze, plot_gold, plot_pit, plot_stench, plot_wumpus]):
                agent = AnnotationBbox(OffsetImage(plt.imread('./images/agent.png'), zoom=0.28),
                                       np.add(plot_pos, [0.5, 0.5]), frameon=False)
                ax.add_artist(agent)

            # Plot for Breeze.
            elif plot_breeze and \
                    all(not item for item in
                        [plot_agent, plot_gold, plot_pit, plot_stench, plot_wumpus]):
                breeze = AnnotationBbox(OffsetImage(plt.imread('./images/breeze.png'), zoom=0.28),
                                        np.add(plot_pos, [0.5, 0.5]), frameon=False)
                ax.add_artist(breeze)

            # Plot for Gold.
            elif plot_gold and \
                    all(not item for item in
                        [plot_agent, plot_breeze, plot_pit, plot_stench, plot_wumpus]):
                gold = AnnotationBbox(OffsetImage(plt.imread('./images/gold.png'), zoom=0.28),
                                      np.add(plot_pos, [0.5, 0.5]), frameon=False)
                ax.add_artist(gold)

            # Plot for Pit.
            elif plot_pit and \
                    all(not item for item in
                        [plot_agent, plot_breeze, plot_gold, plot_stench, plot_wumpus]):
                pit = AnnotationBbox(OffsetImage(plt.imread('./images/pit.png'), zoom=0.28),
                                     np.add(plot_pos, [0.5, 0.5]), frameon=False)
                ax.add_artist(pit)

            # Plot for Stench.
            elif plot_stench and \
                    all(not item for item in
                        [plot_agent, plot_breeze, plot_gold, plot_pit, plot_wumpus]):
                stench = AnnotationBbox(OffsetImage(plt.imread('./images/stench.png'), zoom=0.28),
                                        np.add(plot_pos, [0.5, 0.5]), frameon=False)
                ax.add_artist(stench)

            # Plot for Wumpus.
            elif plot_wumpus and \
                    all(not item for item in
                        [plot_agent, plot_breeze, plot_gold, plot_pit, plot_stench]):
                wumpus = AnnotationBbox(OffsetImage(plt.imread('./images/wumpus.png'), zoom=0.28),
                                        np.add(plot_pos, [0.5, 0.5]), frameon=False)
                ax.add_artist(wumpus)

            # Plot for Agent and Breeze.
            elif all(item for item in [plot_agent, plot_breeze]) and \
                    all(not item for item in
                        [plot_gold, plot_pit, plot_stench, plot_wumpus]):
                agent_breeze = AnnotationBbox(OffsetImage(plt.imread('./images/agent_breeze.png'), zoom=0.28),
                                              np.add(plot_pos, [0.5, 0.5]), frameon=False)
                ax.add_artist(agent_breeze)

            # Plot for Agent and Pit.
            elif all(item for item in [plot_agent, plot_pit]) and \
                    all(not item for item in
                        [plot_breeze, plot_gold, plot_stench, plot_wumpus]):
                agent_pit = AnnotationBbox(OffsetImage(plt.imread('./images/agent_dead_pit.png'), zoom=0.28),
                                           np.add(plot_pos, [0.5, 0.5]), frameon=False)
                ax.add_artist(agent_pit)

            # Plot for Agent and Stench.
            elif all(item for item in [plot_agent, plot_stench]) and \
                    all(not item for item in
                        [plot_breeze, plot_gold, plot_pit, plot_wumpus]):
                agent_stench = AnnotationBbox(OffsetImage(plt.imread('./images/agent_stench.png'), zoom=0.28),
                                              np.add(plot_pos, [0.5, 0.5]), frameon=False)
                ax.add_artist(agent_stench)

            # Plot for Agent, Breeze and Stench.
            elif all(item for item in [plot_agent, plot_breeze, plot_stench]) and \
                    all(not item for item in
                        [plot_gold, plot_pit, plot_wumpus]):
                agent_breeze_stench = AnnotationBbox(OffsetImage(plt.imread('./images/agent_breeze_stench.png'),
                                                                 zoom=0.28), np.add(plot_pos, [0.5, 0.5]),
                                                     frameon=False)
                ax.add_artist(agent_breeze_stench)

            # Plot for Agent and Wumpus.
            elif all(item for item in [plot_agent, plot_wumpus]) and \
                    all(not item for item in
                        [plot_gold, plot_pit, plot_stench, plot_breeze]):
                agent_wumpus = AnnotationBbox(OffsetImage(plt.imread('./images/agent_dead_wumpus_alive.png'),
                                                          zoom=0.28), np.add(plot_pos, [0.5, 0.5]), frameon=False)
                ax.add_artist(agent_wumpus)

            # Plot for Breeze and Gold.
            elif all(item for item in [plot_breeze, plot_gold]) and \
                    all(not item for item in
                        [plot_agent, plot_pit, plot_stench, plot_wumpus]):
                breeze_gold = AnnotationBbox(OffsetImage(plt.imread('./images/breeze_gold.png'), zoom=0.28),
                                             np.add(plot_pos, [0.5, 0.5]), frameon=False)
                ax.add_artist(breeze_gold)

            # Plot for Breeze and Stench.
            elif all(item for item in [plot_breeze, plot_stench]) and \
                    all(not item for item in
                        [plot_agent, plot_gold, plot_pit, plot_wumpus]):
                breeze_stench = AnnotationBbox(OffsetImage(plt.imread('./images/breeze_stench.png'), zoom=0.28),
                                               np.add(plot_pos, [0.5, 0.5]), frameon=False)
                ax.add_artist(breeze_stench)

            # Plot for Breeze, Stench, and Gold.
            elif all(item for item in [plot_breeze, plot_gold, plot_stench]) and \
                    all(not item for item in
                        [plot_agent, plot_pit, plot_wumpus]):
                breeze_gold_stench = AnnotationBbox(OffsetImage(plt.imread('./images/breeze_gold_stench.png'),
                                                                zoom=0.28), np.add(plot_pos, [0.5, 0.5]),
                                                    frameon=False)
                ax.add_artist(breeze_gold_stench)

            # Plot for Stench and Gold.
            elif all(item for item in [plot_stench, plot_gold]) and \
                    all(not item for item in
                        [plot_agent, plot_breeze, plot_pit, plot_wumpus]):
                stench_gold = AnnotationBbox(OffsetImage(plt.imread('./images/stench_gold.png'), zoom=0.28),
                                             np.add(plot_pos, [0.5, 0.5]), frameon=False)
                ax.add_artist(stench_gold)

        coordinates_state_mapping_2 = {}
        for j in range(self.environment_height * self.environment_width):
            coordinates_state_mapping_2[j] = np.asarray(
                [j % self.environment_width, int(np.floor(j / self.environment_width))])

        # Rendering the images for all states.
        for position in coordinates_state_mapping_2:
            plot_image(coordinates_state_mapping_2[position])

        plt.xticks([0, 1, 2, 3, 4, 5])  # Specifying the ticks on the x-axis.
        plt.yticks([0, 1, 2, 3, 4, 5])  # Specifying the ticks on the y-axis.
        plt.grid()  # Setting the plot to be of the type 'grid'.

        if plot:  # Displaying the plot.
            plt.show()
        else:  # Returning the preprocessed image representation of the environment.
            fig.canvas.draw()
            img = np.array(fig.canvas.renderer.buffer_rgba())[:, :, :1]
            width = int(img.shape[1] * 84 / 1000)
            height = int(img.shape[0] * 84 / 1000)
            dim = (width, height)
            # noinspection PyUnresolvedReferences
            preprocessed_image = cv2.resize(img, dim, interpolation=cv2.INTER_AREA)
            return preprocessed_image

In [3]:
class GenerateCausalMap:
    """This class performs the causal inference and generates the casual map."""

    def __init__(self, environment):
        """This method initializes the environment variables."""

        self.environment = environment
        self.samples_to_collect = 25000

        self.history = {'Action Right': [], 'Action Left': [], 'Action Up': [], 'Action Down': [],
                        'State At Breeze': [], 'State At Stench': [], 'State At Pit': [], 'State At Wumpus': [],
                        'State At Gold': [],
                        'State Left Breeze': [], 'State Left Stench': [], 'State Left Pit': [], 'State Left Wumpus': [],
                        'State Left Gold': [],
                        'State Top Left Breeze': [], 'State Top Left Stench': [], 'State Top Left Pit': [],
                        'State Top Left Wumpus': [], 'State Top Left Gold': [],
                        'State Up Breeze': [], 'State Up Stench': [], 'State Up Pit': [], 'State Up Wumpus': [],
                        'State Up Gold': [],
                        'State Top Right Breeze': [], 'State Top Right Stench': [], 'State Top Right Pit': [],
                        'State Top Right Wumpus': [], 'State Top Right Gold': [],
                        'State Right Breeze': [], 'State Right Stench': [], 'State Right Pit': [],
                        'State Right Wumpus': [], 'State Right Gold': [],
                        'State Bottom Right Breeze': [], 'State Bottom Right Stench': [], 'State Bottom Right Pit': [],
                        'State Bottom Right Wumpus': [], 'State Bottom Right Gold': [],
                        'State Down Breeze': [], 'State Down Stench': [], 'State Down Pit': [], 'State Down Wumpus': [],
                        'State Down Gold': [],
                        'State Bottom Left Breeze': [], 'State Bottom Left Stench': [], 'State Bottom Left Pit': [],
                        'State Bottom Left Wumpus': [], 'State Bottom Left Gold': [],
                        'Next State At Breeze': [], 'Next State At Stench': [], 'Next State At Pit': [],
                        'Next State At Wumpus': [], 'Next State At Gold': [],
                        'Next State Left Breeze': [], 'Next State Left Stench': [], 'Next State Left Pit': [],
                        'Next State Left Wumpus': [], 'Next State Left Gold': [],
                        'Next State Top Left Breeze': [], 'Next State Top Left Stench': [],
                        'Next State Top Left Pit': [], 'Next State Top Left Wumpus': [], 'Next State Top Left Gold': [],
                        'Next State Up Breeze': [], 'Next State Up Stench': [], 'Next State Up Pit': [],
                        'Next State Up Wumpus': [], 'Next State Up Gold': [],
                        'Next State Top Right Breeze': [], 'Next State Top Right Stench': [],
                        'Next State Top Right Pit': [], 'Next State Top Right Wumpus': [],
                        'Next State Top Right Gold': [],
                        'Next State Right Breeze': [], 'Next State Right Stench': [], 'Next State Right Pit': [],
                        'Next State Right Wumpus': [], 'Next State Right Gold': [],
                        'Next State Bottom Right Breeze': [], 'Next State Bottom Right Stench': [],
                        'Next State Bottom Right Pit': [], 'Next State Bottom Right Wumpus': [],
                        'Next State Bottom Right Gold': [],
                        'Next State Down Breeze': [], 'Next State Down Stench': [], 'Next State Down Pit': [],
                        'Next State Down Wumpus': [], 'Next State Down Gold': [],
                        'Next State Bottom Left Breeze': [], 'Next State Bottom Left Stench': [],
                        'Next State Bottom Left Pit': [], 'Next State Bottom Left Wumpus': [],
                        'Next State Bottom Left Gold': [],
                        'Reward': [], 'Done': []}
        self.data = pd.DataFrame.from_dict(self.history)
        self.list_of_positions = ['At', 'Left', 'Top Left', 'Up', 'Top Right', 'Right', 'Bottom Right', 'Down',
                                  'Bottom Left']

    def generate_random_data(self):
        """This method generates data through random exploration."""

        while len(self.history['Action Right']) < self.samples_to_collect:
            state = self.environment.reset(random_start=True)
            done = False
            
            while not done:

                action = self.environment.action_space.sample()

                next_state, reward, done, info = self.environment.step(action)

                if action == 0:
                    self.history['Action Right'].append(True)
                else:
                    self.history['Action Right'].append(False)

                if action == 1:
                    self.history['Action Left'].append(True)
                else:
                    self.history['Action Left'].append(False)

                if action == 2:
                    self.history['Action Up'].append(True)
                else:
                    self.history['Action Up'].append(False)

                if action == 3:
                    self.history['Action Down'].append(True)
                else:
                    self.history['Action Down'].append(False)

                self.history['Reward'].append(reward)
                self.history['Done'].append(done)

                index = 0
                for i in range(9):

                    if state[index] == 1:
                        self.history['State ' + self.list_of_positions[i] + ' Breeze'].append(True)
                    elif state[index] == 0:
                        self.history['State ' + self.list_of_positions[i] + ' Breeze'].append(False)

                    if state[index + 1] == 1:
                        self.history['State ' + self.list_of_positions[i] + ' Stench'].append(True)
                    elif state[index + 1] == 0:
                        self.history['State ' + self.list_of_positions[i] + ' Stench'].append(False)

                    if state[index + 2] == 1:
                        self.history['State ' + self.list_of_positions[i] + ' Pit'].append(True)
                    elif state[index + 2] == 0:
                        self.history['State ' + self.list_of_positions[i] + ' Pit'].append(False)

                    if state[index + 3] == 1:
                        self.history['State ' + self.list_of_positions[i] + ' Wumpus'].append(True)
                    elif state[index + 3] == 0:
                        self.history['State ' + self.list_of_positions[i] + ' Wumpus'].append(False)

                    if state[index + 4] == 1:
                        self.history['State ' + self.list_of_positions[i] + ' Gold'].append(True)
                    elif state[index + 4] == 0:
                        self.history['State ' + self.list_of_positions[i] + ' Gold'].append(False)

                    if next_state[index] == 1:
                        self.history['Next State ' + self.list_of_positions[i] + ' Breeze'].append(True)
                    elif next_state[index] == 0:
                        self.history['Next State ' + self.list_of_positions[i] + ' Breeze'].append(False)

                    if next_state[index + 1] == 1:
                        self.history['Next State ' + self.list_of_positions[i] + ' Stench'].append(True)
                    elif next_state[index + 1] == 0:
                        self.history['Next State ' + self.list_of_positions[i] + ' Stench'].append(False)

                    if next_state[index + 2] == 1:
                        self.history['Next State ' + self.list_of_positions[i] + ' Pit'].append(True)
                    elif next_state[index + 2] == 0:
                        self.history['Next State ' + self.list_of_positions[i] + ' Pit'].append(False)

                    if next_state[index + 3] == 1:
                        self.history['Next State ' + self.list_of_positions[i] + ' Wumpus'].append(True)
                    elif next_state[index + 3] == 0:
                        self.history['Next State ' + self.list_of_positions[i] + ' Wumpus'].append(False)

                    if next_state[index + 4] == 1:
                        self.history['Next State ' + self.list_of_positions[i] + ' Gold'].append(True)
                    elif next_state[index + 4] == 0:
                        self.history['Next State ' + self.list_of_positions[i] + ' Gold'].append(False)
                    index += 5
                    
                state = next_state

                if len(self.history['Action Right']) == self.samples_to_collect:
                    break
                    
        self.data = pd.DataFrame.from_dict(self.history)

    def reward_to_object(self):
        """This method computes the causal estimate from the reward to the objects."""

        # Necessary dictionaries.
        models = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimands = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimates = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimate_values = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        refutation_estimates = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        environment_objects = ['Breeze', 'Gold', 'Pit', 'Stench', 'Wumpus']

        # I. Create a causal model from the data and given graph.
        for environment_object in environment_objects:
            graph = f'graph[directed 1node[ id "Reward" label "Reward"]node[ id "Next State At {environment_object}"' \
                f' label "Next State At {environment_object}"]edge[source "Reward" target "Next State At ' \
                f'{environment_object}"]]'
            model = CausalModel(data=self.data, treatment=['Reward'], outcome=[f'Next State At {environment_object}'],
                                graph=graph)
            models[f'{environment_object}'].append(model)

        # II. Identify causal effect and return target estimands.
        for environment_object in environment_objects:
            estimands[f'{environment_object}'].append(models[f'{environment_object}'][0].identify_effect())

        # III. Estimate the target estimand using a statistical method.
        for environment_object in environment_objects:
            estimate = models[f'{environment_object}'][0].estimate_effect(estimands[f'{environment_object}'][0],
                                                                          method_name="backdoor.linear_regression",
                                                                          test_significance=True, control_value=0,
                                                                          treatment_value=1)
            estimates[f'{environment_object}'].append(estimate)
            estimate_values[f'{environment_object}'].append(estimate.value)

        # IV. Refute the obtained estimate using multiple robustness checks.
        method_names = ['random_common_cause', 'placebo_treatment_refuter', 'data_subset_refuter', 'bootstrap_refuter']
        for environment_object in environment_objects:
            for method_name in method_names:
                refute_results = models[f'{environment_object}'][0].refute_estimate(estimands[f'{environment_object}'][0],
                                                                                    estimates[f'{environment_object}'][0],
                                                                                    method_name=method_name)
                refutation_estimates[f'{environment_object}'].append(refute_results)

        return models, estimands, estimates, estimate_values, refutation_estimates

    def object_to_reward(self):
        """This method computes the causal estimate from the objects to the rewards not considering the common
        causes."""

        # Necessary dictionaries.
        models = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimands = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimates = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimate_values = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        refutation_estimates = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        environment_objects = ['Breeze', 'Gold', 'Pit', 'Stench', 'Wumpus']

        # I. Create a causal model from the data and given graph.
        for environment_object in environment_objects:
            graph = f'graph[directed 1node[ id "Reward" label "Reward"]node[ id "Next State At {environment_object}"' \
                f' label "Next State At {environment_object}"]edge[source "Next State At {environment_object}" target' \
                f' "Reward"]]'
            model = CausalModel(data=self.data, treatment=[f'Next State At {environment_object}'], outcome=['Reward'],
                                graph=graph)
            models[f'{environment_object}'].append(model)

        # II. Identify causal effect and return target estimands.
        for environment_object in environment_objects:
            estimands[f'{environment_object}'].append(models[f'{environment_object}'][0].identify_effect())

        # III. Estimate the target estimand using a statistical method.
        for environment_object in environment_objects:
            estimate = models[f'{environment_object}'][0].estimate_effect(estimands[f'{environment_object}'][0],
                                                                          method_name="backdoor.linear_regression",
                                                                          test_significance=True, control_value=0,
                                                                          treatment_value=1)
            estimates[f'{environment_object}'].append(estimate)
            estimate_values[f'{environment_object}'].append(estimate.value)

        # IV. Refute the obtained estimate using multiple robustness checks.
        method_names = ['random_common_cause', 'placebo_treatment_refuter', 'data_subset_refuter', 'bootstrap_refuter']
        for environment_object in environment_objects:
            for method_name in method_names:
                refute_results = models[f'{environment_object}'][0].refute_estimate(estimands[f'{environment_object}'][0],
                                                                                    estimates[f'{environment_object}'][0],
                                                                                    method_name=method_name)
                refutation_estimates[f'{environment_object}'].append(refute_results)

        return models, estimands, estimates, estimate_values, refutation_estimates

    def object_to_reward_common_causes(self):
        """This method computes the causal estimate from the objects to the rewards considering the common causes."""

        # Necessary dictionaries.
        models = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimands = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimates = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimate_values = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        refutation_estimates = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        environment_objects = ['Breeze', 'Gold', 'Pit', 'Stench', 'Wumpus']

        # I. Create a causal model from the data and given graph. 
        for environment_object in environment_objects:
            common_causes = ['Next State At Gold', 'Next State At Wumpus', 'Next State At Breeze',
                                 'Next State At Stench', 'Next State At Pit']
            common_causes.remove(f'Next State At {environment_object}')
            model = CausalModel(data=self.data, treatment=[f'Next State At {environment_object}'], outcome=['Reward'],
                                common_causes=common_causes)
            models[f'{environment_object}'].append(model)
        
        # II. Identify causal effect and return target estimands.
        for environment_object in environment_objects:
            estimands[f'{environment_object}'].append(models[f'{environment_object}'][0].identify_effect())

        # III. Estimate the target estimand using a statistical method.
        for environment_object in environment_objects:
            estimate = models[f'{environment_object}'][0].estimate_effect(estimands[f'{environment_object}'][0],
                                                                       method_name="backdoor.linear_regression",
                                                 test_significance=True, control_value=0, treatment_value=1)
            estimates[f'{environment_object}'].append(estimate)
            estimate_values[f'{environment_object}'].append(estimate.value)

        # IV. Refute the obtained estimate using multiple robustness checks.
        method_names = ['random_common_cause', 'placebo_treatment_refuter', 'data_subset_refuter', 'bootstrap_refuter']
        for environment_object in environment_objects:
            for method_name in method_names:
                refute_results = models[f'{environment_object}'][0].refute_estimate(estimands[f'{environment_object}'][0], estimates[f'{environment_object}'][0], method_name=method_name)
                refutation_estimates[f'{environment_object}'].append(refute_results)
   
        return models, estimands, estimates, estimate_values, refutation_estimates
    
    def object_to_done(self):
        """This method computes the causal estimate from the objects to the rewards considering the common causes."""

        # Necessary dictionaries.
        models = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimands = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimates = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimate_values = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        refutation_estimates = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        environment_objects = ['Breeze', 'Gold', 'Pit', 'Stench', 'Wumpus']

        # I. Create a causal model from the data and given graph.
        for environment_object in environment_objects:
            common_causes = ['Next State At Gold', 'Next State At Wumpus', 'Next State At Breeze',
                                 'Next State At Stench', 'Next State At Pit']
            common_causes.remove(f'Next State At {environment_object}')
            model = CausalModel(data=self.data, outcome=['Done'], treatment=[f'Next State At {environment_object}'],
                                common_causes=common_causes)
            models[f'{environment_object}'].append(model)

        # II. Identify causal effect and return target estimands.
        for environment_object in environment_objects:
            estimands[f'{environment_object}'].append(models[f'{environment_object}'][0].identify_effect())

        # III. Estimate the target estimand using a statistical method.
        for environment_object in environment_objects:
            estimate = models[f'{environment_object}'][0].estimate_effect(estimands[f'{environment_object}'][0],
                                                                       method_name="backdoor.linear_regression",
                                                 test_significance=True, control_value=0, treatment_value=1)
            estimates[f'{environment_object}'].append(estimate)
            estimate_values[f'{environment_object}'].append(estimate.value)

        # IV. Refute the obtained estimate using multiple robustness checks.
        method_names = ['random_common_cause', 'placebo_treatment_refuter', 'data_subset_refuter', 'bootstrap_refuter']
        for environment_object in environment_objects:
            for method_name in method_names:
                refute_results = models[f'{environment_object}'][0].refute_estimate(estimands[f'{environment_object}'][0], estimates[f'{environment_object}'][0], method_name=method_name)
                refutation_estimates[f'{environment_object}'].append(refute_results)
   
        return models, estimands, estimates, estimate_values, refutation_estimates
    
    def state_to_next_state(self):
        """This method computes the causal estimate from the state to the next state considering the actions as effect
        modifiers."""

        # Necessary dictionaries.
        models = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimands = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimates = {'Breeze': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Gold': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Pit': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Stench': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Wumpus': {'Left': [], 'Right': [], 'Up': [], 'Down': []}}
        estimate_values = {'Breeze': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Gold': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Pit': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Stench': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Wumpus': {'Left': [], 'Right': [], 'Up': [], 'Down': []}}
        refutation_estimates = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        actions = ['Left', 'Right', 'Up', 'Down']
        positions = ['Left', 'Right', 'Up', 'Down']
        environment_objects = ['Breeze', 'Gold', 'Pit', 'Stench', 'Wumpus']

        # I. Create a causal model from the data and given graph.
        for environment_object in environment_objects:
            for action in actions:
                graph = f'graph[directed 1node[ id "Next State At {environment_object}" label "Next State At ' \
                        f'{environment_object}"]node[ id "State {action} {environment_object}" label "State ' \
                        f'{action} {environment_object}"]edge[source "State {action} {environment_object}" target ' \
                        f'"Next State At {environment_object}"]]'
                common_causes = ['State At Breeze', 'State At Gold', 'State At Pit', 'State At Stench', 'State At Wumpus',
                 'State Left Breeze', 'State Right Breeze', 'State Up Breeze', 'State Down Breeze', 'State Left Gold',
                 'State Right Gold', 'State Up Gold', 'State Down Gold', 'State Left Pit', 'State Right Pit',
                 'State Up Pit', 'State Down Pit', 'State Left Stench', 'State Right Stench', 'State Up Stench',
                 'State Down Stench', 'State Left Wumpus', 'State Right Wumpus', 'State Up Wumpus', 'State Down Wumpus']

                common_causes.remove(f'State {action} {environment_object}')
                current_object = environment_object
                for environment_object_ in environment_objects:
                    if current_object != environment_object_:
                        common_causes.remove(f'State At {environment_object_}')
                    for action_ in actions:
                        if current_object != environment_object_:
                            common_causes.remove(f'State {action_} {environment_object_}')

                model = CausalModel(data=self.data, treatment=[f'State {action} {environment_object}'],
                                    outcome=[f'Next State At {environment_object}'], common_causes=common_causes,
                                    effect_modifiers=['Action Left', 'Action Right', 'Action Up', 'Action Down'])
                models[f'{environment_object}'].append(model)

        # II. Identify causal effect and return target estimands.
        for environment_object in environment_objects:
            for i in range(len(actions)):
                estimands[f'{environment_object}'].append(
                    models[f'{environment_object}'][i].identify_effect(proceed_when_unidentifiable=True))

        # III. Estimate the target estimand using a statistical method.
        for environment_object in environment_objects:
            for i, position in enumerate(positions):
                for action in actions:
                    dml_estimate = models[f'{environment_object}'][i].estimate_effect(
                        estimands[f'{environment_object}'][i], method_name="backdoor.econml.dml.DML",
                        control_value=0, treatment_value=1, target_units=lambda df: df[f"Action {action}"] > 0,
                        confidence_intervals=False, method_params={
                            "init_params": {'model_y': GradientBoostingRegressor(),
                                            'model_t': GradientBoostingRegressor(),
                                            "model_final": LassoCV(fit_intercept=False),
                                            'featurizer': PolynomialFeatures(degree=1, include_bias=True)},
                            "fit_params": {}})
                    estimates[f'{environment_object}'][f'{position}'].append(dml_estimate)
                    estimate_values[f'{environment_object}'][f'{position}'].append(dml_estimate.value)

        # # IV. Refute the obtained estimate using multiple robustness checks.
        # method_names = ['random_common_cause', 'placebo_treatment_refuter', 'data_subset_refuter', 'bootstrap_refuter']
        # for environment_object in environment_objects:
        #     print('\nEnvironment Object:', environment_object)
        #     for method_name in method_names:
        #         print('Method Name:', method_name)
        #         index = 0
        #         for i, action in enumerate(actions):
        #             print('I', i, 'Action:', action)
        #             while index < 16:
        #                 print('Index:', index)
        #                 refute_results = models[f'{environment_object}'][i].refute_estimate(
        #                     estimands[f'{environment_object}'][i], estimates[f'{environment_object}'][index],
        #                     method_name=method_name)
        #                 refutation_estimates[f'{environment_object}'].append(refute_results)
        #                 print('REFUTE RESULTS', refute_results)
        #                 index += 1
        #                 if index % 4 == 0:
        #                     break
 
        return models, estimands, estimates, estimate_values, refutation_estimates

    def state_to_reward(self):
        """This method computes the causal estimate from the state to the rewards considering the actions as effect
        modifiers."""

        # Necessary dictionaries.
        models = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimands = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        estimates = {'Breeze': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Gold': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Pit': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Stench': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Wumpus': {'Left': [], 'Right': [], 'Up': [], 'Down': []}}
        estimate_values = {'Breeze': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Gold': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Pit': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Stench': {'Left': [], 'Right': [], 'Up': [], 'Down': []}, 'Wumpus': {'Left': [], 'Right': [], 'Up': [], 'Down': []}}
        refutation_estimates = {'Breeze': [], 'Gold': [], 'Pit': [], 'Stench': [], 'Wumpus': []}
        actions = ['Left', 'Right', 'Up', 'Down']
        positions = ['Left', 'Right', 'Up', 'Down']
        environment_objects = ['Breeze', 'Gold', 'Pit', 'Stench', 'Wumpus']

        # I. Create a causal model from the data and given graph.
        for environment_object in environment_objects:
            for action in actions:
                common_causes = ['Next State At Gold', 'Next State At Wumpus', 'Next State At Breeze',
                                 'Next State At Stench', 'Next State At Pit']
                common_causes.remove(f'Next State At {environment_object}')
                model = CausalModel(data=self.data, treatment=[f'State {action} {environment_object}'],
                                    outcome=['Reward'], common_causes=common_causes,
                                    effect_modifiers=['Action Left', 'Action Right', 'Action Up', 'Action Down'])
                models[f'{environment_object}'].append(model)

        # II. Identify causal effect and return target estimands
        for environment_object in environment_objects:
            for i in range(len(actions)):
                estimands[f'{environment_object}'].append(
                    models[f'{environment_object}'][i].identify_effect(proceed_when_unidentifiable=True))

        # III. Estimate the target estimand using a statistical method.
        for environment_object in environment_objects:
            for i, position in enumerate(positions):
                for action in actions:
                    dml_estimate = models[f'{environment_object}'][i].estimate_effect(
                        estimands[f'{environment_object}'][i], method_name="backdoor.econml.dml.DML",
                        control_value=0, treatment_value=1, target_units=lambda df: df[f"Action {action}"] > 0,
                        confidence_intervals=False, method_params={
                            "init_params": {'model_y': GradientBoostingRegressor(),
                                            'model_t': GradientBoostingRegressor(),
                                            "model_final": LassoCV(fit_intercept=False),
                                            'featurizer': PolynomialFeatures(degree=1, include_bias=False)},
                            "fit_params": {}})
                    estimates[f'{environment_object}'][f'{position}'].append(dml_estimate)
                    estimate_values[f'{environment_object}'][f'{position}'].append(dml_estimate.value)

        # # IV. Refute the obtained estimate using multiple robustness checks.
        # method_names = ['random_common_cause', 'placebo_treatment_refuter', 'data_subset_refuter', 'bootstrap_refuter']
        # for environment_object in environment_objects:
        #     for method_name in method_names:
        #         index = 0
        #         for i, action in enumerate(actions):
        #             while index < 16:
        #                 refute_results = models[f'{environment_object}'][i].refute_estimate(
        #                     estimands[f'{environment_object}'][i], estimates[f'{environment_object}'][index],
        #                     method_name=method_name)
        #                 refutation_estimates[f'{environment_object}'].append(refute_results)
        #                 index += 1
        #                 if index % 4 == 0:
        #                     break

        return models, estimands, estimates, estimate_values, refutation_estimates

In [4]:
# Instantiating the Wumpus World environment.
wumpus_world_environment_training = WumpusWorldEnvironment(environment_type='training')
wumpus_world_environment_testing = WumpusWorldEnvironment(environment_type='testing')

In [5]:
generate_causal_map = GenerateCausalMap(wumpus_world_environment_training)

print('\nGenerating data through random exploration:')
generate_causal_map.generate_random_data()

print('\nCausal estimate from reward to objects:')
reward_to_object_models, reward_to_object_estimands, reward_to_object_estimates, reward_to_object_estimate_values, reward_to_object_refutation_estimates = generate_causal_map.reward_to_object()
print('Estimands:\n', reward_to_object_estimands)                   
print('Estimates:\n', reward_to_object_estimates)
print('Estimate Values:\n', reward_to_object_estimate_values)
print('Refutation Estimates:\n:', reward_to_object_refutation_estimates)

print('\nCausal estimate  from the object to rewards not considering the common causes:')
object_to_reward_models, object_to_reward_estimands, object_to_reward_estimates, object_to_reward_estimate_values, object_to_reward_refutation_estimates = generate_causal_map.object_to_reward()
print('Estimands:\n', object_to_reward_estimands)
print('Estimates:\n', object_to_reward_estimates)
print('Estimate Values:\n', object_to_reward_estimate_values)
print('Refutation Estimates:\n:', object_to_reward_refutation_estimates)

print('\nCausal estimate from the object to the rewards considering the common causes:')
object_to_reward_common_causes_models, object_to_reward_common_causes_estimands, object_to_reward_common_causes_estimates, object_to_reward_common_causes_estimate_values, object_to_reward_common_causes_refutation_estimates = generate_causal_map.object_to_reward_common_causes()
print('Estimands:\n', object_to_reward_common_causes_estimands)
print('Estimates:\n', object_to_reward_common_causes_estimates)
print('Estimate Values:\n', object_to_reward_common_causes_estimate_values)
print('Refutation Estimates:\n:', object_to_reward_common_causes_refutation_estimates)

print('\nCausal estimate from the object to the terminal condition considering the common causes:')
object_to_done_models, object_to_done_estimands, object_to_done_estimates, object_to_done_estimate_values, object_to_done_refutation_estimates = generate_causal_map.object_to_done()
print('Estimands:\n', object_to_done_estimands)
print('Estimates:\n', object_to_done_estimates)
print('Estimate Values:\n', object_to_done_estimate_values)
print('Refutation Estimates:\n:', object_to_done_refutation_estimates)

print('\nCausal estimate from the state to the next state:')
state_to_next_state_models, state_to_next_state_estimands, state_to_next_state_estimates, state_to_next_state_estimate_values, state_to_next_state_refutation_estimates = generate_causal_map.state_to_next_state()
print('Estimands:\n', state_to_next_state_estimands)
print('Estimates:\n', state_to_next_state_estimates)
print('Estimate Values:\n', state_to_next_state_estimate_values)
print('Refutation Estimates:\n:', state_to_next_state_refutation_estimates)

print('\nCausal estimate from the state to the reward:')
state_to_reward_models, state_to_reward_estimands, state_to_reward_estimates, state_to_reward_estimate_values, state_to_reward_refutation_estimates = generate_causal_map.state_to_reward()
print('Estimands:\n', state_to_reward_estimands)
print('Estimates:\n', state_to_reward_estimates)
print('Estimate Values:\n', state_to_reward_estimate_values)
print('Refutation Estimates:\n:', state_to_reward_refutation_estimates)



Generating data through random exploration:

Causal estimate from reward to objects:
Estimands:
 {'Breeze': [<dowhy.causal_identifier.IdentifiedEstimand object at 0x0000029222D943A0>], 'Gold': [<dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292B90F8F40>], 'Pit': [<dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292B91B5070>], 'Stench': [<dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292B9118940>], 'Wumpus': [<dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292B9118C10>]}
Estimates:
 {'Breeze': [<dowhy.causal_estimator.CausalEstimate object at 0x00000292BBE8C4C0>], 'Gold': [<dowhy.causal_estimator.CausalEstimate object at 0x00000292BBEC44F0>], 'Pit': [<dowhy.causal_estimator.CausalEstimate object at 0x00000292BBECCF70>], 'Stench': [<dowhy.causal_estimator.CausalEstimate object at 0x00000292BBEC4A00>], 'Wumpus': [<dowhy.causal_estimator.CausalEstimate object at 0x00000292BBEC4C40>]}
Estimate Values:
 {'Breeze': [0.0009211740925840761],

Estimands:
 {'Breeze': [<dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292BD80C970>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292BC5A6A00>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292BD89C220>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292BC3EC160>], 'Gold': [<dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292BD81DDF0>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292BC58EEB0>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292BC1AA9A0>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292BC130790>], 'Pit': [<dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292BF05F790>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292BC202EB0>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292BC199130>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292BC422190>], 'Stench': [<dowhy.causal_identifier.IdentifiedEstimand o

Estimands:
 {'Breeze': [<dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292CAC3B190>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292CAC3B250>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292CACDEC40>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292CACDE9D0>], 'Gold': [<dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292CACDECA0>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292CACDEBB0>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292CACDEE80>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292CACDEFA0>], 'Pit': [<dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292CACE50D0>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292CACE51C0>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292CACE52B0>, <dowhy.causal_identifier.IdentifiedEstimand object at 0x00000292CACE53A0>], 'Stench': [<dowhy.causal_identifier.IdentifiedEstimand o

In [10]:
from itertools import chain
from sklearn.linear_model import LinearRegression
from sklearn import preprocessing

class Policy:
    """This class learns the policy."""

    def __init__(self, test_environment):
        self.test_environment = test_environment
        
    def identify_primary_objective(self):
        """This method identifies the primary objective of the agent."""
        primary_objective = None
        primary_objective_reward = None
        for reward_value, done_estimate in zip(object_to_reward_common_causes_estimate_values, object_to_done_estimates):
            if not primary_objective:
                if object_to_done_estimate_values[f'{done_estimate}'][0]:
                    primary_objective = reward_value
                    primary_objective_reward = object_to_reward_common_causes_estimate_values[f'{reward_value}'][0]
            else:
                if object_to_done_estimates[f'{done_estimate}'][0]:
                    if primary_objective_reward < object_to_reward_common_causes_estimate_values[f'{reward_value}'][0]:
                        primary_objective = reward_value
                        primary_objective_reward = object_to_reward_common_causes_estimate_values[f'{reward_value}'][0]
        
        return primary_objective, primary_objective_reward
    
    def identify_secondary_objectives(self):
        """This method identifies the secondary objectives of the agent."""
        
        secondary_objectives = []
        secondary_objectives_rewards = []
        for reward_value, done_estimate in zip(object_to_reward_common_causes_estimate_values, object_to_done_estimate_values):
              if (-0.1 < (object_to_done_estimate_values[f'{done_estimate}'][0]) < 0.1) and ((object_to_reward_common_causes_estimate_values[f'{reward_value}'][0] + primary_objective_reward) > primary_objective_reward + 0.001 * primary_objective_reward):
                    secondary_objectives.append(reward_value)
                    secondary_objectives_rewards.append(object_to_reward_common_causes_estimate_values[f'{reward_value}'][0])
        
        return secondary_objectives, secondary_objectives_rewards
    
    def identify_objects_to_avoid(self):
        objects_to_avoid = []
        objects_to_avoid_rewards = []

        for reward_value, done_estimate in zip(object_to_reward_common_causes_estimate_values, object_to_done_estimate_values):
              if (0.9 < (object_to_done_estimate_values[f'{done_estimate}'][0]) < 1.1) and ((object_to_reward_common_causes_estimate_values[f'{reward_value}'][0] + primary_objective_reward) < primary_objective_reward):
                    objects_to_avoid.append(reward_value)
                    objects_to_avoid_rewards.append(object_to_reward_common_causes_estimate_values[f'{reward_value}'][0])
        
        return objects_to_avoid, objects_to_avoid_rewards
    
    def identify_situations_previously_unencountered(self):
        
        situations_unencountered = []
        for column_name in generate_causal_map.data.columns:
            if all(generate_causal_map.data[column_name] == False):
                situations_unencountered.append(column_name)
        return situations_unencountered
    
    def generalize_for_unencountered_situations(self):
        
        testdf = pd.DataFrame({'Object': [], 'Position': [], 'Action Estimates': []})
        testdf['Object'] = list(chain.from_iterable([key for i in range(4)] for key in state_to_next_state_estimate_values.keys()))
        testdf['Position'] = list(chain.from_iterable([pos_key for pos_key in state_to_next_state_estimate_values[key].keys()] for key in state_to_next_state_estimate_values.keys()))
        testdf['Action Estimates'] = list(chain.from_iterable([state_to_next_state_estimate_values[key][pos_key] for pos_key in state_to_next_state_estimate_values[key].keys()] for key in state_to_next_state_estimate_values.keys()))
        indexes = testdf[ (testdf['Object'] == 'Gold') & ((testdf['Position'] == 'Left') | (testdf['Position'] == 'Right') | (testdf['Position'] == 'Down'))  ].index
        testdf.drop(indexes, inplace = True)

        print(testdf)
        X, y = testdf[['Object', 'Position']], list(testdf['Action Estimates'])
        
        # Learn the general concepts.
        X, y = testdf[['Object', 'Position']], list(testdf['Action Estimates'])

        le = preprocessing.LabelEncoder()
        testdf['Object'] = le.fit_transform(testdf['Object'])
        X = pd.get_dummies(X['Position'])
        X['Object'] = testdf['Object']
        y = np.asarray(y)

        model = LinearRegression()
        model.fit(X, y)
        print('Left:\n')
        yhat = model.predict([[0, 1, 0, 0, 0]])
        print(yhat)
        yhat = model.predict([[0, 1, 0, 0, 1]])
        print(yhat)
        yhat = model.predict([[0, 1, 0, 0, 2]])
        print(yhat)
        yhat = model.predict([[0, 1, 0, 0, 3]])
        print(yhat)
        yhat = model.predict([[0, 1, 0, 0, 4]])
        print(yhat)

        print('\nRight:\n')
        yhat = model.predict([[0, 0, 1, 0, 0]])
        print(yhat)
        yhat = model.predict([[0, 0, 1, 0, 1]])
        print(yhat)
        yhat = model.predict([[0, 0, 1, 0, 2]])
        print(yhat)
        yhat = model.predict([[0, 0, 1, 0, 3]])
        print(yhat)
        yhat = model.predict([[0, 0, 1, 0, 4]])
        print(yhat)

        print('\nUp:\n')
        yhat = model.predict([[0, 0, 0, 1, 0]])
        print(yhat)
        yhat = model.predict([[0, 0, 0, 1, 1]])
        print(yhat)
        yhat = model.predict([[0, 0, 0, 1, 2]])
        print(yhat)
        yhat = model.predict([[0, 0, 0, 1, 3]])
        print(yhat)
        yhat = model.predict([[0, 0, 0, 1, 4]])
        print(yhat)

        print('\nDown:\n')
        yhat = model.predict([[1, 0, 0, 0, 0]])
        print(yhat)
        yhat = model.predict([[1, 0, 0, 0, 1]])
        print(yhat)
        yhat = model.predict([[1, 0, 0, 0, 2]])
        print(yhat)
        yhat = model.predict([[1, 0, 0, 0, 3]])
        print(yhat)
        yhat = model.predict([[1, 0, 0, 0, 4]])
        print(yhat)
        
    
    @staticmethod
    def policy(state, visited_states_, actions_taken_, previous_action, previous_state, next_state):
        list_of_positions = ['At', 'Left', 'Top Left', 'Up', 'Top Right', 'Right', 'Bottom Right', 'Down',
                             'Bottom Left']
        action_mapping = {'At': None, 'Left': 1, 'Top Left': 2, 'Up': 2, 'Top Right': 0, 'Right': 0, 'Bottom Right': 0,
                          'Down': 3, 'Bottom Left': 1}
        
        gold_found, wumpus_found = None, []
        pit_found, breeze_found, stench_found = [], [], []
        index = 0

        # Gold
        for i in range(9):
            if state[index + 4] == 1:
                gold_found = list_of_positions[i]

            if state[index + 3] == 1:
                wumpus_found.append(list_of_positions[i])

            if state[index + 2] == 1:
                pit_found.append(list_of_positions[i])

            if state[index + 1] == 1:
                stench_found.append(list_of_positions[i])

            if state[index + 0] == 1:
                breeze_found.append(list_of_positions[i])

            index += 5
            
        actions_to_not_take_1 = [0, 0, 0, 0]
        import copy

        actions_to_not_take = copy.deepcopy(actions_taken_)
        if np.array_equal(previous_state, next_state):
            actions_to_not_take[previous_action] = 1

        if not np.array_equal(previous_state, next_state):
            if previous_action == 0:
                actions_to_not_take[1] = 1
            if previous_action == 1:
                actions_to_not_take[0] = 1
            if previous_action == 2:
                actions_to_not_take[3] = 1
            if previous_action == 3:
                actions_to_not_take[2] = 1

        if gold_found:
            action = action_mapping[gold_found]
            
        else:
            if 'Left' in pit_found or 'Left' in wumpus_found:
                actions_to_not_take[1] = 1
            if 'Right' in pit_found or 'Right' in wumpus_found:
                actions_to_not_take[0] = 1
            if 'Up' in pit_found or 'Up' in wumpus_found:
                actions_to_not_take[2] = 1
            if 'Down' in pit_found or 'Down' in wumpus_found:
                actions_to_not_take[3] = 1
                
            possible_actions = []
            for i in range(len(actions_to_not_take)):
                if actions_to_not_take[i] == 0:
                    possible_actions.append(i)

            if len(possible_actions) > 0:
                action = np.random.choice(possible_actions)
 
            else:
                if 'Left' in pit_found or 'Left' in wumpus_found:
                    actions_to_not_take_1[1] = 1
                if 'Right' in pit_found or 'Right' in wumpus_found:
                    actions_to_not_take_1[0] = 1
                if 'Up' in pit_found or 'Up' in wumpus_found:
                    actions_to_not_take_1[2] = 1
                if 'Down' in pit_found or 'Down' in wumpus_found:
                    actions_to_not_take_1[3] = 1
                    
                possible_actions_1 = []
                for i in range(len(actions_to_not_take_1)):
                    if actions_to_not_take_1[i] == 0:
                        possible_actions_1.append(i)

                action = np.random.choice(possible_actions_1)

        return action

    def evaluate(self):
        """This method evaluates the performance of the agent after it has finished training."""

        total_steps, total_penalties = 0, 0  # Initializing the total steps taken and total penalties incurred
                                             # across all episodes.
        episodes = 100  # Number of episodes for which we are going to test the agent's performance.
        rewards_per_episode = []  # Sum of immediate rewards during the episode.
        gold = 0  # Counter to keep track of the episodes in which the agent reaches the Gold.
        cumulative_rewards_evaluation = []

        for episode in range(episodes):
            # Resetting the environment for every new episode.
            visited_states = {}
            state = self.test_environment.reset(random_start=True)
            previous_state_ = state
            next_state_ = None
#             self.test_environment.render(plot=True)
            visited_states[f'{state}'] = [0 for _ in range(self.test_environment.action_space.n)]
            actions_taken = visited_states[f'{state}']
            previous_action = None
            steps, penalties = 0, 0  # Initializing the steps taken, and penalties incurred in this episode.
            done = False  # Initializing the done parameter indicating the episode termination to be False.
            total_reward_episode = 0  # Initializing the total reward acquired in this episode to be 0.
        
            while not done:
                action = self.policy(state, visited_states, visited_states[f'{state}'], previous_action, previous_state_, next_state_)
                previous_action = action
                actions_taken = visited_states[f'{state}']
                for i in range(self.test_environment.action_space.n):
                    if i == action:
                        actions_taken[i] = 1
                visited_states[f'{state}'] = actions_taken

                # Taking the greedy action.
                next_state, reward, done, info = self.test_environment.step(action)
#                 self.test_environment.render(plot=True)
                total_reward_episode += reward  # Adding the reward acquired on this step to the total reward acquired
                                                # during the episode.

                # Incrementing the Gold counter when the agent reaches the Gold.
                if self.test_environment.agent_pos[0] == self.test_environment.gold_pos[0] and \
                        self.test_environment.agent_pos[1] == self.test_environment.gold_pos[1]:
                    gold += 1

                # Increasing the penalties incurred in this episode by checking the reward.
                if reward == -50 or reward == -100:
                    penalties += 1
                
                previous_state_ = state
                next_state_ = next_state
                state = next_state  # Setting the current state to the next state.
                if f'{state}' not in visited_states.keys():
                    visited_states[f'{state}'] = [0, 0, 0, 0]
    
                steps += 1  # Increasing the number of steps taken in this episode.

            rewards_per_episode.append(total_reward_episode)  # Appending the reward acquired during the episode.

            # Appending the cumulative reward.
            if len(cumulative_rewards_evaluation) == 0:
                cumulative_rewards_evaluation.append(total_reward_episode)
            else:
                cumulative_rewards_evaluation.append(
                    total_reward_episode + cumulative_rewards_evaluation[episode - 1])

            total_penalties += penalties  # Adding the penalties incurred in this episode to the total penalties
                                          # across all the episodes.

            total_steps += steps  # Adding the steps taken in this episode to the total steps taken across all episodes

        # Printing some statistics after the evaluation of agent's performance is completed.
        print(f"\nEvaluation of agent's performance across {episodes} episodes:")
        print(f"Average number of steps taken per episode: {total_steps / episodes}")
        print(f"Average penalties incurred per episode: {total_penalties / episodes}")
        print(f"Percentage of episodes in which the agent reaches the Gold: {(gold / episodes) * 100} %")

In [12]:
policy = Policy(wumpus_world_environment_testing)
primary_objective, primary_objective_reward = policy.identify_primary_objective()
print('Primary Objective:', primary_objective, 'Primary Objective Reward:', primary_objective_reward)
secondary_objective, secondary_objective_rewards = policy.identify_secondary_objectives()
print('\nSecondary Objective:', secondary_objective, 'Secondary Objective Reward:', secondary_objective_rewards)
objects_to_avoid, objects_to_avoid_rewards = policy.identify_objects_to_avoid()
print('\nObjects to Avoid:', objects_to_avoid, 'Objects to Avoid Rewards:', objects_to_avoid_rewards)
situations_unencountered = policy.identify_situations_previously_unencountered()
print('\nSituations Unencountered:', situations_unencountered)
print('\n')
policy.generalize_for_unencountered_situations()
policy.evaluate()

Primary Objective: Gold Primary Objective Reward: 999.9999999999649

Secondary Objective: [] Secondary Objective Reward: []

Objects to Avoid: ['Pit', 'Wumpus'] Objects to Avoid Rewards: [-49.9999999999997, -99.99999999999929]

Situations Unencountered: ['State At Pit', 'State At Wumpus', 'State At Gold', 'State Left Gold', 'State Top Right Wumpus', 'State Right Gold', 'State Bottom Right Wumpus', 'State Bottom Right Gold', 'State Down Gold', 'State Bottom Left Gold', 'Next State Bottom Right Gold', 'Next State Down Gold', 'Next State Bottom Left Gold']


    Object Position                                   Action Estimates
0   Breeze     Left  [0.9933726435135993, 0.0, 0.001206801242850667...
1   Breeze    Right  [0.0, 1.0003167200765528, 0.0, -2.129871412573...
2   Breeze       Up                 [0.0, 0.0, 0.999046587687463, 0.0]
3   Breeze     Down  [5.247949754243608e-05, 0.00034863611404790105...
6     Gold       Up  [1.3691023713525845e-05, 1.3738830057171007e-0...
8      Pit  