# Imports

In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
 

import copy
import random
import time
from json.encoder import INFINITY
import pandas as pd
import os
import math
from ayo_game import play, is_illegal_move, assign_reward, print_game_play, end_game
from agents import random_agent as ra
from agents import minimax_agent as ma
from agents import mcts_agent as mctsa
seed = 37

randint = random.randint
random.seed(seed)
value_model_file = 'latest_value_model.keras'
policy_model_file = 'latest_policy_model.keras'

# checkpoint_filepath = 'latest.keras'
# checkpoint_filepath = 'multi.keras'

# Agents

In [29]:
minimax_agent = ma.agent
random_agent = ra.agent

# Helpers

In [91]:
def format_state(state):
    x = []
    x.extend(state['board'])
    x.extend(state['reward'])
    x.append(state['current_player'])
    x.append(state['player_territory'][1])

    return x

In [92]:
def norm(val_1, val_2):
    diff = val_1 - val_2

    if diff < 0:
        return -1
    if diff > 0:
        return 1
    return 0

In [93]:
def custom_train_test_split(data, test_size=0.2, shuffle=True, random_state=None):
    """
    Custom function to split data into training and test sets.

    Parameters:
    - X: Input features (numpy array or list)
    - y: Target labels (numpy array or list)
    - test_size: Proportion of the data to include in the test split (default 0.2)
    - shuffle: Whether to shuffle the data before splitting (default True)
    - random_state: Seed for the random number generator (optional, for reproducibility)

    Returns:
    - X_train, X_test, y_train, y_test: Split training and testing data
    """
    X = np.array([d[0:17] for d in data])
    y = np.array([d[17] for d in data])

    # Set random seed if provided (to ensure reproducibility)
    if random_state is not None:
        np.random.seed(random_state)

    # Get the number of samples
    num_samples = len(X)

    # Shuffle the data if requested
    if shuffle:
        indices = np.random.permutation(num_samples)
        X = X[indices]
        y = y[indices]

    # Compute the split index
    split_index = int(num_samples * (1 - test_size))

    # Split the data into training and testing sets
    X_train, X_test = X[:split_index], X[split_index:]
    y_train, y_test = y[:split_index], y[split_index:]

    return X_train, X_test, y_train, y_test


In [94]:

def remove_duplicates(arr):
    # Convert the list of lists to a NumPy array
    np_arr = np.array(arr, dtype=object)
    
    # Create a set to keep track of unique first elements (list of numbers)
    seen = set()
    unique_arr = []

    for item in np_arr:
        # Convert the list of numbers (first element) to a tuple so it can be added to a set
        num_tuple = tuple(item[0])
        if num_tuple not in seen:
            unique_arr.append(item)
            seen.add(num_tuple)
    
    return np.array(unique_arr, dtype=object)

In [95]:
def add_data_to_csv(file_name, new_data, y_column=None):
    columns = ['Pit_1', 'Pit_2', 'Pit_3','Pit_4', 'Pit_5', 'Pit_6', 'Pit_7', 'Pit_8', 'Pit_9', 'Pit_10', 'Pit_11', 'Pit_12', 'player_1_reward', 'player_2_reward','last_rewarded_player','current_player', 'player_territory', y_column]  # Define columns (used only if file doesn't exist)
    """
    Adds new data to a CSV file by first converting the new data and the existing data to a DataFrame.

    Parameters:
    - file_name: The name of the CSV file (e.g., 'data.csv').
    - new_data: A list or tuple representing a new row (or multiple rows) to be added.
    - columns: A list of column names for the CSV file (used when the file does not already exist).
    """
    # Check if the CSV file already exists
    if os.path.exists(file_name):
        # Load existing CSV into a DataFrame
        df_existing = pd.read_csv(file_name)
    else:
        # If the file doesn't exist, create an empty DataFrame with the specified columns
        df_existing = pd.DataFrame(columns=columns)

    # Create a DataFrame from the new data
    # Check if new_data is a list of lists (multiple rows) or a single row
    if isinstance(new_data[0], (list, tuple)):
        df_new = pd.DataFrame(new_data, columns=columns)
    else:
        df_new = pd.DataFrame([new_data], columns=columns)

    # Append the new data to the existing DataFrame
    df_combined = pd.concat([df_existing, df_new], ignore_index=True)

    # Write the combined DataFrame back to the CSV file
    df_combined.to_csv(file_name, index=False)


