In [5]:
import numpy as np
from keras.layers import Dense, Activation
from keras.models import Sequential, load_model
from keras.optimizers import Adam



In [6]:
# Experience Replay
class ReplayBuffer():
    def __init__(self,memory_size=100000, input_shape=(9,)):
        self.memory_size = int(memory_size/1.5)
        self.input_shape = input_shape
        # memory counter -> tracks first unsaved memory and use it to insert new memory into buffer
        # When replay buffer becomes full, it rewrites earliest memoreis
        self.memory_counter = 0
#         print(input_shape)
#         print(type(self.memory_size/2))
        self.state_memory = np.zeros((self.memory_size, *input_shape), dtype=np.float32)
        self.new_state_memory = np.zeros((self.memory_size, *input_shape), dtype=np.float32)
        # terminal memory is the memory where it ends. 
        # Try int32 
        self.action_memory = np.zeros(self.memory_size, dtype=np.int32)
        self.reward_memory = np.zeros(self.memory_size, dtype=np.float32)
        self.terminal_memory = np.zeros(self.memory_size, dtype=np.float32)

    def store_transition(self, state, action, reward, state_, done):
        index = self.memory_counter % self.memory_size
        self.state_memory[index] = state
        self.new_state_memory[index] = state_
        self.reward_memory[index] = reward
        self.action_memory[index] = action
        self.terminal_memory[index] = 1-int(done)
        self.memory_counter += 1

    def sample_buffer(self, batch_size):
        # Checklist: Have we filled up the agents memory or not. If we have we need to sample a bunch of zeroes
        # If we haven't filled up the agent's memory we shouldn't sample it
        # If we filled it up -> we sample full memory 
        # If we haven't -> we sample till the memory size, because the rest doesn't give it any significant information

        # TODO: Remove this option and test it out
        current_memory = min(self.memory_counter, self.memory_size)
        # TODO: Constant batch size
        batch = np.random.choice(current_memory, batch_size, replace=False)

        # THEN WE JUST SAMPLE THE MEMORY
        state = self.state_memory[batch]
        state_ = self.new_state_memory[batch]
        rewards = self.reward_memory[batch]
        actions = self.action_memory[batch]
        terminal = self.terminal_memory[batch]

        # TODO: Instead of the above random choice the s,a,r,s',t from the list directly of size, batch size, instead we can build a better np arrays
        # Simpler? YES!

        return state, actions, rewards, state_, terminal

In [7]:
import tensorflow as tf
import numpy as np

import random
import numpy as np
import tensorflow as tf
# To copy weights of local to target network
import copy

class Env:
    def __init__(self):
        self.board = np.array([-1.0] * 9)
        self.allowedAction = np.where([-1.0] * 9)
        self.winning_combos = (
        [6, 7, 8], [3, 4, 5], [0, 1, 2], [0, 3, 6], [1, 4, 7], [2, 5, 8],
        [0, 4, 8], [2, 4, 6],)
        self.corners = [0,2,6,8]
        self.sides = [1,3,5,7]
        self.middle = 4

    def get_marker(self):
        return (1.0,0.0)
    
    def set_board(self, board):
        self.board = board
        self.allowedAction = list(np.where(self.board==-1)[0])
        return board

    def reset(self):
        self.board = np.array([-1.0] * 9)
        return self.board
    
    def get_board(self):
        return self.board.reshape(3,3)

    def step(self, action, mark):
        over = False
        reward = 0

        self.make_move(self.board, action, mark)
        
        if(self.is_winner(self.board,mark)):
            reward = 100
            over = True

        # drawing
        elif self.is_board_full():
            reward = 10
            over = True
        
#         print(self.board)

        self.allowedAction = list(np.where(self.board==-1)[0])

        return self.board,self.allowedAction, reward, over

    def is_winner(self, board, mark):
        for combo in self.winning_combos:
            if (board[combo[0]] == board[combo[1]] == board[combo[2]] == mark):
                return True
        return False

    def get_winning_combo(self, board):
        for combo in self.winning_combos:
            if (board[combo[0]] == board[combo[1]] == board[combo[2]]):
                return [combo[0], combo[1], combo[2]]
        return [None, None, None]

    def is_space_free(self, board, index):
        "checks for free space of the board"
        return board[index] == -1.0

    def is_board_full(self):
        "checks if the board is full"
        for i in range(1,9):
            if self.is_space_free(self.board, i):
                return False
        return True

    def make_move(self,board,index, mark):
        board[index] =  mark

    def choose_random_move(self, move_list):
        possible_winning_moves = []
        for index in move_list:
            if self.is_space_free(self.board, index):
                possible_winning_moves.append(index)
        if len(possible_winning_moves) != 0:
            return random.choice(possible_winning_moves)
        else:
            return None



In [14]:
class Game:
    def __init__(self, epsilon=1., min_epsilon=0.1, epsilon_decay=5e-4):
        self.epsilon = 1
        self.batch_size = 50
        self.min_epsilon = min_epsilon
        self.epsilon_decay = epsilon_decay
        self.game = Env()
        self.model = Sequential()
        self.model.add(Dense(4, input_dim=9, activation='relu'))
        self.model.add(Dense(4, activation='relu'))
        self.model.add(Dense(9, activation='sigmoid'))
        self.model.add(Activation('softmax'))
        self.model.compile(loss='mse', optimizer='adam')

        self.model_adv = Sequential()
        self.model_adv.add(Dense(4, input_dim=9, activation='relu'))
        self.model_adv.add(Dense(4, activation='relu'))
        self.model_adv.add(Dense(9, activation='sigmoid'))
        self.model_adv.add(Activation('softmax'))
        self.model_adv.compile(loss='mse', optimizer='adam')

        self.buffer = ReplayBuffer(input_shape = (9,) , memory_size = 100000)
        self.buffer_adv = ReplayBuffer(input_shape = (9,) , memory_size = 100000)
