In [1]:
import gym
from gym.wrappers import RecordVideo
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
import random
import cv2
import matplotlib.pyplot as plt
from IPython.display import clear_output
physical_devices = tf.config.experimental.list_physical_devices('GPU')
print("Num GPUs Available: ", len(physical_devices))


Num GPUs Available:  0


In [2]:
# Environment initialisieren
env = gym.make("ALE/Pong-v5", render_mode='rgb_array')


# Hyperparameter
learning_rate = 0.0001
discount_factor = 0.99
epsilon = 1.0
epsilon_decay = 0.995
epsilon_min = 0.01
batch_size = 32
num_episodes = 1000

# Netzwerkarchitektur
def create_model():
    model = models.Sequential([
        layers.Conv2D(32, (8, 8), strides=(4, 4), activation='relu', input_shape=(84, 84, 4)),
        layers.Conv2D(64, (4, 4), strides=(2, 2), activation='relu'),
        layers.Conv2D(64, (3, 3), strides=(1, 1), activation='relu'),
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.Dense(env.action_space.n, activation='linear')
    ])
    return model

# Agenteninitialisierung
model = create_model()
target_model = create_model()
target_model.set_weights(model.get_weights())

optimizer = tf.keras.optimizers.Adam(learning_rate)
huber_loss = tf.keras.losses.Huber()

# Erfahrungsspeicher
class ExperienceReplayBuffer:
    def __init__(self, capacity=10000):
        self.capacity = capacity
        self.buffer = []
        self.position = 0

    def add(self, state, action, reward, next_state, done):
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state, done)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return zip(*random.sample(self.buffer, batch_size))

# Preprocessing-Funktion für den Zustand
def preprocess_state(state):
    if isinstance(state, np.ndarray):
        if len(state.shape) == 3 and state.shape[2] == 3:
            gray_state = cv2.cvtColor(state, cv2.COLOR_RGB2GRAY)
            resized_state = cv2.resize(gray_state, (84, 84))
            return resized_state.astype(np.float32) / 255.0
        else:
            raise ValueError("Der Zustand hat nicht die erwartete Form (Höhe, Breite, Kanäle).")
    elif isinstance(state, tuple) and len(state) == 2:
        return preprocess_state(state[0])
    else:
        raise ValueError("Der Zustand ist kein NumPy-Array.")
    
# Trainingsmetriken
episode_rewards = []
episode_lengths = []
episode_losses = []


# Training
buffer = ExperienceReplayBuffer()



