In [None]:
import numpy as np
import random
from collections import defaultdict
import matplotlib.pyplot as plt
from frozenlake_env import FrozenLakeEnv


In [None]:
# set random seed for reproducibility
np.random.seed(25)
random.seed(25)

In [None]:
# define hyperparameters
GAMMA = 0.99  # discount factor
#EPSILON = 0.1  # exploration rate
EPISODES = 2000  # no. of episodes
#EPSILON_DECAY = 0.995  # epsilon decay rate
#EPSILON_MIN = 0.01  # minimum epsilon
GRID_SIZE = 10  # grid size (4x4 or 10x10)
HOLE_FRACTION = 0.25  # fraction of holes
USE_DEFAULT_MAP = True  # use default map if available

# create environment
env = FrozenLakeEnv(grid_size=GRID_SIZE, hole_fraction=HOLE_FRACTION, use_default_map=USE_DEFAULT_MAP)


class MonteCarloControl: 
    def __init__(self, env, gamma=GAMMA):
        self.env = env
        self.gamma = gamma
        self.Q = defaultdict(lambda: np.zeros(len(env.actions)))
        self.returns = defaultdict(list)
        self.episode_rewards = []
        self.record_goal = []
        self.record_fail = []
        self.record_path_length = []

    # no epsilon greedy behaviour policy, choose the greedy action (no exploration)

    def choose_action(self, state):
        return max(self.env.actions.keys(), key=lambda a: self.Q[state][a])  # Exploit

    def train(self, episodes):
        for episode in range(episodes):
            state = self.env.reset()
            episode_history = []
            episode_rewards = 0
            done = False
            path_length = 0

            # generate episode
            while not done:
                action = self.choose_action(state)
                next_state, reward, done = self.env.step(action)
                episode_history.append((state, action, reward))
                episode_rewards += reward
                state = next_state
                path_length += 1

            self.episode_rewards.append(episode_rewards)
            self.record_path_length.append(path_length)

            if reward == 1:
                self.record_goal.append(1)
                self.record_fail.append(0)
            else:
                self.record_goal.append(0)
                self.record_fail.append(1)

            # update Q-values using first-visit MC
            G = 0
            visited_state_actions = set()
            for t in range(len(episode_history) - 1, -1, -1):
                state, action, reward = episode_history[t]
                G = self.gamma * G + reward
                if (state, action) not in visited_state_actions:
                    visited_state_actions.add((state, action))
                    self.returns[(state, action)].append(G)
                    self.Q[state][action] = np.mean(self.returns[(state, action)])

        return self.Q, self.episode_rewards
    
    # Save the Q-table to a plain text file.
    def save_q_table_text(self, filename):
        with open(filename, 'w') as f:
            for state, actions in self.Q.items():
                action_values = ' '.join(map(str, actions))
                f.write(f"{state} {action_values}\n")
                
    # Load the Q-table from a plain text file.
    def load_q_table_text(self, filename):
        self.Q = defaultdict(lambda: np.zeros(len(env.actions)))
        with open(filename, 'r') as f:
            for line in f:
                parts = line.strip().split()
                state = int(parts[0])
                actions = np.array([float(x) for x in parts[1:]])
                self.Q[state] = actions   
    
    def visualize_policy(self, Q, env):
        policy = np.zeros((env.grid_size, env.grid_size), dtype=str)
        for state in Q:
            row, col = state // env.grid_size, state % env.grid_size
            action = np.argmax(Q[state])
            policy[row, col] = env.actions[action][0]  # Use first letter of action (U, D, R, L)
        print("Learned Policy:")
        print(policy)


    def test_policy(self, Q, env, episodes=10):
        total_rewards = 0
        for episode in range(episodes):
            state = env.reset()
            done = False
            episode_reward = 0
            while not done:
                action = np.argmax(Q[state])  
                next_state, reward, done = env.step(action)
                episode_reward += reward
                state = next_state
            total_rewards += episode_reward
            print(f"Episode {episode + 1}: Reward = {episode_reward}")
        print(f"Average Reward over {episodes} episodes: {total_rewards / episodes}")

    def visualize_value_function(self, Q, env):
        value_function = np.zeros((env.grid_size, env.grid_size))
        for state in Q:
            row, col = state // env.grid_size, state % env.grid_size
            value_function[row, col] = np.max(Q[state])
        print("Value Function:")
        print(value_function)

    def render_policy(self, Q, env):
        state = self.env.reset()
        done = False
        while not done:
            env.render()
            action = np.argmax(Q[state])  # Greedy policy
            next_state, reward, done = env.step(action)
            state = next_state
        env.final()  # Show the final path



