In [1]:
import gymnasium as gym
from gymnasium.wrappers import GrayScaleObservation, ResizeObservation, FrameStack

from stable_baselines3.common.vec_env import VecFrameStack, DummyVecEnv, VecTransposeImage
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.logger import HParam
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback, EvalCallback, CallbackList
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.policies import ActorCriticPolicy

import torch
import torch.nn as nn

from typing import Optional

import numpy as np
import matplotlib.pyplot as plt

import math

import gc

import os

import csv

# Variable Parameter

In [2]:
training_steps = 5000000
custom_eval_freq = 100000
eval_episodes = 30
experiment = "training_3/PPO"
kombinationen_path = "./3_kombinationen_PPO.csv"
ergebnisse_path = "./3_ergebnisse_PPO.csv"

In [3]:
target_update_interval = 10000
loop_start = 0
vf_coef = 1
clip_range = 0.1

# Unveränderte Parameter

In [4]:
game = "ALE/Pacman-v5"
frameskip = 3
image_size = 84
frame_stack = 4
input_shape = (4, 84, 84)
LOG_DIR = f"./experiments/{experiment}/logs/"
BEST_MODEL_LOG_DIR = f"./experiments/{experiment}/logs/best_model/"
EVAL_ENV_LOG_DIR = f"./experiments/{experiment}/logs/eval_log/"
BEST_MODEL_DIR = f"./experiments/{experiment}/train/"
MODEL_WEIGHTS_FILE = "best_model.zip"

# Eigene CNN-Klasse und Callback-Klassen

In [5]:
class RewardTensorboardCallback(BaseCallback):
    def __init__(self, verbose=0):
        super(RewardTensorboardCallback, self).__init__(verbose)
        self.total_reward = 0

    def _on_step(self) -> bool:
        reward = self.locals["rewards"]
        self.total_reward += np.sum(reward)

        if self.locals["dones"][0]:
            self.logger.record("total_reward_steps", self.total_reward)
            
            self.total_reward = 0

        return True

In [6]:
class HParamCallback(BaseCallback):

    def _on_training_start(self) -> None:
        hparam_dict = {
            "algorithm": self.model.__class__.__name__,
            "learning rate": self.model.learning_rate,
            "gamma": self.model.gamma,
            "batch size": self.model.batch_size,
            "entropiekoeffizient": self.model.ent_coef
        }

        metric_dict = {
            "rollout/ep_len_mean": 0,
            "train/value_loss": 0.0,
        }
        self.logger.record(
            "hparams",
            HParam(hparam_dict, metric_dict),
            exclude=("stdout", "log", "json", "csv"),
        )

    def _on_step(self) -> bool:
        return True

In [7]:
class CustomEvalCallback(EvalCallback):
    def _on_step(self) -> bool:
        continue_training = True

        if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0 and self.n_calls >= 500000:
            # Sync training and eval env if there is VecNormalize
            if self.model.get_vec_normalize_env() is not None:
                try:
                    sync_envs_normalization(self.training_env, self.eval_env)
                except AttributeError as e:
                    raise AssertionError(
                        "Training and eval env are not wrapped the same way, "
                        "see https://stable-baselines3.readthedocs.io/en/master/guide/callbacks.html#evalcallback "
                        "and warning above."
                    ) from e

            # Reset success rate buffer
            self._is_success_buffer = []

            episode_rewards, episode_lengths = evaluate_policy(
                self.model,
                self.eval_env,
                n_eval_episodes=self.n_eval_episodes,
                render=self.render,
                deterministic=self.deterministic,
                return_episode_rewards=True,
                warn=self.warn,
                callback=self._log_success_callback,
            )

            if self.log_path is not None:
                self.evaluations_timesteps.append(self.num_timesteps)
                self.evaluations_results.append(episode_rewards)
                self.evaluations_length.append(episode_lengths)

                kwargs = {}
                # Save success log if present
                if len(self._is_success_buffer) > 0:
                    self.evaluations_successes.append(self._is_success_buffer)
                    kwargs = dict(successes=self.evaluations_successes)

                np.savez(
                    self.log_path,
                    timesteps=self.evaluations_timesteps,
                    results=self.evaluations_results,
                    ep_lengths=self.evaluations_length,
                    **kwargs,
                )

            mean_reward, std_reward = np.mean(episode_rewards), np.std(episode_rewards)
            mean_ep_length, std_ep_length = np.mean(episode_lengths), np.std(episode_lengths)
            self.last_mean_reward = mean_reward

            if self.verbose >= 1:
                print(f"Eval num_timesteps={self.num_timesteps}, " f"episode_reward={mean_reward:.2f} +/- {std_reward:.2f}")
                print(f"Episode length: {mean_ep_length:.2f} +/- {std_ep_length:.2f}")
            # Add to current Logger
            self.logger.record("eval/mean_reward", float(mean_reward))
            self.logger.record("eval/mean_ep_length", mean_ep_length)

            if len(self._is_success_buffer) > 0:
                success_rate = np.mean(self._is_success_buffer)
                if self.verbose >= 1:
                    print(f"Success rate: {100 * success_rate:.2f}%")
                self.logger.record("eval/success_rate", success_rate)

            # Dump log so the evaluation results are printed with the correct timestep
            self.logger.record("time/total_timesteps", self.num_timesteps, exclude="tensorboard")
            self.logger.dump(self.num_timesteps)

            if mean_reward > self.best_mean_reward:
                if self.verbose >= 1:
                    print("New best mean reward!")
                if self.best_model_save_path is not None:
                    self.model.save(os.path.join(self.best_model_save_path, "best_model"))
                self.best_mean_reward = mean_reward
                # Trigger callback on new best model, if needed
                if self.callback_on_new_best is not None:
                    continue_training = self.callback_on_new_best.on_step()

            # Trigger callback after every evaluation, if needed
            if self.callback is not None:
                continue_training = continue_training and self._on_event()

        return continue_training