for episode in range(num_episodes):
    if episode % 50 == 0:
        video_env = gym.wrappers.RecordVideo(env, './videos', episode_trigger=lambda episode_id: True, video_length=0)
        state = video_env.reset()
    else:
        state = env.reset()
        
    state = preprocess_state(state)  # Preprocessing des Zustands
    state = np.stack([state] * 4, axis=2)  # Stacke den Zustand viermal entlang der 3. Achse
    done = False
    total_reward = 0
    step = 0  # Schrittzähler innerhalb der Episode
    losses = []

    while not done:
        if np.random.rand() <= epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(model.predict(np.expand_dims(state, axis=0), verbose=0))

        # Führe den Schritt in der Umgebung aus und erhalte die Rückgabewerte
        step_result = env.step(action)

        # Extrahiere die ersten vier Rückgabewerte und ignoriere den Rest
        next_state, reward, done, _ = step_result[:4]

        next_state = preprocess_state(next_state)  # Preprocessing des nächsten Zustands
        next_state = np.append(state[:, :, 1:], np.expand_dims(next_state, axis=2), axis=2)

        total_reward += reward

        buffer.add(state, action, reward, next_state, done)

        state = next_state
        step += 1  # Schrittzähler erhöhen

        if len(buffer.buffer) > batch_size:
            states, actions, rewards, next_states, dones = buffer.sample(batch_size)

            # Q-Learning
            future_rewards = target_model.predict(np.array(next_states), verbose=0)
            dones = np.array(dones, dtype=int)

            updated_q_values = rewards + discount_factor * np.max(future_rewards, axis=1) * (1 - dones)

            masks = tf.one_hot(actions, env.action_space.n)

            with tf.GradientTape() as tape:
                q_values = model(np.array(states))
                q_action = tf.reduce_sum(tf.multiply(q_values, masks), axis=1)
                loss = huber_loss(updated_q_values, q_action)
                losses.append(loss.numpy())

            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))

        print(f"Episode: {episode + 1}, Step: {step}, Step Reward: {reward}, Total Reward: {total_reward}, Epsilon: {epsilon}")

    # Episode abgeschlossen, Metriken aufzeichnen
    episode_rewards.append(total_reward)
    episode_lengths.append(step)
    episode_losses.append(np.mean(losses))

    # Ausgabe der Schrittanzahl nach Abschluss der Episode
    print(f"Episode {episode + 1} abgeschlossen mit {step} Schritten, Gesamtbelohnung: {total_reward}, Epsilon: {epsilon}")

    # Update des Zielnetzwerks
    if episode % 10 == 0:
        target_model.set_weights(model.get_weights())

    epsilon = max(epsilon_min, epsilon * epsilon_decay)

    # Plotten der Metriken
    if (episode + 1) % 50 == 0:
        plt.figure(figsize=(12, 8))

        plt.subplot(3, 1, 1)
        plt.plot(episode_rewards, label="Total Reward")
        plt.xlabel("Episode")
        plt.ylabel("Total Reward")
        plt.legend()

        plt.subplot(3, 1, 2)
        plt.plot(episode_lengths, label="Episode Length")
        plt.xlabel("Episode")
        plt.ylabel("Length")
        plt.legend()

        plt.subplot(3, 1, 3)
        plt.plot(episode_losses, label="Loss")
        plt.xlabel("Episode")
        plt.ylabel("Loss")
        plt.legend()

        plt.tight_layout()
        plt.show()

# Speichern des fertigen Modells
model.save("pong_rl_model.h5")

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
  logger.warn(
  logger.warn(
  if not isinstance(terminated, (bool, np.bool8)):


Episode: 1, Step: 1, Step Reward: 0.0, Total Reward: 0.0, Epsilon: 1.0
Episode: 1, Step: 2, Step Reward: 0.0, Total Reward: 0.0, Epsilon: 1.0
Episode: 1, Step: 3, Step Reward: 0.0, Total Reward: 0.0, Epsilon: 1.0
Episode: 1, Step: 4, Step Reward: 0.0, Total Reward: 0.0, Epsilon: 1.0
Episode: 1, Step: 5, Step Reward: 0.0, Total Reward: 0.0, Epsilon: 1.0
Episode: 1, Step: 6, Step Reward: 0.0, Total Reward: 0.0, Epsilon: 1.0
Episode: 1, Step: 7, Step Reward: 0.0, Total Reward: 0.0, Epsilon: 1.0
Episode: 1, Step: 8, Step Reward: 0.0, Total Reward: 0.0, Epsilon: 1.0
Episode: 1, Step: 9, Step Reward: 0.0, Total Reward: 0.0, Epsilon: 1.0
Episode: 1, Step: 10, Step Reward: 0.0, Total Reward: 0.0, Epsilon: 1.0
Episode: 1, Step: 11, Step Reward: 0.0, Total Reward: 0.0, Epsilon: 1.0
Episode: 1, Step: 12, Step Reward: 0.0, Total Reward: 0.0, Epsilon: 1.0
Episode: 1, Step: 13, Step Reward: 0.0, Total Reward: 0.0, Epsilon: 1.0
Episode: 1, Step: 14, Step Reward: 0.0, Total Reward: 0.0, Epsilon: 1.0
E

KeyboardInterrupt: 