In [12]:
pip install gym[atari] gym[accept-rom-license] ale-py




In [13]:
import gym
import tensorflow as tf
import numpy as np
import datetime
import random
from collections import deque
from tensorflow.keras import layers
import cv2
import matplotlib.pyplot as plt

In [14]:
# Hyperparameters
STATE_SHAPE = (84, 80)  # Height, Width
ACTION_SIZE = 4  # Number of actions
GAMMA = 0.99
LEARNING_RATE = 0.0001
EPSILON_INIT = 1.0
EPSILON_MIN = 0.1
EPSILON_DECAY = 0.995
EPISODES = 500
TARGET_UPDATE_RATE = 10
MINI_BATCH_SIZE = 8
REPLAY_BUFFER_CAPACITY = 50_000

In [15]:
# Set up environment
env = gym.make('PongDeterministic-v4')


In [16]:
# Frame Preprocessing Functions
def img_crop(img):
    return img[30:-12, :, :]  # Crop irrelevant parts (specific to Pong)

def downsample(img):
    return img[::2, ::2]  # Downsample by a factor of 2

def to_grayscale(img):
    return np.mean(img, axis=2).astype(np.uint8)  # Convert to grayscale

def normalize_grayscale(img):
    return (img - 128) / 128 - 1  # Normalize grayscale from -1 to 1

# Frame Preprocessing Functions
def preprocess_frame(img, target_shape=(84, 80)):
    """Preprocess the input frame."""
    img = img[30:-12, :, :]  # Crop to remove irrelevant parts
    img = img[::2, ::2]  # Downsample
    img = np.mean(img, axis=2).astype(np.uint8)  # Grayscale
    img = (img - 128) / 128 - 1  # Normalize
    img = cv2.resize(img, target_shape, interpolation=cv2.INTER_AREA)
    return np.expand_dims(img, axis=-1)  # Shape: (84, 80, 1)



In [17]:
# Replay Buffer Class
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def store(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

    def size(self):
        return len(self.buffer)


In [18]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Flatten, Dense
from tensorflow.keras.optimizers import Adam


In [19]:
# DQN Model Builder
def build_model(input_shape, action_size):
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, (8, 8), strides=4, activation='relu', input_shape=(input_shape[0], input_shape[1], 4)),
        tf.keras.layers.Conv2D(64, (4, 4), strides=2, activation='relu'),
        tf.keras.layers.Conv2D(64, (3, 3), strides=1, activation='relu'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(512, activation='relu'),
        tf.keras.layers.Dense(action_size, activation='linear')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE), loss='mse')
    return model

In [20]:
@tf.function
def epsilon_greedy_action(state, model, epsilon):
    """Select an action using epsilon-greedy strategy."""
    # Ensure epsilon is a TensorFlow float
    epsilon = tf.convert_to_tensor(epsilon, dtype=tf.float32)
    # Generate a random value
    random_val = tf.random.uniform([], dtype=tf.float32)
    if random_val < epsilon:
        # Explore: choose a random action
        return tf.random.uniform([], 0, ACTION_SIZE, dtype=tf.int32)
    else:
        # Exploit: choose the best action
        q_values = model(state, training=False)
        return tf.argmax(q_values[0], axis=-1, output_type=tf.int32)


# Training on Mini-Batches
@tf.function
def train_minibatch(states, actions, rewards, next_states, dones, policy_net, target_net):
    next_q_values = target_net(next_states, training=False)
    max_next_q_values = tf.reduce_max(next_q_values, axis=1)
    target_q_values = rewards + (1 - dones) * GAMMA * max_next_q_values

    with tf.GradientTape() as tape:
        q_values = policy_net(states, training=True)
        q_values_taken = tf.reduce_sum(q_values * tf.one_hot(actions, ACTION_SIZE), axis=1)
        loss = tf.reduce_mean(tf.square(target_q_values - q_values_taken))

    gradients = tape.gradient(loss, policy_net.trainable_variables)
    policy_net.optimizer.apply_gradients(zip(gradients, policy_net.trainable_variables))
    return loss

