# Aufgabe 2.3 - CartPole Umgebung

Imports:

In [None]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt

Einfache Visualisierung mit zufällig ausgewählten

In [None]:
env = gym.make("CartPole-v1", render_mode="human")

visualization_iterations = 10

for _ in range(visualization_iterations):
    observation, info = env.reset()
    
    episode_over = False
    
    while not episode_over:
        action = env.action_space.sample()  # random right or left
        observation, reward, terminated, truncated, info = env.step(action)

        episode_over = terminated or truncated

env.close()

### b) - Implementierung eines intuitiven Lösungsansatzes

Environment setup:

In [None]:
env = gym.make("CartPole-v1")

Config:

In [None]:
runs = 100
num_episodes = 200

Policy 1: Wenn der Pole nach links geneigt ist, fahre nach links. Wenn der Pole nach rechts geneigt ist, fahre nach rechts.

In [None]:
observation, info = env.reset()

rewards = np.zeros(num_episodes)

for _ in range(runs):
    
    for episode_index in range(num_episodes):

        observation, info = env.reset()

        episode_over = False
        episode_reward = 0

        while not episode_over:
            action = 1  # push cart to the right
            
            # if the pole is leaning to the left, push the cart to the left
            if observation[2] < 0:
                action = 0
            
            observation, reward, terminated, truncated, info = env.step(action)
            episode_reward += reward

            episode_over = terminated or truncated
        
        rewards[episode_index] += episode_reward

rewards_p1 = np.divide(rewards, runs)

Policy 2: Wenn der Pole nach links geneigt ist aber eine positive Winkelgeschwindigkeit hat, fahre wieder nach rechts. Analog für Neigung nach rechts.

In [None]:
observation, info = env.reset()

rewards = np.zeros(num_episodes)

for _ in range(runs):
    
    for episode_index in range(num_episodes):

        observation, info = env.reset()

        episode_over = False
        episode_reward = 0

        while not episode_over:
            action = 1  # push cart to the right
            
            # if the pole is leaning to the left, push the cart to the left
            falling_left = observation[3] < 0
            if falling_left:
                action = 0
            
            observation, reward, terminated, truncated, info = env.step(action)
            episode_reward += reward

            episode_over = terminated or truncated
        
        rewards[episode_index] += episode_reward

rewards_p2 = np.divide(rewards, runs)

Policy 3: Zusätzliche Vermeidung des "Herausfahrens" aus der Umgebung:

In [None]:
observation, info = env.reset()

rewards = np.zeros(num_episodes)

for _ in range(runs):

    for episode_index in range(num_episodes):

        observation, info = env.reset()

        episode_over = False
        episode_reward = 0

        while not episode_over:
            action = 1  # push cart to the right
            
            # if the pole is falling to the left, push the cart to the left
            # (falling meaning the angular velocity is negative)
            falling_left = observation[3] < 0
            if falling_left:
                action = 0
            
            # do not drive out of the environment
            cart_position = observation[0]
            if cart_position <= -2.2:
                action = 1
            elif cart_position >= 2.2:
                action = 0

            observation, reward, terminated, truncated, info = env.step(action)
            episode_reward += reward

            episode_over = terminated or truncated

        rewards[episode_index] += episode_reward 

rewards_p3 = np.divide(rewards, runs)

Environment schließen:

In [None]:
env.close()

Auswerten der Policy-Ergebnisse:

In [None]:
x = np.linspace(0, num_episodes, num_episodes)

plt.plot(x, rewards_p1, label = 'Policy 1')
plt.plot(x, rewards_p2, label = 'Policy 2')
plt.plot(x, rewards_p3, label = 'Policy 3')
plt.xlabel('Episode')
plt.ylabel('Average Reward per Episode')
plt.legend()
plt.show()

### c) - RL-Ansatz

ObservationWrapper erstellen:

In [None]:
import numpy as np

class DiscreteObservationWrapper(gym.ObservationWrapper):
    def __init__(self, env):
        super(DiscreteObservationWrapper, self).__init__(env)
        self.pole_pos_bins = np.array([-np.inf, 0.0, np.inf])
        self.pole_vel_bins = np.array([-np.inf, 0.0, np.inf])
        self.num_states = 4
        
        self.min_state_idx = self.get_state_index(1, 1) # since the values are always greater than negative infinity, they are always in category 1 and never in zero. Thats why category tuple (1, 1) leads to the lowest individual state index

    def get_state_index(self, pole_pos_category, pole_vel_category):
        return 2 * pole_pos_category + pole_vel_category

    def observation(self, observation):
        _, _, pole_pos, pole_vel = observation
        
        pole_pos_category = np.digitize(pole_pos, self.pole_pos_bins)
        pole_vel_category = np.digitize(pole_vel, self.pole_vel_bins)

        # basically binary "or" operation, each category gets its own bit
        state_index = self.get_state_index(pole_pos_category, pole_vel_category)

        return state_index

