In [None]:
#PART2 - Basic Parameters

import numpy as np
import matplotlib.pyplot as plt 

#Variables
BOARD_ROWS = 5 #Number of rows 
BOARD_COLS = 5 #Number of columns
WIN_STATE = (4, 4) #Position of the win state - bottom right corner
HOLES = [(1, 0), (1, 3), (3, 1), (4, 2)] #Position of the holes
START = (0, 0) #Starting position of the agent

DEBUG = False  # set to true to enable verbose output


class State:
    def __init__(self, state=START):        
        self.state = state
        self.isEnd = False        

    #Returns a reward of 10 if end state is reaches
    #A reward of -5 if a hole is fallen into
    #Or a reward of -1 if any other movement besides those 
    def get_reward(self):
        if self.state == WIN_STATE:
            return 10.0        
        elif self.state in HOLES:
            return -5.0
        else:
            return -1.0

        
    #Checks if an end state is reached - either falling into a hold or reaching the end state, if so, episode is ended   
    def is_end_func(self):
        if (self.state == WIN_STATE) or (self.state in HOLES):
            self.isEnd = True

            
    #Responsible for moving the agent in one of 4 directions - up,down,left or right
    def nxt_position(self, action):
        if action == 0:                
            nxt_state = (self.state[0] - 1, self.state[1]) #0 - moves agent up
        elif action == 1:
            nxt_state = (self.state[0] + 1, self.state[1]) #1 - moves agent down
        elif action == 2:
            nxt_state = (self.state[0], self.state[1] - 1) #2 - moves agent left
        else:
            nxt_state = (self.state[0], self.state[1] + 1) #3 - moves agent right
            
        #Makes sure that the movement is valid within the square, if its not, agent remains in the same position      
        if (nxt_state[0] >= 0) and (nxt_state[0] < BOARD_ROWS):
            if (nxt_state[1] >= 0) and (nxt_state[1] < BOARD_COLS):
                return nxt_state  
        return self.state  


class Agent:

    def __init__(self):
        self.states = []
        self.actions = [0, 1, 2, 3]  # up, down, left, right
        self.State = State()
        self.gamma = 0.9
        self.alpha = 0.5
        self.eps = 0.1
        self.episode_rewards = []  
        
       
        self.action_values = {}        
        for i in range(BOARD_ROWS):
            for j in range(BOARD_COLS):
                for k in range(len(self.actions)):
                    self.action_values[(i, j, k)] = 0.0 
        
        self.new_action_values = []
    
    #Selects an action 
    def choose_action(self, current_state):
        # choose action according to policy eps-greedy
        if np.random.uniform(0, 1) <= self.eps:
            action = np.random.choice(self.actions)
            if DEBUG:
                print("selecting random action")
        else:
            action = self.best_action(current_state)
        return action

    #executes the selected action and updates the current state of the agent
    def take_action(self, action):
        position = self.State.nxt_position(action)
        self.State.state = position

    #Shows the best action for a state based on Q-Learning method    
    def best_action(self, state):
        best = -1
        max_val = -100000000
        for a in self.actions:
            q_val = self.action_values[state[0], state[1], a]
            if q_val >= max_val:
                max_val = q_val
                best = a
        return best

    #Calculates max q-value for a state
    def q_max(self, state):
        best = self.best_action(state)
        return self.action_values[state[0], state[1], best]

    #Implements q-learning
    def q_learning(self, episodes):       
        #Q-learning implementation
        x = 0  
        while x < episodes:
            # Init S
            self.State.isEnd = False
            self.State.state = START  # Re init S Start state
            step = 0
            total_reward = 0  #Total reward per episode

            if DEBUG:
                print("**** Beginning episode", x, "****")
                self.show_values()

            while True:  # repeat for each step of the episode (until S is terminal)

                # Store current state for Q update
                current_state = (self.State.state[0], self.State.state[1])

                # Choose action A from S using policy derived from Q (e-greedy)
                action = self.choose_action(current_state)

                # Take action A observe R and next State S'
                self.take_action(action)
                reward = self.State.get_reward()
                total_reward += reward  # Accumulate reward for this episode
                self.State.is_end_func()
                next_state = self.State.state[0], self.State.state[1]

                # Update state action values
                old_q = self.action_values[current_state[0], current_state[1], action]
                max_q = self.q_max(next_state)
                new_q = old_q + self.alpha * (reward + self.gamma * max_q - old_q)
                self.action_values[current_state[0], current_state[1], action] = new_q

                step += 1
                if DEBUG:
                    print("step", step, "state", current_state, "action", action, "reward", reward,
                          "next_state", next_state, "old_q", old_q, "max_q", max_q, "new_q", new_q)

                # Check if s is terminal
                if self.State.isEnd:
                    self.episode_rewards.append(total_reward)  #Append total reward for episode
                    break

                # S <- S' automatically when I took the action                    

            x += 1           
            
    #Displays the actions in a square format
    def show_values(self):
        for i in range(0, BOARD_ROWS):
            print('-----------------------------------------------')
            out = '| '
            for j in range(0, BOARD_COLS):
                mx_nxt_value = -1000
                for a in self.actions:
                    nxt_value = self.action_values[(i, j, a)]
                    if nxt_value >= mx_nxt_value:
                        mx_nxt_value = nxt_value                
                out += str(round(mx_nxt_value, 3)).ljust(6) + ' | '
            print(out)
        print('-----------------------------------------------')
    
    
    #Outputs the max action value estimate for each state to a (.txt) file
    def max_action_value_output(self, filename):
            with open(filename, 'w') as f:
                for i in range(BOARD_ROWS):
                    for j in range(BOARD_COLS):
                        max_value = -float('inf')
                        for a in self.actions:
                            current_value = self.action_values[(i, j, a)]
                            if current_value > max_value:
                                max_value = current_value
                        f.write(f"State ({i}, {j}): {max_value}\n")    
                        
                        
   

                        
    
    def plot_rewards_per_episode(self):
        episode_numbers = range(1, len(self.episode_rewards) + 1)
        plt.figure(figsize=(15, 8))
        plt.plot(episode_numbers, self.episode_rewards, color='black')
        max_reward = max(self.episode_rewards)
        plt.axhline(y=max_reward, color='red', linewidth=3, linestyle='--', label=f'Max Reward: {max_reward}')
        avg_reward = self.average_reward()
        plt.axhline(y=avg_reward, color='green', linewidth=3, linestyle='--', label=f'Avg Reward: {avg_reward}')
        plt.xlabel('Episode')
        plt.ylabel('Reward')
        plt.title('Reward per Episode')
        plt.legend()
        plt.grid(True)
        plt.show()
        
        
      
        
    def average_reward(self):
        return np.mean(self.episode_rewards) if self.episode_rewards else 0.0
        
         
        
