In [None]:
import os
import gym
import time
import random
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from keras.models import Sequential, load_model
from keras.layers import Dense
from collections import deque
%matplotlib inline

In [None]:
EPSILON = 1.0
EPSILON_DECAY = .995
EXPLORATION_MIN = 0.01
GAMMA = 0.90
ALPHA = 0.002
MEMORY_SIZE = 10000
BATCH_SIZE = 20

#### Observation: ####
>Type: Box(4) <br>
>Num &emsp;&emsp; Observation &emsp;&emsp;&emsp;&emsp; Min &emsp;&emsp;&emsp;&emsp; Max <br>
>0   &emsp;&emsp;&emsp;&ensp; Cart Position &emsp;&emsp;&emsp;&emsp; -4.8 &emsp;&emsp;&emsp;&emsp; 4.8 <br>
>1   &emsp;&emsp;&emsp;&ensp; Cart Velocity &emsp;&emsp;&emsp;&emsp; -Inf &emsp;&emsp;&emsp;&emsp;&nbsp; Inf <br>
>2   &emsp;&emsp;&emsp;&ensp; Pole Angle &emsp;&emsp;&emsp;&emsp;&emsp; -24° &emsp;&emsp;&emsp;&ensp;&nbsp; 24° <br>
>3  &emsp;&emsp;&emsp;&ensp; Pole Velocity At Tip &emsp;&nbsp; -Inf &emsp;&emsp;&emsp;&emsp;&nbsp; Inf <br>

#### Action: ####
>Type: Discrete(2) <br>
>Num &emsp;&emsp; Action <br>
>0   &emsp;&emsp;&emsp;&ensp; Push cart to the left <br>
>1   &emsp;&emsp;&emsp;&ensp; Push cart to the right

#### Reward: ####
>Reward: +1 for every timestep in upward position

**Q Update:** $Q(s_{t}, a_{t}) = r_{t+1} + \gamma max(Qs_{t+1})$ <br>

In [None]:
class Agent():
    def __init__(self, observation_space, action_space):
        self.exploration_rate = EPSILON
        self.action_space = action_space
        self.memory = deque(maxlen=MEMORY_SIZE)

        self.model = Sequential()
        self.model.add(Dense(16, input_shape=(observation_space,), activation='relu'))
        self.model.add(Dense(32, input_shape=(observation_space,), activation='relu'))
        self.model.add(Dense(self.action_space, activation='softmax'))
        self.model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(lr=ALPHA))

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
        
    def load_weights(self, filepath):
        self.model.load_weights(filepath)
    
    def save_weights(self, filepath):
        self.model.save_weights(filepath)
        
    def random_action(self, state):
        return random.randrange(self.action_space)
    
    def optimal_action(self, state):
        q_values = self.model.predict(state)
        return np.argmax(q_values[0])

    def choose_action(self, state):
        if np.random.rand() < self.exploration_rate:
            return self.random_action(state)
        return self.optimal_action(state)
    
    def experience_replay(self):
        if len(self.memory) >= BATCH_SIZE:
            batch = random.sample(self.memory, BATCH_SIZE)
            for state, action, reward, next_state, done in batch:
                q_update = reward
                if not done:
                    q_update = reward + GAMMA * np.amax(self.model.predict(next_state))
                q_values = self.model.predict(state)
                q_values[0][action] = q_update
                self.model.fit(state, q_values, verbose=0)
                
            self.exploration_rate *= EPSILON_DECAY
            self.exploration_rate = max(self.exploration_rate, EXPLORATION_MIN)

In [None]:
class Environment():
    def __init__(self, environment_name):
        self.env = gym.make(environment_name)
        self.observation_space = self.env.observation_space.shape[0]
        self.action_space = self.env.action_space.n
        self.agent = Agent(self.observation_space, self.action_space)
                
    def learn_policy(self, state):
        action = self.agent.choose_action(state)
        next_state, reward, done, info = self.env.step(action)
        next_state = np.reshape(next_state, [1, self.observation_space])
        self.agent.remember(state, action, reward, next_state, done)
        state = next_state
        self.agent.experience_replay()
        return state, reward, done
        
    def act_random(self, state):
        self.env.render()
        action = self.agent.random_action(state)
        next_state, reward, done, info = self.env.step(action)
        next_state = np.reshape(next_state, [1, self.observation_space])
        state = next_state
        return state, reward, done
            
    def act_optimal(self, state):
        self.env.render()
        action = self.agent.optimal_action(state)
        next_state, reward, done, info = self.env.step(action)
        next_state = np.reshape(next_state, [1, self.observation_space])
        state = next_state
        return state, reward, done
                         
    def evaluation(self, function, save_weights=False, load_weights=False, filepath=''):
        reward_data = []
        if load_weights:
            self.agent.load_weights(filepath)
            
        for episode in range(100):
            state = self.env.reset()
            state = np.reshape(state, [1, self.observation_space])
            done = False
            total_reward = 0
            while not done:
                state, reward, done = function(state)
                total_reward += reward
                if done:
                    break
            reward_data.append(total_reward)
                
        self.env.close()
        if save_weights:
            self.agent.save_weights(filepath)
        return reward_data
    
    def plot(self, optimal_action, random_action):
        plt.plot(optimal_action, label='Optimal Action')
        plt.plot(random_action, label='Random Action')
        plt.title('Total Cumulative Reward')
        plt.legend()
        plt.xlabel('Episodes')
        plt.ylabel('Rewards')
        plt.show()

In [None]:
if __name__ == '__main__':
    environment = Environment('CartPole-v1')
    random_action = environment.evaluation(environment.act_random)
    environment.evaluation(environment.learn_policy, save_weights=True, filepath='cartpole_dqn.h5')
    optimal_action = environment.evaluation(environment.act_optimal)
    environment.plot(optimal_action, random_action)