<a href="https://colab.research.google.com/github/Akhilez/ml_gallery/blob/master/ml_py/MLGallery/reinforcement/tictactoe/TicTacToe.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from abc import ABCMeta, abstractmethod
import copy
import os
import torch
import numpy as np
import random
import json
from sklearn.metrics import confusion_matrix, accuracy_score

In [0]:
!rm -rf data
!mkdir -p data/PolicyGrad

In [3]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [0]:
class DataManager:

    def __init__(self, file_name='data/data.json', max_size=100):
        self.file_name = file_name
        self.max_size = max_size
        self.data = None

    def write(self):
        with open(self.file_name, 'w') as data:
            data.write(json.dumps({'matches': self.data}))

    def enqueue(self, matches):
        if self.data is not None:
            matches.extend(self.data)
        self.data = matches[:self.max_size]

    def get(self):
        if self.data is None:
            if os.path.exists(self.file_name):
                with open(self.file_name, 'r') as data:
                    data_string = data.read()
                    if len(data_string) > 0:
                        self.data = json.loads(data_string)['matches']
            else:
                print(f"Path {self.file_name} does not exist")
        return self.data

    def clear(self):
        self.data = None
        try:
            with open(self.file_name, 'w') as data:
                data.write('')
        except FileNotFoundError:
            return

    def __enter__(self):
        pass

    def __exit__(self, type, value, traceback):
        self.write()


In [0]:
class Frame:
    X = 'X'
    O = 'O'

    win_lines = [
        [[0, 0], [1, 1], [2, 2]],  # [\]
        [[0, 2], [1, 1], [2, 0]],  # [/]
        [[0, 0], [1, 0], [2, 0]],  # [|  ]
        [[0, 1], [1, 1], [2, 1]],  # [ | ]
        [[0, 2], [1, 2], [2, 2]],  # [  |]
        [[0, 0], [0, 1], [0, 2]],  # [```]
        [[1, 0], [1, 1], [1, 2]],  # [---]
        [[2, 0], [2, 1], [2, 2]],  # [...]
    ]

    def __init__(self):
        self.matrix = self.generate_empty_canvas()

    def insert(self, player, row, column):
        self.matrix[row][column] = player.character

    def print_canvas(self):
        output = '\n\t0\t1\t2\n'
        for i in range(3):
            output += f'{i}\t'
            for j in range(3):
                value = self.matrix[i][j]
                value = value if value is not None else ' '
                output += f'{value}\t'
            output += '\n'
        output += '\n'
        print(output)

    def check_winner(self, player1, player2):
        for win_line in Frame.win_lines:
            num1 = self.matrix[win_line[0][0]][win_line[0][1]]
            num2 = self.matrix[win_line[1][0]][win_line[1][1]]
            num3 = self.matrix[win_line[2][0]][win_line[2][1]]

            if num1 is not None and num1 == num2 and num2 == num3:
                return player1 if player1.character == num1 else player2

    def is_canvas_filled(self):
        for row in self.matrix:
            for column in row:
                if column is None:
                    return False
        return True

    @staticmethod
    def generate_empty_canvas():
        return [
            [None, None, None],
            [None, None, None],
            [None, None, None]
        ]

    @staticmethod
    def flip(matrix):
        flipped = copy.deepcopy(matrix)
        for i in range(3):
            for j in range(3):
                if matrix[i][j] is None:
                    continue
                if matrix[i][j] == Frame.X:
                    flipped[i][j] = Frame.O
                else:
                    flipped[i][j] = Frame.X
        return flipped

    @staticmethod
    def categorize_inputs(my_list):
        categories = {None: [0, 0, 1], 'X': [1, 0, 0], 'O': [0, 1, 0]}
        all_list = []
        for frame in my_list:
            category_list = []
            for position in frame:
                category_list.append(categories[position])
            all_list.append(category_list)
        return all_list

    @staticmethod
    def categorize_outputs(my_list):
        """
        :param my_list: outputs in the form: [1, 2]
        :return: Class of an output from 0-8, Ex: [1, 2] => 5
        """
        cat_list = []
        for lst in my_list:
            cat_list.append(lst[0] * 3 + lst[1])
        return Frame.to_one_hot(cat_list)

    @staticmethod
    def to_one_hot(array):
        """
        :param array: like [1, 0, 3]
        :return: [[0, 1, 0, 0], [1, 0, 0, 0], [0, 0, 0, 1]]
        """
        import numpy as np
        a = np.array(array)
        b = np.zeros((a.size, a.max() + 1))
        b[np.arange(a.size), a] = 1
        return b


