In [1]:
#!pip install gym

## Q-Network

Neural Network 에 기반하여 DeepLearning 알고리즘을 사용해 푼다.  

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow.compat.v1 as tf

import random
import gym

In [2]:
env = gym.make('CartPole-v1')

## Set The Variables for Network

Number of Each Network's input, output

In [3]:
n_input = env.observation_space.shape[0] #4
n_actions = env.action_space.n #2
n_output = n_actions

n_hidden1 = 100
n_hidden2 = 50

Train 을 위한 Parameters (데이터 셋)

In [4]:
tf.disable_eager_execution()
x = tf.placeholder(tf.float32, [None, n_input])
y = tf.placeholder(tf.float32, [None, n_output])

Network 에 대해 Weight and Biase 정의 (Hidden1, Hidden2, Output)

In [5]:
weights = {
    'hidden1' : tf.Variable(tf.random_normal([n_input, n_hidden1], stddev = 0.01)),
    'hidden2' : tf.Variable(tf.random_normal([n_hidden1, n_hidden2], stddev = 0.01)),
    'output' : tf.Variable(tf.random_normal([n_hidden2, n_output], stddev = 0.01))
}

biases = {
    'hidden1' : tf.Variable(tf.random_normal([n_hidden1], stddev = 0.01)),
    'hidden2' : tf.Variable(tf.random_normal([n_hidden2], stddev = 0.01)),
    'output' : tf.Variable(tf.random_normal([n_output], stddev = 0.01))
}

Instructions for updating:
If using Keras pass *_constraint arguments to layers.


## Define Network

Network 정의하기  
> Input -> Hidden(relu) -> Hidden(relu) -> Output

In [6]:
def Qnet(x, weights, biases):
    hidden1 = tf.add(tf.matmul(x, weights['hidden1']), biases['hidden1'])
    hidden1 = tf.nn.relu(hidden1)
    
    hidden2 = tf.add(tf.matmul(hidden1, weights['hidden2']), biases['hidden2'])
    hidden2 = tf.nn.relu(hidden2)
    
    output = tf.add(tf.matmul(hidden2, weights['output']), biases['output'])
    return output

Computation Graph 정의 하기

In [7]:
Qpred = Qnet(x, weights, biases)  # Q Network 에 기반한 Input X 에 대한 예측값
loss = tf.reduce_mean(tf.square(y - Qpred)) # Deep Learning Framework 에 따라 정의된 Network의 Loss.. (Mean Square Loss)

LR = 0.001 # Learning Rate.. 작게 할 수록 더 자세히 한다.
optm = tf.train.AdamOptimizer(LR).minimize(loss) # 최적화기.. Gradient Descent 기법 등을 활용하여 Loss에 대한 최적의 값을 찾는다. 

In [8]:
sess = tf.Session()
init = tf.global_variables_initializer()
sess.run(init)

In [9]:
state = env.reset()
print(state)

state = np.reshape(state, [1, 4])
print(state)

Q = sess.run(Qpred, feed_dict={x:state})
print(Q)

[ 0.02669913  0.00936279 -0.04171726 -0.00492669]
[[ 0.02669913  0.00936279 -0.04171726 -0.00492669]]
[[-0.01127372  0.00346072]]


## Train Using Q Network

1. setting Variables.. (Not Q-table -> Will update Weight and Biases)


2. Get a Sample of States..


3. Train With Q-Network While Episode done
    1. Sample State에 대한 Q-Network의 예측값을 저장한다. 이때, 예측값은 두 Action에 대한 Q-value
    2. Exploitation VS Exploration 으로 현 State에 따른 다음 Action을 결정
    3. 결정된 Action에 따른 state, reward, done, 을 저장
    4. Q-function에 따라 다음 State에 대한 Q 값을 저장한다. 
    5. 최적화기 를 통해서 weight and Bias 업데이트 한다.

In [10]:
gamma = 0.9

for episode in range(501):
    
    done = False
    state = env.reset()
    
    count = 0
    
    while not done:
        
        count += 1
        state = np.reshape(state, [1, 4])
        Q = sess.run(Qpred, feed_dict = {x:state})
        
        epsilon = 0.1
        if np.random.uniform() < epsilon:
            a = env.action_space.sample()
        else:
            a = np.argmax(Q)
            
        next_state, reward, done, _ = env.step(a)
        
        if done:
            reward = -200
            Q[0][a] = reward
        else:
            next_state = np.reshape(next_state, [1, n_input])
            next_Q = sess.run(Qpred, feed_dict = {x:next_state})
            Q[0][a] = reward + gamma*np.max(next_Q)
        
        sess.run(optm, feed_dict = {x: state, y: Q})
        state = next_state
    
    if episode % 100 == 0:
        print("Episode: {} steps: {}".format(episode, count))

env.close()

Episode: 0 steps: 10
Episode: 100 steps: 14
Episode: 200 steps: 58
Episode: 300 steps: 40
Episode: 400 steps: 83
Episode: 500 steps: 18


## Test Q Network

In [13]:
state = env.reset()

done = False

while not done:
    env.render()
    
    state = np.reshape(state, [1, n_input])
    Q = sess.run(Qpred, feed_dict={x:state})
    action = np.argmax(Q)
    
    next_state, reward, done, _ = env.step(action)
    state = next_state
    
    
env.close()

## DQN

DQN Process 
![DQN](resource/DQN.png)
Buffer (Experience Replay)를 사용하여 업데이트 한다. -> 더 효율적. 



In [14]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow.compat.v1 as tf

import random
from collections import deque
import gym
%matplotlib inline

In [15]:
env = gym.make('CartPole-v1')

In [16]:
n_input = env.observation_space.shape[0] #4
n_actions = env.action_space.n #2
n_output = n_actions

