In [1]:
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import Adam, RMSprop
from collections import deque
import gym
import numpy as np
env = gym.make('CartPole-v0')
import random

In [2]:
#parameters
AGENT_MEMORY = 50000
UPDATE_AFTER_EPISODES = 5
HIDDEN_LAYERS= [64, 32]
BATCH_SIZE = 64
DISCOUNT = 0.99
LEARNING_RATE = 0.001
LOSS_FUNCTION = 'mse'
AVG_OF_LAST = 10
OPTIMIZER = Adam(lr=LEARNING_RATE)
RENDER_AFTER_EPISODES = 25
q_new = lambda q_max, reward: (reward + DISCOUNT * q_max)
TOTAL_EPISODES = 2500
MAX_EPSILON = 1
EPSILON_DECAY = 0.99975
MIN_EPSILON = 0.001

In [3]:
class Agent:
    def __init__(self):
        
        self.state = env.reset().tolist()
        self.done = False
        self.score = 0
        self.render = True
        self.epsilon = MAX_EPSILON
        self.average_reward = 0
        self.present_q_model = Sequential()
        self.present_q_model.add(Dense(HIDDEN_LAYERS[0],
                                       input_shape=(env.observation_space.low.size,),
                                       activation='relu'))
        for index in range(1, len(HIDDEN_LAYERS)):
            self.present_q_model.add(Dense(HIDDEN_LAYERS[index],activation='relu'))
        self.present_q_model.add(Dense(env.action_space.n,activation='linear'))
        self.present_q_model.compile(loss=LOSS_FUNCTION, optimizer=OPTIMIZER,
                                     metrics=['accuracy'])
        
        self.future_q_model = Sequential()
        self.future_q_model.add(Dense(HIDDEN_LAYERS[0],
                                      input_shape=(env.observation_space.low.size,),
                                      activation='relu'))
        for index in range(1, len(HIDDEN_LAYERS)):
            self.future_q_model.add(Dense(HIDDEN_LAYERS[index],activation='relu'))
        self.future_q_model.add(Dense(env.action_space.n,activation='linear'))
        self.future_q_model.compile(loss=LOSS_FUNCTION, optimizer=OPTIMIZER, 
                                    metrics=['accuracy'])

        self.future_q_model.set_weights(self.present_q_model.get_weights())
        
        self.memory = deque(maxlen=AGENT_MEMORY)
        self.episodes = 0
        print(self.present_q_model.summary())
        print(self.future_q_model.summary())
        
    def train(self):
        if len(self.memory) < BATCH_SIZE :
            return
        
        batch = random.sample(self.memory, BATCH_SIZE)
        present_q_values = self.present_q_model.predict([x[1] for x in batch]).tolist()
        future_q_values = self.future_q_model.predict([x[3] for x in batch]).tolist()
        for index, slot in enumerate(batch):
            if not slot[4]:
                q_future_max = np.max(future_q_values[index])
                qnew = q_new(q_future_max, slot[2])
            else:
                qnew = slot[2]
            
            present_q_values[index][slot[0]] = qnew
            
        X = [slot[1] for slot in batch]
        Y = present_q_values
        
        history = self.present_q_model.fit(X, Y, batch_size=BATCH_SIZE,
                                           shuffle=False, verbose = 0)
        
        if self.episodes % UPDATE_AFTER_EPISODES == 0:
            _ = self.future_q_model.set_weights(self.present_q_model.get_weights())
                
    def next_action(self):
        q_values = self.present_q_model.predict([self.state]).tolist()
        
        if np.random.random() > self.epsilon:
            action = np.argmax(q_values)
        else:
            action = np.random.randint(0, env.action_space.n)
            
        new_state, reward, done, info = env.step(action)
        self.score += 1
        
        if done:
            self.episodes +=1
            if self.episodes % RENDER_AFTER_EPISODES == 0:
                self.render = True
            else:
                self.render = False
                
            if self.episodes % AVG_OF_LAST == 0:
                self.avg_reward = self.score/AVG_OF_LAST
                print(f"Average Score = {self.avg_reward}")
                self.score = 0
                        
        if self.render:
            #env.render()
            None
            
            
        self.memory.append([action, self.state, reward, new_state.tolist(), done])
        self.state = new_state.tolist() if not done else env.reset().tolist()
        
        self.train()
        if self.epsilon > MIN_EPSILON:
            self.epsilon *= EPSILON_DECAY
            self.epsilon = max(MIN_EPSILON, self.epsilon)
        
        return done
        

In [None]:
agent = Agent()
for episode in range(TOTAL_EPISODES):
    #print (f"On Episode {episode}")
    while True:
        if agent.next_action():
            break

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 64)                320       
_________________________________________________________________
dense_1 (Dense)              (None, 32)                2080      
_________________________________________________________________
dense_2 (Dense)              (None, 2)                 66        
Total params: 2,466
Trainable params: 2,466
Non-trainable params: 0
_________________________________________________________________
None
Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_3 (Dense)              (None, 64)                320       
_________________________________________________________________
dense_4 (Dense)              (None, 32)                2080      
___________________________