# 1. Install dependencies


In [None]:
!pip install keras-rl
!pip install gym


# 2.1. Declare classes: AI Classes: Random AI, Minimax AI


In [None]:
import random

class RandomAI:

    def __init__(self, game):
        self.game = game

    def decide_turn(self):
        board = self.game.board
        assert sum([1 for cell in board.flatten() if cell == 0]) > 0, "no place to make a turn!!!"

        coords = [0,1,2]
        row = random.choice(coords)
        column = random.choice(coords)

        while board[row, column] != 0:
            row = random.choice(coords)
            column = random.choice(coords)

        return row, column



In [None]:
from math import inf as infinity

class MinimaxAI:

    def __init__(self, game):
        self.game = game
        self.random_ai = RandomAI(game)

    def decide_turn(self):
        turns_possible = self.empty_cells()
        if len(turns_possible) == 1:
            row, column = turns_possible[0]
        elif len(turns_possible) >= 7:
            row, column = self.random_ai.decide_turn()
        else:
            chosen_node = self.minimax(10, self.game.x_next)
            row, column, _ = chosen_node
        return row, column

    def empty_cells(self):
        state = self.game.board
        results = []
        for row in range(3):
            for column in range(3):
                if state[row, column] == 0:
                    results.append((row, column))

        return results

    def minimax(self,depth, X_turn):
        state = self.game.board
        score = self.game.evaluate(state)
        game_over = score is not None

        if depth == 0 or game_over:
            return [-1, -1, score]

        if X_turn:
            best = [-1, -1, -infinity]
        else:
            best = [-1, -1, +infinity]

        for cell in self.empty_cells():
            x, y = cell[0], cell[1]
            self.game.board[x, y] = 1 if X_turn else -1
            score = self.minimax(depth - 1, not X_turn)
            self.game.board[x, y] = 0
            score[0], score[1] = x, y

            if X_turn:
                if score[2] > best[2]:
                    best = score  # max value
            else:
                if score[2] < best[2]:
                    best = score  # min value

        return best

# 2.2. Environment class - the actual game implementation

In [None]:
# DEFINE THE TTT GAME CLASS

import numpy as np
import random
from functools import lru_cache
from gym.core import Env
from gym import spaces

