In [43]:
import numpy as np
import sys
if "../" not in sys.path:
    sys.path.append("../") 
from keras.layers import Dense, Input
from keras.optimizers import Adam
from keras.models import Sequential, Model
from keras import backend as K
import gym

In [41]:
class PPO:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.value_size = 1
        
        self.gamma = 0.98 # discount factor
        self._lambda = 0.95
        self.lr = 0.0005
        self.epsilon = 0.1
        self.K_epoch = 3
        self.T = 20
        self.model = self.build_model()
        self.optimizer = self.optimizer()
        
        self.data = []
    # policy와 value function의 parameter를 공유하는 모델
    def build_model(self):
        input_layer = Input(shape=(self.state_size,))
        x = Dense(32, activation='relu',
                 kernel_initializer='he_uniform')(input_layer)
        x = Dense(16, activation='relu',
                 kernel_initializer='he_uniform')(x)
        x = Dense(8, activation='relu',
                 kernel_initializer='he_uniform')(x)
        policy_out = Dense(self.action_size, activation='softmax',
                      kernel_initializer='he_uniform')(x)
        value_out = Dense(self.value_size, activation='linear',
                      kernel_initializer='he_uniform')(x)
        
        return Model(inputs=input_layer, outputs=[policy_out, value_out])
        
    def optimizer(self):
        old_a_prob = K.placeholder(shape=[None, ])
        advantage = K.placeholder(shape=[None, ])
        td_target = K.placeholder(shape=[None, ])
        action = K.placeholder(shape=[None, 2])
        a_prob = K.sum(action * self.model.output[0], axis=1)
        
        # a/b == exp(log(a)-log(b))
        ratio = K.exp(K.log(a_prob) - K.log(old_a_prob))
        
        surr1 = ratio * advantage
        surr2 = K.clip(ratio, 1-self.epsilon, 1+self.epsilon) * advantage
        
        td_error = K.sum(td_target - self.model.output[1], axis=1)
        
        loss = -K.min([surr1, surr2]) + K.l2_normalize(td_error)
        
        optimizer = Adam(lr=self.lr)
        updates = optimizer.get_updates(self.model.trainable_weights, [], loss)
        train = K.function([self.model.input, action, old_a_prob,\
                           advantage, td_target], [],
                           updates=updates)

        return train
    
    def train_model(self):
        state, action, reward, next_state, old_a_prob, done = self.make_batch()
        
        state = np.reshape(np.array(state), [-1,4])
        next_state = np.reshape(np.array(next_state), [-1,4])
        for _ in range(self.K_epoch):
            td_target = reward + self.gamma *\
                        self.model.predict(next_state)[1][0] * done
            delta = td_target - self.model.predict(state)[1][0]

            advantage_lst = []
            advantage = 0.0
            for delta_t in delta[::-1]:
                advantage = self.gamma * self._lambda * advantage + delta_t
                advantage_lst.append(advantage)
            advantage_lst.reverse()

            self.optimizer([state, action, old_a_prob, advantage, td_target])
        
    # transition : (state, action, reward, next_state, action_prob, done)
    def append_sample(self, transition):
        self.data.append(transition)
        
    def make_batch(self):
        state_lst, action_lst, reward_lst, next_state_lst,\
        a_prob_lst, done_list = [], [], [], [], [], []
        
        for transition in self.data:
            state, action, reward, next_state, a_prob, done = transition
            
            state_lst.append(state[0])
            act = np.zeros(self.action_size)
            act[action] = 1
            action_lst.append(act)
            reward_lst.append(reward)
            next_state_lst.append(next_state[0])
            a_prob_lst.append(a_prob)
            done_mask = 0 if done else 1
            done_list.append(done_mask)
            
        self.data = []
        return state_lst, action_lst, reward_lst, next_state_lst,\
                a_prob_lst, done_list
            
        
    def get_action_prob(self, state):
        prob = self.model.predict(state)[0][0]
        return prob
    

In [44]:
if __name__=="__main__":
    env = gym.make('CartPole-v1')
    agent = PPO(4, 2)
    
    global_step = 0
    scores, episodes = [], []
    EPISODES = 2000
    for e in range(EPISODES):
        done = False
        score = 0
        state = env.reset()
        state = np.reshape(state, [1,4])
        
        while not done:
            #env.render()
            for t in range(agent.T):
                global_step += 1

                a_prob = agent.get_action_prob(state)
                action = np.random.choice(2, 1, p=a_prob)[0]
                next_state, reward, done, _ = env.step(action)
                next_state = np.reshape(next_state, [1,4])
                reward = reward if not done or score == 499 else -100
                agent.append_sample((state, action, reward,
                                    next_state, a_prob[action], done))

                score += reward
                state = next_state

                if done:
                    break
                    
            agent.train_model()
            score = score if score == 500.0 else score + 100
            scores.append(score)
            episodes.append(e)
            score = round(score, 2)
            print("episode: ", e, " score: ", score, " time_step: ", global_step)

            # 최근 10번의 에피소드의 평균점수가 490이 넘으면 종료
            if np.mean(scores[-min(10, len(scores)):]) > 490:
                sys.exit()

KeyboardInterrupt: 

In [5]:
b = np.array([[1],[2],[3]])
print(b.shape)
c = [[0.4]]
np.sum(b - c, axis=1).shape

(3, 1)


(3,)

In [25]:
input_layer = Input(shape=(4,))
x = Dense(32, activation='relu',
         kernel_initializer='he_uniform')(input_layer)
x = Dense(16, activation='relu',
         kernel_initializer='he_uniform')(x)
x = Dense(8, activation='relu',
         kernel_initializer='he_uniform')(x)
policy_out = Dense(2, activation='softmax',
              kernel_initializer='he_uniform')(x)
value_out = Dense(1, activation='linear',
              kernel_initializer='he_uniform')(x)

model1 = Model(inputs=input_layer, outputs=[policy_out, value_out])

In [26]:
env = gym.make('CartPole-v1')
state = env.reset()
state = np.reshape(state, [1,4])

In [30]:
model1.predict(state)[1][0] * [1, 0, 1, 0]

array([0.02617052, 0.        , 0.02617052, 0.        ])