# Deep Reinforcement Learning Agent

Hilfreiche Erklärungen am Beispiel CartPole:
- https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html
- https://www.tensorflow.org/agents/tutorials/1_dqn_tutorial

In [None]:
# Install Dependencies
%pip install tensorflow-cpu
%pip install gym
%pip install tf-keras
%pip install tf-agents

In [None]:
import os
# Keep using keras-2 (tf-keras) rather than keras-3 (keras).
os.environ['TF_USE_LEGACY_KERAS'] = '1'

## Umgebung definieren
Wir erstellen eine einfache Umgebung (Environment) in OpenAI Gym, die den Benutzername und das Passwort als Observation liefert.

In [None]:
# Imports
import gym
from gym import spaces
import numpy as np

from tf_agents.environments import py_environment
from tf_agents.specs import array_spec
from tf_agents.trajectories import time_step as ts

In [None]:
class LoginEnv(gym.Env):
    def __init__(self):
        super(LoginEnv, self).__init__()
        
        # Zustandseigenschaften: Richtiges Passwort (boolean), Zeit zwischen Loginversuchen (date), Falsches Passwort Zähler (int), letzte Aktion (int)
        self.observation_space = spaces.Box(low=0, high=1, shape=(4,), dtype=np.float32)
        
        # Aktionen: 0 = Nicht sperren, 1 = 30s sperren, 2 = 1m sperren, 3 = 3min sperren, 4 = Dauerhaft sperren
        self.action_space = spaces.Discrete(5)
        
        # Interne Zustandsvariablen
        self.incorrect_password = False
        self.time_between_attempts = np.random.randint(0, 3600) # 1 sec to 1h (in seconds)
        self.incorrect_password_count = 0 
        self.last_action = 0
    
    def reset(self):
        self.incorrect_password = np.random.choice([True, False])
        self.time_between_attempts = np.random.randint(0, 3600) # 1 sec bis 1h (in Sekunden)
        self.incorrect_password_count = np.random.randint(0, 11)
        self.last_action = np.random.randint(1, 3) if self.incorrect_password_count > 0 else 0
        return np.array([
            self.incorrect_password,
            self.time_between_attempts,
            self.incorrect_password_count,
            self.last_action
        ])

    def step(self, action):
        reward = 0
        done = False
        
        if action == 0:  # Nicht sperren
            if not self.incorrect_password:
                reward = 1
                done = True
            elif self.time_between_attempts <= 3 or self.incorrect_password_count >= 10:
                reward = -1
                done = True
            else:
                reward = 0
        elif action == 1:  # 30s sperren
            if not self.incorrect_password:
                reward = -1
                done = True
            elif self.time_between_attempts <= 3 or (3 < self.incorrect_password_count <= 6):
                reward = 1
                done = True
            else:
                reward = 0
        elif action == 2:  # 1m sperren
            if not self.incorrect_password:
                reward = -1
                done = True
            elif (6 < self.incorrect_password_count <= 9):
                reward = 1
                done = True
            else:
                reward = 0
        elif action == 3:  # 3min sperren
            if not self.incorrect_password:
                reward = -1
                done = True
            elif (9 < self.incorrect_password_count < 10):
                reward = 1
                done = True
            else:
                reward = 0
        elif action == 4:  # Dauerhaft sperren
            if self.incorrect_password_count >= 10:
                reward = 1
                done = True
            else:
                reward = -1
                done = True

        return reward, done



In [None]:
class LoginEnvWrapper(py_environment.PyEnvironment):
    def __init__(self):
        self.env = LoginEnv()  # Initialisieren des gym Environments
        self._action_spec = array_spec.BoundedArraySpec(
            shape=(), dtype=np.int32, minimum=0, maximum=4, name='action')
        self._observation_spec = array_spec.BoundedArraySpec(
            shape=(4,), dtype=np.float32, minimum=0, maximum=1, name='observation')
        self._state = None
        self._episode_ended = False

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        self._state = self.env.reset()
        self._episode_ended = False
        return ts.restart(np.array(self._state, dtype=np.float32))

    def _step(self, action):
        if self._episode_ended:
            return self.reset()

        reward, done = self.env.step(action)
        self._episode_ended = done

        if done:
            return ts.termination(np.array(self._state, dtype=np.float32), reward)
        else:
            self._state = self.env.step(action)[0]  # Aktualisieren des Zustands
            return ts.transition(np.array(self._state, dtype=np.float32), reward=reward)