class TicTacToe(Env):

    def __init__(self):
        self.board = np.zeros([3,3], dtype=np.int8)
        self.x_next = True if random.random() > 0.5 else False

        # by design, our AI plays x and the hard-coded AI plays for o
        self.x_ai = None
        self.o_ai = MinimaxAI(self)

        # action space consists of 9 distinct actions - trying to place your shape to each of 9 cells.
        self.action_space = spaces.Discrete(9)

        # observation space is the state of the board, as numpy array with possible values of
        # -1 - cell with 'o',
        # 0  - empty cell,
        # 1  - cell with 'x'
        self.observation_space = spaces.Box(low=-1, high=1, dtype=np.int8, shape=(3,3))


    def reset(self):
        """
        Resets the board for the next game. Starting player is random.
        """
        self.board = np.zeros([3, 3])
        self.x_next = True if random.random() > 0.5 else False
        if not self.x_next:
            row, column = self.o_ai.decide_turn()
            assert self.try_make_turn(row, column)

        return self.board

    def step(self, action):
        """
        TODO this method is not complete. its up to you to finish it.
        Building blocks to be used:
            1. valid_turn = self.try_make_turn(row, column) 
                valid turn means we tried to place our shape in an empty cell.
                
            2. result = self.evaluate(self.board)
                result is either None if game is not over, or one of -1, 0, 1
                
            3. row, column = self.o_ai.decide_turn()
                it is only the decision of the opponent ai, the turn is not made yet
            
            
        :param action: a discrete action from 0 to 8. Cells are enumerated from left to right,
        with columns continuing from top to bottom, like below:

         0 | 1 | 2
        -  + - + -
         3 | 4 | 5
        -  + - + -
         6 | 7 | 8

        if invalid action is used, board does not change, but negative reward is returned.
        :return: observation, reward, done?, info (empty)
        """
        assert self.x_next # verify it is your turn

        # extract row and column from your actions

        row = action // 3
        column = action % 3

        # Info is an empty dictionary. a dictionary is required by the keras-rl specs.
        info = {}

        # use the row and column to try to make a step

        # if the turn is not a valid one, the game ends and we lose it.
        # The reward should be negative and greater than a game lost normally.

        # check it your turn has ended the game.
        # if not, opponent should make his turn, and we check again if the game is over.

        #if the game is over, we return the observation, reward = self.evaluate(self.board), done = True and the info.

        # if the game is not over, we return the observation, reward = 0, done = False and the info.

        # TODO default return - remove it when you implement a real one.
        return np.zeros([3,3]), 0, False, info


    def try_make_turn(self, row, column):
        """
        The current player according to the boolean self.x_next tries to make a turn
        by placing their shape on the crossection of the crossection of :param row and :param column
        :return: True if it was a valid turn, False if such turn is not allowed (the cell is not empty).
        """

        if self.board[row,column] == 0:

            self.board[row, column] = 1 if self.x_next else -1
            self.x_next = not self.x_next
            return True

        else:
            return False

    @staticmethod
    def evaluate(board):
        """
        takes a :param board as the input ([3,3] numpy array),
        :return:
            None if the game is not over,
            1 if x won,
            -1 if o won,
            0 if its a draw
        """
        board_as_tuple = tuple(tuple(board[row]) for row in range(3))
        return TicTacToe._evaluate(board_as_tuple)

    def play_ai_game(self):
        while self.evaluate(self.board) is None:
            ai = self.x_ai if self.x_next else self.o_ai
            row, column = ai.decide_turn()
            valid_turn = self.try_make_turn(row, column)
            if not valid_turn:
                # any AI that makes invalid turn immediately loses the game.
                return -1 if self.x_next else 1
        return self.evaluate(self.board)

    @property
    def board_as_tuple(self):
        return tuple(tuple(self.board[row]) for row in range(3))


    @staticmethod
    @lru_cache(maxsize=2**16)
    def _evaluate(board):

        sums = []
        # we collect totals of all row, columns and diagonals. Any of those must have a value of
        # either 3 or -3 if there are x x x or o o o in this sequence.
        sums += [sum(board[row]) for row in range(3)]
        sums += [sum([board[i][column] for i in range(3)]) for column in range(3)]
        sum_main_diag = sum([board[i][i] for i in range(3)])
        sum_opp_diag = sum([board[i][2 - i] for i in range(3)])

        sums.append(sum_main_diag)
        sums.append(sum_opp_diag)
        
        n_steps = sum(abs(elem) for row in board for elem in row)

        if 3 in sums:
            return 1 - n_steps / 100
        elif -3 in sums:
            return -1 + n_steps / 100
        else:
            n_empty = sum([1 for row in range(3) for cell in board[row] if cell == 0])
            if n_empty == 0:
                return 0
            else:
                return None


    def play_vs_human(self):
        print("Welcome to Tic-Tac-Toe. Your opponent is a minimax AI, who makes first turn randomly. \n"
              "Use numbers from 0 to 8 to make turns. Below is the map of numbers to cells.")
        print("""
         0 | 1 | 2
        -  + - + -
         3 | 4 | 5
        -  + - + -
         6 | 7 | 8""")
        "You play for x, your opponent plays for o"
        self.reset()
        done = False
        while not done:
            try:
                self.render()
                action = int(input(">>> "))
            except:
                print("could not parse your input. Please try again. Use numbers from 0 to 8 to make turns.")
            else:
                row = action // 3
                column = action % 3

                valid_turn = self.try_make_turn(row, column)
                if not valid_turn:
                    print("Unfortunately you can't overwrite existing shapes. \n"
                          "You have made an invalid turn and therefore lost the game.")
                    break

                result = self.evaluate(self.board)

                if result is not None:
                    self.render()
                    if result == 1:
                        print("Congratulations! you have won the game!")
                    elif result == 0:
                        print("The game has ended in a draw.")
                    break

                row, column = self.o_ai.decide_turn()
                assert self.try_make_turn(row, column)
                result = self.evaluate(self.board)

                if result is not None:
                    self.render()
                    if result == -1:
                        print("Minimax AI has won the game.")
                    elif result == 0:
                        print("The game has ended in a draw.")
                    break


        print("Thanks for playing Tic-Tac-Toe. Have a nice day and come back any time!")


    def render(self, mode="Human"):
        shapes = {-1: 'o', 0: ' ', 1: 'x'}
        print(f"{shapes[self.board[0,0]]} | {shapes[self.board[0,1]]} | {shapes[self.board[0,2]]}")
        print('- + - + -')
        print(f"{shapes[self.board[1,0]]} | {shapes[self.board[1,1]]} | {shapes[self.board[1,2]]}")
        print('- + - + -')
        print(f"{shapes[self.board[2,0]]} | {shapes[self.board[2,1]]} | {shapes[self.board[2,2]]}")