In [8]:
kombinationen = []

with open(kombinationen_path, 'r') as file:
    reader = csv.reader(file, delimiter='\t')
    next(reader)
    for row in reader:
        row = [int(val) if val.isdigit() else float(val.replace(',', '.')) for val in row]
        kombinationen.append(row)

# Erstellung der Trainungs- und Testumgebung

In [9]:
env = gym.make(game, frameskip=frameskip)
env = GrayScaleObservation(env, keep_dim=True)
env = ResizeObservation(env, image_size)
env = DummyVecEnv([lambda: env])
env = VecFrameStack(env, frame_stack, channels_order="last")

eval_env = gym.make(game, frameskip=frameskip)
eval_env = Monitor(eval_env, EVAL_ENV_LOG_DIR)
eval_env = GrayScaleObservation(eval_env, keep_dim=True)
eval_env = ResizeObservation(eval_env, image_size)
eval_env = DummyVecEnv([lambda: eval_env])
eval_env = VecFrameStack(eval_env, frame_stack, channels_order="last")
eval_env = VecTransposeImage(eval_env)

num_actions = env.action_space.n

# Erstellung der Callbacks

In [10]:
hparam_callback = HParamCallback()
reward_callback = RewardTensorboardCallback()
eval_callback = CustomEvalCallback(eval_env, best_model_save_path=BEST_MODEL_DIR,
                                  log_path=BEST_MODEL_LOG_DIR, eval_freq=custom_eval_freq,
                                  deterministic=True, render=False, n_eval_episodes=eval_episodes)
callback_list = CallbackList([eval_callback, reward_callback, hparam_callback])

# Training

In [11]:
for i in range(loop_start, 1):
    if i != 1 or i != 3 or i != 6:
        hparam_callback = HParamCallback()
        reward_callback = RewardTensorboardCallback()
        eval_callback = CustomEvalCallback(eval_env, best_model_save_path=f"{BEST_MODEL_DIR}{str(i)}/",
                                          log_path=f"{BEST_MODEL_LOG_DIR}{str(i)}/", eval_freq=custom_eval_freq,
                                          deterministic=True, render=False, n_eval_episodes=eval_episodes)
        callback_list = CallbackList([eval_callback, reward_callback, hparam_callback])

        learning_rate = 0.0001
        gamma = 0.95
        batch_size = 32
        ent_coef = 0.01

        # Modell erstellen
        model = PPO('CnnPolicy', env, gamma=gamma, learning_rate=learning_rate, verbose=2,            
                batch_size=batch_size, tensorboard_log=f"{LOG_DIR}{str(i)}/",
                ent_coef=ent_coef, vf_coef=vf_coef, clip_range=clip_range)

        # Modell trainieren
        model.learn(total_timesteps=training_steps, callback=callback_list)

        del model
        gc.collect()

        # Bestes Modell laden
        best_model = PPO.load(f"{BEST_MODEL_DIR}{i}/{MODEL_WEIGHTS_FILE}")

        # Modell evaluieren
        observation = env.reset()
        episodes = 0
        done = False
        all_rewards = []
        total_reward = 0

        print(f"Start evaluation Nr. {i}:")

        while episodes < eval_episodes:
            if done:
                observation = env.reset()
                all_rewards.append(total_reward)
                episodes += 1
                print("Episode", episodes, "Reward:", total_reward)
                total_reward = 0

            action, _ = best_model.predict(observation, deterministic=True)
            observation, reward, done, info = env.step(action)
            total_reward += reward

        env.reset()

        del best_model
        gc.collect()

        mean_reward = np.mean(all_rewards)
        std_reward = np.std(all_rewards)

        print("Durchschnittliche Belohnung:", mean_reward)
        print("Standardabweichung der Belohnungen:", std_reward)

        # Ergebnisse in CSV-Datei schreiben
        neue_zeile = [i, mean_reward, std_reward]

        with open(ergebnisse_path, 'a', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(neue_zeile)

Using cuda device
Wrapping the env in a VecTransposeImage.
Logging to ./experiments/training_3/PPO/logs/0/PPO_1
---------------------------------
| time/              |          |
|    fps             | 325      |
|    iterations      | 1        |
|    time_elapsed    | 6        |
|    total_timesteps | 2048     |
| total_reward_steps | 9        |
---------------------------------
------------------------------------------
| time/                   |              |
|    fps                  | 334          |
|    iterations           | 2            |
|    time_elapsed         | 12           |
|    total_timesteps      | 4096         |
| total_reward_steps      | 15           |
| train/                  |              |
|    approx_kl            | 0.0016605644 |
|    clip_fraction        | 0.0217       |
|    clip_range           | 0.1          |
|    entropy_loss         | -1.61        |
|    explained_variance   | -0.00697     |
|    learning_rate        | 0.0001       |
|    loss     