<img src="https://upload.wikimedia.org/wikipedia/commons/0/06/LMU_Muenchen_Logo.svg" alt="Image" width="200" style="float: right;">

*Gruppe A - Thema 5*

### **RL-Policy-Training unter Vermeidung von Zuständen mit sehr geringer Datendichte**

- Erzeugung eines Datensatzes für das Abschätzung der Datendichte

- Sampeln eines Gym-Environments 
    - Entfernen vordefinierter Bereiche, die z.B. durch die eine optimale Policy laufen würde

- Modell-based RL

- Datendichte im Reward abbilden, aber nicht übergewichten

**Dozent:** [Dr. Michel Tokic](https://www.tokic.com/)

**Studenten:** Maximilian Schieder, Leon Lantz

---


## **Konzept**

Offline Reinforcement Learning!

1. Sampling im Environment
    - Ramdom oder gezielt (Heuristik, Rewards von Environment ändern)
    - Zustand und entsprechende Aktion jedes Schrittes speichern
2. Bestimmung der Datendichte
3. Entfernung von Zuständen mit geringer Datendichte
4. Verwendung des gefilterten Datensatzes für das Training
5. Model-Based RFL ...

Anstatt seltene Zustände komplett zu entfernen, könnte man sie auch weniger stark gewichten?

---

## 📦 **Imports**

- `tensorflow` as `tf` for building and training machine learning models

- `numpy` as `np` for numerical operations and handling arrays

- `random` for generating random numbers and randomizing data

- `gym` for creating and interacting with reinforcement learning environments (OpenAI)

- `pandas` as `pd` for data manipulation and analysis

- `matplotlib.colors` for color normalization in plots

- `matplotlib.pyplot` as `plt` for creating visualizations and plots

In [None]:
import tensorflow as tf
import numpy as np
import random 
import gymnasium as gym
import numpy as np
import pandas as pd
from matplotlib.colors import LogNorm
import matplotlib.pyplot as plt
from stable_baselines3 import A2C
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
import imageio

Every time the code is run, the random numbers generated will be the same, leading to **reproducible results**

In [None]:
np.random.seed (0)
random.seed(0)
tf.random.set_seed(0)

## 🌍 **Environment**

### 🚗 **Mountain Car**


<img src="https://gymnasium.farama.org/_images/mountain_car.gif" alt="Mountain Car GIF" width="400" >

Gym Environment -> Mountain Car MDP [**(Documentation)**](https://gymnasium.farama.org/environments/classic_control/mountain_car/)
- **Goal**: Reach the goal state at the top of the right hill
- **Problem**: The car's engine is not strong enough to scale the mountain in a single pass. Therefore, the only way to succeed is to drive back and forth to build up momentum
- **Starting Position**: Car starts stochastically at the bottom of a valley

#### **Observation Space**
- 0: position of the car along the x-axis [-1.2, 0.6]
- 1: velocity of the car [-0.07, 0.07]

#### **Action Space**

Discrete deterministic actions
- 0: Accelerate to the left
- 1: Don’t accelerate
- 2: Accelerate to the right

#### **Reward**

The goal is to reach the flag placed on top of the right hill as quickly as possible, as such the agent is penalised with a reward of -1 for each timestep.

#### **Episode End**
- **Termination**: The position of the car is greater than or equal to 0.5 (the goal position on top of the right hill)
- **Truncation**: The length of the episode is 200.





In [None]:
# for details see: https://github.com/openai/gym/blob/master/gym/envs/classic_control/mountain_car.py
CAR_POS = "carPos"
CAR_VEL = "carVel"
EPISODE = "episode"
STEP = "step"
ACTION = "action"


#OPTIONS: Set by User !!!
RENDERED = True
HEURISTIC = False


def heuristic_policy(velocity):
    if velocity >= 0.0:
        return 2 # Accelerate right
    else:
        return 0 # Accelerate left


def sample_data(episodes=1, seed=0):

    if(RENDERED): env = gym.make("MountainCar-v0", render_mode="human")
    else: env = gym.make("MountainCar-v0")
    
    env.reset()

    ### Create empty Pandas dataset
    transitions = []

    ### SAMPLE DATA
    for episode in range(episodes):
        obs = env.reset()
        step = 0
        done = False

        while step < 200 and not done:
            step += 1

            if(step == 1):
                action = env.action_space.sample()
            else:
                if (HEURISTIC): action = heuristic_policy(obs[1])
                else: action = env.action_space.sample()

            obs, reward, done, _, _ = env.step(action)

            transitions.append(
                {
                    CAR_POS: obs[0],
                    CAR_VEL: obs[1],
                    EPISODE: episode,
                    STEP: step,
                    ACTION: action,
                }
            )
        print("Steps: ", step)

    return pd.DataFrame(transitions)

df = sample_data(episodes=100, seed=0)

**Anderer Ansatz:**

-> Reward ändern. z.B. wenn Car halbwegs gute Ergebnisse erzielt (weit oben auf Hügel) Reward +1 setzen

<p style="color: red;">Zustände nahe des Gipfels sind <b>selten, aber wichtig</b> um das Auto in den Gipfelbereich zu bringen. Das Vermeiden solcher Zustände könnte dazu führen, dass das Modell nicht lernt, wie man erfolgreich den Gipfel erreicht.</p>

### 🌑 **Lunar Lander**

<img src="https://www.gymlibrary.dev/_images/lunar_lander.gif" alt="Lunar Lander GIF" width="400" >

Gym Environment -> Lunar Lander [**(Documentation)**](https://www.gymlibrary.dev/environments/box2d/lunar_lander/)

- **Ziel**: Das Ziel besteht darin, den Lander sicher auf dem Landepad zu landen.

- **Ausgangsposition**: Der Lander startet in der oberen Mitte des Ansichtsbereichs mit einer zufälligen Anfangskraft, die auf seinen Schwerpunkt angewendet wird.

#### **Beobachtungsraum**
Der Beobachtungsraum ist ein 8-dimensionaler Vektor
- [0] : x-Koordinate des Landers
- [1] : y-Koordinate des Landers
- [2] : lineare Geschwindigkeit in x-Richtung
- [3] : lineare Geschwindigkeit in y-Richtung
- [4] : Winkel
- [5] : Winkelgeschwindigkeit
- [6] : linkes Bein in Kontakt mit dem Boden (Boolean)
- [7] : rechtes Bein in Kontakt mit dem Boden (Boolean)

#### **Aktionsraum**
Vier diskrete Aktionen
- [0] : nichts tun
- [1] : linke Orientierungsturbine feuern
- [2] : Hauptturbine feuern
- [3] : rechte Orientierungsturbine feuern

#### **Belohnung**
- Die Belohnung erhöht/verringert sich, je näher/weiter der Lander dem Landepad ist.
- Die Belohnung erhöht/verringert sich, je langsamer/schneller sich der Lander bewegt.
- Die Belohnung verringert sich, je mehr der Lander geneigt ist (Winkel nicht horizontal).
- Die Belohnung erhöht sich um 10 Punkte für jedes Bein, das den Boden berührt.
- Die Belohnung verringert sich um 0,03 Punkte für jeden Frame, in dem eine Seitenturbine feuert.
- Die Belohnung verringert sich um 0,3 Punkte für jeden Frame, in dem die Hauptturbine feuert.

#### **Episodenbeendigung**
- Der Lander stürzt ab (der Landerkörper kommt mit dem Mond in Kontakt).
- Der Lander gerät außerhalb des Ansichtsbereichs (x-Koordinate ist größer als 1).
- Der Lander ist nicht wach. Laut den Box2D-Dokumenten ist ein Körper, der nicht wach ist, ein Körper, der sich nicht bewegt und mit keinem anderen Körper kollidiert.

---

In [None]:
# Setze den Environment Name
environment_name = 'LunarLander-v2'

#### 🌎 **Zufällige Action-Auswahl**

In [None]:
# Definiere die Anzahl der Episoden für die Bewertung
num_episodes = 100

# Funktion zur Bewertung des Modells
def evaluate(env, num_episodes=100):
    returns = []
    for episode in range(num_episodes):
        state, info = env.reset()
        done = False
        terminated = False
        total_reward = 0
        while not done and not terminated:
            action = env.action_space.sample()
            state, reward, done, terminated, info = env.step(action)
            total_reward += reward
        returns.append(total_reward)
        #print(f"Episode {episode + 1}: Total Reward: {total_reward}")
    
    # Durchschnittliche Rückgabe berechnen
    average_return = sum(returns) / num_episodes
    print(f"Durchschnittlicher Reward über {num_episodes} Episoden: {average_return}")
    return returns

# Modell bewerten
env = gym.make(environment_name)
returns = evaluate(env, num_episodes=num_episodes)

In [None]:
env = gym.make(environment_name, render_mode="rgb_array")
episodes = 10

# Funktion zur Aufnahme einer Episode und Speicherung als GIF
for episode in range(episodes):
    output_path = f"results/random/episode_{episode}.gif"
    state, info = env.reset()
    frames = []
    cum_reward = 0
    done = False
    
    while not done:
        # Füge das aktuelle Bild der Frames-Liste hinzu
        frames.append(env.render())
        
        # Zufällige Auswahl einer Aktion
        action = env.action_space.sample()
        state, reward, done, terminated, info = env.step(action)
        cum_reward += reward
    print('Episode:{} Kummulierter Reward:{}'.format(episode, cum_reward))
    
    with imageio.get_writer(output_path, mode='I', duration=0.03) as writer:
        for frame in frames:
            writer.append_data(frame)

env.close()

<p style="color:lightgreen;">Ergebnis:</p>  Bei der Durchführung von zehn Episoden in der <b>Lunar Lander-Umgebung</b>, in denen der Agent zufällig Aktionen auswählt, waren die Ergebnisse erwartungsgemäß schlecht. Der kumulierte Reward ist in der Regel negativ, da zufällige Aktionen selten zu einer erfolgreichen Landung führen. Diese Ergebnisse zeigen deutlich, wie wichtig durchdachte Strategien und gut trainierte Modelle für den Erfolg in komplexen Umgebungen wie Lunar Lander sind.

#### 🏋🏼 **Trainiere ein Modell mit dem A2C-Algorithmus** <b style="color:red;">(Optional)</b>

In [None]:
env = gym.make(environment_name)
env = DummyVecEnv([lambda: env])
model = A2C('MlpPolicy', env, ent_coef=0.1, verbose=1)
model.learn(total_timesteps=80000)
model.save("models/A2C_model")

#### 👨‍🏫 **Bewertung der Modelle**

In [None]:
env = gym.make(environment_name)
env = DummyVecEnv([lambda: env])

#del model #optional
#model = A2C.load('models/A2C_model', env=env)
model2 = A2C.load('models/a2c-LunarLander-v2')
# Andere Modelle zur Datengenerierung 

Durchschnitt über 100 Episoden

In [None]:
# Definiere die Anzahl der Episoden für die Bewertung
num_episodes = 100

# Funktion zur Bewertung des Modells
def evaluate_model(model, env, num_episodes=100):
    returns = []
    for episode in range(num_episodes):
        obs = env.reset()
        done = False
        total_reward = 0
        while not done:
            action, _states = model.predict(obs)
            obs, reward, done, info = env.step(action)
            total_reward += reward
        returns.append(total_reward)
        #print(f"Episode {episode + 1}: Total Reward: {total_reward}")
    
    # Durchschnittliche Rückgabe berechnen
    average_return = sum(returns) / num_episodes
    print(f"Durchschnittlicher Reward über {num_episodes} Episoden: {average_return}")
    return returns

# Modell bewerten
returns = evaluate_model(model2, env, num_episodes=num_episodes)

Speichere 10 Episoden als GIF

In [None]:
env = gym.make(environment_name, render_mode="rgb_array")
env = DummyVecEnv([lambda: env])

episodes = 10

# Funktion zur Aufnahme einer Episode und Speicherung als GIF
for episode in range(episodes):
    output_path = f"results/pretrained_model/episode_{episode}.gif"
    state = env.reset()
    frames = []
    cum_reward = 0
    done = False
    
    while not done:
        # Füge das aktuelle Bild der Frames-Liste hinzu
        frames.append(env.render())
        
        # Vorhersage der Aktion durch das Modell
        action, _states = model.predict(state)
        state, reward, done, info = env.step(action)
        cum_reward += reward
    print('Episode:{} Kummulierter Reward:{}'.format(episode, cum_reward))
    
    with imageio.get_writer(output_path, mode='I', duration=0.03) as writer:
        for frame in frames:
            writer.append_data(frame)

# Schließe das Environment
env.close()

## 💿 **Data Sampling**

### **Model Action Sampling**

In [None]:
# Konstanten für die Spaltennamen
X_POS = "xPos"
Y_POS = "yPos"
X_VEL = "xVel"
Y_VEL = "yVel"
ANGLE = "angle"
ANGULAR_VEL = "angularVel"
LEFT_LEG_CONTACT = "leftLegContact"
RIGHT_LEG_CONTACT = "rightLegContact"
EPISODE = "episode"
STEP = "step"
ACTION = "action"

def sample_data(episodes=10000):
    # Erstelle die LunarLander-Umgebung
    env = gym.make(environment_name)
    
    # Erstelle eine leere Liste zur Speicherung der Übergänge
    transitions = []

    # Sammle Daten aus der Umgebung
    for episode in range(episodes):
        # Setze die Umgebung zurück und erhalte die Anfangsbeobachtung
        obs, info = env.reset()
        step = 0
        done = False
        terminated = False

        while not done and not terminated:
            step += 1
            # Wähle eine zufällige Aktion
            action, _states = model2.predict(obs)
            print(action, _states)

            # Füge den aktuellen Zustand und die Aktion zur Liste der Übergänge hinzu
            transitions.append({
                X_POS: obs[0], 
                Y_POS: obs[1], 
                X_VEL: obs[2], 
                Y_VEL: obs[3],
                ANGLE: obs[4], 
                ANGULAR_VEL: obs[5], 
                LEFT_LEG_CONTACT: int(obs[6]), 
                RIGHT_LEG_CONTACT: int(obs[7]),
                EPISODE: episode, 
                STEP: step, 
                ACTION: int(action)
            })

            # Führe die Aktion aus und erhalte den nächsten Zustand und die Belohnung
            obs, reward, done, terminated, _ = env.step(action)
            if(terminated):
                print("Halllo")

    # Konvertiere die Liste der Übergänge in ein Pandas DataFrame
    return pd.DataFrame(transitions)

# Sammle Daten aus der LunarLander-v2 Umgebung
df = sample_data(episodes=200)
print(len(df))


### **Random Action Sampling**

In [None]:
# Konstanten für die Spaltennamen
X_POS = "xPos"
Y_POS = "yPos"
X_VEL = "xVel"
Y_VEL = "yVel"
ANGLE = "angle"
ANGULAR_VEL = "angularVel"
LEFT_LEG_CONTACT = "leftLegContact"
RIGHT_LEG_CONTACT = "rightLegContact"
EPISODE = "episode"
STEP = "step"
ACTION = "action"

def sample_data(episodes=10000):
    # Erstelle die LunarLander-Umgebung
    env = gym.make("LunarLander-v2")
    
    # Erstelle eine leere Liste zur Speicherung der Übergänge
    transitions = []

    # Sammle Daten aus der Umgebung
    for episode in range(episodes):
        # Setze die Umgebung zurück und erhalte die Anfangsbeobachtung
        obs, info = env.reset()
        step = 0
        done = False

        while not done:
            step += 1
            # Wähle eine zufällige Aktion
            action = env.action_space.sample()

            # Füge den aktuellen Zustand und die Aktion zur Liste der Übergänge hinzu
            transitions.append({
                X_POS: obs[0], 
                Y_POS: obs[1], 
                X_VEL: obs[2], 
                Y_VEL: obs[3],
                ANGLE: obs[4], 
                ANGULAR_VEL: obs[5], 
                LEFT_LEG_CONTACT: int(obs[6]), 
                RIGHT_LEG_CONTACT: int(obs[7]),
                EPISODE: episode, 
                STEP: step, 
                ACTION: action
            })

            # Führe die Aktion aus und erhalte den nächsten Zustand und die Belohnung
            obs, reward, done, truncated, _ = env.step(action)

            done = done or truncated

    # Konvertiere die Liste der Übergänge in ein Pandas DataFrame
    return pd.DataFrame(transitions)

# Sammle Daten aus der LunarLander-v2 Umgebung
df = sample_data(episodes=2000)
print(len(df))


In [None]:
df.dtypes

In [None]:
# Plot a 2D histogram of xPos vs yPos
plt.figure(figsize=(10, 7))
h = plt.hist2d(df[X_POS], df[Y_POS], bins=40, norm=LogNorm(), cmap='Reds')
plt.colorbar(h[3])
plt.grid(True)
plt.xlabel("X Position")
plt.ylabel("Y Position")
plt.title("2D Histogram of X Position vs Y Position")
plt.show()


In [None]:
df.plot(subplots=True, figsize=(10,15), grid=True)

## ✂️ **Cut Out Data**

TODO

## 📊 **Data density**

TODO

## 🗺️ **Model Based RFL**

sss

In [None]:
def create_training_data(data, input_col, target_col, window_size=1, training_pattern_percent=0.7):
    
    data_train = data

    mean_in, std_in = mean_and_std(input_col, data_train)
    mean_out, std_out = mean_and_std(target_col, data_train)
    
    print(f"mean in = {mean_in}" )
    print(f"std in = {std_in}")
    print(f"mean out =  {mean_out}")
    print(f"std out = {std_out}")

    grouped = data_train.groupby(['episode'])

    inputs_all = []
    labels_all = []

    for g in grouped:
        g = g[1].sort_values(by='step')

        past_history = window_size
        future_target = 0
        STEP = 1

        inputs, labels = multivariate_data(
            dataset=g[input_col][:].values,
            target=g[target_col][:].values,
            start_index=0,
            end_index=g[input_col][:].values.shape[0] - future_target,
            history_size=past_history,
            target_size=future_target,
            step=STEP,
            single_step=True
        )

        for i in range(len(inputs)):
            inputs_all.append(inputs[i])
            labels_all.append(labels[i])
  
    length = len(inputs_all)

    c = list(zip(inputs_all, labels_all))
    np.random.shuffle(c)
    inputs_all, labels_all = zip(*c)

    split = int(training_pattern_percent * length)

    inputs_all = np.array(inputs_all)
    labels_all = np.array(labels_all)

    return ((inputs_all[:split], labels_all[:split]), (inputs_all[split:], labels_all[split:])), mean_in, std_in, mean_out, std_out

def mean_and_std(columns, data):
    mean = np.zeros(len(columns))
    std = np.zeros(len(columns))
    for index, c in enumerate(columns):
        mean[index], std[index] = get_normalizations(data[c])
    return mean, std

def get_normalizations(data):
    mean = data.mean()
    std = data.std()
    return mean, std

def multivariate_data(dataset, target, start_index, end_index, history_size, target_size, step, single_step=False):
    data = []
    labels = []

    start_index = start_index + history_size
    if end_index is None:
       end_index = len(dataset) - target_size

    for i in range(start_index, end_index):
        indices = range(i - history_size, i, step)
        data.append(dataset[indices])

        if single_step:
            labels.append(target[i + target_size])
        else:
            labels.append(target[i:i + target_size])

    return np.array(data, dtype=np.float32), np.array(labels, dtype=np.float32)

def prepare_data(df, input_col, target_col, window_size, training_batch_size=50, validation_batch_size=50, training_pattern_percent=0.7):
    
    global x_train_multi, y_train_multi
    
    ((x_train_multi, y_train_multi), (x_val_multi, y_val_multi)), mean_in, std_in, mean_out, std_out = \
                                    create_training_data(df, input_col, target_col, window_size=window_size,
                                                        training_pattern_percent=training_pattern_percent)
    
    print("----")
    print(x_train_multi[0])
    print("----")
    print(y_train_multi[0])
    print("----")

    print('trainData: Single window of past history : {}'.format(x_train_multi[0].shape))
    print('trainData: Single window of future : {}'.format(y_train_multi[1].shape))
    print('valData: Single window of past history : {}'.format(x_val_multi[0].shape))
    print('valData: Single window of future : {}'.format(y_val_multi[1].shape))
    print('trainData: number of training examples: {}'.format(x_train_multi.shape))
    print('valData: number of training examples: {}'.format(x_val_multi.shape))

    train_data = tf.data.Dataset.from_tensor_slices((x_train_multi, y_train_multi))
    train_data = train_data.shuffle(x_train_multi.shape[0]).batch(training_batch_size).repeat()

    val_data = tf.data.Dataset.from_tensor_slices((x_val_multi, y_val_multi))
    val_data = val_data.batch(validation_batch_size).repeat()
    input_shape = x_train_multi[0].shape[-2:]
    print("InputShape: ", input_shape)
    return train_data, val_data, input_shape, mean_in, std_in, mean_out, std_out


In [None]:
window_size = 4
input_col = [X_POS, X_VEL, Y_POS, Y_VEL, ANGLE, ANGULAR_VEL, LEFT_LEG_CONTACT, RIGHT_LEG_CONTACT, ACTION]
target_col = [X_POS, X_VEL, Y_POS, Y_VEL, ANGLE, ANGULAR_VEL, LEFT_LEG_CONTACT, RIGHT_LEG_CONTACT]

# Vorbereitung der Daten
train_data, val_data, input_shape, mean_in, std_in, mean_out, std_out = prepare_data(
    df, input_col, target_col, window_size=window_size, training_pattern_percent=0.7
)

print("Input-Shape: ", input_shape)

In [None]:
print(train_data)
for i, (X, y) in enumerate(train_data):
    print(f"Element {i+1}:")
    print("X (Input):")
    print(X.numpy())
    print("y (Target):")
    print(y.numpy())
    break

### **Create Lunar-Lander state-transition model**

In [None]:
class NormalizeLayer(tf.keras.layers.Layer):
    def __init__(self, mean, std, **kwargs):
        super(NormalizeLayer, self).__init__(**kwargs)
        self.mean = tf.constant(mean, dtype=tf.float32)
        self.std = tf.constant(std, dtype=tf.float32)

    def call(self, inputs):
        return (inputs - self.mean) / self.std

    def get_config(self):
        config = super(NormalizeLayer, self).get_config()
        config.update({
            'mean': self.mean.numpy().tolist(),
            'std': self.std.numpy().tolist(),
        })
        return config

# Registrieren Sie Ihre benutzerdefinierte Schicht
tf.keras.utils.get_custom_objects()['NormalizeLayer'] = NormalizeLayer

def build_single_step_model(mean_in, std_in, mean_out, std_out, input_shape, optimizer=tf.keras.optimizers.RMSprop()):
    print(f"mean = {mean_in}, std = {std_in}, mean = {mean_out}, std = {std_out}")
    single_step_model = tf.keras.models.Sequential()
    
    # Ersetzen Sie den Lambda-Layer durch den benutzerdefinierten NormalizeLayer
    # Erhöhen Sie die Modellkapazität mit zusätzlichen LSTM- und Dense-Schichten
    single_step_model.add(tf.keras.layers.LSTM(150, return_sequences=True, dtype=np.float32))
    single_step_model.add(tf.keras.layers.LSTM(100, dtype=np.float32))
    single_step_model.add(tf.keras.layers.Dense(100, activation="relu"))
    single_step_model.add(tf.keras.layers.Dense(len(mean_out), activation="linear"))
    optimizer = tf.keras.optimizers.RMSprop(learning_rate=1e-4)  # Niedrigere Lernrate
    single_step_model.compile(optimizer=optimizer, loss="mse")

    return single_step_model


In [None]:
modelpath = "model.keras"
max_epochs = 1000
steps_per_epoch = 500
validation_steps = 100
validation_freq = 1

# Callbacks
es_callback = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=50, restore_best_weights=True, verbose=True)
mc_trainLoss_callback = tf.keras.callbacks.ModelCheckpoint(filepath="%s_bestTrainLoss.keras" % modelpath, monitor='loss', verbose=1, save_best_only=True, mode='min')
mc_valLoss_callback = tf.keras.callbacks.ModelCheckpoint(filepath="%s_bestValLoss.keras" % modelpath, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="./model_logs_tb", histogram_freq=1)

# Modell erstellen
step_model = build_single_step_model(mean_in, std_in, mean_out, std_out, input_shape)

# Modell trainieren
history = step_model.fit(train_data, epochs=max_epochs, steps_per_epoch=steps_per_epoch,
                        validation_data=val_data, validation_steps=validation_steps, validation_freq=validation_freq,
                        callbacks=[mc_trainLoss_callback, mc_valLoss_callback, es_callback, tensorboard_callback])

print(history)

# Modell speichern
step_model.save(modelpath)

Evaluate model quality

In [None]:
dfEval = sample_data(episodes=1)
#dfEval.describe()
dfEval = dfEval[dfEval.episode==0]

#row_max_steps = dfEval[dfEval.step == dfEval.step.max()]
#dfEval = dfEval[dfEval.episode==int(row_max_steps.episode)]

In [None]:
print(dfEval)

In [None]:
output_min = y_train_multi.min(axis=0)
output_max = y_train_multi.max(axis=0)
print ("min(output)_data: ", output_min)
print ("max(output)_data: ", output_max)

In [None]:
import collections

# Aktivieren Sie unsichere Deserialisierung, wenn Sie dem Modell vertrauen
#tf.keras.config.enable_unsafe_deserialization()

# Modellpfad (Stellen Sie sicher, dass dies den richtigen Pfad und Dateinamen hat)
modelpath = "model.keras_bestValLoss.keras"

# Modell laden
model = tf.keras.models.load_model(modelpath, compile=False)

# FIFO-Puffer, der den Zustand des neuronalen Netzwerks speichert
stateBuffer = collections.deque(maxlen=window_size)

# Hier werden die Ausgaben des neuronalen Netzwerks gespeichert
transitions = []

# Annahme: dfEval ist ein DataFrame, das die Evaluierungsdaten enthält
for i in range(len(dfEval)): 

    
                            
    # Initialisierung des ersten Zustands
    if i < window_size: 
        state_data = np.float32([dfEval[X_POS].values[i], dfEval[Y_POS].values[i],
                               dfEval[X_VEL].values[i], dfEval[Y_VEL].values[i],
                               dfEval[ANGLE].values[i], dfEval[ANGULAR_VEL].values[i],
                               dfEval[LEFT_LEG_CONTACT].values[i], dfEval[RIGHT_LEG_CONTACT].values[i],
                               dfEval[ACTION].values[i]])
        stateBuffer.append(state_data)
        #print("Filling initState: %s" % state_data)
    
    # Vorhersage des Nachfolgezustands
    else: 
        
        ###########################
        # Abrufen der Vorhersage vom neuronalen Netzwerk
        ###########################
        # Vorhersage des Nachfolgezustands
        state = np.array([list(stateBuffer)])

        print(state)

        netOutput = model.predict(np.float32(state))[0]
        print(netOutput)
        
        if i == 5:
            print(state)
        netOutput = model.predict(np.float32(state))[0]
        print(f"Predicted Output: {netOutput}")


        # Ausgabe auf die Beobachtungsdaten beschränken
        netOutput = np.clip(netOutput, output_min, output_max)
        
        # Überprüfen, ob der Wert die Grenze erreicht hat
        #if np.any(netOutput == output_min) or np.any(netOutput == output_max):
        #    print("Bound-hit at step: ", i, " => terminating further evaluation")
        #    break
        
        # Hinzufügen von Plot-Daten
        transitions.append({
            X_POS: netOutput[0], Y_POS: netOutput[1],
            X_VEL: netOutput[2], Y_VEL: netOutput[3],
            ANGLE: netOutput[4], ANGULAR_VEL: netOutput[5],
            LEFT_LEG_CONTACT: netOutput[6], RIGHT_LEG_CONTACT: netOutput[7]
        })
        
        # Aktualisieren des RNN-Zustands
        stateBuffer.append(np.float32([netOutput[0], netOutput[1], 
                                       netOutput[2], netOutput[3],
                                       netOutput[4], netOutput[5],
                                       netOutput[6], netOutput[7],
                                       dfEval[ACTION].values[i]]))
        
# Erstellen des DataFrames aus den Vorhersagen
dfNet = pd.DataFrame(transitions)

In [None]:
# Beispielhafte Datenfelder für LunarLander
fields = [X_POS, Y_POS, X_VEL, Y_VEL, ANGLE, ANGULAR_VEL, LEFT_LEG_CONTACT, RIGHT_LEG_CONTACT]

# Plot erstellen
fig, axs = plt.subplots(8, 1, figsize=(12, 20))

# Für jede der relevanten Beobachtungen plotten
for i in range(len(fields)):
    f = fields[i]
    if f in dfEval.columns and f in dfNet.columns:
        axs[i].plot(range(len(dfNet)), dfEval[f].values[window_size:window_size+len(dfNet)], label=f"True {f}")
        axs[i].plot(range(len(dfNet)), dfNet[f].values, label=f"Predicted {f}", ls="--")
        axs[i].grid()
        axs[i].legend(loc="best")
    else:
        axs[i].plot([], [], label=f"Field {f} not available")
        axs[i].grid()
        axs[i].legend(loc="best")

plt.tight_layout()
plt.show()


In [None]:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import DataLoader, TensorDataset


In [None]:
class TransformerModel(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_layers, output_dim):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Linear(input_dim, model_dim)
        self.transformer = nn.Transformer(d_model=model_dim, nhead=num_heads, num_encoder_layers=num_layers, num_decoder_layers=num_layers)
        self.fc = nn.Linear(model_dim, output_dim)
        
    def forward(self, src, tgt):
        src = self.embedding(src)
        tgt = self.embedding(tgt)
        output = self.transformer(src, tgt)
        output = self.fc(output)
        return output


In [None]:
# Hyperparameters
input_dim = input_data.shape[1]
model_dim = 64
num_heads = 4
num_layers = 2
output_dim = output_data.shape[0]
learning_rate = 0.001
num_epochs = 100

# Instantiate model, loss function, and optimizer
model = TransformerModel(input_dim, model_dim, num_heads, num_layers, output_dim)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    model.train()
    for src, tgt in dataloader:
        optimizer.zero_grad()
        # Assuming we use src and tgt as both input and target here
        output = model(src, src)
        loss = criterion(output.squeeze(0), tgt.squeeze(0))
        loss.backward()
        optimizer.step()
    
    if epoch % 10 == 0:
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}')

print('Training complete!')


---

# **Autoren**: Maximilian Schieder, Leon Lantz