#### The Frozen Lake Environment

(a) Create the Frozen Lake environment with a 4×4 grid using ’FrozenLake-v1’ version.

In [2]:
# Import the neccesary libraries
import gymnasium as gym
import numpy as np
import pandas as pd

In [3]:
# Create the FrozenLake-v1 environment with a 4x4 grid
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=True)

# Reset the environment to get the initial state
state, info = env.reset()

- S - Start tile
- G - Goal tile
- F - frozen tile
- H - a tile with a hole

In [4]:
# Print basic info about the environment
print("Initial State:", state)
print("Environment Observation Space:", env.observation_space)
print("Environment Action Space:", env.action_space)

# Print the map layout
print(env.unwrapped.desc.astype(str))

Initial State: 0
Environment Observation Space: Discrete(16)
Environment Action Space: Discrete(4)
[['S' 'F' 'F' 'F']
 ['F' 'H' 'F' 'H']
 ['F' 'F' 'F' 'H']
 ['H' 'F' 'F' 'G']]


In [5]:
# from IPython.display import clear_output
# import time

# # Create environment with render_mode
# env = gym.make("FrozenLake-v1", render_mode="human")

# def render_and_clear():
#     clear_output(wait=True)
#     env.render()

# state, _ = env.reset()
# render_and_clear()

# done = False
# while not done:
#     action = env.action_space.sample()  # Random action
#     state, reward, terminated, truncated, _ = env.step(action)
#     done = bool(terminated) or bool(truncated)  # Explicit conversion to Python boolean
#     render_and_clear()
#     time.sleep(0.5)  # Add delay for better visibility

(b) Collect data from 10,000 episodes of agent interaction within the environment.

In [6]:
# Initialize environment
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=True)
goal_pos = (3, 3)  # Bottom-right corner (G)

# Helper function to convert state to (row, col)
def state_to_pos(state):
    return divmod(state, 4)  # For 4x4 grid

# Store data
data = []

n_episodes = 10000

for episode in range(n_episodes):
    state, _ = env.reset()
    done = False
    episode_reward = 0
    episode_data = []

    while not done:
        action = env.action_space.sample()  # Random policy
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        # Calculate goal proximity (Manhattan distance to (3, 3))
        row, col = state_to_pos(state)
        goal_distance = abs(goal_pos[0] - row) + abs(goal_pos[1] - col)

        # Store current step
        episode_data.append({
            "State": state,
            "Action": action,
            "Reward": reward,
            "Goal_Proximity": goal_distance
        })

        episode_reward += reward
        state = next_state

    # Add total reward to each step of the episode
    for entry in episode_data:
        entry["Total_Reward"] = episode_reward
        data.append(entry)

# Convert to DataFrame
df = pd.DataFrame(data)

# Optional: Save to CSV
df.to_csv("frozenlake_10000_episodes.csv", index=False)

# Show sample
print(df.head())

   State  Action  Reward  Goal_Proximity  Total_Reward
0      0       0     0.0               6           0.0
1      4       1     0.0               5           0.0
2      4       0     0.0               5           0.0
3      8       0     0.0               4           0.0
4      8       1     0.0               4           0.0


(c) Calculate the proportion of episodes where the agent achieves the goal.

In [7]:
# Get unique episodes by filtering at the end of each episode
# Since Total_Reward is constant for all steps in an episode, we can group by episodes

# Step 1: Split episodes by their ends using cumulative counter
episode_counter = 0
episode_rewards = []
current_reward = None
previous_state = None

for i in range(len(df)):
    if i == 0 or df.iloc[i]["State"] < df.iloc[i - 1]["State"]:  # naive check for episode restart
        episode_counter += 1
    current_reward = df.iloc[i]["Total_Reward"]
    if i == len(df) - 1 or df.iloc[i + 1]["Total_Reward"] != current_reward:
        episode_rewards.append(current_reward)

# Step 2: Calculate proportion of success
successes = sum(1 for r in episode_rewards if r == 1)
total_episodes = len(episode_rewards)
success_rate = successes / total_episodes

print(f"Agent reached the goal in {successes} out of {total_episodes} episodes.")
print(f"Proportion of success: {success_rate:.4f}")

Agent reached the goal in 161 out of 323 episodes.
Proportion of success: 0.4985


 (d) Create a value representing the importance of action a taken at the given state s to achieve the final goal.

In [8]:
# Load the dataset if needed
df = pd.read_csv("frozenlake_10000_episodes.csv")

# Group by State and Action, and calculate average Total_Reward as importance
importance_df = df.groupby(['State', 'Action'])['Total_Reward'].mean().reset_index()

importance_df.rename(columns={'Total_Reward': 'Importance'}, inplace=True)

print(importance_df.head(10))

   State  Action  Importance