n_hidden1 = 100
n_hidden2 = 50

In [17]:
tf.disable_eager_execution()
x = tf.placeholder(tf.float32, [None, n_input])
y = tf.placeholder(tf.float32, [None, n_output])

In [18]:
weights = {
    'hidden1' : tf.Variable(tf.random_normal([n_input, n_hidden1], stddev = 0.01)),
    'hidden2' : tf.Variable(tf.random_normal([n_hidden1, n_hidden2], stddev = 0.01)),
    'output' : tf.Variable(tf.random_normal([n_hidden2, n_output], stddev = 0.01))
}

biases = {
    'hidden1' : tf.Variable(tf.random_normal([n_hidden1], stddev = 0.01)),
    'hidden2' : tf.Variable(tf.random_normal([n_hidden2], stddev = 0.01)),
    'output' : tf.Variable(tf.random_normal([n_output], stddev = 0.01))
}

In [19]:
def Qnet(x, weights, biases):
    hidden1 = tf.add(tf.matmul(x, weights['hidden1']), biases['hidden1'])
    hidden1 = tf.nn.relu(hidden1)
    
    hidden2 = tf.add(tf.matmul(hidden1, weights['hidden2']), biases['hidden2'])
    hidden2 = tf.nn.relu(hidden2)
    
    output = tf.add(tf.matmul(hidden2, weights['output']), biases['output'])
    return output

In [20]:
Qpred = Qnet(x, weights, biases)
loss = tf.reduce_mean(tf.square(y - Qpred))

LR = 0.001
optm = tf.train.AdamOptimizer(LR).minimize(loss)

In [21]:
sess = tf.Session()
init = tf.global_variables_initializer()
sess.run(init)

## Experienc Replay

- Learning from batches of consecutive smaples is problematic! (연속적인 sample을 가지고 Batch 프로세스를 진행하면 문제점이 생긴다.)
    - Samples are correlated -> inefficient learning..! (샘플이 서로 연관되있다.)
    - Current Q-network parameters determines next training smaples.. 현재 State에대한 Network parameter들이 다음 State를 결정하기 때문에 결과가 치우칠 수 있다. -> Lead to Bad Feed back


- Address these problems using experience replay
    - Continually update a replay mermory table of transition..! 매 State의 전이 마다 상황을 저장한다.
    - Random minibatch of transition을 추출하여 Q-network 을 Train 한다. 
    

- Stabilize! and Improve DQN Training procedure

## Train Using Q Network

1. setting Variables.. and Define replay_buffer(list) (Not Q-table -> Will update Weight and Biases)


2. Get a Sample of States..


3. Train With Q-Network While Episode done
    1. Sample State에 대한 Q-Network의 예측값을 저장한다. 이때, 예측값은 두 Action에 대한 Q-value
    2. Exploitation VS Exploration 으로 현 State에 따른 다음 Action을 결정
    3. 결정된 Action에 따른 state, reward, done, 을 replay_buffer에 리스트로 저장
    4. n번째 에피소드 마다! minibatch를 랜덤으로 뽑아내어 Q-Network을 Train 한다.

In [26]:
replay_buffer = []
n_buffer = 5000
n_batch = 10 #10개씩 샘플링하여 배치프로세스를 진행한다.

gamma = 0.9

for episode in range(2001):
    
    done = False
    state = env.reset()
    
    count = 0
    
    while not done:
        
        count += 1
        state = np.reshape(state, [1, 4])
        Q = sess.run(Qpred, feed_dict = {x:state})
        
        epsilon = 0.1
        if np.random.uniform() < epsilon:
            a = env.action_space.sample()
        else:
            a = np.argmax(Q)
            
        next_state, reward, done, _ = env.step(a)
        
        if done:
            reward = -200
#             Q[0][a] = reward
#             next_state = np.zeros(state.shape) # 다음 시뮬레이션에서 없음
        
        # 버퍼에 저장!
        replay_buffer.append([state, a, reward, next_state, done]) 
        
        # 개수가 넘으면 예전것 삭제
        if len(replay_buffer) > n_buffer:
            replay_buffer.pop(0)
        
        state = next_state
    
    if episode % n_batch == 1:
        for _ in range(50):
            # learning
            if len(replay_buffer) < n_batch:
                break

            minibatch = random.sample(replay_buffer, n_batch)

            x_stack = np.empty(0).reshape(0, n_input)
            y_stack = np.empty(0).reshape(0, n_output)

            for state, action, reward, next_state, done in minibatch:
                Q = sess.run(Qpred, feed_dict = {x: np.reshape(state, [1, n_input])})

                if done:
                    Q[0][action] = reward
                else:
                    next_Q = sess.run(Qpred, feed_dict = {x: np.reshape(next_state, [1, n_input])})
                    Q[0][action] = reward + gamma*np.max(next_Q)

                x_stack = np.vstack([x_stack, state])
                y_stack = np.vstack([y_stack, Q])

            sess.run(optm, feed_dict = {x: x_stack, y: y_stack})


    if episode % 200 == 1:
        print("Episode: {} steps: {}".format(episode, count))

env.close()

Episode: 1 steps: 106
Episode: 201 steps: 127
Episode: 401 steps: 152
Episode: 601 steps: 129
Episode: 801 steps: 310
Episode: 1001 steps: 410
Episode: 1201 steps: 191
Episode: 1401 steps: 162
Episode: 1601 steps: 186
Episode: 1801 steps: 194


## Test (DQN)

In [32]:
state = env.reset()

done = False

while not done:
    env.render()
    
    state = np.reshape(state, [1, n_input])
    Q = sess.run(Qpred, feed_dict={x:state})
    action = np.argmax(Q)
    
    next_state, reward, done, _ = env.step(action)
    state = next_state
    
    
env.close()