## AML Assignment -3

### Aishvarya S MDS202302

In [2]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
import time

# Environment Documentation
"""
Acrobot-v1:
- Observation Space: 6 continuous variables representing the angular velocities and angles of two links.
- Action Space: 3 discrete actions (torque applied to the joint: -1, 0, or +1).
- Rewards:
  - Base reward: -1 per step until the goal is achieved (environment's default reward).
  - Shaped reward: Additional positive reward proportional to the angular velocity of the second link, encouraging progress toward the goal. Specifically:
    Reward = -1 (default) + 10 * angular_velocity_of_second_link.
  - The total reward for an episode is accumulated over steps, with better rewards incentivizing faster achievement of the goal.

"""

# Hyperparameters
GAMMA = 0.98  # Discount factor for future rewards
LR = 5e-4  # Learning rate for optimizer
BATCH_SIZE = 64  # Number of samples per batch for training
MEMORY_SIZE = 10000  # Capacity of the replay memory buffer
EPSILON_START = 1.0  # Initial exploration rate (fully random actions)
EPSILON_END = 0.1  # Minimum exploration rate
EPSILON_DECAY = 1000  # Number of steps to decay epsilon
TARGET_UPDATE = 10  # Frequency to update target network
MAX_EPISODES = 1000  # Total number of episodes to train

class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 256),  # Fully connected layer, input_dim -> 256
            nn.ReLU(),
            nn.Linear(256, 256),  # Fully connected layer, 256 -> 256
            nn.ReLU(),
            nn.Linear(256, output_dim)  # Fully connected layer, 256 -> output_dim (actions)
        )

    def forward(self, x):
        return self.fc(x)
    
class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)  # Circular buffer for storing transitions

    def push(self, transition):
        """Store a transition in memory."""
        self.memory.append(transition)

    def sample(self, batch_size):
        """Sample a random batch of transitions."""
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)
    
def select_action(state, policy_net, epsilon, action_space):
    """Select an action using an epsilon-greedy strategy."""
    if random.random() < epsilon:
        return random.randint(0, action_space - 1)  # Explore: Random action
    with torch.no_grad():
        state_tensor = torch.FloatTensor(state).unsqueeze(0)  # Convert state to tensor
        return policy_net(state_tensor).argmax(dim=1).item()  # Exploit: Action with max Q-value
    
def train(policy_net, target_net, memory, optimizer):
    """Train the policy network using samples from replay memory."""
    if len(memory) < BATCH_SIZE:
        return

    # Sample a batch of transitions
    batch = memory.sample(BATCH_SIZE)
    states, actions, rewards, next_states, dones = zip(*batch)

    # Convert to tensors
    states = torch.FloatTensor(states)
    actions = torch.LongTensor(actions).unsqueeze(1)
    rewards = torch.FloatTensor(rewards)
    next_states = torch.FloatTensor(next_states)
    dones = torch.FloatTensor(dones)

    # Calculate Q-values for the current state-action pairs
    q_values = policy_net(states).gather(1, actions).squeeze()
    
    # Calculate the target Q-values using the target network
    next_q_values = target_net(next_states).max(1)[0]
    target_q_values = rewards + (GAMMA * next_q_values * (1 - dones))  # No future rewards if done

    # Compute the loss (Mean Squared Error)
    loss = nn.MSELoss()(q_values, target_q_values)
    
    # Update the policy network
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# Initialize the environment and parameters
env = gym.make("Acrobot-v1")
obs_space = env.observation_space.shape[0]  # Number of observation space variables (6)
action_space = env.action_space.n  # Number of possible actions (3)

# Initialize the networks and optimizer
policy_net = DQN(obs_space, action_space)  # Main network for learning
target_net = DQN(obs_space, action_space)  # Target network for stable updates
target_net.load_state_dict(policy_net.state_dict())  # Synchronize weights
target_net.eval()  # Set target network to evaluation mode
optimizer = optim.Adam(policy_net.parameters(), lr=LR)
memory = ReplayMemory(MEMORY_SIZE)

epsilon = EPSILON_START
trained_rewards = []  # Track rewards for each episode
best_episode_reward = float('-inf')  # Track the best episode reward
best_actions = []  # Store actions of the best episode

