### **IMPORTS**

In [None]:
import tensorflow
import keras
from tensorflow.keras import Sequential
import numpy as np
import random
import pickle
from tensorflow.keras.layers import Dense
from tensorflow.keras import losses
from tensorflow.keras import optimizers
import matplotlib
import matplotlib.pyplot as plt

### **Versions**

In [None]:
import sys
print(sys.version)

3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0]


In [None]:
print(tensorflow.__version__)
print(keras.__version__)
print(np.__version__)
print(matplotlib.__version__)


2.15.0
2.15.0
1.23.5
3.7.1


### ***POLICY***

In [None]:
class EpsilonGreedy:
    def __init__(self, epsilon):
        '''
            Initialise the epsilon value
        '''
        self.epsilon = epsilon
    def perform(self, q_value, action_space: list = None):
        '''
          get a random probability
          if prob <= epsilon then take random action
          else argmax q action
        '''
        prob = np.random.sample()
        if prob <= self.epsilon:
            if action_space is None:
                return np.random.randint(len(q_value))
            return np.random.choice(action_space)
        else:
            if action_space is None:
                return np.argmax(q_value)
            return max([[q_value[a], a] for a in action_space], key=lambda x: x[0])[1]

    def decay(self, decay_value, lower_bound):
        '''
           updating epsilon by decay factor  => exploration is reduced and exploitaion is preferred
        '''
        self.epsilon = max(self.epsilon * decay_value, lower_bound)


### Experience replay

In [None]:
class ExperienceReplay:
    def __init__(self, e_max: int):
        '''
            Initalize the experience replay buffer
        '''
        self.e_max = e_max
        self.memory = list()
        self.index = 0

    def add_experience(self, sample: list):
        '''
            sample is list containing action state with values
            If buffer is full then replace from front
        '''

        if len(self.memory) < self.e_max:
            self.memory.append(sample)
        else:
            self.memory[self.index] = sample
        self.index = (self.index + 1) % self.e_max

    def sample_experience(self, sample_size: int, cer_mode: bool):
        '''
            Take some random samples from experience replay buffer
            if cer mode is set last sample from random sample is replaced with last item of replay buffer
        '''
        samples = random.sample(self.memory, sample_size)
        if cer_mode:
            samples[-1] = self.memory[self.index - 1]
        s_batch, a_batch, r_batch, ns_batch, done_batch = map(np.array, zip(*samples))
        return s_batch, a_batch, r_batch, ns_batch, done_batch

    def get_size(self):
        '''
            Returns the size of experience replay buffer
        '''
        return len(self.memory)


### **DQN**

In [None]:
class DQN():
    def __init__(self, discount_factor: float, epsilon: float, e_min: int, e_max: int):
        '''
            parameters  : discount factor ,  epsilon , e_min , e_max
            e-max  :  maximum experienes to store in experience replay buffer
            initialized the training and target deep neural networks with Sequential API from tensorflow
        '''
        self.gamma = discount_factor
        self.epsilon_greedy = EpsilonGreedy(epsilon)
        self.e_min = e_min
        self.exp_replay = ExperienceReplay(e_max)
        self.training_network = Sequential()
        self.target_network = Sequential()
        self.cache = list()

    def observe(self, state, action_space: list = None):
        '''
            for the given state and action space q_value is predicted agent makes an action with max q _value
        '''
        q_value = self.training_network.predict(np.array([state])).ravel()
        if action_space is not None:
            return max([[q_value[a], a] for a in action_space], key=lambda x: x[0])[1]
        return np.argmax(q_value)

    def observe_on_training(self, state, action_space: list = None) -> int:
        '''
            action is taken based on greedy strategy
            cache is temporary list to store states and actions until final reward is obtained
        '''
        q_value = self.training_network.predict(np.array([state])).ravel()
        action = self.epsilon_greedy.perform(q_value, action_space)
        self.cache.extend([state, action])
        return action

    def take_reward(self, reward, next_state, done):
        '''
          Updating the experience replay buffer
        '''
        self.cache.extend([reward, next_state, done])
        self.exp_replay.add_experience(self.cache.copy())
        self.cache.clear()

    def train_network(self, sample_size: int, batch_size: int, epochs: int, verbose: int = 2, cer_mode: bool = False):
        if self.exp_replay.get_size() >= self.e_min:
            # state_samples, action_samples, reward_samples, next_state_samples, done_samples
            s_batch, a_batch, r_batch, ns_batch, done_batch = self.exp_replay.sample_experience(sample_size, cer_mode)
            states, q_values = self.replay(s_batch, a_batch, r_batch, ns_batch, done_batch)
            history = self.training_network.fit(states, q_values, epochs=epochs, batch_size=batch_size, verbose=verbose)
            return history.history['loss']

    def replay(self, states, actions, rewards, next_states, terminals):
        q_values = self.target_network.predict(np.array(states))  # get q value at state t by target network
        nq_values = self.target_network.predict(np.array(next_states))  # get q value at state t+1 by target network
        for i in range(len(states)):
            a = actions[i]
            done = terminals[i]
            r = rewards[i]
            if done:
                q_values[i][a] = r
            else:
                q_values[i][a] = r + self.gamma * np.max(nq_values[i])
        return states, q_values

    def update_target_network(self):
        self.target_network.set_weights(self.training_network.get_weights())


