<a href="https://colab.research.google.com/github/dantae74/Reinforcement-Learning/blob/main/06-05-DQN-CartPole-2013.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

모두를 위한 머신러닝에서 가져왔습니다.

In [1]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import random
import gym
import numpy as np
from collections import deque
import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam, RMSprop

In [2]:
env = gym.make('CartPole-v0')
# env = gym.wrappers.Monitor(env, directory="gym-results/", force=True)

# Constants defining our neural network
INPUT_SIZE = env.observation_space.shape[0]
OUTPUT_SIZE = env.action_space.n

DISCOUNT_RATE = 0.99
REPLAY_MEMORY = 50000
BATCH_SIZE = 64
MAX_EPISODE = 5000
TRAIN_START = 1000


# minimum epsilon for epsilon greedy
MIN_E = 0.0
# epsilon will be `MIN_E` at `EPSILON_DECAYING_EPISODE`
EPSILON_DECAYING_EPISODE = MAX_EPISODE * 0.01

In [3]:
def OurModel(input_shape, action_space):
    X_input = Input(input_shape)

    # 'Dense' is the basic form of a neural network layer
    # Input Layer of state size(4) and Hidden Layer with 512 nodes
    X = Dense(512, input_shape=input_shape, activation="relu", kernel_initializer='he_uniform')(X_input)

    # Hidden layer with 256 nodes
    X = Dense(256, activation="relu", kernel_initializer='he_uniform')(X)
    
    # Hidden layer with 64 nodes
    X = Dense(64, activation="relu", kernel_initializer='he_uniform')(X)

    # Output Layer with # of actions: 2 nodes (left, right)
    X = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(X)

#     model = Model(inputs = X_input, outputs = X, name='CartPole DQN model')
    model = Model(inputs = X_input, outputs = X)
    model.compile(loss="mse", optimizer=RMSprop(lr=0.00025, rho=0.95, epsilon=0.01), metrics=["accuracy"])

    model.summary()
    return model

In [4]:
class DQN:
    def __init__(self, input_size, output_size, name = "main"):
        self.state_size = input_size
        self.action_size = output_size
        self.net_name = name
        
        self.model = OurModel(INPUT_SIZE, OUTPUT_SIZE)
    
    def predict(self, state):
        return self.model.predict(state)

In [5]:
def replay(mainDQN, train_batch):

    states = np.vstack(x[0] for x in train_batch)
    actions = np.array(x[1] for x in train_batch)
    rewards = np.varray(x[2] for x in train_batch)
    next_states = np.vstack(x[3] for x in train_batch)
    dones = np.array(x[4] for x in train_batch)

    target = mainDQN.predict(states)
    target_next = mainDQN.predict(next_states)

    for i in range(BATCH_SIZE):
      if done[i]:
        target[i][actions[i]] = reward[i]
      else:
        target[i][actions[i]] = rewards[i] + DISCOUNT_RATE * np.max(target_next[i])

    mainDQN.fit(state, target,batch_size=BATCH_SIZE, verbose=0)

In [6]:
def run(mainDQN):
  state = env.reset()
  total_reward = 0

  while True:
    env.render()
    action = np.argmax(mainDQN.predict(state))
    state, reward, done, info = env.step(action)
    total_reward += reward

    if done:
      print("Total score: {}".format(total_reward))
      break

In [7]:
def annealing_epsilon(episode: int, min_e: float, max_e: float, target_episode: int) -> float:
    """Return an linearly annealed epsilon
    Epsilon will decrease over time until it reaches `target_episode`
         (epsilon)
             |
    max_e ---|\
             | \
             |  \
             |   \
    min_e ---|____\_______________(episode)
                  |
                 target_episode
     slope = (min_e - max_e) / (target_episode)
     intercept = max_e
     e = slope * episode + intercept
    Args:
        episode (int): Current episode
        min_e (float): Minimum epsilon
        max_e (float): Maximum epsilon
        target_episode (int): epsilon becomes the `min_e` at `target_episode`
    Returns:
        float: epsilon between `min_e` and `max_e`
    """

    slope = (min_e - max_e) / (target_episode)
    intercept = max_e

    return max(min_e, slope * episode + intercept)

In [8]:
def main():
  replay_buffer = deque(maxlen=REPLAY_MEMORY)
  last_100_game_reward = deque(maxlen=100)

  mainDQN = DQN(INPUT_SIZE, OUTPUT_SIZE)

  for episode in range(MAX_EPISODES):
    e = annealing_epsilon(episode, MIN_E, 1.0, EPSILON_DECAYING_EPISODE)
    done = False
    state = env.reset()

    step_count = 0
    while not done:
      if np.random.rand() < e:
        action = env.action_space.sample()
      else:
        action = np.argmax(mainDQN.predict(state))

      next_state, reward, done, info = env.step(action)

      if done:
        reward = -1
      
      replay_buffer.append(state, action, reward, next_state, done)

      state = next_state
      step_count += 1

      if len(replay_buffer) > TRAIN_START:
        minibatch = random.sample(replay_buffer, BATCH_SIZE)
        replay(mainDQN, minibatch)

    print("[Episode {:>5}] steps: {:>5} e: {:>5.2f}".format(episode, step_count, e))


    last_100_game_reward.append(step_count)
    if len(last_100_game_reward) == last_100_game_reward.maxlen:
        avg_reward = np.mean(last_100_game_reward)
        if avg_reward > 199.0:
            print("Game Cleared within {} episodes with avg reward {}".format(episode, avg_reward))
            break

In [10]:
if __name__ == "__main__":
  main()

TypeError: 'int' object is not iterable