In [None]:
import os
import collections
import random
import gym
import numpy as np
from typing import Deque

In [None]:
from cartpoleA2CNN import Actor, Critic

In [None]:
PROJECT_PATH = os.path.abspath("C:/Selbststudium/Udemy/Udemy_AI_")
MODELS_PATH = os.path.join(PROJECT_PATH, "models")
ACTOR_PATH = os.path.join(MODELS_PATH, "actor_cartpole.h5")
CRITIC_PATH = os.path.join(MODELS_PATH, "critic_cartpole.h5")

In [None]:
class Agent:
    def __init__(self, env: gym.Env):
        # DQN Env Variables
        self.env = env
        self.num_observations = self.env.observation_space.shape
        self.num_actions = self.env.action_space.n
        self.num_values = 1
        # DQN Agent Variables
        self.replay_buffer_size = 50_000
        self.train_start = 1_000 
        self.memory = collections.deque(maxlen=self.replay_buffer_size)
        self.gamma = 0.95
        # DQN Network Variables
        self.learning_rate_actor = 1e-3
        self.learning_rate_critic = 5e-3
        self.actor = Actor(
            self.num_observations,
            self.num_actions,
            self.learning_rate_actor
        )
        self.critic = Critic(
            self.num_observations,
            self.num_values,
            self.learning_rate_critic
        )

    def get_action(self, state: np.ndarray):
        policy = self.actor(state)[0]
        action = np.random.choice(self.num_actions, p=policy)
        return action

    def train(self, num_episodes):
        last_rewards: Deque = collections.deque(maxlen=10)
        print(f"last rewards: {last_rewards}")
        for episode in range(1, num_episodes + 1):
            total_reward = 0.0
            state = self.env.reset()
            state = np.reshape(state, newshape=(1, -1)).astype(np.float32) # Wieder für TF
            
            while True:
                action = self.get_action(state)
                next_state, reward, done, _ = self.env.step(action) # ausführen des steps
                next_state = np.reshape(next_state, newshape=(1, -1)).astype(np.float32)
                
                if done and total_reward < 500: # reward = 500 --> Gewonnen
                    reward = -100 # Verloren "böse bestrafen"

                self.update_policy(state, action, reward, next_state, done)
                total_reward += reward
                state = next_state
                
                if done:
                    if total_reward < 500:
                        total_reward += 100
                    last_rewards.append(total_reward)
                    current_reward_mean = np.mean(last_rewards)
                    print(f"Episode: {episode} --- Reward: {reward} --- Mean reward: {current_reward_mean}")
                    
                    if current_reward_mean >= 400:
                        self.actor.save_model(ACTOR_PATH)
                        self.critic.save_model(CRITIC_PATH)
                        return
                    break
        self.actor.save_model(ACTOR_PATH)
        self.critic.save_model(CRITIC_PATH)

    def update_policy(self, state, action, reward, next_state, done):
        values = np.zeros(shape=(1, self.num_values))
        advantages = np.zeros(shape=(1, self.num_actions))

        value = self.critic(state)[0]
        next_value = self.critic(next_state)[0]

        if done:
            advantages[0][action] = reward - value # Formel umgesetzt
            values[0][0] = reward
        else:
            advantages[0][action] = (reward + self.gamma * next_value) - value
            values[0][0] = reward + self.gamma * next_value

        self.actor.fit(state, advantages)
        self.critic.fit(state, values)

    def play(self, num_episodes, render=True):
        self.actor.load_model(ACTOR_PATH)
        self.critic.load_model(CRITIC_PATH)
        for episode in range(1, num_episodes + 1):
            total_reward = 0.0
            state = self.env.reset()
            state = np.reshape(state, newshape=(1, -1)).astype(np.float32) # Wieder für TF
            while True:
                action = self.get_action(state)
                next_state, reward, done, _ = self.env.step(action) # ausführen des steps
                next_state = np.reshape(next_state, newshape=(1, -1)).astype(np.float32)
                total_reward += reward
                state = next_state
                if done:
                    print(f"Episode: {episode} --- Reward: {reward}")
                    break

In [None]:
if __name__ == "__main__":
    env = gym.make("CartPole-v1")
    agent = Agent(env)
    agent.train(num_episodes=100)
    input("Play?")
    agent.play(num_episodes=10, render=True)