#Creates an agent, implements q-learning into it for a specified number of episodes, shows the
#results and outputs max action values to .txt file       
if __name__ == "__main__":
    ag = Agent()
    ag.q_learning(10000)
    ag.show_values()
    ag.max_action_value_output('C:/Users/Cathal/OneDrive/Masters/2nd Year/Sem 2/Agents, Multi-Agent Systems and Reinforcement Learning/Assignments/Assignment 2/Max Action Values (.txt)/Original_Parameters.txt')  #Save values to file
    ag.plot_rewards_per_episode()

In [None]:
#PART2 - Epsilon Decay Method with same parameters as previous method 

import numpy as np
import matplotlib.pyplot as plt 

#Variables
BOARD_ROWS = 5 #Number of rows 
BOARD_COLS = 5 #Number of columns
WIN_STATE = (4, 4) #Position of the win state - bottom right corner
HOLES = [(1, 0), (1, 3), (3, 1), (4, 2)] #Position of the holes
START = (0, 0) #Starting position of the agent

DEBUG = False  # set to true to enable verbose output


class State:
    def __init__(self, state=START):        
        self.state = state
        self.isEnd = False        

    #Returns a reward of 10 if end state is reaches
    #A reward of -5 if a hole is fallen into
    #Or a reward of -1 if any other movement besides those 
    def get_reward(self):
        if self.state == WIN_STATE:
            return 10.0        
        elif self.state in HOLES:
            return -5.0
        else:
            return -1.0

        
    #Checks if an end state is reached - either falling into a hold or reaching the end state, if so, episode is ended   
    def is_end_func(self):
        if (self.state == WIN_STATE) or (self.state in HOLES):
            self.isEnd = True

            
    #Responsible for moving the agent in one of 4 directions - up,down,left or right
    def nxt_position(self, action):
        if action == 0:                
            nxt_state = (self.state[0] - 1, self.state[1]) #0 - moves agent up
        elif action == 1:
            nxt_state = (self.state[0] + 1, self.state[1]) #1 - moves agent down
        elif action == 2:
            nxt_state = (self.state[0], self.state[1] - 1) #2 - moves agent left
        else:
            nxt_state = (self.state[0], self.state[1] + 1) #3 - moves agent right
            
        #Makes sure that the movement is valid within the square, if its not, agent remains in the same position      
        if (nxt_state[0] >= 0) and (nxt_state[0] < BOARD_ROWS):
            if (nxt_state[1] >= 0) and (nxt_state[1] < BOARD_COLS):
                return nxt_state  
        return self.state  