class Match:

    def __init__(self, player_1, player_2, match_id=None):
        self.frame = Frame()
        self.current_player = player_1
        self.other_player = player_2
        self.winner = None
        self.inserts = []
        self.id = match_id

    def start(self, print_frame=True):
        if print_frame:
            print(f"Match ID: {self.id}")
        while True:
            if print_frame:
                self.frame.print_canvas()
                print(f'Current player = {self.current_player}')

            self.insert(self.current_player.get_positions(self.frame))

            winner = self.frame.check_winner(self.current_player, self.other_player)
            if winner is not None or self.frame.is_canvas_filled():
                if print_frame:
                    self.frame.print_canvas()
                self.winner = winner
                if print_frame:
                    self.print_winner(winner)
                self.update_scores(winner)
                return
            self.switch_players()

    def insert(self, positions):
        self.inserts.append({
            'current': self.current_player.character,
            'position': [positions[0], positions[1]],
            'frame': copy.deepcopy(self.frame.matrix)
        })
        self.frame.insert(self.current_player, positions[0], positions[1])

    @staticmethod
    def update_scores(winner):
        if winner is not None:
            winner.score += 1

    def summary(self):
        inserts = self.get_all_inserts()  # self.get_best_inserts()
        # successful_inserts = self.remove_current_character_attribute(successful_inserts)
        return {
            'inserts': inserts,
            'id': self.id,
            'winner': None if self.winner is None else self.winner.character}

    def get_best_inserts(self):
        """
        Criteria to decide which inserts were the best:
        - Remove opportunity given
        - Remove missed opportunities
        - Add winner's inserts
        """
        best_inserts = []
        for insert in self.inserts:
            frame = Frame.flip(insert['frame']) if insert['current'] == Frame.O else copy.deepcopy(insert['frame'])
            new_insert = copy.deepcopy(insert)
            new_insert['frame'] = frame
            if Match.is_best_position(frame, insert['position']):
                new_insert['best'] = True
            else:
                new_insert['best'] = False
            best_inserts.append(new_insert)
        return best_inserts

    def get_all_inserts(self):
        all_inserts = []
        for insert in self.inserts:
            # frame = Frame.flip(insert['frame']) if insert['current'] == Frame.O else copy.deepcopy(insert['frame'])
            new_insert = copy.deepcopy(insert)
            # new_insert['frame'] = frame
            all_inserts.append(new_insert)
        return all_inserts

    @staticmethod
    def print_winner(winner):
        if winner is None:
            print('Draw!')
        else:
            print(f'{winner} won!')

    def switch_players(self):
        switcher = self.current_player
        self.current_player = self.other_player
        self.other_player = switcher

    @staticmethod
    def remove_current_character_attribute(inserts):
        for insert in inserts:
            del insert['current']
        return inserts

    @staticmethod
    def is_best_position(frame, current_position):
        """
        Steps:
          - X win opportunity:
            - Get opportunity position for X
            - if the position exists:
              - if position == current_position, then position makes sense. Return True.
          - O win opportunity:
            - Get opportunity position for O
            - if the position exists:
              - if position == current_position, then position makes sense, return True.
          - Return True.
        :param frame: X_Matrix - A frame matrix where the next player is always X
        :param current_position: Position that was selected for X
        :return: Returns True if the position made sense to win the match.
        """
        x_opportunity = Match.get_opportunity(frame, Frame.X)
        if x_opportunity is not None:
            return x_opportunity == current_position
        o_opportunity = Match.get_opportunity(frame, Frame.O)
        if o_opportunity is not None:
            return o_opportunity == current_position
        return True

    @staticmethod
    def get_opportunity(frame, character):
        for win_line in Frame.win_lines:
            num_chars = sum(frame[position[0]][position[1]] == character for position in win_line)
            if num_chars == 2:
                none_pos = [position for position in win_line if frame[position[0]][position[1]] is None]
                if len(none_pos) == 1:
                    return none_pos[0]

    @staticmethod
    def get_loose_opportunity(frame, character):
        for win_line in Frame.win_lines:
            num_chars = sum(frame[position[0]][position[1]] == character for position in win_line)
            if num_chars == 1:
                none_pos = [position for position in win_line if frame[position[0]][position[1]] is None]
                if len(none_pos) == 2:
                    return none_pos[random.randint(0, 1)]



