In [1]:
import numpy as np
import gymnasium as gym

# Setting the environment
env = gym.make("FrozenLake-v1", render_mode="ansi")


# Define hyperparameter values to test
#alpha_values
alpha_values = [0.05, 0.1, 0.9]
#gamma_values
gamma_values = [0.3, 0.5, 0.8]
#epsilon_values
epsilon_values = [0.3,0.6,0.9]

# Q Learning implementation
def q_learning(env, alpha=0.1, gamma=0.99, epsilon=0.5, num_episodes=10000, max_steps=100):
    # Initialize the Q-table with zeros
    q_table = np.zeros([env.observation_space.n, env.action_space.n])

    # Training the agent
    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False

        for step in range(max_steps):
            # Choose action using epsilon-greedy strategy
            if np.random.uniform(0, 1) < epsilon:
                action = env.action_space.sample()  # Explore: select a random action
            else:
                action = np.argmax(q_table[state, :])  # Exploit: select the action with max Q-value

            # Take action and observe the outcome
            new_state, reward, done, _, _ = env.step(action)

            # Update the Q-value
            q_table[state, action] = q_table[state, action] + alpha * (
                reward + gamma * np.max(q_table[new_state, :]) - q_table[state, action])

            state = new_state

            if done:
                break

    return q_table

# Evaluating the Q-Learning Agent
def evaluate_q_learning_agent(env, q_table, num_episodes=100):
    total_rewards = 0
    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False
        while not done:
            action = np.argmax(q_table[state, :])
            state, reward, done, _, _ = env.step(action)
            total_rewards += reward
    return total_rewards / num_episodes

# Evaluate alpha for each hyperparameter
for alpha in alpha_values:
    q_table = q_learning(env, alpha=alpha)
    performance = evaluate_q_learning_agent(env, q_table)
    print(f"At alpha {alpha} : Performance = {performance} ")
    # print(f"Performance with alpha={alpha}: {performance}")

# Evaluate gamma for each hyperparameter
for gamma in gamma_values:
    q_table = q_learning(env, gamma=gamma)
    performance = evaluate_q_learning_agent(env, q_table)
    print(f"At alpha {gamma} : Performance = {performance} ")
    # print(f"Performance with gamma={gamma}: {performance}")

# Evaluate epsilon for each hyperparameter
for epsilon in epsilon_values:
    q_table = q_learning(env, epsilon=epsilon)
    performance = evaluate_q_learning_agent(env, q_table)
    print(f"At epsilon {epsilon} : Performance = {performance} ")
    # print(f"Performance with epsilon={epsilon}: {performance}")

# Policy iteration implementation
def policy_evaluation(env, policy, gamma=0.99, theta=1e-10):
    value_table = np.zeros(env.observation_space.n)
    while True:
        delta = 0
        for state in range(env.observation_space.n):
            v = value_table[state]
            action = policy[state]
            value_table[state] = sum([prob * (reward + gamma * value_table[next_state])
                                      for prob, next_state, reward, _ in env.unwrapped.P[state][action]])
            delta = max(delta, abs(v - value_table[state]))
        if delta < theta:
            break
    return value_table

# Policy Improvement
def policy_improvement(env, value_table, gamma=0.99):
    policy = np.zeros(env.observation_space.n, dtype=int)
    for state in range(env.observation_space.n):
        q_values = np.zeros(env.action_space.n)
        for action in range(env.action_space.n):
            q_values[action] = sum([prob * (reward + gamma * value_table[next_state])
                                    for prob, next_state, reward, _ in env.unwrapped.P[state][action]])
        policy[state] = np.argmax(q_values)
    return policy

# Policy iteration
def policy_iteration(env, gamma=0.99, max_iters=1000):
    policy = np.random.choice(env.action_space.n, env.observation_space.n)
    for i in range(max_iters):
        old_policy = np.copy(policy)
        value_table = policy_evaluation(env, policy, gamma)
        policy = policy_improvement(env, value_table, gamma)
        if np.array_equal(policy, old_policy):
            break
    return policy

# Evaluating the Policy Iteration Agent
def evaluate_policy_iteration_agent(env, policy, num_episodes=100):
    total_rewards = 0
    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False
        while not done:
            action = policy[state]
            state, reward, done, _, _ = env.step(action)
            total_rewards += reward
    return total_rewards / num_episodes

# Compare Q-learning & Policy Iteration
# Policy Iteration
policy = policy_iteration(env)
policy_iteration_performance = evaluate_policy_iteration_agent(env, policy)
print(f"Policy Iteration Performance: {policy_iteration_performance}")
# Q-Learning
q_table = q_learning(env)
q_learning_performance = evaluate_q_learning_agent(env, q_table)
print(f"Q-Learning Performance: {q_learning_performance}")




At alpha 0.05 : Performance = 0.8 
At alpha 0.1 : Performance = 0.71 
At alpha 0.9 : Performance = 0.0 
At alpha 0.3 : Performance = 0.18 
At alpha 0.5 : Performance = 0.08 
At alpha 0.8 : Performance = 0.13 
At epsilon 0.3 : Performance = 0.84 
At epsilon 0.6 : Performance = 0.85 
At epsilon 0.9 : Performance = 0.78 
Policy Iteration Performance: 0.81
Q-Learning Performance: 0.9