class Agent:

    def __init__(self):
        self.states = []
        self.actions = [0, 1, 2, 3]  # up, down, left, right
        self.State = State()
        self.gamma = 0.9
        self.alpha = 0.5
        self.eps = 1.0  #Initial value of epsilon
        self.min_eps = 0.0  #Minimum value of eps
        self.eps_decay = 0.0001  #Decay rate 
        self.episode_rewards = []  
        
        # initialise state values
        self.action_values = {}        
        for i in range(BOARD_ROWS):
            for j in range(BOARD_COLS):
                for k in range(len(self.actions)):
                    self.action_values[(i, j, k)] = 0.0  # set initial value to 0, for Q(s,a)
        
        self.new_action_values = []
    
    #Selects an action 
    def choose_action(self, current_state):
        # choose action according to policy eps-greedy
        if np.random.uniform(0, 1) <= self.eps:
            action = np.random.choice(self.actions)
            if DEBUG:
                print("selecting random action")
        else:
            action = self.best_action(current_state)
        return action

    #executes the selected action and updates the current state of the agent
    def take_action(self, action):
        position = self.State.nxt_position(action)
        self.State.state = position

    #Shows the best action for a state based on Q-Learning method    
    def best_action(self, state):
        best = -1
        max_val = -100000000
        for a in self.actions:
            q_val = self.action_values[state[0], state[1], a]
            if q_val >= max_val:
                max_val = q_val
                best = a
        return best

    #Calculates max q-value for a state
    def q_max(self, state):
        best = self.best_action(state)
        return self.action_values[state[0], state[1], best]

    #Implements q-learning
    def q_learning(self, episodes):       
        # Q-learning implementation
        x = 0  
        while x < episodes:
            # Init S
            self.State.isEnd = False
            self.State.state = START  # Re init S Start state
            step = 0
            total_reward = 0  #Total reward per episode

            if DEBUG:
                print("**** Beginning episode", x, "****")
                self.show_values()

            while True:  # repeat for each step of the episode (until S is terminal)

                # Store current state for Q update
                current_state = (self.State.state[0], self.State.state[1])

                # Choose action A from S using policy derived from Q (e-greedy)
                action = self.choose_action(current_state)

                # Take action A observe R and next State S'
                self.take_action(action)
                reward = self.State.get_reward()
                total_reward += reward  # Accumulate reward for this episode
                self.State.is_end_func()
                next_state = self.State.state[0], self.State.state[1]

                # Update state action values
                old_q = self.action_values[current_state[0], current_state[1], action]
                max_q = self.q_max(next_state)
                new_q = old_q + self.alpha * (reward + self.gamma * max_q - old_q)
                self.action_values[current_state[0], current_state[1], action] = new_q

                step += 1
                if DEBUG:
                    print("step", step, "state", current_state, "action", action, "reward", reward,
                          "next_state", next_state, "old_q", old_q, "max_q", max_q, "new_q", new_q)

                # Check if s is terminal
                if self.State.isEnd:
                    self.episode_rewards.append(total_reward)  #Append total reward for episode
                    break

                # S <- S' automatically when I took the action                    

            x += 1           
            
            # Decay epsilon
            self.eps = max(self.min_eps, self.eps - self.eps_decay)  #Decrease epsilon
            
            
            
            
            
            #Print out final epsilon values, uncomment the below print statement 
            #=====================================================================================================
            #print("Final Epsilon Value:", self.eps)      #NOTE - final value seems to be around 9.381755897326649e-14 == 0.00000000000009
            #=====================================================================================================
            
            
            
            
            
    #Displays the actions in a square format
    def show_values(self):
        for i in range(0, BOARD_ROWS):
            print('-----------------------------------------------')
            out = '| '
            for j in range(0, BOARD_COLS):
                mx_nxt_value = -1000
                for a in self.actions:
                    nxt_value = self.action_values[(i, j, a)]
                    if nxt_value >= mx_nxt_value:
                        mx_nxt_value = nxt_value                
                out += str(round(mx_nxt_value, 3)).ljust(6) + ' | '
            print(out)
        print('-----------------------------------------------')
    
    
    #Outputs the max action value estimate for each state to a (.txt) file
    def max_action_value_output(self, filename):
            with open(filename, 'w') as f:
                for i in range(BOARD_ROWS):
                    for j in range(BOARD_COLS):
                        max_value = -float('inf')
                        for a in self.actions:
                            current_value = self.action_values[(i, j, a)]
                            if current_value > max_value:
                                max_value = current_value
                        f.write(f"State ({i}, {j}): {max_value}\n")    
                        
                        
   

                        
    
    def plot_rewards_per_episode(self):
        episode_numbers = range(1, len(self.episode_rewards) + 1)
        plt.figure(figsize=(15, 8))
        plt.plot(episode_numbers, self.episode_rewards, color='black')
        max_reward = max(self.episode_rewards)
        plt.axhline(y=max_reward, color='red', linewidth=3, linestyle='--', label=f'Max Reward: {max_reward}')
        avg_reward = self.average_reward()
        plt.axhline(y=avg_reward, color='green', linewidth=3, linestyle='--', label=f'Avg Reward: {avg_reward}')
        plt.xlabel('Episode')
        plt.ylabel('Reward')
        plt.title('Reward per Episode')
        plt.legend()
        plt.grid(True)
        plt.show()
        
        
        
    def average_reward(self):
        return np.mean(self.episode_rewards) if self.episode_rewards else 0.0
        
        
