In [None]:
import cv2
import gym
import h5py
import random
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns

from collections import deque 
from IPython.display import clear_output
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Activation, Flatten, Dense

monokai = ["#F92672", "#A6E22E", "#66D9EF", "#FD971F", "#272822"]

sns.set_palette(monokai)
sns.set_style("white")

In [None]:
env = gym.make("PongDeterministic-v4")

In [None]:
print(f"Action Space: {env.action_space}")
print(f"Observation Space Shape: {env.observation_space.shape}")

env.unwrapped.get_action_meanings()

In [None]:
class Estimator:
    def __init__(self):
        initializer = tf.keras.initializers.VarianceScaling(scale=2.0)
        
        self.model = Sequential()
        
        self.model.add(Conv2D(32, (8, 8), 4, input_shape=(84, 84, 4), kernel_initializer=initializer))
        self.model.add(Activation("relu"))
        self.model.add(Conv2D(64, (4, 4), 2, kernel_initializer=initializer))
        self.model.add(Activation("relu"))
        self.model.add(Conv2D(64, (3, 3), 1, kernel_initializer=initializer))
        self.model.add(Activation("relu"))
        
        self.model.add(Flatten())
        
        self.model.add(Dense(512, kernel_initializer=initializer))
        self.model.add(Activation("relu"))
        
        self.model.add(Dense(env.action_space.n, kernel_initializer=initializer))
        
        self.optimizer = tf.keras.optimizers.Adam(lr=0.0000625, epsilon=0.00015)
        self.model.compile(optimizer=self.optimizer, 
                           loss="logcosh")
        
        self.model.summary()
        
        self.target_model = tf.keras.models.clone_model(self.model)
        self.target_model.set_weights(self.model.get_weights())
        
    def preprocess(self, state):
        new_state = state[:]
        
        for i in range(4):
            new_state[i] = cv2.cvtColor(new_state[i], cv2.COLOR_RGB2GRAY)
            new_state[i] = cv2.resize(new_state[i], (84, 84))
            
        new_state = np.stack(new_state, axis=2)
        new_state = new_state.reshape(-1, 84, 84, 4)
        new_state = new_state / 255.0
        
        return new_state
    
    def predict(self, state):
        state = self.preprocess(state)
        prediction = self.model.predict(state)
        
        return prediction
    
    def update(self, s, a, y):
        state = self.preprocess(s)
        
        td_target = self.predict(s)
        td_target[0][a] = y
        
        self.model.train_on_batch(state, td_target)
        
    def predictTarget(self, state):
        state = self.preprocess(state)
        prediction = self.target_model.predict(state)
        
        return prediction
        
    def updateTarget(self):
        self.target_model.set_weights(self.model.get_weights())
    
    def save(self, filename):
        self.model.save(f"./models/{filename}")
        
    def load(self, filename):
        self.model.load_weights(f"./models/{filename}")

In [None]:
estimator = Estimator()

In [None]:
MAX_STEPS = 1000000
MAX_EPISODE_STEPS = 18000 

DISCOUNT = 0.99
BATCH_SIZE = 32

EPSILON_INIT = 1.0
EPSILON_MIN = 0.1
EPSILON_END = 200000

REPLAY_MEMORY_SIZE = 100000
REPLAY_START_SIZE = 10000

UPDATE_FREQ = 4
TARGET_NETWORK_UPDATE_FREQ = 10000

In [None]:
def EpsilonGreedyPolicy(state, epsilon):
    A = np.ones(env.action_space.n) * (epsilon / env.action_space.n)
    best_action = np.argmax(estimator.predict(state))
    A[best_action] = A[best_action] + (1 - epsilon)
    
    return A

In [None]:
def QLearning():
    num_steps = 0
    episode_rewards = []
    
    epsilon = EPSILON_INIT
    epsilon_gradient = (EPSILON_INIT - EPSILON_MIN) / EPSILON_END
    
    # Initialize replay memory D to capacity N
    replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)
    
    state = [env.reset()]
    state = [state[0] for _ in range(4)]
    
    for _ in range(REPLAY_START_SIZE):
        action_prob = EpsilonGreedyPolicy(state, epsilon)
        action = np.random.choice([i for i in range(env.action_space.n)], p=action_prob)
        
        next_frame, reward, done, _ = env.step(action)
        next_state = state[1:] + [next_frame]
        reward = np.sign(reward)
        
        replay_memory.append([state, action, reward, next_state, done])
        
        clear_output(True)
        print(f"Replay Memory Size: ({len(replay_memory)}/{REPLAY_START_SIZE})")
        
        if done:
            state = [env.reset()]
            state = [state[0] for _ in range(4)]
            
        state = next_state
    
    while num_steps < MAX_STEPS:
        state = [env.reset()]
        state = [state[0] for _ in range(4)]
        
        episode_reward = 0
        
        for t in range(MAX_EPISODE_STEPS):
            action_prob = EpsilonGreedyPolicy(state, epsilon)
            action = np.random.choice([i for i in range(env.action_space.n)], p=action_prob)

            next_frame, reward, done, _ = env.step(action)
            next_state = state[1:] + [next_frame]
            reward = np.sign(reward)

            num_steps = num_steps + 1
            episode_reward = episode_reward + reward
            replay_memory.append([state, action, reward, next_state, done])
            
            if epsilon > EPSILON_MIN:
                epsilon = epsilon - epsilon_gradient
            
            if num_steps % UPDATE_FREQ == 0:
                replay_batch = random.sample(replay_memory, BATCH_SIZE)
                
                for ss, aa, rr, ns, terminal in replay_batch:
                    td_target = rr
                    
                    if not terminal:
                        best_next_action_value = np.max(estimator.predictTarget(ns))
                        td_target = rr + DISCOUNT * best_next_action_value
                        
                    estimator.update(ss, aa, td_target)
                    
            if num_steps % TARGET_NETWORK_UPDATE_FREQ == 0:
                estimator.updateTarget()

#             clear_output(True)
#             print(f"Replay Memory Size: ({len(replay_memory)}/{REPLAY_START_SIZE})")

            if done:
                state = [env.reset()]
                state = [state[0] for _ in range(4)]
                
                episode_rewards.append(episode_reward)
                break

            state = next_state
            
        clear_output(True)
        print(f"[{len(episode_rewards)}] ({num_steps}/{MAX_STEPS}) Episode Reward: {episode_rewards[-1]} Epsilon: {epsilon}")
        
    estimator.save("final.h5")
    print("Model Saved")
        
    return num_steps, episode_rewards

In [None]:
num_steps, episode_rewards = QLearning()

In [None]:
def playEnvOnce():
    total_reward = 0
    
    state = [env.reset()]
    state = [state[0] for _ in range(4)]
    
    while True:
        env.render()
        
        action = np.argmax(estimator.predict(state))
        
        next_frame, reward, done, _ = env.step(action)
        next_state = state[1:] + [next_frame]
        
        total_reward = total_reward + reward
        
        if done:
            print(f"Total Reward: {total_reward}")
            break
            
        state = next_state
        
    env.close()

In [None]:
playEnvOnce()