# DRL

In [96]:
class PolicyModel:
    def __init__(self):
        self.num_actions=12
        self.policy_model_file = None
        self.X_shape=17
        self.model = Sequential([
            Dense(16, input_dim=self.X_shape, activation='relu'),  # First hidden layer
            Dense(32, activation='relu'),  # Second hidden layer
            Dense(64, activation='relu'),  # Second hidden layer
            Dense(32, activation='relu'),  # Second hidden layer
            Dense(16, activation='relu'),  # Second hidden layer
            Dense(self.num_actions, activation='softmax')  # Output layer for 12 possible actions
        ])


    def train(self, training_examples):
        if self.policy_model_file != None:
            self.model = load_model(self.policy_model_file)
                # Train the model (Assume more training data is available)
        # For example purposes, we'll use the same sample data multiple times
        X_train, X_test, y_train, y_test = custom_train_test_split(training_examples, test_size=0.3, shuffle=True, random_state=None)
        policy_model_file = f'policy_models/policy_model_{str(time.time())}.keras'
        

        # Convert target (y) to categorical (for classification)
        num_actions = self.num_actions # Assuming 12 possible actions (0 to 11)
        y_train = tf.keras.utils.to_categorical(y_train, num_classes=num_actions)
        y_test = tf.keras.utils.to_categorical(y_test, num_classes=num_actions)

        # Compile the model
        self.model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'], )

        callbacks = [EarlyStopping(patience=20, monitor='loss', verbose=0),
             ReduceLROnPlateau(monitor='val_accuracy',factor=0.01, min_Ir=0.00001, verbose=0),
             ModelCheckpoint(policy_model_file, verbose=0, save_best_only=True, save_weights_only=False)
             ]

        # Train the model (Assume more training data is available)
        # For example purposes, we'll use the same sample data multiple times
        self.model.fit(X_train, y_train, epochs=200, batch_size=64,  callbacks=callbacks, validation_data=(X_test,y_test))
        self.policy_model_file = policy_model_file



    def predict(self,data):
        if self.policy_model_file != None:
            self.model = load_model(self.policy_model_file)
        data = np.array(data)
        prediction = self.model.predict(data, verbose=None)
        return prediction[0]
    

class ValueModel:
    def __init__(self):
        self.X_shape= 17
        self.value_model_file = None
        self.model = Sequential([
            Dense(16, input_dim=self.X_shape, activation='relu'),  # First hidden layer
            Dense(32, activation='relu'),  # Second hidden layer
            Dense(64, activation='relu'),  # Second hidden layer
            Dense(32, activation='relu'),  # Second hidden layer
            Dense(16, activation='relu'),  # Second hidden layer
            Dense(1)  # Output layer for 12 possible actions
        ])


    def train(self, training_examples):
        if self.value_model_file != None:
            self.model = load_model(self.value_model_file)
             
        value_model_file = f'value_models/value_model_{str(time.time())}.keras'
        # Train the model (Assume more training data is available)
        # For example purposes, we'll use the same sample data multiple times
        X_train, X_test, y_train, y_test = custom_train_test_split(training_examples, test_size=0.3, shuffle=True, random_state=None)

        # Compile the model
        self.model.compile(optimizer='adam', loss='mean_squared_error', metrics=['accuracy'])

        callbacks = [EarlyStopping(patience=20, monitor='loss', verbose=0),
             ReduceLROnPlateau(monitor='val_accuracy',factor=0.01, min_Ir=0.00001, verbose=0),
             ModelCheckpoint(value_model_file, verbose=0, save_best_only=True, save_weights_only=False)
             ]

        self.model.fit(X_train, y_train, epochs=200, batch_size=64,  callbacks=callbacks, validation_data=(X_test,y_test))
        self.value_model_file = value_model_file


    def predict(self,data):
        if self.value_model_file != None:
            self.model = load_model(self.value_model_file)
        # Predict action for a new data point
        data = np.array(data)
        prediction = self.model.predict(data, verbose=None)
        return prediction[0][0]


In [97]:
value_model = ValueModel()
policy_model = PolicyModel()

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


# MCTS