#Creates an agent, implements q-learning into it for a specified number of episodes, shows the
#results and outputs max action values to .txt file       
if __name__ == "__main__":
    ag = Agent()
    ag.q_learning(10000)
    ag.show_values()
    ag.max_action_value_output('C:/Users/Cathal/OneDrive/Masters/2nd Year/Sem 2/Agents, Multi-Agent Systems and Reinforcement Learning/Assignments/Assignment 2/Max Action Values (.txt)/Epsilon_Decay.txt')  #Save values to file
    ag.plot_rewards_per_episode()

REFERENCES:
- Lecture Notes
- Matplotlib Documentation - https://matplotlib.org/stable/

In [None]:
#PART2 - Improved Parameters (Based on the first impplementation, just with adjusted hyperparamaters)

import numpy as np
import matplotlib.pyplot as plt 

#Variables
BOARD_ROWS = 5 #Number of rows 
BOARD_COLS = 5 #Number of columns
WIN_STATE = (4, 4) #Position of the win state - bottom right corner
HOLES = [(1, 0), (1, 3), (3, 1), (4, 2)] #Position of the holes
START = (0, 0) #Starting position of the agent

DEBUG = False  # set to true to enable verbose output


class State:
    def __init__(self, state=START):        
        self.state = state
        self.isEnd = False        

    #Returns a reward of 10 if end state is reaches
    #A reward of -5 if a hole is fallen into
    #Or a reward of -1 if any other movement besides those 
    def get_reward(self):
        if self.state == WIN_STATE:
            return 10.0        
        elif self.state in HOLES:
            return -5.0
        else:
            return -1.0

        
    #Checks if an end state is reached - either falling into a hold or reaching the end state, if so, game is ended   
    def is_end_func(self):
        if (self.state == WIN_STATE) or (self.state in HOLES):
            self.isEnd = True

            
    #Responsible for moving the agent in one of 4 directions - up,down,left or right
    def nxt_position(self, action):
        if action == 0:                
            nxt_state = (self.state[0] - 1, self.state[1]) #0 - moves agent up
        elif action == 1:
            nxt_state = (self.state[0] + 1, self.state[1]) #1 - moves agent down
        elif action == 2:
            nxt_state = (self.state[0], self.state[1] - 1) #2 - moves agent left
        else:
            nxt_state = (self.state[0], self.state[1] + 1) #3 - moves agent right
            
        #Makes sure that the movement is valid within the square, if its not, agent remains in the same position      
        if (nxt_state[0] >= 0) and (nxt_state[0] < BOARD_ROWS):
            if (nxt_state[1] >= 0) and (nxt_state[1] < BOARD_COLS):
                return nxt_state  
        return self.state  


