In [2]:
import gymnasium as gym
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import numpy as np
import cv2
import time
import ale_py
import random
from collections import deque
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D, Flatten, Input
from tensorflow.keras.optimizers import Adam
import rl
import rl_agents

pygame 2.6.0 (SDL 2.28.4, Python 3.12.3)
Hello from the pygame community. https://www.pygame.org/contribute.html


2024-09-30 17:36:31.172463: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
env = gym.make("SpaceInvaders-v4", render_mode='rgb_array')
height, width, channels = env.observation_space.shape
actions = env.action_space.n

A.L.E: Arcade Learning Environment (version 0.9.1+aff5939)
[Powered by Stella]


In [4]:
env.unwrapped.get_action_meanings()


['NOOP', 'FIRE', 'RIGHT', 'LEFT', 'RIGHTFIRE', 'LEFTFIRE']

In [5]:
episodes = 5
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0
    while not done:
        env.render()
        action = random.choice([0,1,2,3,4,5])
        n_state, reward, done, info, _ = env.step(action)
        score += reward
    print("Episode:{} Score:{}".format(episode, score))
env.close()

Episode:1 Score:130.0
Episode:2 Score:410.0
Episode:3 Score:155.0
Episode:4 Score:405.0
Episode:5 Score:45.0


In [6]:
def build_model(height, width, channels, actions):
    model = Sequential()
    model.add(Conv2D(32, (8,8), strides=(4,4), activation="relu", input_shape = (height, width, channels)))
    model.add(Conv2D(64, (4,4), strides=(2,2), activation="relu"))
    model.add(Conv2D(64, (3,3), activation="relu"))
    model.add(Flatten())
    model.add(Dense(512, activation="relu"))
    model.add(Dense(256, activation="relu"))
    model.add(Dense(actions, activation="linear"))
    return model

In [7]:
model = build_model(height, width, channels, actions)
model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')
model.summary()

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [8]:
class ReplayBuffer:
    def __init__(self, max_size):
        self.buffer = deque(maxlen=max_size)

    def store(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)
        batch = [self.buffer[i] for i in indices]
        return map(np.array, zip(*batch))

    def size(self):
        return len(self.buffer)

def build_agent(model, actions, buffer_size=10000, batch_size=64, gamma=0.99, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.98, learning_rate=0.05):
    memory = ReplayBuffer(max_size=buffer_size)
    optimizer = Adam(learning_rate=learning_rate)
    def act(state, epsilon):
        if np.random.rand() < epsilon:
            return random.randrange(actions)
        else:
            q_values = model.predict(state)
            return np.argmax(q_values[0])
    def train_model():
        if memory.size() < batch_size:
            return
        states, actions, rewards, next_states, dones = memory.sample(batch_size)
        states = np.reshape(states, (batch_size, height, width, channels))
        next_states = np.reshape(next_states, (batch_size, height, width, channels))
        q_next = model.predict(next_states)
        target_q = rewards + (gamma * np.amax(q_next, axis=1) * (1 - dones))
        q_values = model.predict(states)
        for i in range(batch_size):
            q_values[i][actions[i]] = target_q[i]
        model.train_on_batch(states, q_values)
    def remember(state, action, reward, next_state, done):
        memory.store((state, action, reward, next_state, done))
    def decay_epsilon(epsilon):
        return max(epsilon_min, epsilon_decay * epsilon)
    return {
        'act': act,
        'train': train_model,
        'remember': remember,
        'decay_epsilon': decay_epsilon,
        'epsilon': epsilon
    }

In [9]:
agent = build_agent(model, actions)

In [12]:
def train_dqn_agent(env, agent, episodes=5, batch_size=64, max_steps=200):
    state_shape = env.observation_space.shape
    print("State shape from observation space:", state_shape)
    epsilon = agent['epsilon']
    episode_rewards = []
    
    for episode in range(episodes):
        state, _ = env.reset()
        print("State type:", type(state), "State shape:", state.shape)
        state = np.reshape(state, (1, height, width, channels))
        total_reward = 0
        
        for step in range(max_steps):
            action = agent['act'](state, epsilon)
            result = env.step(action) 
            if len(result) == 5:
                next_state, reward, done, truncated, info = result
            elif len(result) == 4:
                next_state, reward, done, info = result
                truncated = False 
            else:
                raise ValueError("Unexpected result from env.step: {}".format(result))
            next_state = np.reshape(next_state, (1, height, width, channels))
            agent['remember'](state, action, reward, next_state, done)
            agent['train']()
            state = next_state
            total_reward += reward
            print(step)
            if done:
                break
        epsilon = agent['decay_epsilon'](epsilon)
        episode_rewards.append(total_reward)
        if episode % 1 == 0:
            print(f"Episode {episode}/{episodes}, Total Reward: {total_reward}, Epsilon: {epsilon}")
    return episode_rewards
rewards = train_dqn_agent(env, agent)

State shape from observation space: (210, 160, 3)
State type: <class 'numpy.ndarray'> State shape: (210, 160, 3)
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 33ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 75ms/step
0
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 33ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
1
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
2
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
3
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
4
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
5
[1m2/2[0m

In [14]:
def test_dqn_agent(env, agent, episodes=5, max_steps=200):
    epsilon = 0.01
    episode_rewards = []
    
    for episode in range(episodes):
        state, _ = env.reset()
        state = np.reshape(state, (1, height, width, channels))
        total_reward = 0
        
        for step in range(max_steps):
            action = agent['act'](state, epsilon)
            result = env.step(action) 
            if len(result) == 5:
                next_state, reward, done, truncated, info = result
            elif len(result) == 4:
                next_state, reward, done, info = result
                truncated = False 
            else:
                raise ValueError("Unexpected result from env.step: {}".format(result))
            next_state = np.reshape(next_state, (1, height, width, channels))
            agent['remember'](state, action, reward, next_state, done)
            agent['train']()
            state = next_state
            total_reward += reward
            print(step)
            if done:
                break
        epsilon = agent['decay_epsilon'](epsilon)
        episode_rewards.append(total_reward)
        if episode % 1 == 0:
            print(f"Episode {episode}/{episodes}, Total Reward: {total_reward}, Epsilon: {epsilon}")
    return episode_rewards
test_rewards = test_dqn_agent(env, agent, episodes=5)
print(f"Average Reward over {len(test_rewards)} episodes: {np.mean(test_rewards)}")
average_reward = np.mean(test_rewards)
print(f"Average reward after {len(test_rewards)} test episodes: {average_reward}")

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 372ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
0
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 33ms/step
1
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 72ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step
2
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 45ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 70ms/step
3
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 35ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m