In [None]:
# tictactoe environment
class Tictactoe_v0:
    #constructor to create a empty board and define winning states
    def __init__(self):
        '''
        winning positions  -
        012 indicate the first row filled with same marks and similarly for other rows 345 678
        036 indicate the first column filled with same marks and simlarly for other columns 147 258
        048 & 246 indicate the diagonals

        player mark - can be 1 or -1  ( analogous to O AND X )
        '''
        self.board = [0] * 9
        self.winning_position = [[0, 1, 2], [3, 4, 5], [6, 7, 8],
                                [0, 3, 6], [1, 4, 7], [2, 5, 8],
                                [0, 4, 8], [6, 4, 2]]
        self.current_turn = 1
        self.player_mark = 1

    def reset(self, is_human_first):
        '''
        reset methods resets the tictactoe object
        parameters : is_human_first if not set allows agent to make move first
        '''
        self.board = [0] * 9
        self.current_turn = 1
        self.player_mark = 1 if is_human_first else -1
        if not is_human_first:
            self.env_act()
        return self.board.copy()


    def check_win(self):
        '''
            pst is position parameter which tuple of indices
            board[pst[0]] + board[pst[1]] + board[pst[2]] is permutation of combination of 1,0,-1
        '''
        for pst in self.winning_position:
            if str(self.board[pst[0]]) + str(self.board[pst[1]]) + str(self.board[pst[2]]) in ['111', '-1-1-1']:
                if self.current_turn == self.player_mark:
                    # win and done
                    return 1, True
                #lose and done
                return -1, True
        #all values of board filled with 1 or -1 tie situation
        if 0 not in self.board:
            return 0, True
        #game not done
        return 0, False

    def env_act(self):
        action = random.choice([i for i in range(len(self.board)) if self.board[i] == 0])
        for pst in self.winning_position:
            com = str(self.board[pst[0]]) + str(self.board[pst[1]]) + str(self.board[pst[2]])
            if com.replace('0', '') == str(self.current_turn) * 2:
                if self.board[pst[0]] == 0:
                    action = pst[0]
                elif self.board[pst[1]] == 0:
                    action = pst[1]
                else:
                    action = pst[2]
        if self.board[action] != 0:
            raise Exception('Invalid action')
        self.board[action] = self.current_turn
        reward, done = self.check_win()
        self.current_turn = self.current_turn * -1
        return reward, done

    def step(self, action):
        #action on index which is already filled
        if self.board[action] != 0:
            raise Exception('Invalid action')
        # fill the index with current turn
        self.board[action] = self.current_turn
        reward, done = self.check_win()
        # flip the current turn
        self.current_turn = self.current_turn * -1
        #if game over return reward and board
        if done:
            return self.board.copy(), reward, done, None
        #else take next step
        reward, done = self.env_act()
        return self.board.copy(), reward, done, None


In [None]:

'''
  Initialize the enviornment
'''
env = Tictactoe_v0()
agent = DQN(0.7, 0.4, 4096, 1048576) #initialize the dqn

# Training network
op1 = optimizers.RMSprop(learning_rate=0.00025)
agent.training_network.add(Dense(128, activation='relu', input_shape=(9,)))
agent.training_network.add(Dense(128, activation='relu'))
agent.training_network.add(Dense(9, activation='linear'))
agent.training_network.compile(optimizer=op1, loss=losses.mean_squared_error)

# Target Network
op2 = optimizers.RMSprop(learning_rate=0.00025)
agent.target_network.add(Dense(128, activation='relu', input_shape=(9,)))
agent.target_network.add(Dense(128, activation='relu'))
agent.target_network.add(Dense(9, activation='linear'))
agent.target_network.compile(optimizer=op2, loss=losses.mean_squared_error)

# update the target network
agent.update_target_network()

reward_records = list()
loss_records = list()
count = 0
tau = 500
record = 0