Den Banditen durch eine Klasse modellieren:

In [None]:
import numpy as np
import random

class StateBandit():
    def __init__(self, epsilon, initial_Q = 0.0):
        self.epsilon = epsilon
        self.Q = np.array([initial_Q, initial_Q])
        self.N = np.array([0, 0])
        self.G = 0
        self.reward_per_action = []

    def get_action(self):
        action = np.argmax(self.Q)

        # decide on exploit vs explore
        explore = random.random() <= self.epsilon
        if explore:
            action = int(random.random() >= 0.5)

        return action

    def update_metrics(self, action, reward):
        self.reward_per_action.append(reward)
        self.G += reward
        self.N[action] += 1
        self.Q[action] = self.Q[action] + ((reward - self.Q[action]) / self.N[action])

Für jeden State einen Banditen erstellen und die Simulation laufen lassen, um die Banditen anzulernen:

In [None]:
from sys import stdout # flushing stdout for progress printing

# create env with wrapper
env = gym.make('CartPole-v1')
wrapped_env = DiscreteObservationWrapper(env)

# setting up the bandits
epsilon_value = 0.1
initial_Q = 1.0
bandits = [StateBandit(epsilon_value, initial_Q) for _ in range(wrapped_env.num_states)]

# training parameters
simulation_steps = 40_000

# reset wrapped env
state_index, _ = wrapped_env.reset()
episode_over = False

# training loop
progress = -1
sim_step = 0
while sim_step < simulation_steps:
    
    new_progress = int(((sim_step + 1) * 100) / simulation_steps)
    if new_progress > progress:
        progress = new_progress
        print(f'\rProgress: {progress} %', end='')
        stdout.flush()

    bandit_index = state_index - wrapped_env.min_state_idx
    action = bandits[bandit_index].get_action()
    state_index, reward, terminated, truncated, _ = wrapped_env.step(action)

    reward = -10 if terminated else reward

    bandits[bandit_index].update_metrics(action, reward)

    episode_over = terminated or truncated
    
    if episode_over:
        sim_step += 1
        episode_over = False
        state_index, _ = wrapped_env.reset()

Plotten des Lernprozesses, Nutzung eines "Dictionarys", Banditen-Index auf die Zustandsbeschreibung zu mappen. Die Verwendung eines vollwertigen Python-Dicts ist hier nicht notwendig, da die Banditen von Index 0 aufsteigend geplottet werden. Das heißt, man kann die Zustandsbeschreibungen auch chronologisch ablegen.

In [None]:
state_index_description = ['Negative angle, negative velocity', 'Negative angle, positive velocity', 'Positive angle, Negative velocity', 'Positive angle, Positive velocity']

In [None]:
fig, ax = plt.subplots(len(bandits), figsize=(15, 15))
for bandit_index in range(len(bandits)):
    bandit = bandits[bandit_index]
    ax[bandit_index].plot(bandit.reward_per_action, label = f'State {bandit_index} ({state_index_description[bandit_index]})')
    ax[bandit_index].set_xlabel('# Actions')
    ax[bandit_index].set_ylabel('Reward per Action')
    ax[bandit_index].legend()

plt.show()

Testen, wie gut die Banditen sind:

In [None]:
num_episodes = 200
runs = 100

state_index, info = wrapped_env.reset()

rewards = np.zeros(num_episodes)

for _ in range(runs):
    
    for episode_index in range(num_episodes):

        state_index, info = wrapped_env.reset()

        episode_over = False
        episode_reward = 0

        while not episode_over:
            bandit_index = state_index - wrapped_env.min_state_idx
            action = bandits[bandit_index].get_action()

            state_index, reward, terminated, truncated, info = wrapped_env.step(action)
            episode_reward += reward

            episode_over = terminated or truncated
        
        rewards[episode_index] += episode_reward

rewards_RL = np.divide(rewards, runs)

Darstellung der Testergebnisse:

In [None]:
x = np.linspace(0, num_episodes, num_episodes)

plt.plot(x, rewards_RL, label = 'Reinforcement Learning')
plt.xlabel('Episode')
plt.ylabel('Average Reward per Episode')
plt.ylim(0, 240)
plt.legend()
plt.show()

Schließen der Environments (Da diese noch für den Test verwendet wurden, kann man sie nicht direkt nach dem Training schließen):

In [None]:
wrapped_env.close()
env.close()