# Training loop
for episode in range(MAX_EPISODES):
    state, _ = env.reset(seed=123)  # Reset the environment
    total_reward = 0  # Initialize total reward for the episode
    actions_in_episode = []  # Store actions in the current episode

    for t in range(1000):  # Limit the number of steps in each episode
        action = select_action(state, policy_net, epsilon, action_space)  # Choose an action
        next_state, reward, done, truncated, _ = env.step(action)  # Take a step in the environment

        # Reward shaping to encourage progress
        reward += 10 * next_state[1]  # Add scaled velocity to reward for progress

        # Store the transition in replay memory
        memory.push((state, action, reward, next_state, float(done)))
        state = next_state  # Update the current state
        total_reward += reward  # Accumulate reward
        actions_in_episode.append(action)  # Store action taken

        train(policy_net, target_net, memory, optimizer)  # Train the policy network
        if done or truncated:  # End episode if the goal is achieved or environment resets
            break

    trained_rewards.append(total_reward)

    # Save the best episode if it has the highest reward
    if total_reward > best_episode_reward:
        best_episode_reward = total_reward
        best_actions = actions_in_episode.copy()

    # Update the target network periodically
    if episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

    # Decay epsilon for less exploration over time
    epsilon = max(EPSILON_END, epsilon - (EPSILON_START - EPSILON_END) / EPSILON_DECAY)

    # Log progress every 100 episodes
    if episode % 100 == 0:
        print(f"Episode {episode + 1}, Reward: {total_reward}, Epsilon: {epsilon:.2f}")

env.close()

print(f"Total reward for replayed best episode: {best_episode_reward}")


Episode 1, Reward: -489.92917716735974, Epsilon: 1.00
Episode 101, Reward: -486.7900119643309, Epsilon: 0.91
Episode 201, Reward: -439.64176238980144, Epsilon: 0.82
Episode 301, Reward: -309.76763312797993, Epsilon: 0.73
Episode 401, Reward: -414.92011077178176, Epsilon: 0.64
Episode 501, Reward: -364.4581777811982, Epsilon: 0.55
Episode 601, Reward: -289.28535945131443, Epsilon: 0.46
Episode 701, Reward: -272.36909495666623, Epsilon: 0.37
Episode 801, Reward: -235.46276442924864, Epsilon: 0.28
Episode 901, Reward: -216.27351706381887, Epsilon: 0.19
Total reward for replayed best episode: -107.37903186690528


In [3]:
random_env = gym.make("Acrobot-v1", render_mode=None)  # No rendering
random_rewards = []
for episode in range(100):
    state, _ = random_env.reset(seed=123)
    total_reward = 0
    for t in range(1000):
        action = random_env.action_space.sample()
        next_state, reward, done, truncated, _ = random_env.step(action)
        total_reward += reward
        if done or truncated:
            break
    random_rewards.append(total_reward)

random_env.close()

print("Comparison of Performance:")
print(f"Trained Agent Average Reward: {np.mean(trained_rewards[-100:]):.2f}")
print(f"Random Agent Average Reward: {np.mean(random_rewards):.2f}")


Comparison of Performance:
Trained Agent Average Reward: -201.91
Random Agent Average Reward: -499.06


In [4]:
import gymnasium as gym
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML

# Replay the best episode and capture frames for animation
env = gym.make('Acrobot-v1', render_mode='rgb_array')  # Use 'rgb_array' for frame capture
state, _ = env.reset(seed=123)
frames = []
total_reward = 0

for action in best_actions:
    state, reward, done, truncated, _ = env.step(action)
    total_reward += reward
    frames.append(env.render())  # Capture each frame
    if done or truncated:
        break

env.close()

# Create animation
fig, ax = plt.subplots()
img = ax.imshow(frames[0], animated=True)  # Initialize with an empty frame for animation
ax.axis('off')  # Turn off axis for cleaner visualization

def update(frame):
    img.set_data(frame)
    return [img]

ani = FuncAnimation(fig, update, frames=frames, interval=50)  # 50ms interval between frames

# Clear the figure display to show only the animation
plt.close(fig)

# Display the animation in the notebook
HTML(ani.to_jshtml())
