In [None]:
## Import Libraries
import numpy as np
import gym
import random
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
from IPython.display import clear_output

In [None]:
## Initialize the CartPole environment
env = gym.make('CartPole-v1')

In [None]:
## Set up DQN parameters
state_space_size = env.observation_space.shape[0]
action_space_size = env.action_space.n

In [None]:
# Hyperparameters
num_episodes = 1000
max_steps_per_episode = 200
learning_rate = 0.001
discount_rate = 0.99
exploration_rate = 1
max_exploration_rate = 1
min_exploration_rate = 0.01
exploration_decay_rate = 0.001
batch_size = 64
memory_size = 10000
target_update_freq = 10

In [None]:
# Define the Q-network
class QNetwork(nn.Module):
    def __init__(self, state_space_size, action_space_size):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_space_size, 24)
        self.fc2 = nn.Linear(24, 24)
        self.fc3 = nn.Linear(24, action_space_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [None]:
model = QNetwork(state_space_size, action_space_size)
target_model = QNetwork(state_space_size, action_space_size)
target_model.load_state_dict(model.state_dict())  # Initialize target model weights
memory = deque(maxlen=memory_size)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.MSELoss()
rewards_all_episodes = []

In [None]:
## Training the DQN
def train_dqn():
    global exploration_rate
    global target_model

    for episode in range(num_episodes):
        state = env.reset()
        state = torch.tensor(np.reshape(state, [1, state_space_size]), dtype=torch.float32)
        done = False
        rewards_current_episode = 0

        for step in range(max_steps_per_episode):
            # Exploration-exploitation trade-off
            if np.random.rand() <= exploration_rate:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    q_values = model(state)
                action = torch.argmax(q_values[0]).item()
            
            next_state, reward, done, _ = env.step(action)
            next_state = torch.tensor(np.reshape(next_state, [1, state_space_size]), dtype=torch.float32)
            reward = torch.tensor([reward], dtype=torch.float32)
            done = torch.tensor([done], dtype=torch.float32)
            memory.append((state, action, reward, next_state, done))
            state = next_state
            rewards_current_episode += reward.item()

            if len(memory) > batch_size:
                minibatch = random.sample(memory, batch_size)
                for state_b, action_b, reward_b, next_state_b, done_b in minibatch:
                    target = reward_b
                    if not done_b:
                        with torch.no_grad():
                            target += discount_rate * torch.max(target_model(next_state_b)[0])
                    target_f = model(state_b)
                    target_f[0][action_b] = target
                    output = model(state_b)
                    loss = criterion(output, target_f)
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()
            
            if done:
                break

        if episode % target_update_freq == 0:
            target_model.load_state_dict(model.state_dict())  # Update target model

        exploration_rate = min_exploration_rate + \
            (max_exploration_rate - min_exploration_rate) * np.exp(-exploration_decay_rate*episode)
        
        rewards_all_episodes.append(rewards_current_episode)



In [None]:
## Run training
train_dqn()

In [None]:
## Calculate and print the average reward per thousand episodes
rewards_per_thousand_episodes = np.split(np.array(rewards_all_episodes), num_episodes/1000)
count = 1000

print("Average reward per thousand episodes")
for r in rewards_per_thousand_episodes:
    print(count, ": ", str(sum(r/1000)))
    count += 1000

In [None]:
## Plotting the results
plt.figure(figsize=(12, 6))
plt.plot(range(num_episodes), rewards_all_episodes)
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('Reward vs Episode')
plt.show()

In [None]:
## Plot Q-values as a heatmap for a given state
def plot_q_values_heatmap_for_state(model, state):
    with torch.no_grad():
        q_values = model(state).numpy()[0]
    
    plt.figure(figsize=(8, 8))
    sns.heatmap(np.reshape(q_values, (1, -1)), cmap='viridis', annot=True, fmt=".1f", cbar=True)
    plt.title('Q-values for Given State')
    plt.xlabel('Action')
    plt.ylabel('Q-value')
    plt.show()

In [None]:
## Visualize the agent's performance
def visualize_agent_performance(env, model):
    state = env.reset()
    state = torch.tensor(np.reshape(state, [1, state_space_size]), dtype=torch.float32)
    done = False
    total_reward = 0
    path = []

    for _ in range(max_steps_per_episode):
        with torch.no_grad():
            q_values = model(state)
        action = torch.argmax(q_values[0]).item()
        next_state, reward, done, _ = env.step(action)
        next_state = torch.tensor(np.reshape(next_state, [1, state_space_size]), dtype=torch.float32)
        total_reward += reward
        state = next_state
        path.append((state.numpy(), action))

        if done:
            break

    print(f"Total Reward: {total_reward}")

    # Plot the path
    plt.figure(figsize=(12, 6))
    plt.plot([s[0][0] for s, _ in path], label='Cart Position')
    plt.xlabel('Time Steps')
    plt.ylabel('Position')
    plt.title('Agent\'s Cart Position Over Time')
    plt.legend()
    plt.show()



In [None]:
## Run the visualization
visualize_agent_performance(env, model)