In [None]:
if __name__ == '__main__':
    for episode in range(10000):
        state = env.reset(1)
        done = False
        print(episode, '-------------------------------------------------------', 'current epsilon: ', agent.epsilon_greedy.epsilon)
        while not done:
            action = agent.observe_on_training(state, [i for i in range(len(state)) if state[i] == 0])
            state, reward, done, _ = env.step(action)
            print(state, done)
            record += reward
            # print(episode, '-----------------------------------', reward)
            agent.take_reward(reward, state, done)
            hist = agent.train_network(64 ,64, 1, 2, cer_mode=True)
            loss_records.append(hist)
            count += 1
            # time to update the target network
            if count % tau == 0:
                agent.update_target_network()
        reward_records.append(record)
        agent.epsilon_greedy.decay(0.99999, 0.1)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
9730 ------------------------------------------------------- current epsilon:  0.36291333550855226
[1, 0, -1, 0, 0, 0, 0, 0, 0] False
1/1 - 0s - loss: 0.0039 - 7ms/epoch - 7ms/step
[1, 0, -1, 0, -1, 0, 1, 0, 0] False
1/1 - 0s - loss: 0.0023 - 13ms/epoch - 13ms/step
[1, 0, -1, 1, -1, 0, 1, 0, 0] True
1/1 - 0s - loss: 0.0016 - 8ms/epoch - 8ms/step
9731 ------------------------------------------------------- current epsilon:  0.3629097063751972
[0, 0, 0, 0, -1, 0, 0, 1, 0] False
1/1 - 0s - loss: 0.0011 - 10ms/epoch - 10ms/step
[-1, 0, 0, 0, -1, 0, 0, 1, 1] False
1/1 - 0s - loss: 0.0027 - 7ms/epoch - 7ms/step
[-1, 0, 0, 0, -1, 0, 1, 1, 1] True
1/1 - 0s - loss: 0.0033 - 10ms/epoch - 10ms/step
9732 ------------------------------------------------------- current epsilon:  0.36290607727813345
[0, 0, 1, 0, 0, 0, 0, -1, 0] False
1/1 - 0s - loss: 0.0024 - 10ms/epoch - 10ms/step
[0, 1, 1, 0, 0, 0, -1, -1, 0] False
1/1 - 0s - loss: 0.

In [None]:
def print_board(board):
    mark = {1:'X', -1:'O', 0:' '}
    for i in range(3):
        print(end=' | ')
        for j in range(3):
            print(mark[board[3*i + j]], end=' | ')
        print('\n')
    print('------------------------')

In [None]:
#Dictionary to store wins , draws , looses
result_vs_random = {1:0, 0:0, -1:0}

#initialize the game
env_test = Tictactoe_v0()

#no of tests
for ep in range(1000):
    # game not over
    done = False
    reward = 0
    # reset the last state in game
    state = env_test.reset(1)
    is_first_move = True
    print('game ' + str(ep) + ' start ---------------------')
    while not done:
        if is_first_move:
            action = random.choice([i for i in range(len(state)) if state[i] == 0]) #random player move
            is_first_move = False # make first move and set it to false
        else:
            action = agent.observe(state, [i for i in range(len(state)) if state[i] == 0])
        print_board(state)
        state, reward, done, _ = env_test.step(action)
    result_vs_random[reward] += 1
    print_board(state)
    print('game ' + str(ep) + ' end ---------------------', reward)

game 0 start ---------------------
 |   |   |   | 

 |   |   |   | 

 |   |   |   | 

------------------------
 |   |   |   | 

 |   | X |   | 

 |   | O |   | 

------------------------
 |   |   |   | 

 |   | X | O | 

 | X | O |   | 

------------------------
 |   |   | X | 

 |   | X | O | 

 | X | O |   | 

------------------------
game 0 end --------------------- 1
game 1 start ---------------------
 |   |   |   | 

 |   |   |   | 

 |   |   |   | 

------------------------
 |   |   | X | 

 |   | O |   | 

 |   |   |   | 

------------------------
 |   | X | X | 

 |   | O |   | 

 | O |   |   | 

------------------------
 | X | X | X | 

 |   | O |   | 

 | O |   |   | 

------------------------
game 1 end --------------------- 1
game 2 start ---------------------
 |   |   |   | 

 |   |   |   | 

 |   |   |   | 

------------------------
 |   |   |   | 

 | O |   |   | 

 |   |   | X | 

------------------------
 |   |   |   | 

 | O |   | X | 

 | O |   | X | 

--------------

In [None]:
print('Result:')
print('win: ', result_vs_random[1])
print('tie: ', result_vs_random[0])
print('lose: ', result_vs_random[-1])