In [None]:
# TRY PLAYING VS MINIMAX
ttt = TicTacToe()
ttt.play_vs_human()

## 3.1. Create a benchmark
We first create a benchmark by testing a random agent playing against Minimax AI

In [None]:
# TEST HOW GOOD RANDOM AI IS

ttt = TicTacToe()

x_ai = RandomAI(ttt)
ttt.x_ai = x_ai

o_ai = MinimaxAI(ttt)
ttt.o_ai = o_ai


counters = {-1:0, 0:0, 1:0}
n_games = int(1e4)

def run_trials():
    for i in range(n_games):
        if i % 50 == 0:
            print(i, "out of", n_games)
            print(f"stats: out of {i} games, x has won {counters[1]} times, o - {counters[-1]} times, and there were "
                  f"{counters[0]} draws.")

        ttt.reset()
        result = ttt.play_ai_game()
        if result < 0:
            result = -1
        elif result > 0:
            result = 1
        counters[result] += 1



run_trials()



## 3.2. Train the reinforcement learning agent
Next, we train a neural network to play against Minimax AI

In [None]:
# TRAIN DQN FOR TIC-TAC-TOE

from __future__ import division
import argparse

from keras.models import Sequential
from keras.layers import Dense, Activation, Flatten
from keras.optimizers import Adam

from rl.agents.dqn import DQNAgent
from rl.policy import LinearAnnealedPolicy, EpsGreedyQPolicy
from rl.memory import SequentialMemory
from rl.callbacks import FileLogger, ModelIntervalCheckpoint



# Get the environment and extract the number of actions.
from gym.wrappers import TimeLimit
env = TimeLimit(TicTacToe(), max_episode_steps=10)
nb_actions = env.action_space.n

# Next, we build our model. We use the same model that was described by Mnih et al. (2015).
WINDOW_LENGTH = 4
input_shape = (3,3)
input_shape = (WINDOW_LENGTH,) + input_shape

def build_model():
    model = Sequential()
    model.add(Flatten(input_shape=input_shape))
    model.add(Dense(32, activation='relu'))
    model.add(Dense(nb_actions))
    print(model.summary())
    return model


model = build_model()

# Finally, we configure and compile our agent. You can use every built-in Keras optimizer and
# even the metrics!
memory = SequentialMemory(limit=1000000, window_length=WINDOW_LENGTH)

# Select a policy. We use eps-greedy action selection, which means that a random action is selected
# with probability eps. We anneal eps from 1.0 to 0.1 over the course of 1M steps. This is done so that
# the agent initially explores the environment (high eps) and then gradually sticks to what it knows
# (low eps). We also set a dedicated eps value that is used during testing. Note that we set it to 0.05
# so that the agent still performs some random actions. This ensures that the agent cannot get stuck.
policy = LinearAnnealedPolicy(EpsGreedyQPolicy(), attr='eps', value_max=1., value_min=.1, value_test=.05,
                              nb_steps=1000000)