In [98]:
def get_valid_actions_mct(state):
    board = state['board']
    territory = state['player_territory'][1]
    current_player = state['current_player']
    valid_actions = []
    for i,a in enumerate(board):
        if current_player == 0 and i < territory and a != 0:
                valid_actions.append(1)
                continue
        if current_player == 1 and i >= territory and a != 0:
                valid_actions.append(1)
                continue
        valid_actions.append(0)
    return valid_actions

In [100]:
class Node():
    def __init__(self,state,prior,parent_node=None):
        self.parent_node = parent_node
        self.prior = prior
        self.total_score = 0
        self.visit_count = 0
        self.expanded = False
        self.children = {}
        self.reward = state['reward']
        self.state = state

    def update_result(self, reward):
        self.total_score += reward
        self.visit_count += 1

    def printer(self):
        print('parent_node: ',self.parent_node)
        print('prior: ',self.prior)
        print('total_score: ',self.total_score)
        print('visit_count: ',self.visit_count)
        print('expanded: ',self.expanded)
        print('children: ',self.children)
        print('reward: ',self.reward)
        print('state: ',self.state)



def expand(node, action_probs):
    """
    We expand a node and keep track of the prior policy probability given by neural network
    """
    state = node.state
    for action, prob in enumerate(action_probs):
        if prob != 0:
            new_state, new_reward = play(state, action)
            reward = assign_reward(state['reward'], new_reward)
            new_state['reward'] = reward
            node.children[action] = Node(state=new_state,prior=prob,parent_node=node)

    node.expanded = True



def resources_left( start_time, duration):
    current_time = time.time()
    if current_time - start_time >= duration:
        return False
    
    return True


def ucb_score(parent, child):
    """
    The score for an action that would transition between the parent and child.
    """
    if child.visit_count == 0:
        return np.inf
    
    prior_score = child.prior * math.sqrt(math.log(parent.visit_count) / (child.visit_count))
    value_score = -(child.total_score / child.visit_count)

    return value_score + prior_score



def select(node):
    """
    Select the child with the highest UCB score.
    """
    best_score = -np.inf
    best_action = -1
    best_child = None

    for action, child in node.children.items():
        score = ucb_score(node, child)
        if score > best_score:
            best_score = score
            best_action = action
            best_child = child

    return best_action, best_child



def generate_action(state,agent_1, agent_2):
   if state['current_player'] == 0:
      func = agent_1['func']
      arg = agent_1['arg']
      return func(state, arg)

   if state['current_player'] == 1:
      func = agent_2['func']
      arg = agent_2['arg']
      return func(state, arg)



def simulate_game(state,  agent_1, agent_2,show=False):
    state = copy.deepcopy(state)
    reward = [0,0,0]
    path = []

    while True:
        action = generate_action(state, agent_1, agent_2)

        if is_illegal_move(state, action):
            continue

        state, new_reward = play(state, action)
        reward = assign_reward(reward, new_reward)

        if show:
            print_game_play(state, reward, action)

        if end_game(state):
            break
        path.append(action)

    return  (reward, path)



def best_child(node):
    actions = node.action_prob
    highest_visit = -INFINITY
    for action in actions:
        child = node.children[action]
        visit_count = child.visit_count
        if visit_count > highest_visit:
            highest_visit = visit_count
            best_action = action
    return best_action



def back_propagation(node, result):
    act_result = result * ((-1)**node.state['current_player'])
    node.update_result(act_result)

    parent_node = node.parent_node

    if parent_node == None:
        return

    back_propagation(parent_node, result)

# function for the result of the simulation
def rollout(node):

    state = node.state

    player_1 = {
    'func': random_agent,
    'arg': {}
    }

    player_2 = {
    'func': random_agent,
    'arg': {}
    }

    if end_game(state):
            return 0

    reward, path = simulate_game(state, player_1, player_2)

    return  norm(reward[0],reward[1])