#         self.model.compile(optimizer='adam', loss='mse')
    
    def take_action(self, allowed_actions, state):
#         FOR SELECTING RANDOM
        random = np.random.choice([True, False], 1, p=[self.epsilon,1-self.epsilon])[0]
#     UPDATE EPSILON VALUE
        self.epsilon = max(self.epsilon-self.epsilon_decay, self.min_epsilon)
        
#         IF RANDOM WHAT DO I DO!!!!
        all_actions = np.arange(9)

#         IF RANDOM WE GO WITH THIS TWO LINES
        allowed_actions_pos = np.take(all_actions, allowed_actions)
        action = np.random.choice(allowed_actions_pos)
            
        if not random:
            actions = self.model.predict(np.array([state]))[0]
            max_action = 0
            for k,v in enumerate(list(actions)):
                if k in allowed_actions:
                    if v>max_action:
                        max_action=v
                        action = k
        return action
    
    def play_with_self(self):

        for i in range(10000):
            self.game.reset()
            done = False
            if i%1000 == 0:
                print("rounds: ", i)
            step = 0
            allowed_actions = np.arange(9)
            state = self.game.board
            while not done:
#                 Starting game
                step+=1
#                 When player 1 is done, player 2 get's lower reward

                p1_play = np.random.choice([True,False], p = [0.9,0.1])
                if(p1_play):
#                 Player 1
                    action = self.take_action(allowed_actions, state)
                    state_,allowed_actions,r,done = self.game.step(action=action,mark=0)
                
                p2_play = np.random.choice([True,False], p = [0.9,0.1])
#                 Player 2
                if p2_play:
                    if not done:
                        action2 = self.take_action(allowed_actions, state_)
                        state2_,allowed_actions,r2,done = self.game.step(action=action2,mark=1)
                        self.buffer_adv.store_transition(state_, action2, r2, state2_, done)
                    else:
                        self.buffer_adv.store_transition(state, action, -r, state_, done)
                
                if(p1_play):
    #                 Add both to ReplayBuffer
                    self.buffer.store_transition(state, action, r, state_, done)
                
                self.learn()
                state = state2_
    
    def learn(self):
        if self.buffer.memory_counter<self.batch_size:
            return
        
        state, action, rewards, state_, dones = self.buffer.sample_buffer(self.batch_size)
        state_adv, action_adv, rewards_adv, state_adv_, dones = self.buffer_adv.sample_buffer(self.batch_size)

        # Finding a q value for a bunch of states and next states
        q = self.model.predict(state)
        q_next = self.model.predict(state_)

        # Finding a q value for a bunch of states and next states
        q_adv = self.model.predict(state_adv)
        q_adv_next = self.model.predict(state_adv_)

        # Q value to be updated -> basically the weights to be updated such that the q value is changed to whatever it is supposed to be. 
        q_target = np.copy(q)
        q_target_adv = np.copy(q)

        # handles array indexing later
        batch_index = np.arange(self.batch_size, dtype=np.int32)

#       MaxQ
        max_of_q_of_each_sample = np.max(q_next, axis=1)
        max_of_q_of_each_sample_adv = np.max(q_adv_next, axis=1)
#     Gamma = 0.9
        q_target[batch_index, action] = rewards+0.9*max_of_q_of_each_sample*dones
        q_target_adv[batch_index, action] = rewards+0.9*max_of_q_of_each_sample_adv*dones

#       IS THERE ANY OTHER WAY OF WRITING THIS?
#       Neural Network style of updating a q table. Update weights
        self.model.train_on_batch(state, q_target)
        self.model_adv.train_on_batch(state_adv, q_target_adv)

        # TODO: Update self.epsilon

        self.epsilon = self.epsilon - self.epsilon_decay if self.epsilon>self.min_epsilon else self.min_epsilon

    def play(self):
        done = False
        step = 0
        allowed_actions = np.arange(9)
        self.game.reset()
        state = self.game.board
        while not done:
#                 Starting game
            step+=1
            p1_play = np.random.choice([True,False], p = [0.9,0.1])
            if(p1_play):
    #                 Player 1
                action = self.take_action(allowed_actions, state)
                state_,allowed_actions,r,done = self.game.step(action=action,mark=0)
            if(p2_play):
    #                 Player 2
                print('Game: ')
                print(self.game.get_board())
                print("Enter your value among :") 
                print(self.game.allowedAction)
                played_right = False
                while not played_right:
                    try:
                        action2 = int(input('enter value'))
                        played_right = True
                    except:
                        print("Enter the right value in integer from 0 to 8")
                state2_,allowed_actions,r2,done = self.game.step(action=action2,mark=1)
            
            state = state_
            
            



In [9]:
game = Game()
game.play_with_self()
game.play()

rounds:  0
rounds:  1000


KeyboardInterrupt: 

In [13]:
np.random.choice([True,False], p = [0.9,0.1])

True