class Game:

    def __init__(self, player1, player2):
        """
        :param player1: Player 1 (human|random|dense)
        :param player2: Player 2 (human|random|dense)
        """
        self.player_1, self.player_2 = player1, player2
        self.matches = []
        self.current_match = None
        self.num_matches = 0

    def start(self, epochs=None, print_frame=True):
        while epochs is None or epochs > 0:
            match = Match(self.player_1, self.player_2, self.num_matches)
            self.current_match = match

            match.start(print_frame)
            self.num_matches += 1

            if print_frame:
                self.print_scores()
            match_summary = match.summary()
            self.matches.append(match_summary)

            if epochs is None:
                if not self.choose_to_replay():
                    print("Closing the game. Bye!")
                    epochs = 0
            else:
                epochs -= 1

    @staticmethod
    def choose_to_replay():
        choice = input("Replay? (y/n):").lower()
        return choice == 'y'

    def print_scores(self):
        print(f"Scores:\n\t{self.player_1}: {self.player_1.score}")
        print(f"\t{self.player_2}: {self.player_2.score}")

    def filter_draw_matches(self):
        return [match for match in self.matches if match.winner is not None]

    def swap_players(self):
        temp = self.player_1
        self.player_1 = self.player_2
        self.player_2 = temp

In [0]:
class Player(metaclass=ABCMeta):
    TYPE = 'default'

    def __init__(self, name, character=None):
        self.name = name
        self.score = 0
        self.character = self.get_character(character)

    @abstractmethod
    def get_positions(self, frame):
        pass

    @staticmethod
    def get_character(character):
        if character is None:
            while True:
                character = input('Enter player 1 character (X or O): ').upper()
                if character == Frame.X or character == Frame.O:
                    break
                print(f'Please enter either {Frame.X} or {Frame.O}')
        return character

    def __str__(self):
        return f'{self.name} ({self.character})'

    def __eq__(self, other):
        return self.character == other.name

In [0]:
class RandomPlayer(Player):
    TYPE = 'random'

    def get_positions(self, frame):
        return self.get_random_position(frame.matrix)

    @staticmethod
    def get_random_position(frame):
        positions = []
        for i in range(3):
            for j in range(3):
                if frame[i][j] is None:
                    positions.append((i, j))
        if len(positions) > 0:
            random_index = random.randint(0, len(positions) - 1)
            return positions[random_index]

In [0]:
class PerfectPlayer(Player):

    TYPE = 'static'

    def get_positions(self, frame):
        """
        Conditions to get_position:
          - Check if you have opportunity
            - Place X where X has a win_line
          - Check if opponent has opportunity:
            - Place X where O has a win_line
          - Pick random position.
        """
        frame = frame.matrix if self.character == Frame.X else Frame.flip(frame.matrix)
        position = Match.get_opportunity(frame, Frame.X)
        if position is None:
            position = Match.get_opportunity(frame, Frame.O)
            if position is None:
                position = Match.get_loose_opportunity(frame, Frame.X)
                if position is None:
                    position = RandomPlayer.get_random_position(frame)
        return position