def mcts(state,think_time):
    start_time = time.time()
    root = Node(state,state['current_player'])
    data = [format_state(root.state)]
    alpha = 0.7
    action_probs = policy_model.predict(data)
    valid_actions = np.array(get_valid_actions_mct(state))
    action_probs = action_probs *valid_actions
    action_probs /= np.sum(action_probs)
    expand(root, action_probs)
    i = 0    
    
    while resources_left(start_time, think_time):
        node = root
        while len(node.children) > 0:
            _, child = select(node)
            if child == None:
                break
            node = child

        data = [format_state(node.state)]
        if node.visit_count != 0:
            action_probs = policy_model.predict(data)
            valid_actions = np.array(get_valid_actions_mct(node.state))
            action_probs = action_probs * valid_actions
            action_probs /= np.sum(action_probs)
            expand(node, action_probs)
    
        value_reward = value_model.predict(data)
        mcts_reward = norm(node.reward[0], node.reward[1])
        value = ((1 - alpha)*mcts_reward)+(alpha * value_reward)
        back_propagation(node, value)
        i += 1

    return root

def select_action(node, temperature):
        """
        Select action according to the visit count distribution and the temperature.
        """
        visit_counts = np.array([child.visit_count for child in node.children.values()])
        actions = [action for action in node.children.keys()]
        if temperature == 0:
            action = actions[np.argmax(visit_counts)]
        elif temperature == float("inf"):
            action = np.random.choice(actions)
        else:
            # See paper appendix Data Generation
            visit_count_distribution = visit_counts ** (1 / temperature)
            visit_count_distribution = visit_count_distribution / sum(visit_count_distribution)
            action = np.random.choice(actions, p=visit_count_distribution)

        return action

def agent(state, arg):
    think_time = arg['think_time']
    temperature = arg['temperature']
    root = mcts(state, think_time)
    return select_action(root, temperature)

In [87]:
state = {
   'board' :[4, 4, 4, 4, 4, 4, 4, 4, 0, 4, 4, 0],
   'current_player': 1,
   'player_territory': (0,6),
   'reward':[0,0,0],
}
# state ={'board': [0, 0, 1, 0, 3, 0, 9, 2, 1, 10, 1, 9], 'current_player': 1, 'player_territory': (0, 6), 'reward': [8, 4, 0]}

root = mcts(state, 60)
action = select_action(root, 0)
action

[0.48648649 0.11411411 0.02402402 0.37537538]


6

In [86]:
root.printer()
child = root.children[7]
child.printer()