In [1]:
import numpy as np
import gym
import tensorflow as tf
from tensorflow.keras import layers
from collections import deque
import random

# Hyperparameters
alpha = 0.001  # Learning rate
gamma = 0.99   # Discount factor
epsilon = 1.0  # Exploration rate
epsilon_min = 0.1
epsilon_decay = 0.995
batch_size = 32
num_episodes = 1000  # Reduce number of episodes for better run time 
replay_memory_size = 10000
target_update_freq = 1000
learning_start = 500

# Setting the environment
env = gym.make("CartPole-v1")

# Neural Network Model
def build_model(input_shape, action_space):
    model = tf.keras.Sequential([
        layers.Dense(24, activation='relu', input_shape=input_shape),
        layers.Dense(24, activation='relu'),
        layers.Dense(action_space, activation='linear')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=alpha), loss='mse')
    return model

# Experience Replay
class ReplayMemory:
    def __init__(self, max_size):
        self.memory = deque(maxlen=max_size)
    
    def add(self, experience):
        self.memory.append(experience)
    
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

# Training the agent with performance tracking
def train_agent():
    global epsilon
    input_shape = (env.observation_space.shape[0],)  # Shape of the observation space
    action_space = env.action_space.n
    model = build_model(input_shape, action_space)
    target_model = build_model(input_shape, action_space)
    target_model.set_weights(model.get_weights())
    replay_memory = ReplayMemory(replay_memory_size)

    def predict_action(state):
        state_input = np.expand_dims(state, axis=0)  # Ensure state is correctly shaped
        return np.argmax(model.predict(state_input))

    total_steps = 0
    rewards_per_episode = []
    epsilon_history = []

    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False
        total_reward = 0
        steps = 0

        while not done:
            if np.random.rand() <= epsilon:
                action = env.action_space.sample()  # Explore
            else:
                action = predict_action(state)  # Exploit

            next_state, reward, done, _, _ = env.step(action)
            
            replay_memory.add((state, action, reward, next_state, done))

            if len(replay_memory.memory) >= batch_size:
                experiences = replay_memory.sample(batch_size)
                states, actions, rewards, next_states, dones = zip(*experiences)
                
                target_q_values = model.predict(np.array(next_states))
                target_q_values_next = target_model.predict(np.array(next_states))
                targets = np.array(rewards) + gamma * np.max(target_q_values_next, axis=1) * np.logical_not(dones)
                
                with tf.GradientTape() as tape:
                    q_values = model(np.array(states))
                    actions_one_hot = tf.one_hot(actions, action_space)
                    q_values = tf.reduce_sum(q_values * actions_one_hot, axis=1)
                    loss = tf.reduce_mean(tf.square(targets - q_values))
                
                grads = tape.gradient(loss, model.trainable_variables)
                model.optimizer.apply_gradients(zip(grads, model.trainable_variables))
            
            state = next_state
            total_reward += reward
            total_steps += 1
            steps += 1

            if total_steps > learning_start and total_steps % target_update_freq == 0:
                target_model.set_weights(model.get_weights())

        rewards_per_episode.append(total_reward)
        epsilon_history.append(epsilon)

        if epsilon > epsilon_min:
            epsilon *= epsilon_decay

        print(f"Episode {episode + 1}/{num_episodes} - Total Reward: {total_reward} - Steps: {steps} - Epsilon: {epsilon}")

    return model, rewards_per_episode, epsilon_history

# Evaluate the trained agent
def evaluate_agent(env, model, num_episodes=100):
    total_rewards = 0
    def predict_action(state):
        state_input = np.expand_dims(state, axis=0)  # Ensure state is correctly shaped
        return np.argmax(model.predict(state_input))
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False
        episode_reward = 0
        while not done:
            action = predict_action(state)  # Predict action based on state
            next_state, reward, done, _, _ = env.step(action)
            state = next_state
            episode_reward += reward
        total_rewards += episode_reward
    return total_rewards / num_episodes

# Metrics Calculation
def calculate_metrics(rewards_per_episode, epsilon_history):
    average_reward = np.mean(rewards_per_episode)
    steps_to_threshold = np.argmax(np.array(rewards_per_episode) >= 195)  # Example threshold for CartPole-v1
    final_epsilon = epsilon_history[-1]
    return average_reward, steps_to_threshold, final_epsilon

# Train the agent
model, rewards_per_episode, epsilon_history = train_agent()

# Evaluate the trained agent
performance = evaluate_agent(env, model)
print(f"\nAverage Performance (Final 100 Episodes): {performance}")

# Calculate and display metrics
average_reward, steps_to_threshold, final_epsilon = calculate_metrics(rewards_per_episode, epsilon_history)
print(f"\nMetrics:")
print(f"Average Reward per Episode: {average_reward}")
print(f"Episodes to Reach Threshold (195 Reward): {steps_to_threshold}")
print(f"Final Epsilon Value: {final_epsilon}")


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
  if not isinstance(terminated, (bool, np.bool8)):


Episode 1/10 - Total Reward: 21.0 - Steps: 21 - Epsilon: 0.995
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 118ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 70ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 33ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 33ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