## Umgebung testen

In [None]:
env = LoginEnv()

num_episodes = 10
for episode in range(num_episodes):
    state = env.reset()
    done = False
    total_reward = 0
    
    while not done:
        action = env.action_space.sample()
        reward, done = env.step(action)
        total_reward += reward
        print(f'State: {state}, Reward: {reward}')

    
    print(f'Episode {episode + 1}: Total Reward = {total_reward}')

## Deep Learning Modell definieren

In [None]:
# Imports
from tensorflow import keras


In [None]:
def build_model():
    model = keras.models.Sequential([
        keras.layers.Input(shape=(4,)),  # Eingabeschicht mit 4 Neuronen (entspricht der Größe des Zustandsraums)
        keras.layers.Dense(64, activation='relu'),  # Erste verborgene Schicht mit 64 Neuronen
        keras.layers.Dense(64, activation='relu'),  # Zweite verborgene Schicht mit 64 Neuronen
        keras.layers.Dense(5, activation='linear')  # Ausgabeschicht mit 5 Neuronen (entspricht der Anzahl der möglichen Aktionen)
    ])
    
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001), loss='mse')  # Mean Squared Error Verlustfunktion

    return model

In [None]:
model = build_model()
model.summary()

## Deep Reeinforcement Learning Agenten definieren
Hier definieren wir ein einfaches neuronales Netzwerk mit TensorFlow, das die Umgebung steuert.

In [None]:
# Imports
import tensorflow as tf
from tf_agents.networks import q_network
from tf_agents.agents.dqn import dqn_agent
from tf_agents.environments import tf_py_environment
from tf_agents.utils import common

In [None]:
# Definiere die Umgebung
env = LoginEnvWrapper()
train_env = tf_py_environment.TFPyEnvironment(env)
eval_env = tf_py_environment.TFPyEnvironment(env)

In [None]:
# Erstelle das Q-Network
fc_layer_params = (64, 64)
q_net = q_network.QNetwork(
    train_env.observation_spec(),
    train_env.action_spec(),
    fc_layer_params=fc_layer_params)

In [None]:
# Konfiguriere den DQN-Agenten
optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=0.001)
train_step_counter = tf.Variable(0)
agent = dqn_agent.DqnAgent(
    train_env.time_step_spec(),
    train_env.action_spec(),
    q_network=q_net,
    optimizer=optimizer,
    td_errors_loss_fn=common.element_wise_squared_loss,
    train_step_counter=train_step_counter)

In [None]:
# Initialisiere und kompiliere den Agenten
agent.initialize()

In [None]:
from tf_agents.drivers import dynamic_step_driver
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.trajectories import trajectory
from tf_agents.utils import common
from tf_agents.policies import boltzmann_policy



# Boltzmann-Policy erstellen
boltzmann_policy = boltzmann_policy.BoltzmannPolicy(agent.policy)

# Replay Buffer für gesammelte Daten
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec=agent.collect_data_spec,
    batch_size=env.batch_size,
    max_length=100000)

# Sammeln von Erfahrungen
collect_driver = dynamic_step_driver.DynamicStepDriver(
    env,
    boltzmann_policy,
    observers=[replay_buffer.add_batch],
    num_steps=1)  # Anzahl der Schritte pro Datensammlung

# Initialisieren
agent.train = common.function(agent.train)
agent.train_step_counter.assign(0)

# Trainingsloop
num_iterations = 20000  # Anzahl der Trainingsiterationen
initial_collect_steps = 1000  # Anfangsphase: Sammeln ohne Training
collect_steps_per_iteration = 1  # Schritte zwischen jedem Training
batch_size = 64  # Batch-Größe für das Training

# Anfangsphase: Sammeln von Erfahrungen ohne Training
for _ in range(initial_collect_steps):
    collect_driver.run()

# Trainingsloop
dataset = replay_buffer.as_dataset(
    num_parallel_calls=3, 
    sample_batch_size=batch_size, 
    num_steps=2).prefetch(3)

iterator = iter(dataset)

for _ in range(num_iterations):
    # Sammeln von Erfahrungen
    collect_driver.run()
    
    # Erfahrungen aus dem Replay Buffer holen
    experiences, _ = next(iterator)
    
    # Agenten trainieren
    train_loss = agent.train(experiences).loss
    
    step = agent.train_step_counter.numpy()
    
    if step % 1000 == 0:
        print('Schritt:', step, 'Verlust:', train_loss)