In [5]:
import numpy as np
import tensorflow as tf
import gymnasium as gym
from collections import deque
import random
import matplotlib.pyplot as plt
from tqdm import tqdm
import time


# Create the environment
env = gym.make("Acrobot-v1", render_mode="human")


In [6]:

# Define the DQN model
def create_model(state_shape, action_shape):
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(64, activation='relu', input_shape=state_shape),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(action_shape, activation='linear')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mse')
    return model


In [7]:

# Define the DQN Agent
class DQNAgent:
    def __init__(self, state_shape, action_shape):
        self.state_shape = state_shape
        self.action_shape = action_shape
        self.model = create_model(state_shape, action_shape)
        self.target_model = create_model(state_shape, action_shape)
        self.target_model.set_weights(self.model.get_weights())
        self.replay_memory = deque(maxlen=20000)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01
        self.batch_size = 32

    def remember(self, state, action, reward, next_state, done):
        self.replay_memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return env.action_space.sample()
        q_values = self.model.predict(state, verbose=0)
        return np.argmax(q_values[0])

    def replay(self):
        if len(self.replay_memory) < self.batch_size:
            return
        minibatch = random.sample(self.replay_memory, self.batch_size)
        states = np.array([i[0] for i in minibatch])
        actions = np.array([i[1] for i in minibatch])
        rewards = np.array([i[2] for i in minibatch])
        next_states = np.array([i[3] for i in minibatch])
        dones = np.array([i[4] for i in minibatch])

        states = np.squeeze(states)
        next_states = np.squeeze(next_states)

        targets = self.model.predict(states, verbose=0)
        target_vals = self.target_model.predict(next_states, verbose=0)
        
        for i in range(self.batch_size):
            if dones[i]:
                targets[i][actions[i]] = rewards[i]
            else:
                targets[i][actions[i]] = rewards[i] + self.gamma * np.amax(target_vals[i])

        self.model.fit(states, targets, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def target_train(self):
        weights = self.model.get_weights()
        target_weights = self.target_model.get_weights()
        for i in range(len(target_weights)):
            target_weights[i] = weights[i]
        self.target_model.set_weights(target_weights)


In [None]:
agent = DQNAgent(env.observation_space.shape, env.action_space.n)
episodes = 1000
batch_size = 32

rewards = []
epsilons = []

render_interval = 50  # Render every 50 episodes
render_frames = 100  # Number of frames to render when rendering

for episode in tqdm(range(episodes)):
    state, _ = env.reset()
    state = np.reshape(state, [1, env.observation_space.shape[0]])
    done = False
    total_reward = 0
    frame_count = 0
    
    while not done:
        # Render only on specified intervals and for a limited number of frames
        if episode % render_interval == 0 and frame_count < render_frames:
            env.render()
            time.sleep(0.01)  # Small delay for visibility
        
        action = agent.act(state)
        next_state, reward, done, _, _ = env.step(action)
        next_state = np.reshape(next_state, [1, env.observation_space.shape[0]])
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward
        
        if len(agent.replay_memory) > batch_size:
            agent.replay()
        
        frame_count += 1
    
    if episode % 10 == 0:
        agent.target_train()
    
    rewards.append(total_reward)
    epsilons.append(agent.epsilon)
    
    if episode % 10 == 0:  # Print progress every 10 episodes
        print(f"Episode: {episode}, Total Reward: {total_reward}, Epsilon: {agent.epsilon}")

env.close()


  0%|          | 0/1000 [00:00<?, ?it/s]

In [None]:

# Visualization
plt.figure(figsize=(12, 6))
plt.subplot(2, 1, 1)
plt.plot(rewards)
plt.title('Total Reward per Episode')
plt.xlabel('Episode')
plt.ylabel('Total Reward')

plt.subplot(2, 1, 2)
plt.plot(epsilons)
plt.title('Epsilon per Episode')
plt.xlabel('Episode')
plt.ylabel('Epsilon')

plt.tight_layout()
plt.show()


In [None]:

# Test the trained agent
env = gym.make("Acrobot-v1", render_mode="human")
state, _ = env.reset()
state = np.reshape(state, [1, env.observation_space.shape[0]])
done = False
total_reward = 0

while not done:
    env.render()
    action = np.argmax(agent.model.predict(state, verbose=0)[0])
    next_state, reward, done, _, _ = env.step(action)
    state = np.reshape(next_state, [1, env.observation_space.shape[0]])
    total_reward += reward
    time.sleep(0.01)  # Add a small delay to make the rendering visible

print(f"Total reward in test episode: {total_reward}")

env.close()