0      0       0    0.019884
1      0       1    0.017623
2      0       2    0.016863
3      0       3    0.016648
4      1       0    0.008525
5      1       1    0.013410
6      1       2    0.012260
7      1       3    0.023576
8      2       0    0.037474
9      2       1    0.028405


(e) Train a suitable machine learning model to predict the state-action value for a given state s and action a.

In [9]:
# Create FrozenLake environment
env = gym.make('FrozenLake-v1', is_slippery=True)

# Initialize Q-table: states x actions
Q = np.zeros((env.observation_space.n, env.action_space.n))

# Hyperparameters
alpha = 0.8       # learning rate
gamma = 0.95      # discount factor
epsilon = 1.0     # exploration rate (epsilon-greedy)
epsilon_decay = 0.9995
epsilon_min = 0.01
num_episodes = 10000
max_steps_per_episode = 100

for episode in range(num_episodes):
    state, _ = env.reset()
    done = False

    for step in range(max_steps_per_episode):
        # Choose action using epsilon-greedy policy
        if np.random.random() < epsilon:
            action = env.action_space.sample()  # Explore: random action
        else:
            action = np.argmax(Q[state])        # Exploit: best known action

        # Take action and observe result
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        # Q-learning update rule
        best_next_action = np.argmax(Q[next_state])
        td_target = reward + gamma * Q[next_state][best_next_action]
        td_error = td_target - Q[state][action]
        Q[state][action] += alpha * td_error

        state = next_state

        if done:
            break

    # Decay epsilon to reduce exploration over time
    if epsilon > epsilon_min:
        epsilon *= epsilon_decay

# After training, Q contains the learned state-action values
print("Training finished.")

# Example: print Q-values for state 0
print(f"Q-values for state 0 (Left, Down, Right, Up) : {Q[0]}")

# Optional: Evaluate the learned policy's success rate
successes = 0
test_episodes = 1000

for _ in range(test_episodes):
    state, _ = env.reset()
    done = False
    for _ in range(max_steps_per_episode):
        action = np.argmax(Q[state])  # Always pick best action
        state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        if done:
            successes += reward  # reward=1 if reached goal
            break

print(f"Success rate over {test_episodes} test episodes: {successes / test_episodes:.4f}")

Training finished.
Q-values for state 0 (Left, Down, Right, Up) : [0.10331174 0.0631334  0.0627047  0.06376185]
Success rate over 1000 test episodes: 0.3690


(f) Design and implement an algorithm to guide the agent to achieve the goal using the predictive model you have trained in part (e).

In [10]:
def run_episode(env, Q, max_steps=100, render=False):
    """
    Run one episode following the policy derived from Q-table.

    Args:
        env: Gym environment
        Q: Trained Q-table (states x actions)
        max_steps: Max steps before terminating
        render: Whether to render the environment (visual)

    Returns:
        total_reward: cumulative reward received in the episode
        steps_taken: number of steps before termination
        success: boolean if goal was reached
    """
    state, _ = env.reset()
    total_reward = 0
    steps_taken = 0
    done = False

    while not done and steps_taken < max_steps:
        if render:
            env.render()

        # Select action greedily from Q-table
        action = np.argmax(Q[state])

        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        total_reward += reward
        steps_taken += 1
        state = next_state

    success = total_reward > 0  # Reward = 1 if goal reached, else 0
    return total_reward, steps_taken, success

# Example usage:
env = gym.make('FrozenLake-v1', is_slippery=True)

total_reward, steps, success = run_episode(env, Q, render=False)
print(f"Episode finished - Total Reward: {total_reward}, Steps Taken: {steps}, Success: {success}")

Episode finished - Total Reward: 0.0, Steps Taken: 23, Success: False


(g) Evaluate the performance of your algorithm based on 10000 episodes.

In [11]:
# Assume Q-table from part (e) is already trained and available as Q
env = gym.make('FrozenLake-v1', is_slippery=True)

def run_episode(env, Q, max_steps=100, render=False):
    state, _ = env.reset()
    total_reward = 0
    steps_taken = 0
    done = False

    while not done and steps_taken < max_steps:
        if render:
            env.render()
        action = np.argmax(Q[state])
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        total_reward += reward
        steps_taken += 1
        state = next_state

    success = total_reward > 0
    return total_reward, steps_taken, success

# Evaluation over 10,000 episodes
num_episodes = 10000
success_count = 0
total_rewards = 0
total_steps = 0

for _ in range(num_episodes):
    reward, steps, success = run_episode(env, Q, render=False)
    success_count += success
    total_rewards += reward
    total_steps += steps

print(f"Evaluation over {num_episodes} episodes:")
print(f"Success Rate: {success_count / num_episodes:.4f}")
print(f"Average Reward: {total_rewards / num_episodes:.4f}")
print(f"Average Steps Taken: {total_steps / num_episodes:.2f}")