In [None]:
# Function to plot the performance of the robot in the figures
def plot_results(record_goal, record_fail, record_path_length, reward_list, performance_bar):
    fig = plt.figure()
    plt.subplots_adjust(wspace=0.5, hspace=0.7)
    f1 = fig.add_subplot(2, 2, 1)
    f2 = fig.add_subplot(2, 2, 2)
    f3 = fig.add_subplot(2, 2, 3)
    f4 = fig.add_subplot(2, 2, 4)

    # Plot the No. of Successful episodes
    f1.plot(range(len(record_goal)), record_goal, color='red')
    f1.set_title("Successful Episodes")
    f1.set_xlabel("Number of trained episodes")
    f1.set_ylabel("Sucessful Episodes")

    # Plot the No. of Failing episodes
    f2.plot(range(len(record_fail)), record_fail, color='orange')
    f2.set_title("Failing Episodes")
    f2.set_xlabel("Number of trained episodes")
    f2.set_ylabel("Failed Episodes")

    # Plot the path length
    f3.plot(range(len(record_path_length)), record_path_length, color='blue')
    f3.set_title("Successful Path Length")
    f3.set_xlabel("Number of trained episodes")
    f3.set_ylabel("Path Length")

    # Plot the episode reward
    f4.plot(range(len(reward_list)), reward_list, color='yellow')
    f4.set_title("Episode reward")
    f4.set_xlabel("Number of trained episodes")
    f4.set_ylabel("Episode reward")

    plt.figure()
    performance_list = ['Success', 'Fail']
    color_list = ['blue', 'red']
    plt.bar(np.arange(len(performance_bar)), performance_bar, tick_label=performance_list, color=color_list)
    plt.title('Bar/Success and Fail')
    plt.ylabel('Numbers')

    # Show the figures
    plt.show()

In [None]:
print("Training with Monte Carlo Control...")
mc_agent = MonteCarloControl(env)
Q_mc, mc_rewards = mc_agent.train(EPISODES)
print("Training complete!")

# save the Q-table
mc_agent.save_q_table_text('q_table_mc_4x4.txt')
print("Q-table saved to 'q_table_mc_4x4.txt'")

# load the Q-table (optional, if want to test without retraining)
mc_agent.load_q_table_text('q_table_mc_4x4.txt')
print("Q-table loaded from 'q_table_mc4x4.txt'")

# visualize the policy 
print("visualize policy")
mc_agent.visualize_policy(Q_mc, env)  
# test the policy
print("test policy")
mc_agent.test_policy(Q_mc, env)
# visualize the value function
print("visualize value function")
mc_agent.visualize_value_function(Q_mc, env)
# render the policy
print("render policy")
mc_agent.render_policy(Q_mc, env)

# plot cumulative rewards
plt.figure(figsize=(12, 6))
plt.plot(np.cumsum(mc_rewards), label=f"Monte Carlo Control (Map Size: {GRID_SIZE}x{GRID_SIZE})")
plt.title("Cumulative Rewards Over Episodes")
plt.xlabel("Episodes")
plt.ylabel("Cumulative Reward")
plt.legend()
plt.grid()
plt.show()

# Plot moving average of rewards
window_size = 10  # Moving average window
plt.figure(figsize=(12, 6))
plt.plot(np.convolve(mc_rewards, np.ones(window_size)/window_size, mode='valid'),
         label=f"Monte Carlo Control (Map Size: {GRID_SIZE}x{GRID_SIZE})")
plt.title(f"Moving Average of Rewards (Window Size = {window_size})")
plt.xlabel("Episodes")
plt.ylabel("Average Reward")
plt.legend()
plt.grid()
plt.show()

# Collect data for plotting
record_goal = mc_agent.record_goal
record_fail = mc_agent.record_fail
record_path_length = mc_agent.record_path_length
reward_list = mc_rewards
performance_bar = [sum(record_goal), sum(record_fail)]

# Call the plot_results function
plot_results(record_goal, record_fail, record_path_length, mc_rewards, performance_bar)

In [None]:
# Plot Steps per Episode
plt.figure(figsize=(12, 6))
plt.plot(record_path_length, label="Steps per Episode", color="blue")
plt.title("Steps Taken per Episode")
plt.xlabel("Episodes")
plt.ylabel("Steps")
plt.legend()
plt.grid()
plt.show()

# Plot Success Rate (Rolling Average)
window_size = 10  # Rolling window to smooth success rate
rolling_success = np.convolve(record_goal, np.ones(window_size)/window_size, mode='valid')

plt.figure(figsize=(12, 6))
plt.plot(rolling_success, label="Success Rate (Rolling Avg)", color="green")
plt.title("Moving Average of Success Rate (Window Size = 10)")
plt.xlabel("Episodes")
plt.ylabel("Success Rate")
plt.legend()
plt.grid()
plt.show()

final_success_rate = sum(mc_agent.record_goal) / len(mc_agent.record_goal) * 100
print(f"Final Success Rate: {final_success_rate:.2f}%")