In [0]:
class PolicyGradPlayer(Player):
    TYPE = 'policy_grad'

    def __init__(self, name, character=None):
        super().__init__(name, character)
        self.weights_1 = self.load_params('weights_1', shape=(50, 27)).to(device)
        self.biases_1 = self.load_params('biases_1', shape=50).to(device)

        self.weights_3 = self.load_params('weights_3', shape=(50, 50)).to(device)
        self.biases_3 = self.load_params('biases_3', shape=50).to(device)

        self.weights_2 = self.load_params('weights_2', shape=(9, 50)).to(device)
        self.biases_2 = self.load_params('biases_2', shape=9).to(device)

        self.model = self

        self.learning_rates_1 = self.get_dynamic_learning_rates(len(self.weights_1), initial=0.5, decay=0.9)
        self.learning_rates_2 = self.get_dynamic_learning_rates(len(self.weights_2), initial=0.5, decay=0.9)
        self.learning_rates_3 = self.get_dynamic_learning_rates(len(self.weights_3), initial=0.5, decay=0.9)

    def get_positions(self, frame):
        if self.flip():
            return RandomPlayer.get_random_position(frame.matrix)
        frame = frame.matrix if self.character == Frame.X else Frame.flip(frame.matrix)
        frame_one_hot = self.get_one_hot_frame(frame)

        position_one_hot = self.forward(frame_one_hot)

        output = self.get_max_index(position_one_hot[0], frame)
        return [int(output // 3), int(output % 3)]

    def train(self, epochs, data_manager):
        for epoch in range(epochs):
            self.clear_grads()
            total_loss = 0
            for match in data_manager.data:
                if match['winner'] is not None:
                    x, y = self.get_mini_batch(match)
                    y_hat = self.forward(x)
                    loss = self.backward(y, y_hat)

                    total_loss += loss
            if epoch % 20 == 0:
                print(f'Loss = {total_loss}')

    def forward(self, x):
        self.clear_grads()
        h1 = x.mm(self.weights_1.T) + self.biases_1
        h2 = h1.mm(self.weights_3.T) + self.biases_3
        y_hat = torch.softmax(h2.mm(self.weights_2.T) + self.biases_2, dim=1)
        return y_hat

    def backward(self, y, y_hat):
        loss = torch.nn.functional.binary_cross_entropy(y_hat, y)
        loss.backward()

        # print(f'\n---\nReal y = {y}\ny_hat = {y_hat}')

        self.weights_1 = self.weights_1 - self.learning_rates_1 * self.weights_1.grad
        self.biases_1 = self.biases_1 - self.learning_rates_1.T * self.biases_1.grad
        
        self.weights_3 = self.weights_3 - self.learning_rates_3 * self.weights_3.grad
        self.biases_3 = self.biases_3 - self.learning_rates_3.T * self.biases_3.grad

        self.weights_2 = self.weights_2 - self.learning_rates_2 * self.weights_2.grad
        self.biases_2 = self.biases_2 - self.learning_rates_2.T * self.biases_2.grad

        self.clear_grads()
        return loss

    def get_mini_batch(self, match):
        """
        :param match: dict of inserts[current, position, frame], winner and id
        """
        x, y = [], []
        match = copy.deepcopy(match)
        inserts = match['inserts']
        winner_character = match['winner']
        winners_inserts = [insert for insert in inserts if insert['current'] == winner_character]
        if winner_character == Frame.O:
            for insert in winners_inserts:
                insert['frame'] = Frame.flip(insert['frame'])
        for insert in winners_inserts:
            x.append(self.get_one_hot_frame(insert['frame']).reshape(27))
            y.append(self.get_one_hot_position(insert['position'][0], insert['position'][1]))
        return torch.stack(x).to(device), torch.stack(y).to(device)

    def clear_grads(self):
        self.weights_1 = self.weights_1.detach().requires_grad_()
        self.biases_1 = self.biases_1.detach().requires_grad_()

        self.weights_2 = self.weights_2.detach().requires_grad_()
        self.biases_2 = self.biases_2.detach().requires_grad_()

        self.weights_3 = self.weights_3.detach().requires_grad_()
        self.biases_3 = self.biases_3.detach().requires_grad_()

    def load_params(self, name, shape):
        weights_1_path = f'data/{self.name}/{name}.pt'
        if os.path.exists(weights_1_path):
            print(f"Loaded {name} parameters successfully")
            return torch.load(weights_1_path)
        else:
            print(f"Cannot load parameters {name} from {weights_1_path}")
            return self.get_new_weights(shape)

    def show_confusion_matrix(self, data_manager):
        """
        For all the matches in the dataset, gimme the real real y and predicted y

        :return:
        """
        print("Confusion matrix")
        with torch.no_grad():
            real_ys = []
            predicted_ys = []
            for match in data_manager.data:
                if match['winner'] is not None:
                    x, y = self.get_mini_batch(match)
                    y_hat = self.forward(x)
                    for i in range(len(y_hat)):
                        y_hat_argmax = int(self.get_max_index(y_hat[i], match['inserts'][i]['frame']))
                        y_argmax = int(self.get_max_index(y[i], match['inserts'][i]['frame']))
                        real_ys.append(y_argmax)
                        predicted_ys.append(y_hat_argmax)
            matrix = confusion_matrix(real_ys, predicted_ys)
            print(matrix)
            accuracy = accuracy_score(real_ys, predicted_ys)
            print(f'Accuracy: {accuracy}')


    def save_params(self, path=None):
        if path is None:
            os.makedirs(f'data/{self.name}', exist_ok=True)
            path = f'data/{self.name}'
        torch.save(self.weights_1, f'{path}/weights_1.pt')
        torch.save(self.biases_1, f'{path}/biases_1.pt')

        torch.save(self.weights_2, f'{path}/weights_2.pt')
        torch.save(self.biases_2, f'{path}/biases_2.pt')

        torch.save(self.weights_3, f'{path}/weights_3.pt')
        torch.save(self.biases_3, f'{path}/biases_3.pt')

    @staticmethod
    def get_new_weights(shape):
        range_min = -0.5
        range_max = 0.5
        random_tensor = torch.rand(shape, requires_grad=True, dtype=torch.float32)
        scaled_tensor = (range_min - range_max) * random_tensor + range_max
        return scaled_tensor

    @staticmethod
    def get_dynamic_learning_rates(length, initial=0.5, decay=0.8):
        lrs = []
        lr = initial
        for i in range(length):
            lrs.append(lr)
            lr = lr * decay
        lrs.reverse()
        return torch.tensor([lrs]).T.to(device)

    @staticmethod
    def flip():
        flipped = np.random.rand()
        return flipped > 0.8

    @staticmethod
    def get_one_hot_frame(frame):
        return torch.tensor(Frame.categorize_inputs(frame), dtype=torch.float32).reshape(1, 27).to(device)

    @staticmethod
    def get_one_hot_position(i, j):
        position = torch.zeros(9)
        position[i*3 + j] = 1
        return position

    @staticmethod
    def get_max_index(output, frame):
        i = 0
        while i < len(output):
            i += 1
            max_index = output.argmax()
            indices = [max_index // 3, max_index % 3]
            if frame[indices[0]][indices[1]] is None:
                return max_index
            output[max_index] = -1
        return -1

In [45]:
dense_player = PolicyGradPlayer('PolicyGrad', Frame.X)

Loaded weights_1 parameters successfully
Loaded biases_1 parameters successfully
Loaded weights_3 parameters successfully
Loaded biases_3 parameters successfully
Loaded weights_2 parameters successfully
Loaded biases_2 parameters successfully


In [41]:
game = Game(
    dense_player,
    # RandomPlayer('Random', Frame.O)
    PerfectPlayer('Static', Frame.O)
)

data_manager = DataManager(max_size=10000)
data_manager.get()

train = True

for i in range(100):
    print(f'\n----\nEpoch: {i}')
    game.start(100, print_frame=(not train))
    data_manager.enqueue(game.matches)
    if train:
        dense_player.train(10, data_manager)
    dense_player.show_confusion_matrix(data_manager)
    game.matches.clear()
    game.swap_players()

    if train:
        dense_player.save_params()
        data_manager.write()



----
Epoch: 0
Loss = 347.0382080078125
Confusion matrix
[[327  37   6   0 121  13   5  14 107]
 [  9 142  22  25 128  17  10   1 145]
 [ 72   4 223   4 174   6  76  14 147]
 [ 18   0  36 247  57   7  42   5 158]
 [ 19   0   4   0 648  10  15  47 198]
 [ 39  11  13   1  70 168  13  11  95]
 [ 97   2   5  43 148   6 457   0 117]
 [ 11   0  12   0  67   2  25 200  97]
 [ 30   3   6   4   8   4  10   2 397]]
Accuracy: 0.5075894470545718

----
Epoch: 1
Loss = 370.3204345703125


KeyboardInterrupt: ignored

In [42]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
drive_path =path = f"/content/gdrive/My Drive/Projects/weights/tictactoe"
dense_player.save_params(drive_path)