Evaluation over 10000 episodes:
Success Rate: 0.3595
Average Reward: 0.3595
Average Steps Taken: 40.14


 (h) Improve your learning algorithm by incorporating the concept of exploration–exploitation trade-off.

In [12]:
env = gym.make('FrozenLake-v1', is_slippery=True)

Q = np.zeros((env.observation_space.n, env.action_space.n))

alpha = 0.8
gamma = 0.95
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.9998  # slower decay for better exploration
num_episodes = 10000
max_steps = 100

for episode in range(num_episodes):
    state, _ = env.reset()
    done = False

    for step in range(max_steps):
        # Epsilon-greedy action selection
        if np.random.random() < epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(Q[state])

        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        best_next_action = np.argmax(Q[next_state])
        td_target = reward + gamma * Q[next_state][best_next_action]
        td_error = td_target - Q[state][action]
        Q[state][action] += alpha * td_error

        state = next_state

        if done:
            break

    # Decay epsilon but not below minimum threshold
    epsilon = max(epsilon_min, epsilon * epsilon_decay)

print("Improved Q-learning with exploration-exploitation trade-off finished training.")

Improved Q-learning with exploration-exploitation trade-off finished training.


(i) Evaluate the performance of your improved learning algorithm based on 10000
 episodes and compare it with the algorithm implemented in part (f).

In [13]:
def run_episode(env, Q, max_steps=100, render=False):
    state, _ = env.reset()
    total_reward = 0
    steps_taken = 0
    done = False

    while not done and steps_taken < max_steps:
        if render:
            env.render()
        action = np.argmax(Q[state])
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        total_reward += reward
        steps_taken += 1
        state = next_state

    success = total_reward > 0
    return total_reward, steps_taken, success


# Environment
env = gym.make('FrozenLake-v1', is_slippery=True)

# --- Original Q-learning policy (part f) ---
# Assume Q_original is the Q-table you got from basic Q-learning without exploration decay
# For demonstration, suppose Q_original is already available (replace with your Q)
Q_original = np.zeros((env.observation_space.n, env.action_space.n))
# ... (load or train Q_original accordingly) ...

# --- Improved Q-learning with epsilon decay (part h) ---
# Train improved Q (already done)
Q_improved = np.zeros((env.observation_space.n, env.action_space.n))
alpha = 0.8
gamma = 0.95
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.9998
num_training_episodes = 10000
max_steps = 100

for episode in range(num_training_episodes):
    state, _ = env.reset()
    done = False
    for step in range(max_steps):
        if np.random.random() < epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(Q_improved[state])
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        best_next_action = np.argmax(Q_improved[next_state])
        td_target = reward + gamma * Q_improved[next_state][best_next_action]
        td_error = td_target - Q_improved[state][action]
        Q_improved[state][action] += alpha * td_error

        state = next_state
        if done:
            break
    epsilon = max(epsilon_min, epsilon * epsilon_decay)

# --- Evaluation function ---
def evaluate_policy(env, Q, episodes=10000):
    success_count = 0
    total_rewards = 0
    total_steps = 0
    for _ in range(episodes):
        reward, steps, success = run_episode(env, Q)
        success_count += success
        total_rewards += reward
        total_steps += steps
    return success_count / episodes, total_rewards / episodes, total_steps / episodes


# Evaluate original policy
success_orig, avg_reward_orig, avg_steps_orig = evaluate_policy(env, Q_original)
print(f"Original Q-learning policy over 10,000 episodes:")
print(f"Success rate: {success_orig:.4f}, Avg reward: {avg_reward_orig:.4f}, Avg steps: {avg_steps_orig:.2f}")

# Evaluate improved policy
success_impr, avg_reward_impr, avg_steps_impr = evaluate_policy(env, Q_improved)
print(f"Improved Q-learning policy over 10,000 episodes:")
print(f"Success rate: {success_impr:.4f}, Avg reward: {avg_reward_impr:.4f}, Avg steps: {avg_steps_impr:.2f}")

# Compare
print("\nComparison:")
print(f"Success rate improvement: {success_impr - success_orig:.4f}")
print(f"Average reward improvement: {avg_reward_impr - avg_reward_orig:.4f}")
print(f"Average steps difference: {avg_steps_impr - avg_steps_orig:.2f}")

Original Q-learning policy over 10,000 episodes:
Success rate: 0.0000, Avg reward: 0.0000, Avg steps: 17.87
Improved Q-learning policy over 10,000 episodes:
Success rate: 0.4088, Avg reward: 0.4088, Avg steps: 28.93

Comparison:
Success rate improvement: 0.4088
Average reward improvement: 0.4088
Average steps difference: 11.06