class Agent:

    def __init__(self):
        self.states = []
        self.actions = [0, 1, 2, 3]  # up, down, left, right
        self.State = State()
        self.gamma = 0.6
        self.alpha = 0.75
        self.eps = 0.05
        self.episode_rewards = []  
        
        # initialise state values
        self.action_values = {}        
        for i in range(BOARD_ROWS):
            for j in range(BOARD_COLS):
                for k in range(len(self.actions)):
                    self.action_values[(i, j, k)] = 0.0  # set initial value to 0, for Q(s,a)
        
        self.new_action_values = []
    
    #Selects an action 
    def choose_action(self, current_state):
        # choose action according to policy eps-greedy
        if np.random.uniform(0, 1) <= self.eps:
            action = np.random.choice(self.actions)
            if DEBUG:
                print("selecting random action")
        else:
            action = self.best_action(current_state)
        return action

    #executes the selected action and updates the current state of the agent
    def take_action(self, action):
        position = self.State.nxt_position(action)
        self.State.state = position

    #Shows the best action for a state based on Q-Learning method    
    def best_action(self, state):
        best = -1
        max_val = -100000000
        for a in self.actions:
            q_val = self.action_values[state[0], state[1], a]
            if q_val >= max_val:
                max_val = q_val
                best = a
        return best

    #Calculates max q-value for a state
    def q_max(self, state):
        best = self.best_action(state)
        return self.action_values[state[0], state[1], best]

    #Implements q-learning
    def q_learning(self, episodes):       
        # Q-learning implementation
        x = 0  # episode counter
        while x < episodes:
            # Init S
            self.State.isEnd = False
            self.State.state = START  # Re init S Start state
            step = 0
            total_reward = 0  #Total reward per episode

            if DEBUG:
                print("**** Beginning episode", x, "****")
                self.show_values()

            while True:  # repeat for each step of the episode (until S is terminal)

                # Store current state for Q update
                current_state = (self.State.state[0], self.State.state[1])

                # Choose action A from S using policy derived from Q (e-greedy)
                action = self.choose_action(current_state)

                # Take action A observe R and next State S'
                self.take_action(action)
                reward = self.State.get_reward()
                total_reward += reward  #Accumulate reward for episode
                self.State.is_end_func()
                next_state = self.State.state[0], self.State.state[1]

                # Update state action values
                old_q = self.action_values[current_state[0], current_state[1], action]
                max_q = self.q_max(next_state)
                new_q = old_q + self.alpha * (reward + self.gamma * max_q - old_q)
                self.action_values[current_state[0], current_state[1], action] = new_q

                step += 1
                if DEBUG:
                    print("step", step, "state", current_state, "action", action, "reward", reward,
                          "next_state", next_state, "old_q", old_q, "max_q", max_q, "new_q", new_q)

                # Check if s is terminal
                if self.State.isEnd:
                    self.episode_rewards.append(total_reward)  #Append total reward for episode
                    break

                # S <- S' automatically when I took the action                    

            x += 1           
            
    #Displays the actions in a square format
    def show_values(self):
        for i in range(0, BOARD_ROWS):
            print('-----------------------------------------------')
            out = '| '
            for j in range(0, BOARD_COLS):
                mx_nxt_value = -1000
                for a in self.actions:
                    nxt_value = self.action_values[(i, j, a)]
                    if nxt_value >= mx_nxt_value:
                        mx_nxt_value = nxt_value                
                out += str(round(mx_nxt_value, 3)).ljust(6) + ' | '
            print(out)
        print('-----------------------------------------------')
    
    
    #Outputs the max action value estimate for each state to a (.txt) file
    def max_action_value_output(self, filename):
            with open(filename, 'w') as f:
                for i in range(BOARD_ROWS):
                    for j in range(BOARD_COLS):
                        max_value = -float('inf')
                        for a in self.actions:
                            current_value = self.action_values[(i, j, a)]
                            if current_value > max_value:
                                max_value = current_value
                        f.write(f"State ({i}, {j}): {max_value}\n")    
                        
                        
   

                        
    
    def plot_rewards_per_episode(self):
        episode_numbers = range(1, len(self.episode_rewards) + 1)
        plt.figure(figsize=(15, 8))
        plt.plot(episode_numbers, self.episode_rewards, color='black')
        max_reward = max(self.episode_rewards)
        plt.axhline(y=max_reward, color='red', linewidth=3, linestyle='--', label=f'Max Reward: {max_reward}')
        avg_reward = self.average_reward()
        plt.axhline(y=avg_reward, color='green', linewidth=3, linestyle='--', label=f'Avg Reward: {avg_reward}')
        plt.xlabel('Episode')
        plt.ylabel('Reward')
        plt.title('Reward per Episode')
        plt.legend()
        plt.grid(True)
        plt.show()
        
        
      
        
    def average_reward(self):
        return np.mean(self.episode_rewards) if self.episode_rewards else 0.0
        
         
        
#Creates an agent, implements q-learning into it for a specified number of episodes, shows the
#results and outputs max action values to .txt file       
if __name__ == "__main__":
    ag = Agent()
    ag.q_learning(10000)
    ag.show_values()
    ag.max_action_value_output('C:/Users/Cathal/OneDrive/Masters/2nd Year/Sem 2/Agents, Multi-Agent Systems and Reinforcement Learning/Assignments/Assignment 2/Max Action Values (.txt)/Improved_Parameters.txt')  #Save values to file
    ag.plot_rewards_per_episode()