In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from ReplayBuffer import PrioritizedReplayBuffer
from TicTacToe import TicTacToeEnv


KeyboardInterrupt



In [None]:
gamma = 0.99
epsilon = 1.0
epsilon_min = 0.001
epsilon_max = 1.0
epsilon_interval = (epsilon_max - epsilon_min)
batch_size = 32
num_actions = 9

env = TicTacToeEnv()

In [None]:
def create_q_model():
    inputs = keras.layers.Input(shape=(3,3,1,))
    #layer1 = keras.layers.Conv2D(64, 2, strides=1, activation="relu")(inputs)
    #layer2 = keras.layers.Conv2D(32, 2, activation="relu")(layer1)

    #layer4 = keras.layers.Flatten()(layer2)
    layer4 = keras.layers.Dense(64, activation="relu")(inputs)

    layer5 = keras.layers.Dense(32, activation="relu")(layer4)
    layer6 = keras.layers.Flatten()(layer5)
    layer7 = keras.layers.Dense(16, activation="relu")(layer6)
    out = keras.layers.Dense(num_actions, activation="linear")(layer7)

    return keras.Model(inputs=inputs, outputs=out)

model = create_q_model()
model_target = create_q_model()

In [None]:
optimizer = keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0)
running_reward = 0
episode_count = 0
frame_count = 0
epsilon_random_frames = 5000
epsilon_greedy_frames = 200000
update_after_actions = 4
# How often to update the target network
update_target_network = 10000
# Using huber loss for stability
loss_function = keras.losses.Huber()
memory = PrioritizedReplayBuffer(8192, (3, 3, 1), 1, 0.7)

In [None]:
class Evaluator:
    def __init__(self):
        self.pos = [0,0]
        self.exit_pole = [0,0,0,0]
        self.total_turns = 0
        self.first = True
        self.total_turns = 0
        self.score = 0
        self.turns = 0
        self.num_games = 2

    def reset(self):
        self.pos = [0,0]
        self.exit_pole = [0,0,0,0]
        self.total_turns = 0
        self.first = True
        self.total_turns = 0
        self.score = 0
        self.turns = 0
        self.num_games = 2

    def update_stats(self):
        self.total_turns += self.turns
        if self.score == -2 or self.score == 0:
            self.exit_pole[self.score] += 1
        elif self.first:
            self.exit_pole[self.score] += 1
        else:
            self.exit_pole[-self.score] += 1
        self.pos[self.first] += 1
        self.first = not self.first

    def play_net_turn(self, baby, test_env, init_state):
        action_p = baby(tf.expand_dims(init_state, 0), training=False)
        act = tf.argmax(action_p[0]).numpy()
        next_state, rw, terminal, info = test_env.step(act)
        self.score += rw
        self.turns+=1
        print()
        print(np.squeeze(next_state,axis=-1), info, act)
        print()
        return next_state, rw, terminal, info

    def play_minmax_turn(self, test_env):
        act = test_env.minimax(test_env.state)
        next_state, rw, terminal, info = test_env.step(act)
        self.score += rw
        self.turns+=1
        print()
        print(np.squeeze(next_state,axis=-1), info, act)
        print()
        return next_state, rw, terminal, info

    def display_stats(self):
        print('##################################################')
        print('Mean Game turns: ', self.total_turns/self.num_games)
        print('Invalid: ', self.exit_pole[-2])
        print('Win: ', self.exit_pole[1])
        print('Tie: ', self.exit_pole[0])
        print('Lose: ', self.exit_pole[-1])
        print('##################################################')

    def play_full_game(self, baby):
        print('**************************************************')
        self.reset()
        for i in range(self.num_games):
            test_env = TicTacToeEnv()
            next_state = test_env.reset()
            terminal = False
            self.turns = 0
            self.score = 0
            while not terminal:
                if self.first:
                    next_state, rw, terminal, info =  self.play_net_turn(baby, test_env, next_state)
                    if terminal:
                        self.update_stats()
                        break
                    next_state, rw, terminal, info =  self.play_minmax_turn(test_env)
                    if terminal:
                        self.update_stats()
                        break
                else:
                    next_state, rw, terminal, info =  self.play_minmax_turn(test_env)
                    if terminal:
                        self.update_stats()
                        break
                    next_state, rw, terminal, info =  self.play_net_turn(baby, test_env, next_state)
                    if terminal:
                        self.update_stats()
                        break
        self.display_stats()
        print('**************************************************')

In [None]:
episode_reward_history = []

In [None]:
eval_tool = Evaluator()
while frame_count < epsilon_greedy_frames:
    state = env.reset()
    episode_reward = 0
    rewards_sample = None
    while True:
        frame_count += 1
        if frame_count < epsilon_random_frames or epsilon > np.random.rand(1)[0]:
            action = np.random.choice(num_actions)
        else:
            state_tensor = tf.expand_dims(state, 0)
            action_probs = model(state_tensor, training=False)
            action = tf.argmax(action_probs[0]).numpy()

        epsilon -= epsilon_interval / epsilon_greedy_frames
        epsilon = max(epsilon, epsilon_min)

        state_next, reward, done, _ = env.step(action)
        episode_reward += reward
        memory.push(state, action, reward, state_next, done)
        if not done:
            state, reward, done, _ = env.step(env.minimax(env.state))
            episode_reward += reward
            memory.push(-state, action, -reward, -state_next, done)
        if frame_count % update_after_actions == 0 and frame_count > batch_size:
            state_sample, action_sample, rewards_sample, state_next_sample, done_sample, weights, indexes = memory.pop(batch_size, 0.4)
            future_rewards = model_target.predict(state_next_sample)
            updated_q_values = rewards_sample + gamma * tf.reduce_max(
                future_rewards, axis=1
            )
            updated_q_values = updated_q_values * (1 - done_sample) - done_sample
            actions = tf.squeeze(tf.cast(action_sample, dtype=tf.int32), axis=-1)
            masks = tf.one_hot(actions, num_actions)

            with tf.GradientTape() as tape:
                q_values = model(state_sample)
                q_action = tf.reduce_sum(tf.multiply(q_values, masks), axis=1)
                loss = loss_function(updated_q_values, q_action, sample_weight=tf.expand_dims(weights, axis=-1))

            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))
            memory.update_priorities_variant(indexes, tf.math.abs(updated_q_values - q_action))

        if frame_count % update_target_network == 0:
            model_target.set_weights(model.get_weights())

        if frame_count % 5000 == 0:
            eval_tool.play_full_game(model)

        if done:
            break
    if rewards_sample is not None:
        print(frame_count, episode_reward, epsilon, sum(rewards_sample))

    episode_reward_history.append(episode_reward)
    if len(episode_reward_history) > 100:
        del episode_reward_history[:1]
    running_reward = np.mean(episode_reward_history)
    episode_count += 1

    if running_reward == 0 and len(episode_reward_history) == 100:  # Condition to consider the task solved
        print("Solved at episode {}!".format(episode_count))
        break