# The trade-off between exploration and exploitation is difficult and an on-going research topic.
# If you want, you can experiment with the parameters or use a different policy. Another popular one
# is Boltzmann-style exploration:
# policy = BoltzmannQPolicy(tau=1.)
# Feel free to give it a try!
dqn = DQNAgent(model=model, nb_actions=nb_actions, policy=policy, memory=memory, nb_steps_warmup=50000, gamma=.99,
               target_model_update=10000,
               train_interval=4, delta_clip=1.)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])

print("about to enter the flags branching.")


# Okay, now it's time to learn something! We capture the interrupt exception so that training
# can be prematurely aborted. Notice that you can the built-in Keras callbacks!
weights_filename = 'dqn_ttt_weights.h5f'
checkpoint_weights_filename = 'dqn_ttt_weights_{step}.h5f'
callbacks = [ModelIntervalCheckpoint(checkpoint_weights_filename, interval=50000)]
print("about to fit the dqn!")
# dqn.load_weights(weights_filename)
dqn.fit(env, callbacks=callbacks, nb_steps=int(1e5), log_interval=10000, verbose=1)

# After training is done, we save the final weights one more time.
dqn.save_weights(weights_filename, overwrite=True)

# Finally, evaluate our algorithm for 10 episodes.
dqn.test(env, nb_episodes=1, visualize=False)


## 3.3. Reinforcement Learning agent class to use the same API as other AIs
NeuralAI class - an agent API using the neural network we have trained.

In [None]:
# DEFINE AGENT BASED ON THE DQN NET

from keras.models import Sequential
from keras.layers import Dense, Flatten
from keras.optimizers import Adam
from rl.memory import SequentialMemory
from rl.agents.dqn import DQNAgent


WINDOW_LENGTH = 4
input_shape = (3, 3)
input_shape = (WINDOW_LENGTH,) + input_shape
nb_actions = 9


class NeuralAI:

    def __init__(self, game, weights_path=None):
        self.game = game
        model = self.build_model()
        dqn = DQNAgent(model=model, nb_actions=nb_actions, gamma=.99,
                       memory=SequentialMemory(limit=10, window_length=WINDOW_LENGTH))

        dqn.compile(Adam(lr=1e-3), metrics=['mae'])
        import sys
        sys.path.append("../")
        weights_filename = weights_path or 'dqn_ttt_weights.h5f'
        dqn.load_weights(weights_filename)
        self.dqn = dqn

    def decide_turn(self):
        board = self.game.board
        action = self.dqn.forward(board)
        row = action // 3
        column = action % 3

        return row, column

    @staticmethod
    def build_model():
        model = Sequential()
        model.add(Flatten(input_shape=input_shape))
        model.add(Dense(32, activation='relu'))
        model.add(Dense(nb_actions))
        return model

## 3.4. Benchmark the trained agent

The network is trained now. We benchmark it to compare it with random AI and Minimax AI; we also analyse how much computation do Minimax and the neural network agent require.

In [None]:
# TEST THE NEURAL AI

ttt = TicTacToe()

x_ai = NeuralAI(ttt)
ttt.x_ai = x_ai

o_ai = MinimaxAI(ttt)
ttt.o_ai = o_ai


counters = {-1:0, 0:0, 1:0}
n_games = int(1e4)

def run_trials():
    for i in range(n_games):
        if i % 50 == 0:
            print(i, "out of", n_games)
            print(f"stats: out of {i} games, x has won {counters[1]} times, o - {counters[-1]} times, and there were "
                  f"{counters[0]} draws.")

        ttt.reset()
        result = ttt.play_ai_game()
        if result < 0:
            result = -1
        elif result > 0:
            result = 1
        counters[result] += 1




from cProfile import Profile
profiler = Profile()
profiler.runcall(run_trials)
profiler.print_stats('cumulative')