parent_node:  None
prior:  1
total_score:  4.643788965931161
visit_count:  57
expanded:  True
children:  {6: <__main__.Node object at 0x00000295845B6450>, 7: <__main__.Node object at 0x000002958B3BFB10>, 9: <__main__.Node object at 0x00000294F041CA10>, 10: <__main__.Node object at 0x00000294DA626F10>}
reward:  [0, 0, 0]
state:  {'board': [4, 4, 4, 4, 4, 4, 4, 4, 0, 4, 4, 0], 'current_player': 1, 'player_territory': (0, 6), 'reward': [0, 0, 0]}
parent_node:  <__main__.Node object at 0x000002958AEF8310>
prior:  0.42504381653475654
total_score:  -2.8071979399770504
visit_count:  28
expanded:  True
children:  {0: <__main__.Node object at 0x000002958456E450>, 1: <__main__.Node object at 0x00000294A4D91D90>, 2: <__main__.Node object at 0x000002947FCA6CD0>, 3: <__main__.Node object at 0x00000294F09AFD50>, 4: <__main__.Node object at 0x00000294812AA650>, 5: <__main__.Node object at 0x0000029585800290>}
reward:  [0, 0, 0]
state:  {'board': [4, 4, 4, 4, 4, 4, 4, 0, 1, 5, 5, 1], 'current_player':

# Test

In [105]:
def execute_episode(state,  agent_1, agent_2,show=False):
    state = copy.deepcopy(state)
    state['reward'] = [0,0,0]
    reward = [0,0,0]
    path = []
    train_example_policy = []
    train_example_value = []

    while True:
        action = generate_action(state, agent_1, agent_2)
        
        if is_illegal_move(state, action):
            continue

        train_example_policy.append([*format_state(state), action])
        state, new_reward = play(state, action)
        reward = assign_reward(reward, new_reward)
        state['reward'] = reward
        path.append(action)

        if show:
            print_game_play(state, reward, action)

        if end_game(state) or len(path)> 20:
            value = norm(reward[0],reward[1])
            for hist_state in train_example_policy:
                player_value = value * ((-1) ** (hist_state[15]))
                train_example_value.append([*hist_state[0:17], player_value])
                
            break

    return  train_example_value, train_example_policy, path


In [106]:
def generate_training_data(num_of_eps):
    value_data = []
    policy_data = []

    state = {
   'board' :[4,4,4,4,4,4,4,4,4,4,4,4],
   'current_player': 0,
   'player_territory': (0,6)
    }
    
    agent_1 = {
    'func': agent,
    'arg': {
        'think_time': 5,
        'temperature': 1,
    },
    'name': 'mcts_agent',
    'elo': 1200
    }

    agent_2 = {
    'func': agent,
    'arg': {
        'think_time': 5,
        'temperature': 1,
    },
    'name': 'mcts_agent',
    'elo': 1200
    }

    for i in range(num_of_eps):
        state['current_player'] = i%2
        state_value_data, state_policy_data, path = execute_episode(state,  agent_1, agent_2)
        add_data_to_csv(file_name='value_data.csv', new_data=state_value_data, y_column='value')
        add_data_to_csv(file_name='policy_data.csv', new_data=state_policy_data, y_column='policy')
        value_data.extend(state_value_data)
        policy_data.extend(state_policy_data)
        
        print('path_len: ', len(path))
        print('path: ', path)
        print(((i+1)/num_of_eps) * 100,'%')
    
        
    return value_data, policy_data


In [27]:
def test_play(num_of_eps):
    value_data = []
    policy_data = []

    state = {
   'board' :[4,4,4,4,4,4,4,4,4,4,4,4],
   'current_player': 0,
   'player_territory': (0,6)
    }
    
    agent_1 = {
    'func': agent,
    'arg': {
        'think_time': 20,
        'temperature': 0,
    },
    'name': 'mcts_agent',
    'elo': 1200
    }

    agent_2 = {
    'func': minimax_agent,
    'arg': {
        'max_dept': 3,
    },
    'name': 'minimax_agent',
    'elo': 1200
    }

    for i in range(num_of_eps):
        state['current_player'] = i%2
        state_value_data, state_policy_data, path = execute_episode(state,  agent_1, agent_2, True)
        print('path_len: ', len(path))
        print('path: ', path)
        value_data.extend(state_value_data)
        policy_data.extend(state_policy_data)
        
    return value_data, policy_data


In [None]:
num = 100
print(value_model.value_model_file,
policy_model.policy_model_file)
for _ in range(num):
    value_data, policy_data = generate_training_data(1)
    # value_model.train(value_data)
    # policy_model.train(policy_data)
    # test_play(1)
    

None None


  action_probs /= np.sum(action_probs)


path_len:  21
path:  [2, 8, 2, 7, 5, 7, 3, 6, 0, 7, 3, 6, 5, 6, 4, 11, 5, 6, 0, 10, 3]
100.0 %


  action_probs /= np.sum(action_probs)


In [17]:
model = value_model
data = [
    [4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 1, 6],# 0
    [0,2,3,0,1,0,0,0,1,0,16,1,0,6],# 1
    [0,0,2,7,2,7,7,1,2,0,8,8,0,6],# 1
]
# data = [[0, 0, 3, 11, 1, 1, 0, 1, 2, 11, 1, 1, 1, 6]]
# data = np.array(data)
p = model.predict(data)
print(p)

-0.23476975


In [None]:
# 2.0497162
# -1.2499301

    def train(self, training_examples):
        # Train the model (Assume more training data is available)
        # For example purposes, we'll use the same sample data multiple times
        X_train, X_test, y_train, y_test = custom_train_test_split(training_examples, test_size=0.3, shuffle=True, random_state=None)

        callbacks = [EarlyStopping(patience=20, monitor='loss', verbose=0),
             ReduceLROnPlateau(monitor='val_accuracy',factor=0.01, min_Ir=0.00001, verbose=0),
             ModelCheckpoint('latest_value_model.keras', verbose=0, save_best_only=True, save_weights_only=False)
             ]

        self.model.fit(X_train, y_train, epochs=200, batch_size=64,  callbacks=callbacks, validation_data=(X_test,y_test))


    def predict(self,data):
        # Predict action for a new data point
        data = np.array(data)
        prediction = self.model.predict(data, verbose=None)
        return prediction[0][0]