In [22]:
import numpy as np
import gymnasium as gym
from gymnasium.envs.toy_text.frozen_lake import generate_random_map
import pandas as pd
import time
from tqdm import tqdm

In [23]:
desc=["SFFF", "FHHH", "FFFF", "HFHF", "FFGF"]
# env = gym.make('FrozenLake-v1', desc=desc, map_name="5x4", is_slippery=False, render_mode='human')
env = gym.make('FrozenLake-v1', desc=desc, map_name="5x4", is_slippery=False)
observation, info = env.reset()

In [24]:
# Custom rewards
custom_rewards = {
    'S': 0.0,  # Reward for frozen tiles (very small positive reward)
    'F': -0.75,  # Reward for falling in a hole (negative reward)
    'G': 1.0,   # Reward for reaching the goal (the "gift" state)
}

# Map custom rewards to the environment's reward table
env.env.rewards = custom_rewards

In [25]:
# Custom policy to avoid edges
def custom_policy(state):
    if state % 4 == 0:  # Agent is at leftmost column
        return [1, 2, 3]  # Avoid going left
    elif state % 4 == 3:  # Agent is at rightmost column
        return [0, 1, 3]  # Avoid going right
    elif state < 4:  # Agent is at top row
        return [0, 1, 2]  # Avoid going up
    elif state > 15:  # Agent is at bottom row
        return [0, 2, 3]  # Avoid going down
    else:
        return [0, 1, 2, 3]  # All actions are allowed

In [26]:
# Define a function to calculate average rewards over a window of episodes
def calculate_average_rewards(rewards_list, window_size):
    if len(rewards_list) < window_size:
        return None  # Insufficient data to calculate average
    else:
        return np.mean(rewards_list[-window_size:])  # Calculate average over the specified window

# Specify convergence criteria based on average rewards stability
def check_convergence(rewards_list, window_size, threshold):
    average_rewards = calculate_average_rewards(rewards_list, window_size)
    if average_rewards is not None:
        return np.abs(average_rewards - rewards_list[-1]) < threshold  # Check if average rewards change is below threshold
    else:
        return False  # Insufficient data for convergence check

In [27]:
# Hyperparameters
learning_rate = 0.5
discount_factor = 0.95
epsilon = 0.9
max_exploration_rate = 1.0
min_exploration_rate = 0.01
exploration_decay_rate = 0.001
num_episodes = 1000
window_size = 200  # Number of episodes to consider for calculating average rewards
threshold = 0.05  # Threshold for considering average rewards stable

In [28]:
train_successrates = []
train_episodes = []
train_time = []
test_successrates = []
test_steps = []

In [29]:
def test_sarsa():
    numSuccesses = 0
    numTestEpisodes = 10000
    steps = 0
    for episode in range(numTestEpisodes):
        state_tuple = env.reset()  # State is a tuple
        state = state_tuple[0]  # Extract the integer state value
        done = False

        while not done:
            steps += 1  # Increment step counter for each step taken
            action = np.argmax(Q[state, :])

            # Take action and observe the next state, reward, done flag, and info
            step_result = env.step(action)

            next_state = step_result[0]  # Extract the next state tuple
            reward = step_result[1]
            terminated = step_result[2]  # Extract the done flags
            truncated = step_result[3] # Extract the done flags
            done = terminated or truncated

            state = next_state

            if done and reward > 0.9:
                numSuccesses += 1

    averageSteps = steps/numTestEpisodes
    testSuccessRate = numSuccesses/numTestEpisodes
    test_successrates.append(testSuccessRate)
    test_steps.append(averageSteps)

In [30]:
for test in tqdm(range(1000), desc='Training Progress'):
    # Initialize Q-table with zeros
    Q = np.random.rand(env.observation_space.n, env.action_space.n) * 0.01
    numSuccesses = 0
    converged = False
    episode = 0
    rewards_list = []
    start_time = time.time()  # Start time of training
    while not converged and episode < num_episodes:
        episode += 1
        state_tuple = env.reset()  # State is a tuple
        state = state_tuple[0]  # Extract the integer state value
        done = False
        # Reset state visits count for the new episode
        state_visits = {s: 0 for s in range(env.observation_space.n)}

        while not done:
            # Choose action using epsilon-greedy policy
            if np.random.rand() < epsilon:
                action = np.random.choice(custom_policy(state))  # Custom policy
            else:
                action = np.argmax(Q[state, :])

            # print("Episode:", episode)
            # print("Action:", action)

            # Take action and observe the next state, reward, done flag, and info
            step_result = env.step(action)

            next_state = step_result[0]  # Extract the next state tuple
            reward = step_result[1]  # Extract the reward
            terminated = step_result[2]  # Extract the done flags
            truncated = step_result[3] # Extract the done flags
            done = terminated or truncated

            # Update state visits count
            state_visits[next_state] += 1

            # Calculate penalty for visiting the same state
            visit_penalty = -0.01 * (2 ** state_visits[next_state])

            # Check if the agent stayed in the same state
            if next_state == state:
                reward = visit_penalty
            else:
                # Check for falling into the ice
                if terminated and reward == 0:
                    reward = custom_rewards["F"]  # Penalty for falling into the ice
                elif done and reward > 0.9:
                    numSuccesses += 1
                elif not terminated:
                    reward = custom_rewards["S"]  # Reward for a safe move
                reward += visit_penalty  # Add penalty for repeated visits

            prev_Q = Q.copy()  # Store previous Q-values for convergence check

            # Update Q-value using SARSA formula
            next_action = np.argmax(Q[next_state, :])
            Q[state, action] = Q[state, action] + learning_rate * (reward + discount_factor * Q[next_state, next_action] - Q[state, action])

            rewards_list.append(reward)  # Append episode reward to the list

            # Check for convergence based on average rewards
            if check_convergence(rewards_list, window_size, threshold):
                converged = True
                break

            # Decay the exploration rate
            epsilon = max(min_exploration_rate, epsilon * exploration_decay_rate)

            # print("Step Result:", step_result)
            # print("State Tuple:", state_tuple)
            # print("State:", state)
            # print("New State:", next_state)
            # print("Next Action:", next_action)
            # print("Reward:", reward)
            # print("Done:", done)
            # print("New Q-Value", Q[state, action])
            # print("--------NEXT--------")

            state = next_state

    end_time = time.time()  # End time of training
    training_time = end_time - start_time  # Calculate training time in seconds

    train_episodes.append(episode)
    train_time.append(training_time)
    trainSuccessRate = numSuccesses/num_episodes
    train_successrates.append(trainSuccessRate)

    test_sarsa()

Training Progress:   1%|▏         | 13/1000 [00:08<10:02,  1.64it/s]

: 

In [None]:
env.close()

In [None]:
df = pd.DataFrame({'Train Episodes': train_episodes, 'Train Success Rates': train_successrates, 'Train Time': train_time, 'Test Steps': test_steps, 'Test Success Rates': test_successrates})

In [None]:
df.to_csv('sarsa.csv')