In [1]:
import gymnasium as gym
import numpy as np
import keras
from keras.models import Sequential, clone_model
from keras.layers import Conv2D, Flatten, Dense
from keras.optimizers import Adam
from collections import deque
from gymnasium.wrappers import FrameStack
import random
from tqdm import tqdm



In [2]:
env = gym.make("ALE/Tetris-v5", render_mode='human')
env.reset()
done = False

In [3]:
state_size = env.observation_space.shape
action_size = env.action_space.n
memory = deque(maxlen=2000)
batch_size = 64
gamma = 0.95
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.995
target_update_counter = 0
update_target_every = 5
episodes = 1000
max_steps_per_episode = 1000
learning_rate = 0.001
update_target_every = 10  # Update target DQN every 10 episodes

In [4]:
def build_model(input_shape, action_size):
    model = Sequential([ # Each person should change the amount of Conv2D/Dense layers, as well as the filter amount and kernel_size/strides
        Conv2D(32, kernel_size=(8, 8), strides=(4, 4), activation='relu', input_shape=input_shape),
        Conv2D(64, kernel_size=(4, 4), strides=(2, 2), activation='relu'),
        Conv2D(64, kernel_size=(3, 3), strides=(1, 1), activation='relu'),
        Flatten(),
        Dense(512, activation='relu'),
        Dense(256, activation='relu'),
        Dense(action_size, activation='linear')
    ])
    model.compile(optimizer=Adam(learning_rate=learning_rate), loss='mse')
    return model

In [5]:
dqn = build_model(state_size, action_size)
# Define target DQN model
target_dqn = clone_model(dqn)
target_dqn.set_weights(dqn.get_weights())

  super().__init__(


In [6]:
def preprocess_rgb(state):
    state = cv2.resize(state, (84, 110))
    state = cv2.cvtColor(state, cv2.COLOR_BGR2GRAY)
    state = state[26:110, :]
    state = state.astype(np.float32) / 255.0
    return state.reshape(84, 84, 1)

In [7]:
def preprocess_grayscale(state):
    if state is None:
        return np.zeros((84, 84, 1), dtype=np.float32)  # Return a blank image if state is None
    state = cv2.cvtColor(state, cv2.COLOR_RGB2GRAY) if state.shape[-1] == 3 else state  # Convert RGB to grayscale if needed
    state = cv2.resize(state, (84, 84))  # Resize the image
    state = state.astype(np.float32) / 255.0  # Normalize pixel values
    return np.expand_dims(state, axis=-1)  # Ensure the image has a single channel

In [8]:
# Take a random action
action = env.action_space.sample()

# Perform the action and capture the return values in a single variable
info = env.step(action)

info

(array([[[  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0],
         ...,
         [  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0]],
 
        [[  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0],
         ...,
         [  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0]],
 
        [[  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0],
         ...,
         [ 50, 132,  50],
         [ 50, 132,  50],
         [ 50, 132,  50]],
 
        ...,
 
        [[  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0],
         ...,
         [  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0]],
 
        [[  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0],
         ...,
         [  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0]],
 
        [[  0,   0,   0],
         [  0,   0,   0],
         [  0,   0,   0],
         ...,
         [  0,   0,   0],
  

In [10]:
for episode in tqdm(range(episodes), desc='Episode Progress'):
    state, _ = env.reset()
    episode_reward = 0
    done = False
    for step in range(max_steps_per_episode):
        if np.random.rand() <= epsilon:
            action = env.action_space.sample()  # Exploration
        else:
            q_values = dqn.predict(np.array([state]), verbose=None)[0]
            action = np.argmax(q_values)  # Exploitation

        # Ensure action is within bounds
        action = np.clip(action, 0, action_size - 1)

        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        episode_reward += reward

        memory.append((state, action, reward, next_state, done))

        state = next_state

        # Perform batch update every few steps
        if len(memory) >= batch_size:
            batch = np.array(random.sample(memory, batch_size))
            states, actions, rewards, next_states, dones = batch[:, 0], batch[:, 1], batch[:, 2], batch[:, 3], batch[:, 4]

            # Calculate target Q-values using target DQN
            target_q_values = target_dqn.predict(next_states)
            targets = rewards + (1 - dones) * gamma * np.amax(target_q_values, axis=1)

            # Update Q-values using the target Q-values
            dqn.fit(states, targets, epochs=1, verbose=0)

        if done:
            break

    # Decay exploration rate
    if epsilon > epsilon_min:
        epsilon *= epsilon_decay

    # Update target DQN weights periodically
    if episode % update_target_every == 0:
        target_dqn.set_weights(dqn.get_weights())

env.close()

KeyboardInterrupt: 

In [11]:
env.close()

: 