# Initialize Models, Replay Buffer, and Epsilon
policy_net = build_model(STATE_SHAPE, ACTION_SIZE)
target_net = build_model(STATE_SHAPE, ACTION_SIZE)
target_net.set_weights(policy_net.get_weights())

replay_buffer = ReplayBuffer(REPLAY_BUFFER_CAPACITY)
epsilon = EPSILON_INIT


In [None]:
# Training Loop
rewards_per_episode = []
for episode in range(EPISODES):
    state = env.reset()  # This may return just the state or a dictionary
    if isinstance(state, dict):
        state = state['state']  # If it's a dictionary, extract the 'state' field

    state = preprocess_frame(state)  # Process the state frame
    state_stack = np.repeat(state, 4, axis=-1)  # Stack 4 frames

    total_reward = 0
    done = False
    while not done:
        action = epsilon_greedy_action(np.expand_dims(state_stack, axis=0), policy_net, epsilon)
        next_state, reward, done, _ = env.step(int(action))
        next_state = preprocess_frame(next_state)
        next_state_stack = np.append(state_stack[:, :, 1:], next_state, axis=-1)

        replay_buffer.store((state_stack, int(action), reward, next_state_stack, done))
        state_stack = next_state_stack
        total_reward += reward

        if replay_buffer.size() >= MINI_BATCH_SIZE:
            minibatch = replay_buffer.sample(MINI_BATCH_SIZE)
            states, actions, rewards, next_states, dones = map(np.array, zip(*minibatch))
            rewards = rewards.astype(np.float32)
            dones = dones.astype(np.float32)
            train_minibatch(states, actions, rewards, next_states, dones, policy_net, target_net)

    rewards_per_episode.append(total_reward)
    avg_reward = np.mean(rewards_per_episode[-5:])

    # with train_summary_writer.as_default():
    #     tf.summary.scalar('Total Reward', total_reward, step=episode)
    #     tf.summary.scalar('Average Reward (Last 5)', avg_reward, step=episode)
    #     tf.summary.scalar('Epsilon', epsilon, step=episode)

    if episode % TARGET_UPDATE_RATE == 0:
        target_net.set_weights(policy_net.get_weights())

    epsilon = max(EPSILON_MIN, epsilon * EPSILON_DECAY)
    print(f"Episode {episode + 1}: Total Reward = {total_reward}, Avg Reward = {avg_reward:.2f}, Epsilon = {epsilon:.3f}")

# Plot Rewards
plt.plot(rewards_per_episode)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Rewards Over Episodes')
plt.show()


Episode 1: Total Reward = -21.0, Avg Reward = -21.00, Epsilon = 0.995
Episode 2: Total Reward = -20.0, Avg Reward = -20.50, Epsilon = 0.990
Episode 3: Total Reward = -21.0, Avg Reward = -20.67, Epsilon = 0.985
Episode 4: Total Reward = -20.0, Avg Reward = -20.50, Epsilon = 0.980
Episode 5: Total Reward = -21.0, Avg Reward = -20.60, Epsilon = 0.975
Episode 6: Total Reward = -18.0, Avg Reward = -20.00, Epsilon = 0.970
Episode 7: Total Reward = -21.0, Avg Reward = -20.20, Epsilon = 0.966
Episode 8: Total Reward = -20.0, Avg Reward = -20.00, Epsilon = 0.961
Episode 9: Total Reward = -20.0, Avg Reward = -20.00, Epsilon = 0.956
Episode 10: Total Reward = -19.0, Avg Reward = -19.60, Epsilon = 0.951
Episode 11: Total Reward = -20.0, Avg Reward = -20.00, Epsilon = 0.946
Episode 12: Total Reward = -21.0, Avg Reward = -20.00, Epsilon = 0.942
Episode 13: Total Reward = -21.0, Avg Reward = -20.20, Epsilon = 0.937
Episode 14: Total Reward = -21.0, Avg Reward = -20.40, Epsilon = 0.932
Episode 15: Tot