In [1]:
import unittest
import unittest.mock
import os
import sys
import subprocess
import psutil
from datetime import datetime
import numpy as np
import tensorflow as tf
from tf_agents.agents.ppo import ppo_agent
from tf_agents.environments import PyEnvironment, tf_py_environment, py_environment
from tf_agents.networks import actor_distribution_network, value_network
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.trajectories import from_transition, time_step as ts, policy_step
from tf_agents.specs import array_spec
import tensorflow_probability as tfp
from tf_agents.policies import tf_policy, py_tf_eager_policy, policy_saver, actor_policy
from tf_agents.policies.py_policy import PyPolicy
from tf_agents.utils import common
from tensorflow.keras.optimizers.legacy import Optimizer
from tensorflow.keras.optimizers.legacy import Adam

# Environment

The `Standard_Env` class defines the game environment for a Tic-Tac-Toe-like game, supporting features like action masks and probabilistic placement. Unit tests ensure the environment behaves correctly under various conditions.

In [2]:
import numpy as np
from tf_agents.environments import py_environment
from tf_agents.specs import array_spec
from tf_agents.trajectories import time_step as ts

class Standard_Env(py_environment.PyEnvironment):
    """Tic-tac-toe environment for reinforcement learning with customizable board size, win conditions, and probabilistic placement.

    Supports Standard (3x3), Random (3x3 with random placement), and Super Tic-Tac-Toe (12x12 cross-shaped) boards.
    Implements reward shaping with live/dead patterns and step penalties to facilitate agent learning.
    Inherits from PyEnvironment for TF-Agents compatibility.

    Args:
        board_size (tuple): Rows and columns of the board (e.g., (3, 3) for 3x3).
        win_condition (list): Number of pieces needed to win in [row, column, diagonal].
        unplayable_grids (np.ndarray, optional): 2D array marking unplayable squares (1=unplayable).
        rewards (dict, optional): Reward values for win, lose, tie, patterns, etc.
        def_level (int, optional): Defense level (default: 1, not used in this version).
        discount (float, optional): Discount factor for future rewards (default: 1.0).
        place_prob (float, optional): Probability of placing a piece in the chosen square (default: 0.5).
        show (bool, optional): If True, prints board and game state (default: False).
    """
    def __init__(self, board_size, win_condition, unplayable_grids=None, rewards=None, def_level=1, discount=1.0, place_prob=0.5, show=False):
        super().__init__()
        self._rows = board_size[0]
        self._cols = board_size[1]
        self._win_condition = win_condition
        self._unplayable_grids = unplayable_grids if unplayable_grids is not None else np.zeros((self._rows, self._cols))
        self._unplayable_actions = unplayable_grids.flatten() if unplayable_grids is not None else np.zeros(self._rows * self._cols)
        self._rewards = rewards if rewards is not None else {
            'win': 1.0, 'lose': -1.0, 'tie': 0.0, 'illegal': -1.0, 'forfeited': 0.0, 'step': -0.1,
            'row_live_3': 0.16, 'row_dead_3': 0.08, 'row_live_2': 0.04, 'row_dead_2': 0.02,
            'col_live_3': 0.16, 'col_dead_3': 0.08, 'col_live_2': 0.04, 'col_dead_2': 0.02,
            'diag_live_4': 0.16, 'diag_dead_4': 0.08, 'diag_live_3': 0.04, 'diag_dead_3': 0.02, 'diag_live_2': 0.01, 'diag_dead_2': 0.005
        }
        self._def_level = def_level
        self._discount = float(discount)
        self._place_prob = place_prob
        self._show = show
        
        # Define action spec: integer action for grid position
        self._action_spec = array_spec.BoundedArraySpec(
            shape=(), dtype=np.int32, minimum=0, maximum=self._rows * self._cols - 1, name='action'
        )
        # Define observation spec: board state and legal action mask
        self._observation_spec = {
            'board': array_spec.ArraySpec(
                shape=(self._rows, self._cols, 2), dtype=np.float32, name='board'
            ),
            'action_mask': array_spec.ArraySpec(
                shape=(self._rows * self._cols,), dtype=np.float32, name='action_mask'
            )
        }
        
        self._board = None
        self._action_mask = None
        self._player = None
        self._winner = None
        self._step_count = 0
        self._board_display = None
        self._n_neighbor_row = self._win_condition[0] - 1
        self._n_neighbor_col = self._win_condition[1] - 1
        self._n_neighbor_diag = self._win_condition[2] - 1

    def action_spec(self):
        """Returns the action specification for the environment."""
        return self._action_spec

    def observation_spec(self):
        """Returns the observation specification for the environment."""
        return self._observation_spec

    def get_current_player(self):
        """Returns the current player (1 or 2)."""
        return self._player

    def get_winner(self):
        """Returns the winner (1, 2, or 0 for tie, None if ongoing)."""
        return self._winner

    def _get_observation(self, current_player=None):
        """Returns the current observation: board state and action mask.

        Args:
            current_player (int, optional): Player perspective (1 or 2). Defaults to current player.

        Returns:
            dict: Observation with 'board' (player/opponent grids) and 'action_mask' (legal actions).
        """
        if current_player is None:
            current_player = self._player
        board = self._board if current_player == 1 else self._board[:, :, [1, 0]]
        return {'board': board, 'action_mask': self._action_mask}

    def _reset(self):
        """Resets the environment to start a new episode.

        Initializes board, action mask, and player; sets random starting player.

        Returns:
            TimeStep: Restart step with initial observation.
        """
        self._board = np.zeros((self._rows, self._cols, 2), dtype=np.float32)
        self._action_mask = np.ones(self._rows * self._cols, dtype=np.float32)
        self._action_mask[self._unplayable_actions==1] = 0
        self._player = np.random.choice([1, 2])
        self._winner = None
        self._step_count = 0
        self._board_display = np.full((self._rows, self._cols), '.', dtype=object)
        self._board_display[self._unplayable_grids==1] = '#'
        return ts.restart(self._get_observation())

    def _consecutive(self, seq, win_con):
        """Checks if a sequence contains enough consecutive 1s to win.

        Args:
            seq (np.ndarray): Sequence of board values (0 or 1).
            win_con (int): Number of consecutive pieces needed to win.

        Returns:
            bool: True if sequence meets win condition.
        """
        count = 0
        for x in seq:
            count = count + 1 if x == 1 else 0
            if count >= win_con:
                return True
        return False

    def _get_reward_state(self, row, col):
        """Calculates reward and game state after a move.

        Checks for win, tie, or transition; computes pattern-based rewards (e.g., live/dead 3).

        Args:
            row (int): Row of the placed piece.
            col (int): Column of the placed piece.

        Returns:
            tuple: (reward, state) where reward is a float and state is 'termination' or 'transition'.
        """
        player_layer = self._player - 1
        opponent_layer = 1 - player_layer
        player_board = self._board[:, :, player_layer]
        opponent_board = self._board[:, :, opponent_layer]
       
        # Check row for win condition
        action_leftmost_index = max(0, col - self._n_neighbor_row)
        action_rightmost_index = min(self._cols - 1, col + self._n_neighbor_row)
        player_row = player_board[row, action_leftmost_index : action_rightmost_index + 1]
        if self._consecutive(player_row, self._win_condition[0]):
            self._winner = self._player
            return self._rewards['win'], "termination"
        
        # Check column for win condition
        action_top_index = max(0, row - self._n_neighbor_col)
        action_bottom_index = min(self._rows - 1, row + self._n_neighbor_col)
        player_col = player_board[action_top_index : action_bottom_index + 1, col]
        if self._consecutive(player_col, self._win_condition[1]):
            self._winner = self._player
            return self._rewards['win'], "termination"
        
        # Check main diagonal for win condition
        diag_indices = []
        for i in range(-self._n_neighbor_diag, self._n_neighbor_diag + 1):
            r, c = row + i, col + i
            if 0 <= r < self._rows and 0 <= c < self._cols:
                diag_indices.append((r, c))
        player_diag = np.array([player_board[r, c] for r, c in diag_indices])
        if len(player_diag) >= self._win_condition[2] and self._consecutive(player_diag, self._win_condition[2]):
            self._winner = self._player
            return self._rewards['win'], "termination"
        
        # Check anti-diagonal for win condition
        antidiag_indices = []
        for i in range(-self._n_neighbor_diag, self._n_neighbor_diag + 1):
            r, c = row + i, col - i
            if 0 <= r < self._rows and 0 <= c < self._cols:
                antidiag_indices.append((r, c))
        player_antidiag = np.array([player_board[r, c] for r, c in antidiag_indices])
        if len(player_antidiag) >= self._win_condition[2] and self._consecutive(player_antidiag, self._win_condition[2]):
            self._winner = self._player
            return self._rewards['win'], "termination"

        # Check for tie (no legal actions left)
        if np.all(self._action_mask == 0.0):
            self._winner = 0
            return self._rewards['tie'], "termination"

        # Compute pattern-based rewards for non-terminal state
        opponent_row = opponent_board[row, action_leftmost_index : action_rightmost_index + 1]
        unplayable_row = self._unplayable_grids[row, action_leftmost_index : action_rightmost_index + 1]
        center_index_row = min(col, self._n_neighbor_row)
        unblocked_player_row = self._get_unblocked_segment(player_row, opponent_row, unplayable_row, center_index_row)
        row_pattern_reward = self.get_pattern_reward(unblocked_player_row, 'row')

        opponent_col = opponent_board[action_top_index : action_bottom_index + 1, col]
        unplayable_col = self._unplayable_grids[action_top_index : action_bottom_index + 1, col]
        center_index_col = min(row, self._n_neighbor_col)
        unblocked_player_col = self._get_unblocked_segment(player_col, opponent_col, unplayable_col, center_index_col)
        col_pattern_reward = self.get_pattern_reward(unblocked_player_col, 'col')

        opponent_diag = np.array([opponent_board[r, c] for r, c in diag_indices])
        unplayable_diag = np.array([self._unplayable_grids[r, c] for r, c in diag_indices])
        center_index_diag = min(row, self._n_neighbor_diag)
        unblocked_player_diag = self._get_unblocked_segment(player_diag, opponent_diag, unplayable_diag, center_index_diag)
        diag_pattern_reward = self.get_pattern_reward(unblocked_player_diag, 'diag')

        opponent_antidiag = np.array([opponent_board[r, c] for r, c in antidiag_indices])
        unplayable_antidiag = np.array([self._unplayable_grids[r, c] for r, c in antidiag_indices])
        center_index_antidiag = min(row, self._n_neighbor_diag)
        unblocked_player_antidiag = self._get_unblocked_segment(player_antidiag, opponent_antidiag, unplayable_antidiag, center_index_antidiag)
        antidiag_pattern_reward = self.get_pattern_reward(unblocked_player_antidiag, 'diag')
        
        reward = row_pattern_reward + col_pattern_reward + diag_pattern_reward + antidiag_pattern_reward
        return reward, "transition"

    def _get_unblocked_segment(self, player_segment, opponent_segment, unplayable_segment, center_index):
        """Extracts the unblocked segment of a player’s pieces for pattern reward calculation.

        Args:
            player_segment (np.ndarray): Player’s piece sequence (0 or 1).
            opponent_segment (np.ndarray): Opponent’s piece sequence (0 or 1).
            unplayable_segment (np.ndarray): Unplayable squares (0 or 1).
            center_index (int): Index of the placed piece in the segment.

        Returns:
            np.ndarray: Unblocked segment of player’s pieces.
        """
        unblocked_mask = (opponent_segment == 0) & (unplayable_segment == 0)
        if center_index == 0:
            final_leftmost_index = 0
        else:
            unblocked_mask_left = unblocked_mask[:center_index]
            if np.all(unblocked_mask_left):
                final_leftmost_index = 0
            else:
                final_leftmost_index = np.where(~unblocked_mask_left)[0][-1] + 1
        
        if center_index == len(player_segment) - 1:
            final_rightmost_index = center_index
        else:
            unblocked_mask_right = unblocked_mask[center_index + 1:]
            if np.all(unblocked_mask_right):
                final_rightmost_index = len(player_segment) - 1
            else:
                final_rightmost_index = center_index + np.where(~unblocked_mask_right)[0][0]
        
        unblocked_segment = player_segment[final_leftmost_index : final_rightmost_index + 1]
        return unblocked_segment
    
    def get_pattern_reward(self, arr, arr_type):
        """Calculates reward for a pattern (e.g., live/dead 2/3) in a row, column, or diagonal.

        Args:
            arr (np.ndarray): Sequence of player’s pieces (0 or 1).
            arr_type (str): Type of sequence ('row', 'col', 'diag').

        Returns:
            float: Reward for the pattern, or 0 if no pattern is found.
        """
        if arr_type == 'row':
            win_cond = self._win_condition[0]
        elif arr_type == 'col':
            win_cond = self._win_condition[1]
        elif arr_type == 'diag':
            win_cond = self._win_condition[2]
        arr_len = len(arr)
        if arr_len < win_cond:
            return 0
        elif arr_len == win_cond:
            arr_sum = np.sum(arr)
            if arr_sum >= 2:
                if self._show:
                    print(f"Pattern: {arr_type}_dead_{int(arr_sum)}")
                return self._rewards[f"{arr_type}_dead_{int(arr_sum)}"]
            else:
                return 0
        else:
            # Scan for highest-value pattern (live/dead, number of pieces)
            scan_len = win_cond + 1
            pattern = (None, 0)
            for i in range(arr_len - scan_len + 1):
                subset = arr[i:i+scan_len]
                subset_sum = np.sum(subset)
                if subset[0] == 1 and subset[-1] == 1:
                    subset_sum -= 1
                live_dead = 'live' if subset[0] == 0 and subset[-1] == 0 else 'dead'
                if subset_sum < 2:
                    continue
                elif (subset_sum > pattern[1]) or (subset_sum == pattern[1] and live_dead == 'live'):
                    pattern = (live_dead, subset_sum)
                if pattern == ('live', win_cond-1):
                    break
            if pattern[0] is None:
                return 0
            else:
                if self._show:
                    print(f"Pattern: {arr_type}_{pattern[0]}_{int(pattern[1])}")
                return self._rewards[f"{arr_type}_{pattern[0]}_{int(pattern[1])}"]

    def _place_mark(self, row, col, player):
        """Places a piece on the board with probabilistic placement.

        With probability place_prob, places the piece in the chosen square; otherwise,
        attempts placement in a random adjacent square (1/16 per direction).

        Args:
            row (int): Target row for placement.
            col (int): Target column for placement.
            player (int): Player ID (1 or 2).

        Returns:
            tuple: (row, col) of placed piece, or None if placement fails.
        """
        player_layer = 0 if player == 1 else 1
        action = self.board_map_to_action(row, col)
        if self._action_mask[action] == 0.0:
            return None
        if np.random.random() < self._place_prob and self._action_mask[action] == 1.0:
            self._board[row, col, player_layer] = 1.0
            self._action_mask[action] = 0.0
            return (row, col)
        else:
            # Randomly select an adjacent square (1/16 probability each)
            directions = [
                (-1, -1), (-1, 0), (-1, 1),
                (0, -1),  (0, 1),
                (1, -1),  (1, 0),  (1, 1)
            ]
            di, dj = directions[np.random.randint(8)]
            new_row, new_col = row + di, col + dj

            if new_row < 0 or new_col < 0 or new_row >= self._rows or new_col >= self._cols:
                return None
            new_action = self.board_map_to_action(new_row, new_col)
            if self._action_mask[new_action] == 1.0:
                self._board[new_row, new_col, player_layer] = 1.0
                self._action_mask[new_action] = 0.0
                return (new_row, new_col)
            return None
    
    def _step(self, action):
        """Executes a single step in the environment.

        Applies the action, updates board and state, and returns reward and next step.

        Args:
            action (int): Action index (grid position).

        Returns:
            TimeStep: Transition or termination step with observation, reward, and discount.
        """
        if self._winner is not None:
            return self._reset()
        
        self._step_count += 1

        # Penalize illegal actions (occupied or unplayable squares)
        if self._action_mask[action] == 0.0:
            self._winner = 3 - self._player
            reward = self._rewards['illegal']
            return ts.termination(self._get_observation(), reward=reward)

        init_row, init_col = self.action_map_to_board(action)
        placement = self._place_mark(init_row, init_col, self._player)
        if placement is not None:
            row, col = placement
            self._board_display[row, col] = f"{'X' if self._player == 1 else 'O'}"
            self.print_board()
            reward, state = self._get_reward_state(row, col)
            if self._show:
                print()
            if state == "termination":
                if self._show:
                    if self._winner == 0:
                        print("Tie")
                    elif self._winner == 1:
                        print("Winner: P1")
                    else:
                        print("Winner: P2")
                return ts.termination(self._get_observation(), reward=reward)
            elif state == "transition":
                self._player = 3 - self._player
                return ts.transition(self._get_observation(), reward=reward+self._rewards['step'], discount=self._discount)
        else:
            self._player = 3 - self._player
            return ts.transition(self._get_observation(), reward=self._rewards['forfeited']+self._rewards['step'], discount=self._discount)

    def action_map_to_board(self, action):
        """Maps an action index to board coordinates.

        Args:
            action (int): Action index.

        Returns:
            tuple: (row, col) coordinates.
        """
        row = action // self._cols
        col = action % self._cols
        return row, col

    def board_map_to_action(self, row, col):
        """Maps board coordinates to an action index.

        Args:
            row (int): Row coordinate.
            col (int): Column coordinate.

        Returns:
            int: Action index.
        """
        return row * self._cols + col

    def print_board(self):
        """Prints the current board state if show is True."""
        if self._show:
            print(f"\nRound {self._step_count} - Player {self._player} ( {'X' if self._player == 1 else 'O'} )")
            board_display = [' '.join(row) for row in self._board_display]
            print('\n'.join(board_display))

In [3]:
class StandardEnvTest(unittest.TestCase):
    def test_initialization(self):
        """Test environment initialization and specs."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3])
        self.assertEqual(env._rows, 3)
        self.assertEqual(env._cols, 3)
        self.assertEqual(env._win_condition, [3, 3, 3])
        self.assertIsInstance(env._unplayable_grids, np.ndarray)
        self.assertEqual(env._unplayable_grids.shape, (3, 3))
        self.assertIsInstance(env._unplayable_actions, np.ndarray)
        self.assertEqual(env._unplayable_actions.shape, (9,))
        self.assertIsInstance(env._rewards, dict)
        self.assertIn('win', env._rewards)
        self.assertIn('row_live_3', env._rewards)
        self.assertIn('action', env.action_spec().name)
        self.assertIn('board', env.observation_spec())
        self.assertIn('action_mask', env.observation_spec())
        self.assertEqual(env.action_spec().maximum, 3*3 - 1)
        self.assertEqual(env.observation_spec()['board'].shape, (3, 3, 2))
        self.assertEqual(env.observation_spec()['action_mask'].shape, (3*3,))

        unplayable = np.array([[0, 0, 0], [0, 1, 0], [0, 0, 0]])
        env_unplayable = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3], unplayable_grids=unplayable)
        self.assertEqual(np.sum(env_unplayable._unplayable_actions), 1)
        self.assertEqual(env_unplayable._unplayable_actions[env_unplayable.board_map_to_action(1, 1)], 1)
        env_unplayable.reset()
        expected_mask = np.ones(9, dtype=np.float32)
        expected_mask[env_unplayable.board_map_to_action(1, 1)] = 0.0
        np.testing.assert_array_equal(env_unplayable._action_mask, expected_mask)

    def test_reset(self):
        """Test environment reset state."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3])
        initial_time_step = env.reset()

        self.assertEqual(initial_time_step.step_type, ts.StepType.FIRST)
        self.assertEqual(initial_time_step.reward, 0.0)
        self.assertEqual(initial_time_step.discount, env._discount)
        self.assertIn('board', initial_time_step.observation)
        self.assertIn('action_mask', initial_time_step.observation)

        np.testing.assert_array_equal(env._board, np.zeros((3, 3, 2)))
        np.testing.assert_array_equal(env._action_mask, np.ones(9))
        self.assertIn(env._player, [1, 2])
        self.assertIsNone(env._winner)
        self.assertEqual(env._step_count, 0)

    def test_specs(self):
        """Test action and observation specs."""
        env = Standard_Env(board_size=(4, 5), win_condition=[4, 4, 4])
        action_spec = env.action_spec()
        obs_spec = env.observation_spec()

        self.assertIsInstance(action_spec, array_spec.BoundedArraySpec)
        self.assertEqual(action_spec.shape, ())
        self.assertEqual(action_spec.dtype, np.int32)
        self.assertEqual(action_spec.minimum, 0)
        self.assertEqual(action_spec.maximum, 4*5 - 1)
        self.assertEqual(action_spec.name, 'action')

        self.assertIsInstance(obs_spec, dict)
        self.assertIn('board', obs_spec)
        self.assertIn('action_mask', obs_spec)

        board_spec = obs_spec['board']
        self.assertIsInstance(board_spec, array_spec.ArraySpec)
        self.assertEqual(board_spec.shape, (4, 5, 2))
        self.assertEqual(board_spec.dtype, np.float32)
        self.assertEqual(board_spec.name, 'board')

        mask_spec = obs_spec['action_mask']
        self.assertIsInstance(mask_spec, array_spec.ArraySpec)
        self.assertEqual(mask_spec.shape, (4*5,))
        self.assertEqual(mask_spec.dtype, np.float32)
        self.assertEqual(mask_spec.name, 'action_mask')

    def test_coordinate_mapping(self):
        """Test action index to board coordinates and vice versa."""
        env = Standard_Env(board_size=(3, 4), win_condition=[3, 3, 3])
        self.assertEqual(env.action_map_to_board(0), (0, 0))
        self.assertEqual(env.action_map_to_board(3), (0, 3))
        self.assertEqual(env.action_map_to_board(4), (1, 0))
        self.assertEqual(env.action_map_to_board(7), (1, 3))
        self.assertEqual(env.action_map_to_board(8), (2, 0))
        self.assertEqual(env.action_map_to_board(11), (2, 3))

        self.assertEqual(env.board_map_to_action(0, 0), 0)
        self.assertEqual(env.board_map_to_action(0, 3), 3)
        self.assertEqual(env.board_map_to_action(1, 0), 4)
        self.assertEqual(env.board_map_to_action(1, 3), 7)
        self.assertEqual(env.board_map_to_action(2, 0), 8)
        self.assertEqual(env.board_map_to_action(2, 3), 11)

        for r in range(3):
            for c in range(4):
                action = env.board_map_to_action(r, c)
                mapped_row, mapped_col = env.action_map_to_board(action)
                self.assertEqual((r, c), (mapped_row, mapped_col))

    def test_consecutive(self):
        """Test the _consecutive helper function."""
        env = Standard_Env(board_size=(1, 5), win_condition=[3, 0, 0])
        self.assertFalse(env._consecutive(np.array([0, 0, 0, 0, 0]), 3))
        self.assertFalse(env._consecutive(np.array([1, 1, 0, 1, 1]), 3))
        self.assertTrue(env._consecutive(np.array([0, 1, 1, 1, 0]), 3))
        self.assertTrue(env._consecutive(np.array([1, 1, 1, 0, 0]), 3))
        self.assertTrue(env._consecutive(np.array([0, 0, 1, 1, 1]), 3))
        self.assertTrue(env._consecutive(np.array([1, 1, 1, 1, 1]), 3))
        self.assertTrue(env._consecutive(np.array([1, 1, 1, 1, 1]), 5))
        self.assertFalse(env._consecutive(np.array([1, 1, 1, 1, 1]), 6))

    def test_get_observation(self):
        """Test observation generation, especially player perspective."""
        env = Standard_Env(board_size=(2, 2), win_condition=[2, 2, 2])
        env.reset()
        env._board = np.array([[[1, 0], [0, 1]], [[0, 1], [1, 0]]], dtype=np.float32)
        env._action_mask = np.array([1, 0, 0, 1], dtype=np.float32)

        env._player = 1
        obs_p1 = env._get_observation()
        np.testing.assert_array_equal(obs_p1['board'], env._board)
        np.testing.assert_array_equal(obs_p1['action_mask'], env._action_mask)

        env._player = 2
        obs_p2 = env._get_observation()
        expected_board_p2 = np.array([[[0, 1], [1, 0]], [[1, 0], [0, 1]]], dtype=np.float32)
        np.testing.assert_array_equal(obs_p2['board'], expected_board_p2)
        np.testing.assert_array_equal(obs_p2['action_mask'], env._action_mask)

    def test_get_reward_state_win(self):
        """Test win condition detection with pattern rewards."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3])
        env.reset()

        # Row win
        env._player = 1
        env._board[0, :, 0] = 1.0
        reward, state = env._get_reward_state(0, 2)
        self.assertEqual(reward, env._rewards['win'])
        self.assertEqual(state, "termination")
        self.assertEqual(env._winner, 1)

        # Column win
        env.reset()
        env._player = 2
        env._board[:, 1, 1] = 1.0
        reward, state = env._get_reward_state(2, 1)
        self.assertEqual(reward, env._rewards['win'])
        self.assertEqual(state, "termination")
        self.assertEqual(env._winner, 2)

        # Diagonal win
        env.reset()
        env._player = 1
        env._board[0, 0, 0] = 1.0
        env._board[1, 1, 0] = 1.0
        env._board[2, 2, 0] = 1.0
        reward, state = env._get_reward_state(2, 2)
        self.assertEqual(reward, env._rewards['win'])
        self.assertEqual(state, "termination")
        self.assertEqual(env._winner, 1)

        # Anti-diagonal win
        env.reset()
        env._player = 2
        env._board[0, 2, 1] = 1.0
        env._board[1, 1, 1] = 1.0
        env._board[2, 0, 1] = 1.0
        reward, state = env._get_reward_state(2, 0)
        self.assertEqual(reward, env._rewards['win'])
        self.assertEqual(state, "termination")
        self.assertEqual(env._winner, 2)

        # Non-winning move with pattern reward
        env.reset()
        env._player = 1
        env._board[0, 0, 0] = 1.0
        env._board[0, 1, 0] = 1.0
        reward, state = env._get_reward_state(0, 1)
        self.assertEqual(reward, env._rewards['row_dead_2'])
        self.assertEqual(state, "transition")
        self.assertIsNone(env._winner)

    def test_get_reward_state_tie(self):
        """Test tie condition detection."""
        env = Standard_Env(board_size=(2, 2), win_condition=[3, 3, 3])
        env.reset()
        env._board = np.zeros((2, 2, 2), dtype=np.float32)
        env._action_mask = np.zeros(4, dtype=np.float32)

        env._player = 1
        reward, state = env._get_reward_state(0, 0)
        self.assertEqual(reward, env._rewards['tie'])
        self.assertEqual(state, "termination")
        self.assertEqual(env._winner, 0)

    def test_place_mark_prob_1(self):
        """Test _place_mark when place_prob is 1.0 (direct placement)."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3], place_prob=1.0)
        env.reset()
        initial_mask = env._action_mask.copy()

        env._player = 1
        placement = env._place_mark(1, 1, env._player)
        self.assertEqual(placement, (1, 1))
        self.assertEqual(env._board[1, 1, 0], 1.0)
        self.assertEqual(env._action_mask[env.board_map_to_action(1, 1)], 0.0)
        expected_mask = initial_mask.copy()
        expected_mask[env.board_map_to_action(1, 1)] = 0.0
        np.testing.assert_array_equal(env._action_mask, expected_mask)
        self.assertEqual(env._board[1, 1, 1], 0.0)

        env._player = 2
        placement = env._place_mark(0, 2, env._player)
        self.assertEqual(placement, (0, 2))
        self.assertEqual(env._board[0, 2, 1], 1.0)
        self.assertEqual(env._action_mask[env.board_map_to_action(0, 2)], 0.0)
        self.assertEqual(env._board[0, 2, 0], 0.0)

        # Test illegal placement (occupied square)
        env.reset()
        env._board[1, 0, 0] = 1.0  # Occupy (1, 0)
        env._action_mask[env.board_map_to_action(1, 0)] = 0.0
        env._player = 1
        placement = env._place_mark(1, 0, env._player)
        self.assertIsNone(placement)

    @unittest.mock.patch('numpy.random.random')
    @unittest.mock.patch('numpy.random.randint')
    def test_place_mark_prob_0_5_adjacent_forced(self, mock_randint, mock_random):
        """Test _place_mark with place_prob=0.5, forcing adjacent placement."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3], place_prob=0.5)
        env.reset()
        initial_mask = env._action_mask.copy()

        target_row, target_col = 1, 1
        target_action = env.board_map_to_action(target_row, target_col)
        forced_adjacent_square = (1, 0)
        forced_adjacent_action = env.board_map_to_action(*forced_adjacent_square)
        directions = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]
        forced_direction_index = directions.index((0, -1))  # (1,1) to (1,0)
        mock_randint.return_value = forced_direction_index
        mock_random.return_value = 0.6  # Force adjacent logic

        # Block all other adjacent squares
        for dr, dc in directions:
            adj_row, adj_col = target_row + dr, target_col + dc
            if (adj_row, adj_col) != forced_adjacent_square and 0 <= adj_row < 3 and 0 <= adj_col < 3:
                env._action_mask[env.board_map_to_action(adj_row, adj_col)] = 0.0

        env._player = 1
        placement = env._place_mark(target_row, target_col, env._player)

        self.assertEqual(placement, forced_adjacent_square)
        self.assertEqual(env._board[1, 0, 0], 1.0)
        self.assertEqual(env._action_mask[forced_adjacent_action], 0.0)
        self.assertEqual(env._action_mask[target_action], 1.0)

    @unittest.mock.patch('numpy.random.random')
    def test_place_mark_prob_0_5_direct_forced(self, mock_random):
        """Test _place_mark with place_prob=0.5, forcing direct placement."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3], place_prob=0.5)
        env.reset()

        target_row, target_col = 1, 1
        target_action = env.board_map_to_action(target_row, target_col)
        self.assertEqual(env._action_mask[target_action], 1.0)

        mock_random.return_value = 0.4  # Force direct logic
        env._player = 1
        placement = env._place_mark(target_row, target_col, env._player)

        self.assertEqual(placement, (target_row, target_col))
        self.assertEqual(env._board[target_row, target_col, 0], 1.0)
        self.assertEqual(env._action_mask[target_action], 0.0)

    def test_step_illegal_move(self):
        """Test _step for illegal moves."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3])
        env.reset()
        initial_player = env._player
        opponent_player = 3 - initial_player
        illegal_action = env.board_map_to_action(0, 0)

        env._action_mask[illegal_action] = 0.0
        expected_mask_before_step = env._action_mask.copy()

        time_step = env.step(illegal_action)

        self.assertEqual(time_step.step_type, ts.StepType.LAST)
        self.assertTrue(np.isclose(time_step.reward.item(), env._rewards['illegal']))
        self.assertEqual(env._winner, opponent_player)
        self.assertEqual(env._player, initial_player)
        np.testing.assert_array_equal(env._board, np.zeros((3, 3, 2)))
        np.testing.assert_array_equal(time_step.observation['action_mask'], expected_mask_before_step)
        self.assertEqual(env._step_count, 1)

    def test_step_win_prob_1(self):
        """Test _step for a winning move with place_prob=1.0."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3], rewards={'win': 10.0, 'step': -0.1}, place_prob=1.0)
        env.reset()
        env._player = 1
        env._board[0, 0, 0] = 1.0
        env._board[0, 1, 0] = 1.0
        env._action_mask[env.board_map_to_action(0, 0)] = 0.0
        env._action_mask[env.board_map_to_action(0, 1)] = 0.0

        winning_action = env.board_map_to_action(0, 2)
        time_step = env.step(winning_action)

        self.assertEqual(time_step.step_type, ts.StepType.LAST)
        self.assertTrue(np.isclose(time_step.reward.item(), env._rewards['win']))
        self.assertEqual(env._winner, 1)
        self.assertEqual(env._board[0, 2, 0], 1.0)
        self.assertEqual(env._action_mask[winning_action], 0.0)
        self.assertEqual(env._step_count, 1)
        self.assertEqual(env._player, 1)

    @unittest.mock.patch('numpy.random.random')
    def test_step_win_prob_0_5_direct_forced(self, mock_random):
        """Test _step for a winning move with place_prob=0.5, forcing direct placement."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 0, 0], rewards={'win': 10.0, 'step': -0.1}, place_prob=0.5)
        env.reset()
        env._player = 1
        env._board[0, 0, 0] = 1.0
        env._board[0, 1, 0] = 1.0
        env._action_mask[env.board_map_to_action(0, 0)] = 0.0
        env._action_mask[env.board_map_to_action(0, 1)] = 0.0

        mock_random.return_value = 0.4  # Force direct placement
        winning_action = env.board_map_to_action(0, 2)
        time_step = env.step(winning_action)

        self.assertEqual(time_step.step_type, ts.StepType.LAST)
        self.assertTrue(np.isclose(time_step.reward.item(), env._rewards['win']))
        self.assertEqual(env._winner, 1)
        self.assertEqual(env._board[0, 2, 0], 1.0)
        self.assertEqual(env._action_mask[winning_action], 0.0)
        self.assertEqual(env._step_count, 1)
        self.assertEqual(env._player, 1)

    @unittest.mock.patch('numpy.random.random')
    @unittest.mock.patch('numpy.random.randint')
    def test_step_win_prob_0_5_adjacent_forced(self, mock_randint, mock_random):
        """Test _step for a move that would win but lands adjacent due to place_prob=0.5."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 0, 0], rewards={'win': 10.0, 'step': -0.1}, place_prob=0.5)
        env.reset()
        env._player = 1
        env._board[0, 0, 0] = 1.0
        env._board[0, 1, 0] = 1.0
        env._action_mask[env.board_map_to_action(0, 0)] = 0.0
        env._action_mask[env.board_map_to_action(0, 1)] = 0.0
        winning_action = env.board_map_to_action(0, 2)

        # Force placement to (1, 2)
        forced_adjacent_square = (1, 2)
        forced_adjacent_action = env.board_map_to_action(*forced_adjacent_square)
        directions = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]
        forced_direction_index = directions.index((1, 0))  # (0,2) to (1,2)
        mock_randint.return_value = forced_direction_index
        mock_random.return_value = 0.6  # Force adjacent placement

        # Block other adjacent squares
        for dr, dc in directions:
            adj_row, adj_col = 0 + dr, 2 + dc
            if (adj_row, adj_col) != forced_adjacent_square and 0 <= adj_row < 3 and 0 <= adj_col < 3:
                env._action_mask[env.board_map_to_action(adj_row, adj_col)] = 0.0

        time_step = env.step(winning_action)

        self.assertEqual(time_step.step_type, ts.StepType.MID)
        self.assertTrue(np.isclose(time_step.reward.item(), env._rewards['step']))
        self.assertIsNone(env._winner)
        self.assertEqual(env._board[1, 2, 0], 1.0)
        self.assertEqual(env._action_mask[forced_adjacent_action], 0.0)
        self.assertEqual(env._action_mask[winning_action], 1.0)
        self.assertEqual(env._step_count, 1)
        self.assertEqual(env._player, 2)

    def test_step_tie_prob_1(self):
        """Test _step for a tie with place_prob=1.0."""
        env = Standard_Env(board_size=(2, 2), win_condition=[3, 3, 3], rewards={'tie': 5.0, 'step': -0.1}, place_prob=1.0)
        env.reset()
        actions_to_fill = [(0, 0), (0, 1), (1, 0)]
        players_to_fill = [1, 2, 1]
        for (r, c), p in zip(actions_to_fill, players_to_fill):
            action = env.board_map_to_action(r, c)
            env._board[r, c, p-1] = 1.0
            env._action_mask[action] = 0.0

        env._player = 2
        final_action = env.board_map_to_action(1, 1)
        self.assertEqual(env._action_mask[final_action], 1.0)
        self.assertEqual(env._step_count, 0)

        time_step = env.step(final_action)

        self.assertEqual(time_step.step_type, ts.StepType.LAST)
        self.assertTrue(np.isclose(time_step.reward.item(), env._rewards['tie']))
        self.assertEqual(env._winner, 0)
        np.testing.assert_array_equal(env._action_mask, np.zeros(4))
        self.assertEqual(env._step_count, 1)
        self.assertEqual(env._player, 2)

    @unittest.mock.patch('numpy.random.random')
    def test_step_tie_prob_0_5_direct_forced(self, mock_random):
        """Test _step for a tie with place_prob=0.5, forcing direct placement."""
        env = Standard_Env(board_size=(2, 2), win_condition=[3, 3, 3], rewards={'tie': 5.0, 'step': -0.1}, place_prob=0.5)
        env.reset()
        actions_to_fill = [(0, 0), (0, 1), (1, 0)]
        players_to_fill = [1, 2, 1]
        for (r, c), p in zip(actions_to_fill, players_to_fill):
            action = env.board_map_to_action(r, c)
            env._board[r, c, p-1] = 1.0
            env._action_mask[action] = 0.0

        env._player = 2
        final_action = env.board_map_to_action(1, 1)
        self.assertEqual(env._action_mask[final_action], 1.0)
        self.assertEqual(env._step_count, 0)

        mock_random.return_value = 0.4  # Force direct placement
        time_step = env.step(final_action)

        self.assertEqual(time_step.step_type, ts.StepType.LAST)
        self.assertTrue(np.isclose(time_step.reward.item(), env._rewards['tie']))
        self.assertEqual(env._winner, 0)
        np.testing.assert_array_equal(env._action_mask, np.zeros(4))
        self.assertEqual(env._step_count, 1)
        self.assertEqual(env._player, 2)

    def test_step_transition_prob_1(self):
        """Test _step for a non-terminal move with place_prob=1.0."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3], rewards={'step': -0.1, 'row_dead_2': 0.02}, place_prob=1.0)
        env.reset()
        env._player = 1
        env._board[0, 0, 0] = 1.0  # Set up for row_dead_2
        env._action_mask[env.board_map_to_action(0, 0)] = 0.0

        valid_action = env.board_map_to_action(0, 1)
        self.assertEqual(env._action_mask[valid_action], 1.0)
        self.assertEqual(env._step_count, 0)

        time_step = env.step(valid_action)

        self.assertEqual(time_step.step_type, ts.StepType.MID)
        self.assertTrue(np.isclose(time_step.reward.item(), env._rewards['row_dead_2'] + env._rewards['step']))
        self.assertEqual(time_step.discount, env._discount)
        self.assertEqual(env._player, 2)
        self.assertEqual(env._board[0, 1, 0], 1.0)
        self.assertEqual(env._board[0, 1, 1], 0.0)
        self.assertEqual(env._action_mask[valid_action], 0.0)
        self.assertEqual(env._step_count, 1)

    @unittest.mock.patch('numpy.random.random')
    def test_step_transition_prob_0_5_direct_forced(self, mock_random):
        """Test _step for a non-terminal move with place_prob=0.5, forcing direct placement."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3], rewards={'step': -0.1, 'row_dead_2': 0.02}, place_prob=0.5)
        env.reset()
        env._player = 1
        env._board[0, 0, 0] = 1.0  # Set up for row_dead_2
        env._action_mask[env.board_map_to_action(0, 0)] = 0.0

        valid_action = env.board_map_to_action(0, 1)
        self.assertEqual(env._action_mask[valid_action], 1.0)
        self.assertEqual(env._step_count, 0)

        mock_random.return_value = 0.4  # Force direct placement
        time_step = env.step(valid_action)

        self.assertEqual(time_step.step_type, ts.StepType.MID)
        self.assertTrue(np.isclose(time_step.reward.item(), env._rewards['row_dead_2'] + env._rewards['step']))
        self.assertEqual(time_step.discount, env._discount)
        self.assertEqual(env._player, 2)
        self.assertEqual(env._board[0, 1, 0], 1.0)
        self.assertEqual(env._board[0, 1, 1], 0.0)
        self.assertEqual(env._action_mask[valid_action], 0.0)
        self.assertEqual(env._step_count, 1)

    @unittest.mock.patch('numpy.random.random')
    @unittest.mock.patch('numpy.random.randint')
    def test_step_transition_prob_0_5_adjacent_forced(self, mock_randint, mock_random):
        """Test _step for a non-terminal move with place_prob=0.5, forcing adjacent placement."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3], rewards={'step': -0.1, 'row_dead_2': 0.02}, place_prob=0.5)
        env.reset()
        env._player = 1
        env._board[0, 0, 0] = 1.0  # Set up for row_dead_2
        env._action_mask[env.board_map_to_action(0, 0)] = 0.0

        target_row, target_col = 0, 2
        target_action = env.board_map_to_action(target_row, target_col)
        forced_adjacent_square = (0, 1)
        forced_adjacent_action = env.board_map_to_action(*forced_adjacent_square)
        directions = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]
        forced_direction_index = directions.index((0, -1))  # (0,2) to (0,1)
        mock_randint.return_value = forced_direction_index
        mock_random.return_value = 0.6  # Force adjacent placement

        # Block other adjacent squares
        for dr, dc in directions:
            adj_row, adj_col = target_row + dr, target_col + dc
            if (adj_row, adj_col) != forced_adjacent_square and 0 <= adj_row < 3 and 0 <= adj_col < 3:
                env._action_mask[env.board_map_to_action(adj_row, adj_col)] = 0.0

        time_step = env.step(target_action)

        self.assertEqual(time_step.step_type, ts.StepType.MID)
        self.assertTrue(np.isclose(time_step.reward.item(), env._rewards['row_dead_2'] + env._rewards['step']))
        self.assertEqual(time_step.discount, env._discount)
        self.assertEqual(env._player, 2)
        self.assertEqual(env._board[0, 1, 0], 1.0)
        self.assertEqual(env._board[target_row, target_col, 0], 0.0)
        self.assertEqual(env._action_mask[forced_adjacent_action], 0.0)
        self.assertEqual(env._action_mask[target_action], 1.0)
        self.assertEqual(env._step_count, 1)
    
    @unittest.mock.patch('numpy.random.random')
    def test_step_forfeit_prob_0_5(self, mock_random):
        """Test _step for a forfeited move with place_prob=0.5."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3], rewards={'forfeited': 0.0, 'step': -0.1}, place_prob=0.5)
        env.reset()
        initial_player = env._player

        target_row, target_col = 1, 1
        target_action = env.board_map_to_action(target_row, target_col)
        env._action_mask[target_action] = 1.0  # Ensure target is valid
        # Block all adjacent squares
        for dr, dc in [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]:
            adj_row, adj_col = target_row + dr, target_col + dc
            if 0 <= adj_row < 3 and 0 <= adj_col < 3:
                env._action_mask[env.board_map_to_action(adj_row, adj_col)] = 0.0

        mock_random.return_value = 0.6  # Force adjacent placement
        time_step = env.step(target_action)

        self.assertEqual(time_step.step_type, ts.StepType.MID)
        self.assertTrue(np.isclose(time_step.reward.item(), env._rewards['forfeited'] + env._rewards['step']))
        self.assertIsNone(env._winner)
        self.assertEqual(env._player, 3 - initial_player)
        np.testing.assert_array_equal(env._board, np.zeros((3, 3, 2)))
        self.assertEqual(env._step_count, 1)

    def test_reset_after_termination(self):
        """Test _step after a terminal state triggers reset."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3], rewards={'win': 10.0, 'step': -0.1, 'forfeited': 0.0}, place_prob=1.0)
        env.reset()
        env._player = 1
        env._board[0, 0, 0] = 1.0
        env._board[0, 1, 0] = 1.0
        env._action_mask[env.board_map_to_action(0, 0)] = 0.0
        env._action_mask[env.board_map_to_action(0, 1)] = 0.0
        winning_action = env.board_map_to_action(0, 2)

        time_step_terminal = env.step(winning_action)
        self.assertEqual(time_step_terminal.step_type, ts.StepType.LAST)
        self.assertEqual(env._winner, 1)

        time_step_after_terminal = env.step(0)
        self.assertEqual(time_step_after_terminal.step_type, ts.StepType.FIRST)
        self.assertEqual(time_step_after_terminal.reward, 0.0)
        self.assertIsNone(env._winner)
        self.assertEqual(env._step_count, 0)
        np.testing.assert_array_equal(env._board, np.zeros((3, 3, 2)))
        np.testing.assert_array_equal(env._action_mask, np.ones(9))

    def test_first_step_type(self):
        """Test that the first step returns StepType.MID if non-terminal."""
        env = Standard_Env(board_size=(3, 3), win_condition=[3, 3, 3])
        env.reset()
        initial_player = env._player

        time_step = env.step(0)
        self.assertEqual(time_step.step_type, ts.StepType.MID)
        self.assertEqual(env._step_count, 1)
        self.assertEqual(env._player, 3 - initial_player)

if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False, verbosity=2)


test_consecutive (__main__.StandardEnvTest.test_consecutive)
Test the _consecutive helper function. ... ok
test_coordinate_mapping (__main__.StandardEnvTest.test_coordinate_mapping)
Test action index to board coordinates and vice versa. ... ok
test_first_step_type (__main__.StandardEnvTest.test_first_step_type)
Test that the first step returns StepType.MID if non-terminal. ... ok
test_get_observation (__main__.StandardEnvTest.test_get_observation)
Test observation generation, especially player perspective. ... ok
test_get_reward_state_tie (__main__.StandardEnvTest.test_get_reward_state_tie)
Test tie condition detection. ... ok
test_get_reward_state_win (__main__.StandardEnvTest.test_get_reward_state_win)
Test win condition detection with pattern rewards. ... ok
test_initialization (__main__.StandardEnvTest.test_initialization)
Test environment initialization and specs. ... ok
test_place_mark_prob_0_5_adjacent_forced (__main__.StandardEnvTest.test_place_mark_prob_0_5_adjacent_forced)
Te

# Optimizer

The `ScheduledAdamOptimizer` class extends TensorFlow's `Optimizer` to incorporate a dynamic learning rate schedule. It wraps the standard Adam optimizer, updating the learning rate based on the current training step, suitable for reinforcement learning tasks like PPO training.

In [4]:
class ScheduledAdamOptimizer(Optimizer):
    """Optimizer that applies the Adam algorithm with a dynamic learning rate schedule.

    Args:
        learning_rate_schedule: Callable that takes the current step and returns the learning rate.
        **kwargs: Additional arguments passed to the base Adam optimizer.
    """
    def __init__(self, learning_rate_schedule, **kwargs):
        super().__init__(name='Adam', **kwargs)
        self._learning_rate_schedule = learning_rate_schedule
        self._current_step = tf.Variable(0, dtype=tf.int64, trainable=False)  # Tracks optimization steps
        self._optimizer = Adam(learning_rate=self._learning_rate_schedule(self._current_step), **kwargs)  # Internal Adam optimizer

    def _resource_apply_dense(self, grad, var, apply_state=None):
        """Applies gradients to dense variables using the internal Adam optimizer."""
        return self._optimizer._resource_apply_dense(grad, var, apply_state)

    def _resource_apply_sparse(self, grad, var, indices, apply_state=None):
        """Applies gradients to sparse variables using the internal Adam optimizer."""
        return self._optimizer._resource_apply_sparse(grad, var, indices, apply_state)

    def get_config(self):
        """Returns the configuration of the optimizer."""
        config = super().get_config()
        config.update({
            'learning_rate_schedule': self._learning_rate_schedule
        })
        return config

    @property
    def learning_rate(self):
        """Evaluates and returns the current learning rate based on the schedule."""
        return self._learning_rate_schedule(self._current_step)

    def _create_slots(self, var_list):
        """Creates slots for the internal Adam optimizer."""
        self._optimizer._create_slots(var_list)

    def _prepare_local(self, var_device, var_dtype, apply_state):
        """Prepares local state, including the current learning rate."""
        apply_state = super()._prepare_local(var_device, var_dtype, apply_state)
        apply_state[('learning_rate',)] = self.learning_rate
        return apply_state

    def apply_gradients(self, grads_and_vars, name=None, **kwargs):
        """Applies gradients to variables and increments the step counter."""
        self._current_step.assign_add(1)  # Increment step count
        return self._optimizer.apply_gradients(grads_and_vars, name, **kwargs)

# Utility Functions

Utility functions for managing actions and trajectories in the reinforcement learning environment. These include selecting valid actions for Player 2 and preparing trajectories for the replay buffer.

In [5]:
def get_p2_action(action_mask):
    """Select a random valid action for Player 2 based on the action mask.

    Args:
        action_mask: Array indicating valid actions (1 for valid, 0 for invalid).

    Returns:
        int: A randomly selected valid action.

    Raises:
        ValueError: If no valid actions are available.
    """
    action_mask_np = action_mask[0] if action_mask.ndim > 1 else action_mask  # Flatten if needed
    valid_actions = np.where(action_mask_np == 1)[0]  # Get indices of valid actions
    if len(valid_actions) == 0:
        raise ValueError("No valid actions available for P2")
    return np.random.choice(valid_actions)  # Randomly choose a valid action

def prepare_trajectory_for_buffer(traj):
    """Prepare a trajectory for adding to the replay buffer by adjusting tensor shapes.

    Args:
        traj: Trajectory data with potentially unbatched tensors.

    Returns:
        Trajectory with tensors reshaped for batch compatibility.
    """
    traj = tf.nest.map_structure(lambda t: tf.squeeze(t, axis=0) if t.shape.rank > 0 else t, traj)  # Remove batch dimension
    batched_traj = tf.nest.map_structure(
        lambda t: tf.expand_dims(t, axis=0) if t.shape.rank > 0 else tf.expand_dims(t, axis=0) if t.shape.rank == 0 else t,
        traj
    )  # Add batch dimension
    return batched_traj

# Logging

The `PrintLogger` class redirects standard output and error streams to both the terminal and a specified log file, ensuring all training logs are persisted.

In [6]:
class PrintLogger:
    """Redirects stdout/stderr to both terminal and a log file.

    Args:
        terminal: Original output stream (e.g., sys.stdout).
        log_file: Path to the log file.

    Raises:
        OSError: If the log file cannot be opened.
    """
    def __init__(self, terminal, log_file):
        self.terminal = terminal
        try:
            self.log = open(log_file, 'a')  # Open log file in append mode
        except OSError as e:
            raise OSError(f"Failed to open log file {log_file}: {e}") from e
    
    def _write_to_log(self, message):
        """Writes a message to the log file if it is open."""
        if not self.log.closed:
            self.log.write(message)
            self.log.flush()

    def write(self, message):
        """Writes a message to both terminal and log file."""
        self.terminal.write(message)
        self._write_to_log(message)
    
    def flush(self):
        """Flushes both terminal and log file streams."""
        self.terminal.flush()
        if not self.log.closed:
            self.log.flush()

    def close(self):
        """Closes the log file if it is open."""
        if not self.log.closed:
            self.log.close()

# Policy Classes

Policy classes for reinforcement learning, including `MaskedPolicy` for probabilistic action selection with action masks and `DeterministicMaskedPolicy` for deterministic action selection based on the highest valid action probability.

In [7]:
# Policy Classes
class MaskedPolicy(PyPolicy):
    """Wraps a base policy to mask invalid actions using an action mask.

    Args:
        base_policy: The underlying policy to wrap.
        verbose: If True, prints debugging information about actions and probabilities.
    """
    def __init__(self, base_policy, verbose=False):
        super().__init__(
            time_step_spec=base_policy.time_step_spec,
            action_spec=base_policy.action_spec,
            policy_state_spec=base_policy.policy_state_spec
        ) 
        self._base_policy = base_policy
        self._verbose = verbose

    def _action(self, timestep, policy_state, seed=None):
        """Generates an action by masking invalid actions in the base policy's distribution.

        Args:
            timestep: Current time step containing observation with action mask.
            policy_state: Current state of the policy.
            seed: Optional seed for sampling.

        Returns:
            PolicyStep: Contains the sampled action, state, and info.
        """
        dist_step = self._base_policy.distribution(timestep, policy_state)
        logits = dist_step.action.logits
        action_mask = timestep.observation['action_mask']

        masked_logits = logits + (action_mask - 1) * 1e10  # Apply mask to logits
        dist = tfp.distributions.Categorical(logits=masked_logits)

        action = dist.sample()
        if self._verbose:
            probs = tf.nn.softmax(logits).numpy()  # Compute probabilities before masking
            print(f"Action Mask: {action_mask.numpy()[0]}")
            print(f"Before Masking - Logits: {logits.numpy()[0]}")
            print(f"Before Masking - Probs: {probs[0]}")
            masked_probs = tf.nn.softmax(masked_logits).numpy()  # Compute probabilities after masking
            print(f"After Masking - Logits: {masked_logits.numpy()[0]}")
            print(f"After Masking - Probs: {masked_probs[0]}")
            print(f"After Masking - Action: {action[0]}")

        return policy_step.PolicyStep(action, dist_step.state, dist_step.info)

class DeterministicMaskedPolicy(PyPolicy):
    """Wraps a base policy to select the best valid action deterministically.

    Args:
        base_policy: The underlying policy to wrap.
        actor_network: Network used to compute action logits.
        verbose: If True, prints debugging information about actions and probabilities.
    """
    def __init__(self, base_policy, actor_network, verbose=False):
        super().__init__(
            time_step_spec=base_policy.time_step_spec,
            action_spec=base_policy.action_spec,
            policy_state_spec=base_policy.policy_state_spec
        )
        self._base_policy = base_policy
        self._actor_network = actor_network
        self._verbose = verbose

    def _action(self, timestep, policy_state, seed=None):
        """Generates a deterministic action by selecting the best valid action.

        Args:
            timestep: Current time step containing observation with action mask.
            policy_state: Current state of the policy.
            seed: Optional seed (unused in deterministic policy).

        Returns:
            PolicyStep: Contains the chosen action, state, and empty info.
        """
        observation = timestep.observation
        batched_observation = {
            key: tf.expand_dims(value, axis=0) if value.shape.rank == 1 else value
            for key, value in observation.items()
        }
        dist, _ = self._actor_network(batched_observation, timestep.step_type, policy_state)
        logits = dist.logits
        action_mask = timestep.observation['action_mask']

        masked_logits = logits + (action_mask - 1) * 1e10  # Apply mask to logits
        action = tf.argmax(masked_logits, axis=-1, output_type=tf.int32)  # Select action with highest logit

        if self._verbose:
            probs = tf.nn.softmax(logits).numpy()  # Compute probabilities before masking
            print(f"Action Mask (Eval): {action_mask.numpy()[0]}")
            print(f"Before Masking - Logits (Eval): {logits.numpy()[0]}")
            print(f"Before Masking - Probs (Eval): {probs[0]}")
            masked_probs = tf.nn.softmax(masked_logits).numpy()  # Compute probabilities after masking
            print(f"After Masking - Logits (Eval): {masked_logits.numpy()[0]}")
            print(f"After Masking - Probs (Eval): {masked_probs[0]}")
            print(f"After Masking - Action (Eval): {action[0]}")

        return policy_step.PolicyStep(action, policy_state, ())

# Episode Collection

The `EpisodeCollector` class manages the collection of training episodes for reinforcement learning, supporting both random and greedy opponents.

In [8]:
class EpisodeCollector:
    """Collects episodes for training using random or greedy opponents.

    Args:
        env: Environment to interact with.
        rewards: Dictionary of reward values for win, lose, tie, and step.
        def_level: Defensive level for reward adjustment.
        deterministic_policy: Optional policy for greedy opponent.
    """
    def __init__(self, env, rewards, def_level, deterministic_policy=None):
        self.env = env
        self.rewards = rewards
        self.def_level = def_level
        self.deterministic_policy = deterministic_policy

    def collect_random(self, policy, replay_buffer, num_episodes, verbose=False):
        """Collects episodes with a random opponent and stores trajectories in the replay buffer.

        Args:
            policy: Policy for Player 1.
            replay_buffer: Buffer to store trajectories.
            num_episodes: Number of episodes to collect.
            verbose: If True, prints trajectory and observation details.
        """
        for _ in range(num_episodes):
            timestep = self.env.reset()
            if self.env._env._envs[0].get_current_player() == 2:
                p2_action = get_p2_action(timestep.observation['action_mask'])  # Get random P2 action
                timestep = self.env.step(p2_action)
                timestep = timestep._replace(step_type=tf.constant(0, dtype=tf.int32))  # Set step type
            
            p1_policy_state = policy.get_initial_state(batch_size=1)
            while not timestep.is_last():
                action_step = policy.action(timestep, p1_policy_state)
                p1_next_timestep = self.env.step(action_step.action)
                p1_reward = p1_next_timestep.reward

                if self.env._env._envs[0].get_winner() == 1:
                    p1_observation = self.env._env._envs[0]._get_observation(current_player=1)  # Get P1 observation
                    p1_next_timestep = p1_next_timestep._replace(observation=p1_observation)  # Update observation
                    traj = from_transition(timestep, action_step, p1_next_timestep)  # Create trajectory
                    if verbose:
                        print("P1 traj:", traj)
                        print("p1_next_timestep obs:", p1_next_timestep.observation, "\n")
                    replay_buffer.add_batch(prepare_trajectory_for_buffer(traj))
                    break
                
                if p1_next_timestep.is_last():
                    p1_observation = self.env._env._envs[0]._get_observation(current_player=1)  # Get P1 observation
                    p1_next_timestep = p1_next_timestep._replace(observation=p1_observation)  # Update observation
                    traj = from_transition(timestep, action_step, p1_next_timestep)  # Create trajectory
                    if verbose:
                        print("P1 traj:", traj)
                        print("p1_next_timestep obs:", p1_next_timestep.observation, "\n")
                    replay_buffer.add_batch(prepare_trajectory_for_buffer(traj))
                    break
                
                p1_policy_state = action_step.state
                p2_action = get_p2_action(p1_next_timestep.observation['action_mask'])  # Get random P2 action
                p2_next_timestep = self.env.step(p2_action)

                if self.env._env._envs[0].get_winner() == 2:
                    p2_observation = self.env._env._envs[0]._get_observation(current_player=1)  # Get P1 observation
                    p2_next_timestep = p2_next_timestep._replace(
                        observation=p2_observation,
                        reward=tf.constant(self.rewards['lose'], dtype=tf.float32),
                        step_type=tf.constant(2, dtype=tf.int32)
                    )  # Update timestep
                    traj = from_transition(timestep, action_step, p2_next_timestep)  # Create trajectory
                    if verbose:
                        print("P1 traj:", traj)
                        print("p2_next_timestep obs:", p2_next_timestep.observation, "\n")
                    replay_buffer.add_batch(prepare_trajectory_for_buffer(traj))
                    break
                
                if p2_next_timestep.is_last():
                    p2_observation = self.env._env._envs[0]._get_observation(current_player=1)  # Get P1 observation
                    p2_next_timestep = p2_next_timestep._replace(
                        observation=p2_observation,
                        reward=tf.constant(self.rewards['tie'], dtype=tf.float32),
                        step_type=tf.constant(2, dtype=tf.int32)
                    )  # Update timestep
                    traj = from_transition(timestep, action_step, p2_next_timestep)  # Create trajectory
                    if verbose:
                        print("P1 traj:", traj)
                        print("p2_next_timestep obs:", p2_next_timestep.observation, "\n")
                    replay_buffer.add_batch(prepare_trajectory_for_buffer(traj))
                    break
                
                p2_pattern_reward = p2_next_timestep.reward - self.rewards['step']  # Compute P2 pattern reward
                adjusted_reward = p1_reward - self.def_level * p2_pattern_reward  # Adjust reward
                p2_next_timestep = p2_next_timestep._replace(reward=adjusted_reward)  # Update reward
                traj = from_transition(timestep, action_step, p2_next_timestep)  # Create trajectory
                if verbose:
                    print("P1 traj:", traj)
                    print("p2_next_timestep obs:", p2_next_timestep.observation, "\n")
                replay_buffer.add_batch(prepare_trajectory_for_buffer(traj))
                timestep = p2_next_timestep  # Update timestep

    def collect_selfplay(self, policy, replay_buffer, num_episodes, verbose=False):
        """Collects episodes with a greedy opponent using DeterministicMaskedPolicy.

        Args:
            policy: Policy for Player 1.
            replay_buffer: Buffer to store trajectories.
            num_episodes: Number of episodes to collect.
            verbose: If True, prints trajectory and observation details.
        """
        for _ in range(num_episodes):
            timestep = self.env.reset()  # Reset environment
            p2_policy_state = self.deterministic_policy.get_initial_state(batch_size=1)  # Initialize P2 policy state
            if self.env._env._envs[0].get_current_player() == 2:
                p2_action_step = self.deterministic_policy.action(timestep, p2_policy_state)  # Get P2 action
                p2_action = p2_action_step.action
                p2_policy_state = p2_action_step.state
                timestep = self.env.step(p2_action)  # Step environment
                timestep = timestep._replace(step_type=tf.constant(0, dtype=tf.int32))  # Set step type
            
            p1_policy_state = policy.get_initial_state(batch_size=1)  # Initialize P1 policy state
            while not timestep.is_last():
                action_step = policy.action(timestep, p1_policy_state)  # Get P1 action
                p1_next_timestep = self.env.step(action_step.action)  # Step environment
                p1_reward = p1_next_timestep.reward  # Get P1 reward

                if self.env._env._envs[0].get_winner() == 1:
                    p1_observation = self.env._env._envs[0]._get_observation(current_player=1)  # Get P1 observation
                    p1_next_timestep = p1_next_timestep._replace(observation=p1_observation)  # Update observation
                    traj = from_transition(timestep, action_step, p1_next_timestep)  # Create trajectory
                    if verbose:
                        print("P1 traj:", traj)
                        print("p1_next_timestep obs:", p1_next_timestep.observation, "\n")
                    replay_buffer.add_batch(prepare_trajectory_for_buffer(traj))  # Add to buffer
                    break
                
                if p1_next_timestep.is_last():
                    p1_observation = self.env._env._envs[0]._get_observation(current_player=1)  # Get P1 observation
                    p1_next_timestep = p1_next_timestep._replace(observation=p1_observation)  # Update observation
                    traj = from_transition(timestep, action_step, p1_next_timestep)  # Create trajectory
                    if verbose:
                        print("P1 traj:", traj)
                        print("p1_next_timestep obs:", p1_next_timestep.observation, "\n")
                    replay_buffer.add_batch(prepare_trajectory_for_buffer(traj))  # Add to buffer
                    break
                
                p1_policy_state = action_step.state  # Update P1 policy state
                p2_action_step = self.deterministic_policy.action(p1_next_timestep, p2_policy_state)  # Get P2 action
                p2_action = p2_action_step.action
                p2_policy_state = p2_action_step.state
                p2_next_timestep = self.env.step(p2_action)  # Step environment

                if self.env._env._envs[0].get_winner() == 2:
                    p2_observation = self.env._env._envs[0]._get_observation(current_player=1)  # Get P1 observation
                    p2_next_timestep = p2_next_timestep._replace(
                        observation=p2_observation,
                        reward=tf.constant(self.rewards['lose'], dtype=tf.float32),
                        step_type=tf.constant(2, dtype=tf.int32)
                    )  # Update timestep
                    traj = from_transition(timestep, action_step, p2_next_timestep)  # Create trajectory
                    if verbose:
                        print("P1 traj:", traj)
                        print("p2_next_timestep obs:", p2_next_timestep.observation, "\n")
                    replay_buffer.add_batch(prepare_trajectory_for_buffer(traj))  # Add to buffer
                    break
                
                if p2_next_timestep.is_last():
                    p2_observation = self.env._env._envs[0]._get_observation(current_player=1)  # Get P1 observation
                    p2_next_timestep = p2_next_timestep._replace(
                        observation=p2_observation,
                        reward=tf.constant(self.rewards['tie'], dtype=tf.float32),
                        step_type=tf.constant(2, dtype=tf.int32)
                    )  # Update timestep
                    traj = from_transition(timestep, action_step, p2_next_timestep)  # Create trajectory
                    if verbose:
                        print("P1 traj:", traj)
                        print("p2_next_timestep obs:", p2_next_timestep.observation, "\n")
                    replay_buffer.add_batch(prepare_trajectory_for_buffer(traj))  # Add to buffer
                    break
                
                p2_pattern_reward = p2_next_timestep.reward - self.rewards['step']  # Compute P2 pattern reward
                adjusted_reward = p1_reward - self.def_level * p2_pattern_reward  # Adjust reward
                p2_next_timestep = p2_next_timestep._replace(reward=adjusted_reward)  # Update reward
                traj = from_transition(timestep, action_step, p2_next_timestep)  # Create trajectory
                if verbose:
                    print("P1 traj:", traj)
                    print("p2_next_timestep obs:", p2_next_timestep.observation, "\n")
                replay_buffer.add_batch(prepare_trajectory_for_buffer(traj))  # Add to buffer
                timestep = p2_next_timestep  # Update timestep

# Policy Evaluation

The `PolicyEvaluator` class evaluates a policy's performance by playing against random or greedy opponents, tracking metrics like average reward, win rate, and step count.

In [9]:
class PolicyEvaluator:
    """Evaluates a policy against random or greedy opponents.

    Args:
        env: Environment to interact with.
        rewards: Dictionary of reward values for win, lose, tie, and step.
        def_level: Defensive level for reward adjustment.
        deterministic_policy: Optional policy for greedy opponent.
    """
    def __init__(self, env, rewards, def_level, deterministic_policy=None):
        self.env = env  # Store environment
        self.rewards = rewards  # Store reward configuration
        self.def_level = def_level  # Store defensive level
        self.deterministic_policy = deterministic_policy  # Store deterministic policy

    def evaluate_random(self, policy, num_episodes, verbose=False):
        """Evaluates a policy against a random opponent.

        Args:
            policy: Policy for Player 1.
            num_episodes: Number of episodes to evaluate.
            verbose: If True, prints reward and termination details.

        Returns:
            tuple: Average reward, win rate, loss rate, tie rate, average steps, P1 illegal rate, P2 illegal rate.
        """
        total_reward = 0.0
        wins = 0
        losses = 0
        ties = 0
        step_counts = 0
        p1_illegals = 0
        p2_illegals = 0
        
        for _ in range(num_episodes):
            timestep = self.env.reset()  # Reset environment
            policy_state = policy.get_initial_state(batch_size=1)  # Initialize policy state
            episode_reward = 0.0
            
            while not timestep.is_last():
                if self.env._env._envs[0].get_current_player() == 1:
                    action_step = policy.action(timestep, policy_state)  # Get P1 action
                    next_timestep = self.env.step(action_step.action)  # Step environment
                    p1_reward = next_timestep.reward  # Get P1 reward
                    
                    if next_timestep.is_last():
                        episode_reward += p1_reward.numpy()[0]  # Accumulate reward
                        if verbose:
                            print(f"P1 terminates. P1 reward = {p1_reward.numpy()[0]}")
                    else:
                        p2_action = get_p2_action(next_timestep.observation['action_mask'])  # Get random P2 action
                        p2_next_timestep = self.env.step(p2_action)  # Step environment
                        p2_reward = p2_next_timestep.reward  # Get P2 reward
                        winner = self.env._env._envs[0].get_winner()  # Check winner
                        if winner == 2:
                            episode_reward += self.rewards['lose']  # Accumulate loss reward
                            if verbose:
                                print(f"P2 terminates. P1 reward = {self.rewards['lose']}")
                        elif winner == 0:
                            episode_reward += self.rewards['tie']  # Accumulate tie reward
                            if verbose:
                                print(f"P2 terminates. P1 reward = {self.rewards['tie']}")
                        elif winner is None:
                            p2_pattern_reward = p2_reward.numpy()[0] - self.rewards['step']  # Compute P2 pattern reward
                            mid_reward = p1_reward.numpy()[0] - self.def_level * p2_pattern_reward  # Adjust reward
                            episode_reward += mid_reward  # Accumulate adjusted reward
                            if verbose:
                                print(f"Mid. P1 reward = {p1_reward.numpy()[0]} - {self.def_level} * {p2_pattern_reward} = {mid_reward}")
                        else:
                            raise ValueError("Random P2 makes illegal move!")
                        timestep = p2_next_timestep  # Update timestep
                        continue
                    
                    policy_state = action_step.state  # Update policy state
                else:
                    p2_action = get_p2_action(timestep.observation['action_mask'])  # Get random P2 action
                    next_timestep = self.env.step(p2_action)  # Step environment
                
                timestep = next_timestep  # Update timestep
            if verbose:
                print(f"P1 episode reward = {episode_reward}")

            if timestep.is_last():
                winner = self.env._env._envs[0].get_winner()  # Check winner
                current_player = self.env._env._envs[0].get_current_player()  # Check current player
                if winner == 1 and current_player == 1:
                    wins += 1  # Increment wins
                elif winner == 2 and current_player == 2:
                    losses += 1  # Increment losses
                elif winner == 0:
                    ties += 1  # Increment ties
                elif winner == 2 and current_player == 1:
                    p1_illegals += 1  # Increment P1 illegal moves
                elif winner == 1 and current_player == 2:
                    p2_illegals += 1  # Increment P2 illegal moves
            
            total_reward += episode_reward  # Accumulate total reward
            step_counts += self.env._env._envs[0]._step_count  # Accumulate step count
        return (total_reward / num_episodes, wins / num_episodes,
                losses / num_episodes, ties / num_episodes, step_counts / num_episodes,
                p1_illegals / num_episodes, p2_illegals / num_episodes)  # Return averages

    def evaluate_selfplay(self, policy, num_episodes, verbose=False):
        """Evaluates a policy against a greedy opponent using DeterministicMaskedPolicy.

        Args:
            policy: Policy for Player 1.
            num_episodes: Number of episodes to evaluate.
            verbose: If True, prints reward and termination details.

        Returns:
            tuple: Average reward, win rate, loss rate, tie rate, average steps, P1 illegal rate, P2 illegal rate.
        """
        total_reward = 0.0
        wins = 0
        losses = 0
        ties = 0
        step_counts = 0
        p1_illegals = 0
        p2_illegals = 0
        
        for _ in range(num_episodes):
            timestep = self.env.reset()  # Reset environment
            policy_state = policy.get_initial_state(batch_size=1)  # Initialize P1 policy state
            p2_policy_state = self.deterministic_policy.get_initial_state(batch_size=1)  # Initialize P2 policy state
            episode_reward = 0.0
            
            while not timestep.is_last():
                if self.env._env._envs[0].get_current_player() == 1:
                    action_step = policy.action(timestep, policy_state)  # Get P1 action
                    next_timestep = self.env.step(action_step.action)  # Step environment
                    p1_reward = next_timestep.reward  # Get P1 reward
                    
                    if next_timestep.is_last():
                        episode_reward += p1_reward.numpy()[0]  # Accumulate reward
                        if verbose:
                            print(f"P1 terminates. P1 reward = {p1_reward.numpy()[0]}")
                    else:
                        p2_action_step = self.deterministic_policy.action(next_timestep, p2_policy_state)  # Get P2 action
                        p2_action = p2_action_step.action
                        p2_policy_state = p2_action_step.state
                        p2_next_timestep = self.env.step(p2_action)  # Step environment
                        p2_reward = p2_next_timestep.reward  # Get P2 reward
                        winner = self.env._env._envs[0].get_winner()  # Check winner
                        if winner == 2:
                            episode_reward += self.rewards['lose']  # Accumulate loss reward
                            if verbose:
                                print(f"P2 terminates. P1 reward = {self.rewards['lose']}")
                        elif winner == 0:
                            episode_reward += self.rewards['tie']  # Accumulate tie reward
                            if verbose:
                                print(f"P2 terminates. P1 reward = {self.rewards['tie']}")
                        elif winner is None:
                            p2_pattern_reward = p2_reward.numpy()[0] - self.rewards['step']  # Compute P2 pattern reward
                            mid_reward = p1_reward.numpy()[0] - self.def_level * p2_pattern_reward  # Adjust reward
                            episode_reward += mid_reward  # Accumulate adjusted reward
                            if verbose:
                                print(f"Mid. P1 reward = {p1_reward.numpy()[0]} - {self.def_level} * {p2_pattern_reward} = {mid_reward}")
                        else:
                            raise ValueError("Greedy P2 makes illegal move!")
                        timestep = p2_next_timestep  # Update timestep
                        continue
                    
                    policy_state = action_step.state  # Update P1 policy state
                else:
                    p2_action_step = self.deterministic_policy.action(timestep, p2_policy_state)  # Get P2 action
                    p2_action = p2_action_step.action
                    p2_policy_state = p2_action_step.state
                    next_timestep = self.env.step(p2_action)  # Step environment
                
                timestep = next_timestep  # Update timestep
            if verbose:
                print(f"P1 episode reward = {episode_reward}")

            if timestep.is_last():
                winner = self.env._env._envs[0].get_winner()  # Check winner
                current_player = self.env._env._envs[0].get_current_player()  # Check current player
                if winner == 1 and current_player == 1:
                    wins += 1  # Increment wins
                elif winner == 2 and current_player == 2:
                    losses += 1  # Increment losses
                elif winner == 0:
                    ties += 1  # Increment ties
                elif winner == 2 and current_player == 1:
                    p1_illegals += 1  # Increment P1 illegal moves
                elif winner == 1 and current_player == 2:
                    p2_illegals += 1  # Increment P2 illegal moves
            
            total_reward += episode_reward  # Accumulate total reward
            step_counts += self.env._env._envs[0]._step_count  # Accumulate step count
        return (total_reward / num_episodes, wins / num_episodes,
                losses / num_episodes, ties / num_episodes, step_counts / num_episodes,
                p1_illegals / num_episodes, p2_illegals / num_episodes)  # Return averages

# PPO Training

The `PPOTrainer` class orchestrates the Proximal Policy Optimization (PPO) training pipeline, including environment setup, agent training, episode collection, evaluation, and logging.

In [10]:
class PPOTrainer:
    """Manages the PPO training pipeline, including environment, agent, and metrics.

    Args:
        config: Dictionary containing training configuration parameters.
        resume_from_checkpoint: Optional path to checkpoint for resuming training.
    """
    def __init__(self, config, resume_from_checkpoint=None):
        self.config = config
        self.save_dir = config['train_log_dir']
        self.checkpoint_dir = os.path.join(self.save_dir, 'checkpoint')
        self.metrics_file = os.path.join(self.save_dir, 'metrics.npz')
        self.policy_dir = os.path.join(self.save_dir, 'actor_network')
        self.policy_v1_dir = os.path.join(self.save_dir, 'policy_v1')
        self.policy_v2_dir = os.path.join(self.save_dir, 'policy_v2')
        self.num_iterations = config['phase1_iterations'] + config['phase2_iterations']
        
        # Validate and create directories
        try:
            os.makedirs(self.save_dir, exist_ok=True)
            os.makedirs(self.checkpoint_dir, exist_ok=True)
            os.makedirs(self.policy_dir, exist_ok=True)
            os.makedirs(self.policy_v1_dir, exist_ok=True)
            os.makedirs(self.policy_v2_dir, exist_ok=True)
            if not os.access(self.save_dir, os.W_OK):
                raise PermissionError(f"No write permission for directory: {self.save_dir}")
            if not os.access(self.checkpoint_dir, os.W_OK):
                raise PermissionError(f"No write permission for directory: {self.checkpoint_dir}")
            if not os.access(self.policy_dir, os.W_OK):
                raise PermissionError(f"No write permission for directory: {self.policy_dir}")
            if not os.access(self.policy_v1_dir, os.W_OK):
                raise PermissionError(f"No write permission for directory: {self.policy_v1_dir}")
            if not os.access(self.policy_v2_dir, os.W_OK):
                raise PermissionError(f"No write permission for directory: {self.policy_v2_dir}")
        except OSError as e:
            raise OSError(f"Failed to set up directories: {e}") from e

        # Initialize training and evaluation environments
        self.train_env = tf_py_environment.TFPyEnvironment(
            Standard_Env(
                board_size=config['board_size'],
                win_condition=config['win_condition'],
                unplayable_grids=config['unplayable_grids'],
                rewards=config['rewards'],
                def_level=config['def_level'],
                place_prob=config['place_prob'],
                show=False
            )
        )
        self.eval_env = tf_py_environment.TFPyEnvironment(
            Standard_Env(
                board_size=config['board_size'],
                win_condition=config['win_condition'],
                unplayable_grids=config['unplayable_grids'],
                rewards=config['rewards'],
                def_level=config['def_level'],
                place_prob=config['place_prob'],
                show=True
            )
        )

        # Initialize neural networks for actor and value functions
        preprocessing_layers = {
            'board': tf.keras.Sequential([
                tf.keras.layers.Conv2D(16, 2, activation='relu', padding='same'),
                tf.keras.layers.Flatten()
            ]),
            'action_mask': tf.keras.layers.Dense(16, activation='relu')
        }
        self.actor_net = actor_distribution_network.ActorDistributionNetwork(
            self.train_env.observation_spec(),
            self.train_env.action_spec(),
            preprocessing_layers=preprocessing_layers,
            preprocessing_combiner=tf.keras.layers.Concatenate(),
            fc_layer_params=config['fc_layer_params']
        )
        self.value_net = value_network.ValueNetwork(
            self.train_env.observation_spec(),
            preprocessing_layers=preprocessing_layers,
            preprocessing_combiner=tf.keras.layers.Concatenate(),
            fc_layer_params=config['fc_layer_params']
        )

        # Set up learning rate schedule and optimizer
        learning_rate_schedule = tf.keras.optimizers.schedules.PolynomialDecay(
            initial_learning_rate=config['initial_learning_rate'],
            decay_steps=config['decay_steps'],
            end_learning_rate=config['end_learning_rate'],
            power=1.0
        )
        optimizer = ScheduledAdamOptimizer(learning_rate_schedule=learning_rate_schedule)

        # Initialize PPO agent
        self.agent = ppo_agent.PPOAgent(
            self.train_env.time_step_spec(),
            self.train_env.action_spec(),
            actor_net=self.actor_net,
            value_net=self.value_net,
            optimizer=optimizer,
            num_epochs=5,
            entropy_regularization=0.2,
            importance_ratio_clipping=0.2
        )
        self.agent.initialize()

        self.replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
            data_spec=self.agent.collect_data_spec,
            batch_size=self.train_env.batch_size,
            max_length=10000
        )

        self.masked_policy = MaskedPolicy(self.agent.collect_policy, verbose=False)
        self.deterministic_masked_policy = DeterministicMaskedPolicy(self.agent.policy, self.actor_net, verbose=False)
        self.policy_v1 = None

        self.collector = EpisodeCollector(self.train_env, config['rewards'], config['def_level'], deterministic_policy=self.deterministic_masked_policy)
        self.evaluator = PolicyEvaluator(self.eval_env, config['rewards'], config['def_level'], deterministic_policy=self.deterministic_masked_policy)

        self.train_summary_phase1 = tf.summary.create_file_writer(os.path.join(self.save_dir, 'phase1'))
        self.train_summary_phase2 = tf.summary.create_file_writer(os.path.join(self.save_dir, 'phase2'))
        self.checkpointer = common.Checkpointer(
            ckpt_dir=self.checkpoint_dir,
            max_to_keep=1,
            agent=self.agent,
            actor_net=self.actor_net,
            value_net=self.value_net,
            global_step=self.agent.train_step_counter
        )

        # Initialize metrics dictionary for tracking training and evaluation
        self.metrics = {
            'train_losses': [],
            'random_avg_rewards_phase1': [],
            'random_win_rates_phase1': [],
            'random_tie_rates_phase1': [],
            'random_lose_rates_phase1': [],
            'random_avg_step_counts_phase1': [],
            'random_avg_rewards_phase2': [],
            'random_win_rates_phase2': [],
            'random_tie_rates_phase2': [],
            'random_lose_rates_phase2': [],
            'random_avg_step_counts_phase2': [],
            'selfplay_avg_rewards_phase2': [],
            'selfplay_win_rates_phase2': [],
            'selfplay_tie_rates_phase2': [],
            'selfplay_lose_rates_phase2': [],
            'selfplay_avg_step_counts_phase2': []
        }

        if resume_from_checkpoint:
            self.load_from_checkpoint(resume_from_checkpoint)

    def save_metrics(self):
        """Saves training metrics to a .npz file."""
        try:
            np.savez(self.metrics_file, **self.metrics)
            print(f"Metrics saved to {self.metrics_file}")
        except Exception as e:
            print(f"Failed to save metrics: {e}")

    def save_policy(self, policy_dir, policy_name):
        """Saves the actor network to the specified directory.

        Args:
            policy_dir: Directory to save the policy.
            policy_name: Name of the policy for logging.
        """
        try:
            tf.saved_model.save(self.actor_net, policy_dir)
            print(f"{policy_name} saved to {policy_dir}")
        except Exception as e:
            print(f"Failed to save {policy_name}: {e}")

    def log_train_loss(self, step, train_loss, phase):
        """Logs training loss to TensorBoard for the specified phase.

        Args:
            step: Current training step.
            train_loss: Loss value to log.
            phase: Training phase (1 or 2).
        """
        summary_writer = self.train_summary_phase1 if phase == 1 else self.train_summary_phase2
        prefix = 'phase1' if phase == 1 else 'phase2'
        with summary_writer.as_default():
            tf.summary.scalar(f'{prefix}/train/loss', train_loss, step=step)
        summary_writer.flush()

    def log_eval_metrics(self, step, phase):
        """Logs evaluation metrics to TensorBoard for the specified phase.

        Args:
            step: Current training step.
            phase: Training phase (1 or 2).
        """
        summary_writer = self.train_summary_phase1 if phase == 1 else self.train_summary_phase2
        prefix = 'phase1' if phase == 1 else 'phase2'
        with summary_writer.as_default():
            if phase == 1 and self.metrics['random_avg_rewards_phase1']:
                tf.summary.scalar(f'{prefix}/eval/random_avg_reward', self.metrics['random_avg_rewards_phase1'][-1], step=step)
                tf.summary.scalar(f'{prefix}/eval/random_win_rate', self.metrics['random_win_rates_phase1'][-1], step=step)
                tf.summary.scalar(f'{prefix}/eval/random_tie_rate', self.metrics['random_tie_rates_phase1'][-1], step=step)
                tf.summary.scalar(f'{prefix}/eval/random_loss_rate', self.metrics['random_lose_rates_phase1'][-1], step=step)
                tf.summary.scalar(f'{prefix}/eval/random_avg_step_count', self.metrics['random_avg_step_counts_phase1'][-1], step=step)
            if phase == 2:
                if self.metrics['random_avg_rewards_phase2']:
                    tf.summary.scalar(f'{prefix}/eval/random_avg_reward', self.metrics['random_avg_rewards_phase2'][-1], step=step)
                    tf.summary.scalar(f'{prefix}/eval/random_win_rate', self.metrics['random_win_rates_phase2'][-1], step=step)
                    tf.summary.scalar(f'{prefix}/eval/random_tie_rate', self.metrics['random_tie_rates_phase2'][-1], step=step)
                    tf.summary.scalar(f'{prefix}/eval/random_loss_rate', self.metrics['random_lose_rates_phase2'][-1], step=step)
                    tf.summary.scalar(f'{prefix}/eval/random_avg_step_count', self.metrics['random_avg_step_counts_phase2'][-1], step=step)
                if self.metrics['selfplay_avg_rewards_phase2']:
                    tf.summary.scalar(f'{prefix}/eval/selfplay_avg_reward', self.metrics['selfplay_avg_rewards_phase2'][-1], step=step)
                    tf.summary.scalar(f'{prefix}/eval/selfplay_win_rate', self.metrics['selfplay_win_rates_phase2'][-1], step=step)
                    tf.summary.scalar(f'{prefix}/eval/selfplay_tie_rate', self.metrics['selfplay_tie_rates_phase2'][-1], step=step)
                    tf.summary.scalar(f'{prefix}/eval/selfplay_lose_rate', self.metrics['selfplay_lose_rates_phase2'][-1], step=step)
                    tf.summary.scalar(f'{prefix}/eval/selfplay_avg_step_count', self.metrics['selfplay_avg_step_counts_phase2'][-1], step=step)
        summary_writer.flush()

    def trigger_system_sleep(self):
        """Triggers system sleep on macOS."""
        try:
            subprocess.run("pmset sleepnow", shell=True)
        except Exception as e:
            print(f"Failed to trigger system sleep: {e}")

    def train(self):
        """Executes the PPO training loop with two phases: random opponent and self-play."""
        def handle_crash(exctype, value, traceback):
            print(f"\nKernel crashed with {exctype}: {value}")
            print("Saving policy and metrics...")
            self.save_metrics()
            self.save_policy(self.policy_v1_dir, "Policy V1")
            self.save_policy(self.policy_v2_dir, "Policy V2")
            sys.__excepthook__(exctype, value, traceback)

        sys.excepthook = handle_crash  # Override default exception handler for crashes

        try:
            for i in range(self.config['phase1_iterations']):
                print(f"Phase 1 - Iteration {i+1}/{self.config['phase1_iterations']}")
                self.collector.collect_random(
                    self.masked_policy,
                    self.replay_buffer,
                    self.config['num_collect_episodes'],
                    verbose=False
                )
                experience = self.replay_buffer.gather_all()
                weights_before = self.actor_net.get_weights()
                train_loss = self.agent.train(experience).loss
                weights_after = self.actor_net.get_weights()
                weight_diff = [np.any(w_before != w_after) for w_before, w_after in zip(weights_before, weights_after)]
                print(f"Weights updated: {any(weight_diff)}")
                self.replay_buffer.clear()
                
                print(f"Loss: {train_loss.numpy():.4f}")
                self.metrics['train_losses'].append(train_loss.numpy())
                self.log_train_loss(i, train_loss.numpy(), phase=1)
                
                print(f"Iteration {i+1} memory: {psutil.Process().memory_info().rss / 1024**2:.2f} MB")
                
                if (i + 1) % self.config['eval_interval'] == 0:
                    random_metrics = self.evaluator.evaluate_random(
                        self.deterministic_masked_policy,
                        self.config['num_eval_episodes'],
                        verbose=True
                    )
                    print("Random Opponent:")
                    print(f"Avg Reward: {random_metrics[0]:.2f}")
                    print(f"Win Rate: {random_metrics[1]:.2%}, Lose Rate: {random_metrics[2]:.2%}, "
                          f"Tie Rate: {random_metrics[3]:.2%}, Avg Step Count: {random_metrics[4]}, "
                          f"P1 Illegal Rate: {random_metrics[5]:.2%}, P2 Illegal Rate: {random_metrics[6]:.2%}")

                    self.metrics['random_avg_rewards_phase1'].append(random_metrics[0])
                    self.metrics['random_win_rates_phase1'].append(random_metrics[1])
                    self.metrics['random_tie_rates_phase1'].append(random_metrics[3])
                    self.metrics['random_lose_rates_phase1'].append(random_metrics[2])
                    self.metrics['random_avg_step_counts_phase1'].append(random_metrics[4])

                    self.log_eval_metrics(i, phase=1)

            self.save_policy(self.policy_v1_dir, "Policy V1")

            # Create and initialize Policy V1 for self-play in Phase 2
            policy_v1_actor_net = actor_distribution_network.ActorDistributionNetwork(
                self.train_env.observation_spec(),
                self.train_env.action_spec(),
                preprocessing_layers={
                    'board': tf.keras.Sequential([
                        tf.keras.layers.Conv2D(16, 2, activation='relu', padding='same'),
                        tf.keras.layers.Flatten()
                    ]),
                    'action_mask': tf.keras.layers.Dense(16, activation='relu')
                },
                preprocessing_combiner=tf.keras.layers.Concatenate(),
                fc_layer_params=config['fc_layer_params']
            )

            policy_v1_actor_net.create_variables(training=False)
            policy_v1_actor_net.set_weights(self.actor_net.get_weights())

            policy_v1_base_policy = actor_policy.ActorPolicy(
                time_step_spec=self.train_env.time_step_spec(),
                action_spec=self.train_env.action_spec(),
                actor_network=policy_v1_actor_net,
                clip=True
            )

            self.policy_v1 = DeterministicMaskedPolicy(policy_v1_base_policy, policy_v1_actor_net, verbose=False)
            self.collector.deterministic_policy = self.policy_v1
            self.evaluator.deterministic_policy = self.policy_v1

            for i in range(self.config['phase1_iterations'], self.num_iterations):
                print(f"Phase 2 - Iteration {i+1}/{self.num_iterations}")
                num_random_episodes = int(self.config['num_collect_episodes'] * self.config['random_proportion'])
                num_selfplay_episodes = self.config['num_collect_episodes'] - num_random_episodes
                
                if num_random_episodes > 0:
                    self.collector.collect_random(
                        self.masked_policy,
                        self.replay_buffer,
                        num_random_episodes,
                        verbose=False
                    )

                if num_selfplay_episodes > 0:
                    self.collector.collect_selfplay(
                        self.masked_policy,
                        self.replay_buffer,
                        num_selfplay_episodes,
                        verbose=False
                    )

                experience = self.replay_buffer.gather_all()
                weights_before = self.actor_net.get_weights()
                train_loss = self.agent.train(experience).loss
                weights_after = self.actor_net.get_weights()
                weight_diff = [np.any(w_before != w_after) for w_before, w_after in zip(weights_before, weights_after)]
                print(f"Weights updated: {any(weight_diff)}")
                self.replay_buffer.clear()
                
                print(f"Loss: {train_loss.numpy():.4f}")
                self.metrics['train_losses'].append(train_loss.numpy())
                self.log_train_loss(i, train_loss.numpy(), phase=2)
                
                print(f"Iteration {i+1} memory: {psutil.Process().memory_info().rss / 1024**2:.2f} MB")
                
                if (i + 1) % self.config['eval_interval'] == 0:
                    random_metrics = self.evaluator.evaluate_random(
                        self.deterministic_masked_policy,
                        self.config['num_eval_episodes'],
                        verbose=True
                    )
                    print("Random Opponent:")
                    print(f"Avg Reward: {random_metrics[0]:.2f}")
                    print(f"Win Rate: {random_metrics[1]:.2%}, Lose Rate: {random_metrics[2]:.2%}, "
                          f"Tie Rate: {random_metrics[3]:.2%}, Avg Step Count: {random_metrics[4]}, "
                          f"P1 Illegal Rate: {random_metrics[5]:.2%}, P2 Illegal Rate: {random_metrics[6]:.2%}")

                    selfplay_metrics = self.evaluator.evaluate_selfplay(
                        self.deterministic_masked_policy,
                        self.config['num_eval_episodes'],
                        verbose=True
                    )
                    print("Self-play (Policy V1):")
                    print(f"Avg Reward: {selfplay_metrics[0]:.2f}")
                    print(f"Win Rate: {selfplay_metrics[1]:.2%}, Lose Rate: {selfplay_metrics[2]:.2%}, "
                          f"Tie Rate: {selfplay_metrics[3]:.2%}, Avg Step Count: {selfplay_metrics[4]}, "
                          f"P1 Illegal Rate: {selfplay_metrics[5]:.2%}, P2 Illegal Rate: {selfplay_metrics[6]:.2%}")

                    self.metrics['random_avg_rewards_phase2'].append(random_metrics[0])
                    self.metrics['random_win_rates_phase2'].append(random_metrics[1])
                    self.metrics['random_tie_rates_phase2'].append(random_metrics[3])
                    self.metrics['random_lose_rates_phase2'].append(random_metrics[2])
                    self.metrics['random_avg_step_counts_phase2'].append(random_metrics[4])
                    self.metrics['selfplay_avg_rewards_phase2'].append(selfplay_metrics[0])
                    self.metrics['selfplay_win_rates_phase2'].append(selfplay_metrics[1])
                    self.metrics['selfplay_tie_rates_phase2'].append(selfplay_metrics[3])
                    self.metrics['selfplay_lose_rates_phase2'].append(selfplay_metrics[2])
                    self.metrics['selfplay_avg_step_counts_phase2'].append(selfplay_metrics[4])

                    self.log_eval_metrics(i, phase=2)

            self.save_policy(self.policy_v2_dir, "Policy V2")
            self.save_metrics()
            self.train_summary_phase1.close()
            self.train_summary_phase2.close()
            self.trigger_system_sleep()
        except KeyboardInterrupt:
            print("\nTraining interrupted. Saving policy and metrics...")
            self.save_metrics()
            self.save_policy(self.policy_v1_dir, "Policy V1")
            self.save_policy(self.policy_v2_dir, "Policy V2")
            self.train_summary_phase1.close()
            self.train_summary_phase2.close()
        except Exception as e:
            print(f"Unexpected error: {e}")
            self.save_metrics()
            self.save_policy(self.policy_v1_dir, "Policy V1")
            self.save_policy(self.policy_v2_dir, "Policy V2")
            self.train_summary_phase1.close()
            self.train_summary_phase2.close()
            raise
        finally:
            print("Closing PrintLogger...")
            if isinstance(sys.stdout, PrintLogger):
                sys.stdout.close()
            if isinstance(sys.stderr, PrintLogger):
                sys.stderr.close()

# 3x3_No random_trained solely against random agent

In [11]:
board_size = (3, 3)
win_condition = [3, 3, 3]
def_level = 0.5
place_prob = 1
unplayable_grids = np.zeros(board_size)
rewards = {
    'win': 1.0, 'lose': -1.0, 'tie': 0.0, 'illegal': -1.0, 'forfeited': 0.0, 'step': -0.1,
    'row_dead_2': 0.1,
    'col_dead_2': 0.1,
    'diag_dead_2': 0.1
}

config = {
    'board_size': board_size,
    'win_condition': win_condition,
    'unplayable_grids': unplayable_grids,
    'rewards': rewards,
    'def_level': def_level,
    'place_prob': place_prob,
    'phase1_iterations': 1000,
    'phase2_iterations': 1000,
    'num_collect_episodes': 10,
    'random_proportion': 1,
    'num_eval_episodes': 50,
    'eval_interval': 50,
    'initial_learning_rate': 5e-4,
    'end_learning_rate': 1e-4,
    'decay_steps': 2000,
    'fc_layer_params': (64,),
    'train_log_dir': os.path.join('logs/test_3x3_no_random', f"PPO trained against random policy_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
}

log_file = os.path.join(config['train_log_dir'], 'terminal_output.log')
try:
    os.makedirs(config['train_log_dir'], exist_ok=True)
    if not os.access(config['train_log_dir'], os.W_OK):
        raise PermissionError(f"No write permission for directory: {config['train_log_dir']}")
    sys.stdout = PrintLogger(sys.stdout, log_file)
    sys.stderr = PrintLogger(sys.stderr, log_file)
    print("PrintLogger initialized. Starting training...")
    
    # Optionally resume from a checkpoint
    resume_checkpoint = None
    trainer = PPOTrainer(config, resume_from_checkpoint=resume_checkpoint)
    trainer.train()
except OSError as e:
    print(f"Failed to initialize PrintLogger: {e}", file=sys.__stdout__)
    sys.exit(1)



PrintLogger initialized. Starting training...
Phase 1 - Iteration 1/10
Instructions for updating:
Use `as_dataset(..., single_deterministic_pass=True)` instead.


Instructions for updating:
Use `as_dataset(..., single_deterministic_pass=True)` instead.


Weights updated: True
Loss: 46.2992
Iteration 1 memory: 436.08 MB
Phase 1 - Iteration 2/10
Weights updated: True
Loss: 1.3466
Iteration 2 memory: 437.44 MB
Phase 1 - Iteration 3/10
Weights updated: True
Loss: 0.6466
Iteration 3 memory: 438.83 MB
Phase 1 - Iteration 4/10
Weights updated: True
Loss: 1.7794
Iteration 4 memory: 439.66 MB
Phase 1 - Iteration 5/10
Weights updated: True
Loss: 1.2629
Iteration 5 memory: 440.25 MB

Round 1 - Player 2 ( O )
. . .
. . O
. . .


Round 2 - Player 1 ( X )
. . .
. . O
X . .


Round 3 - Player 2 ( O )
. . .
. O O
X . .
Pattern: row_dead_2

Mid. P1 reward = -0.10000000149011612 - 0.5 * 0.1 = -0.1500000014901161

Round 4 - Player 1 ( X )
. X .
. O O
X . .


Round 5 - Player 2 ( O )
. X O
. O O
X . .
Pattern: col_dead_2

Mid. P1 reward = -0.10000000149011612 - 0.5 * 0.1 = -0.1500000014901161

Round 6 - Player 1 ( X )
X X O
. O O
X . .
Pattern: col_dead_2


Round 7 - Player 2 ( O )
X X O
. O O
X . O

Winner: P2
P2 terminates. P1 reward = -1.0
P1 episode r

INFO:tensorflow:Assets written to: logs/test_3x3_no_random/PPO trained against random policy_20250512_180146/policy_v1/assets


Policy V1 saved to logs/test_3x3_no_random/PPO trained against random policy_20250512_180146/policy_v1
Phase 2 - Iteration 11/20
Weights updated: True
Loss: 1.2822
Iteration 11 memory: 460.92 MB
Phase 2 - Iteration 12/20
Weights updated: True
Loss: 1.0064
Iteration 12 memory: 461.72 MB
Phase 2 - Iteration 13/20
Weights updated: True
Loss: 1.5719
Iteration 13 memory: 462.27 MB
Phase 2 - Iteration 14/20
Weights updated: True
Loss: 0.8784
Iteration 14 memory: 462.75 MB
Phase 2 - Iteration 15/20
Weights updated: True
Loss: 1.3771
Iteration 15 memory: 463.27 MB

Round 1 - Player 1 ( X )
. . .
. X .
. . .


Round 2 - Player 2 ( O )
. . .
. X .
. . O

Mid. P1 reward = -0.10000000149011612 - 0.5 * -1.4901161138336505e-09 = -0.10000000074505806

Round 3 - Player 1 ( X )
. . .
. X .
X . O
Pattern: diag_dead_2


Round 4 - Player 2 ( O )
. . .
. X .
X O O

Mid. P1 reward = 0.0 - 0.5 * -1.4901161138336505e-09 = 7.450580569168253e-10

Round 5 - Player 1 ( X )
. . X
. X .
X O O

Winner: P1
P1 termina

INFO:tensorflow:Assets written to: logs/test_3x3_no_random/PPO trained against random policy_20250512_180146/policy_v2/assets


Policy V2 saved to logs/test_3x3_no_random/PPO trained against random policy_20250512_180146/policy_v2
Metrics saved to logs/test_3x3_no_random/PPO trained against random policy_20250512_180146/metrics.npz
Sleeping now...
Closing PrintLogger...




PrintLogger initialized. Starting training...
Phase 1 - Iteration 1/1000
Weights updated: True
Loss: 57.1400
Iteration 1 memory: 476.31 MB
Phase 1 - Iteration 2/1000
Weights updated: True
Loss: 1.4176
Iteration 2 memory: 477.11 MB
Phase 1 - Iteration 3/1000
Weights updated: True
Loss: 1.0379
Iteration 3 memory: 477.83 MB
Phase 1 - Iteration 4/1000
Weights updated: True
Loss: 0.7200
Iteration 4 memory: 478.50 MB
Phase 1 - Iteration 5/1000
Weights updated: True
Loss: 1.1816
Iteration 5 memory: 479.12 MB
Phase 1 - Iteration 6/1000
Weights updated: True
Loss: 1.4076
Iteration 6 memory: 479.62 MB
Phase 1 - Iteration 7/1000
Weights updated: True
Loss: 1.6659
Iteration 7 memory: 480.11 MB
Phase 1 - Iteration 8/1000
Weights updated: True
Loss: 1.1001
Iteration 8 memory: 480.59 MB
Phase 1 - Iteration 9/1000
Weights updated: True
Loss: 1.3779
Iteration 9 memory: 481.06 MB
Phase 1 - Iteration 10/1000
Weights updated: True
Loss: 1.4923
Iteration 10 memory: 481.53 MB
Phase 1 - Iteration 11/1000
Wei

INFO:tensorflow:Assets written to: logs/test_3x3_no_random/PPO trained by self-play_20250512_180425/policy_v1/assets


Policy V1 saved to logs/test_3x3_no_random/PPO trained by self-play_20250512_180425/policy_v1
INFO:tensorflow:Assets written to: logs/test_3x3_no_random/PPO trained by self-play_20250512_180425/policy_v2/assets


INFO:tensorflow:Assets written to: logs/test_3x3_no_random/PPO trained by self-play_20250512_180425/policy_v2/assets


Policy V2 saved to logs/test_3x3_no_random/PPO trained by self-play_20250512_180425/policy_v2
Closing PrintLogger...




PrintLogger initialized. Starting training...
Phase 1 - Iteration 1/10
Weights updated: True
Loss: 51.6583
Iteration 1 memory: 493.91 MB
Phase 1 - Iteration 2/10
Weights updated: True
Loss: 1.7028
Iteration 2 memory: 494.53 MB
Phase 1 - Iteration 3/10
Weights updated: True
Loss: 1.1683
Iteration 3 memory: 495.27 MB
Phase 1 - Iteration 4/10
Weights updated: True
Loss: 1.2776
Iteration 4 memory: 495.84 MB
Phase 1 - Iteration 5/10
Weights updated: True
Loss: 1.1655
Iteration 5 memory: 496.41 MB
Phase 1 - Iteration 6/10
Weights updated: True
Loss: 1.3747
Iteration 6 memory: 496.91 MB
Phase 1 - Iteration 7/10
Weights updated: True
Loss: 1.6819
Iteration 7 memory: 497.48 MB
Phase 1 - Iteration 8/10
Weights updated: True
Loss: 1.7542
Iteration 8 memory: 497.95 MB
Phase 1 - Iteration 9/10
Weights updated: True
Loss: 1.4530
Iteration 9 memory: 498.44 MB
Phase 1 - Iteration 10/10
Weights updated: True
Loss: 1.3151
Iteration 10 memory: 498.95 MB
INFO:tensorflow:Assets written to: logs/test_3x3_no

INFO:tensorflow:Assets written to: logs/test_3x3_no_random/PPO trained by self-play_20250512_180446/policy_v1/assets


Policy V1 saved to logs/test_3x3_no_random/PPO trained by self-play_20250512_180446/policy_v1
Phase 2 - Iteration 11/20
Weights updated: True
Loss: 1.4749
Iteration 11 memory: 502.91 MB
Phase 2 - Iteration 12/20
Weights updated: True
Loss: 1.6755
Iteration 12 memory: 503.66 MB
Phase 2 - Iteration 13/20
Weights updated: True
Loss: 1.4029
Iteration 13 memory: 504.48 MB
Phase 2 - Iteration 14/20

Training interrupted. Saving policy and metrics...
Metrics saved to logs/test_3x3_no_random/PPO trained by self-play_20250512_180446/metrics.npz
INFO:tensorflow:Assets written to: logs/test_3x3_no_random/PPO trained by self-play_20250512_180446/policy_v1/assets


INFO:tensorflow:Assets written to: logs/test_3x3_no_random/PPO trained by self-play_20250512_180446/policy_v1/assets


Policy V1 saved to logs/test_3x3_no_random/PPO trained by self-play_20250512_180446/policy_v1
INFO:tensorflow:Assets written to: logs/test_3x3_no_random/PPO trained by self-play_20250512_180446/policy_v2/assets


INFO:tensorflow:Assets written to: logs/test_3x3_no_random/PPO trained by self-play_20250512_180446/policy_v2/assets


Policy V2 saved to logs/test_3x3_no_random/PPO trained by self-play_20250512_180446/policy_v2
Closing PrintLogger...


# 3x3_No random_agent trained by self-play

In [None]:
board_size = (3, 3)
win_condition = [3, 3, 3]
def_level = 0.5
place_prob = 1
unplayable_grids = np.zeros(board_size)
rewards = {
    'win': 1.0, 'lose': -1.0, 'tie': 0.0, 'illegal': -1.0, 'forfeited': 0.0, 'step': -0.1,
    'row_dead_2': 0.1,
    'col_dead_2': 0.1,
    'diag_dead_2': 0.1
}

config = {
    'board_size': board_size,
    'win_condition': win_condition,
    'unplayable_grids': unplayable_grids,
    'rewards': rewards,
    'def_level': def_level,
    'place_prob': place_prob,
    'phase1_iterations': 1000,
    'phase2_iterations': 1000,
    'num_collect_episodes': 10,
    'random_proportion': 0.75,
    'num_eval_episodes': 50,
    'eval_interval': 50,
    'initial_learning_rate': 5e-4,
    'end_learning_rate': 1e-4,
    'decay_steps': 2000,
    'fc_layer_params': (64,),
    'train_log_dir': os.path.join('logs/test_3x3_no_random', f"PPO trained by self-play_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
}

log_file = os.path.join(config['train_log_dir'], 'terminal_output.log')
try:
    os.makedirs(config['train_log_dir'], exist_ok=True)
    if not os.access(config['train_log_dir'], os.W_OK):
        raise PermissionError(f"No write permission for directory: {config['train_log_dir']}")
    sys.stdout = PrintLogger(sys.stdout, log_file)
    sys.stderr = PrintLogger(sys.stderr, log_file)
    print("PrintLogger initialized. Starting training...")
    
    # Optionally resume from a checkpoint
    resume_checkpoint = None
    trainer = PPOTrainer(config, resume_from_checkpoint=resume_checkpoint)
    trainer.train()
except OSError as e:
    print(f"Failed to initialize PrintLogger: {e}", file=sys.__stdout__)
    sys.exit(1)



PrintLogger initialized. Starting training...
Phase 1 - Iteration 1/10
Instructions for updating:
Use `as_dataset(..., single_deterministic_pass=True)` instead.


Instructions for updating:
Use `as_dataset(..., single_deterministic_pass=True)` instead.


Weights updated: True
Loss: 67.0096
Iteration 1 memory: 435.77 MB
Phase 1 - Iteration 2/10
Weights updated: True
Loss: 1.6868
Iteration 2 memory: 437.22 MB
Phase 1 - Iteration 3/10
Weights updated: True
Loss: 1.0518
Iteration 3 memory: 438.19 MB
Phase 1 - Iteration 4/10
Weights updated: True
Loss: 1.5383
Iteration 4 memory: 439.19 MB
Phase 1 - Iteration 5/10
Weights updated: True
Loss: 1.5510
Iteration 5 memory: 440.05 MB
Phase 1 - Iteration 6/10
Weights updated: True
Loss: 1.3818
Iteration 6 memory: 440.62 MB
Phase 1 - Iteration 7/10
Weights updated: True
Loss: 1.3638
Iteration 7 memory: 441.25 MB
Phase 1 - Iteration 8/10
Weights updated: True
Loss: 1.1466
Iteration 8 memory: 441.73 MB
Phase 1 - Iteration 9/10
Weights updated: True
Loss: 1.5619
Iteration 9 memory: 442.44 MB
Phase 1 - Iteration 10/10
Weights updated: True
Loss: 1.2079
Iteration 10 memory: 443.34 MB
INFO:tensorflow:Assets written to: logs/test_3x3_no_random/PPO trained by self-play_20250512_180529/policy_v1/assets


INFO:tensorflow:Assets written to: logs/test_3x3_no_random/PPO trained by self-play_20250512_180529/policy_v1/assets


Policy V1 saved to logs/test_3x3_no_random/PPO trained by self-play_20250512_180529/policy_v1
Phase 2 - Iteration 11/20
Weights updated: True
Loss: 1.3029
Iteration 11 memory: 459.45 MB
Phase 2 - Iteration 12/20
Weights updated: True
Loss: 1.2683
Iteration 12 memory: 460.33 MB
Phase 2 - Iteration 13/20
Weights updated: True
Loss: 1.2252
Iteration 13 memory: 460.98 MB
Phase 2 - Iteration 14/20
Weights updated: True
Loss: 1.4602
Iteration 14 memory: 461.59 MB
Phase 2 - Iteration 15/20
Weights updated: True
Loss: 1.2661
Iteration 15 memory: 462.08 MB
Phase 2 - Iteration 16/20
Weights updated: True
Loss: 1.2976
Iteration 16 memory: 462.66 MB
Phase 2 - Iteration 17/20
Weights updated: True
Loss: 1.2963
Iteration 17 memory: 463.20 MB
Phase 2 - Iteration 18/20
Weights updated: True
Loss: 1.2033
Iteration 18 memory: 463.70 MB
Phase 2 - Iteration 19/20
Weights updated: True
Loss: 1.2687
Iteration 19 memory: 464.23 MB
Phase 2 - Iteration 20/20
Weights updated: True
Loss: 1.0359
Iteration 20 memo

INFO:tensorflow:Assets written to: logs/test_3x3_no_random/PPO trained by self-play_20250512_180529/policy_v2/assets


Policy V2 saved to logs/test_3x3_no_random/PPO trained by self-play_20250512_180529/policy_v2
Metrics saved to logs/test_3x3_no_random/PPO trained by self-play_20250512_180529/metrics.npz
Sleeping now...
Closing PrintLogger...


# 3x3_With random_trained solely against random agent

In [None]:
board_size = (3, 3)
win_condition = [3, 3, 3]
def_level = 0.5
place_prob = 0.5
unplayable_grids = np.zeros(board_size)
rewards = {
    'win': 1.0, 'lose': -1.0, 'tie': 0.0, 'illegal': -1.0, 'forfeited': 0.0, 'step': -0.1,
    'row_dead_2': 0.1,
    'col_dead_2': 0.1,
    'diag_dead_2': 0.1
}

config = {
    'board_size': board_size,
    'win_condition': win_condition,
    'unplayable_grids': unplayable_grids,
    'rewards': rewards,
    'def_level': def_level,
    'place_prob': place_prob,
    'phase1_iterations': 1000,
    'phase2_iterations': 1000,
    'num_collect_episodes': 10,
    'random_proportion': 1,
    'num_eval_episodes': 50,
    'eval_interval': 50,
    'initial_learning_rate': 5e-4,
    'end_learning_rate': 1e-4,
    'decay_steps': 2000,
    'fc_layer_params': (64,),
    'train_log_dir': os.path.join('logs/test_3x3_with_random', f"PPO trained against random policy_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
}

log_file = os.path.join(config['train_log_dir'], 'terminal_output.log')
try:
    os.makedirs(config['train_log_dir'], exist_ok=True)
    if not os.access(config['train_log_dir'], os.W_OK):
        raise PermissionError(f"No write permission for directory: {config['train_log_dir']}")
    sys.stdout = PrintLogger(sys.stdout, log_file)
    sys.stderr = PrintLogger(sys.stderr, log_file)
    print("PrintLogger initialized. Starting training...")
    
    # Optionally resume from a checkpoint
    resume_checkpoint = None
    trainer = PPOTrainer(config, resume_from_checkpoint=resume_checkpoint)
    trainer.train()
except OSError as e:
    print(f"Failed to initialize PrintLogger: {e}", file=sys.__stdout__)
    sys.exit(1)

# 3x3_With random_trained by self-play

In [None]:
board_size = (3, 3)
win_condition = [3, 3, 3]
def_level = 0.5
place_prob = 0.5
unplayable_grids = np.zeros(board_size)
rewards = {
    'win': 1.0, 'lose': -1.0, 'tie': 0.0, 'illegal': -1.0, 'forfeited': 0.0, 'step': -0.1,
    'row_dead_2': 0.1,
    'col_dead_2': 0.1,
    'diag_dead_2': 0.1
}

config = {
    'board_size': board_size,
    'win_condition': win_condition,
    'unplayable_grids': unplayable_grids,
    'rewards': rewards,
    'def_level': def_level,
    'place_prob': place_prob,
    'phase1_iterations': 1000,
    'phase2_iterations': 1000,
    'num_collect_episodes': 10,
    'random_proportion': 0.75,
    'num_eval_episodes': 50,
    'eval_interval': 50,
    'initial_learning_rate': 5e-4,
    'end_learning_rate': 1e-4,
    'decay_steps': 2000,
    'fc_layer_params': (64,),
    'train_log_dir': os.path.join('logs/test_3x3_with_random', f"PPO trained by self-play_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
}

log_file = os.path.join(config['train_log_dir'], 'terminal_output.log')
try:
    os.makedirs(config['train_log_dir'], exist_ok=True)
    if not os.access(config['train_log_dir'], os.W_OK):
        raise PermissionError(f"No write permission for directory: {config['train_log_dir']}")
    sys.stdout = PrintLogger(sys.stdout, log_file)
    sys.stderr = PrintLogger(sys.stderr, log_file)
    print("PrintLogger initialized. Starting training...")
    
    # Optionally resume from a checkpoint
    resume_checkpoint = None
    trainer = PPOTrainer(config, resume_from_checkpoint=resume_checkpoint)
    trainer.train()
except OSError as e:
    print(f"Failed to initialize PrintLogger: {e}", file=sys.__stdout__)
    sys.exit(1)

# 12x12_With random_trained solely against random agent

In [None]:
board_size = (12, 12)
win_condition = [4, 4, 5]
def_level = 0.5
place_prob = 0.5
unplayable_grids = np.zeros(board_size)
corner_size = 4
unplayable_grids[:corner_size, :corner_size] = 1
unplayable_grids[:corner_size, -corner_size:] = 1
unplayable_grids[-corner_size:, :corner_size] = 1
unplayable_grids[-corner_size:, -corner_size:] = 1

rewards = {
    'win': 1.0, 'lose': -1.0, 'tie': 0.0, 'illegal': -1.0, 'forfeited': 0.0, 'step': -0.06,
    'row_live_3': 0.16, 'row_dead_3': 0.08, 'row_live_2': 0.04, 'row_dead_2': 0.02,
    'col_live_3': 0.16, 'col_dead_3': 0.08, 'col_live_2': 0.04, 'col_dead_2': 0.02,
    'diag_live_4': 0.16, 'diag_dead_4': 0.08, 'diag_live_3': 0.04, 'diag_dead_3': 0.02, 'diag_live_2': 0.01, 'diag_dead_2': 0.005
}

config = {
    'board_size': board_size,
    'win_condition': win_condition,
    'unplayable_grids': unplayable_grids,
    'rewards': rewards,
    'def_level': def_level,
    'place_prob': place_prob,
    'phase1_iterations': 1000,
    'phase2_iterations': 1000,
    'num_collect_episodes': 10,
    'random_proportion': 1,
    'num_eval_episodes': 50,
    'eval_interval': 50,
    'initial_learning_rate': 5e-4,
    'end_learning_rate': 1e-4,
    'decay_steps': 2000,
    'fc_layer_params': (64,),
    'train_log_dir': os.path.join('logs/test_12x12_with_random', f"PPO trained against random policy_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
}

log_file = os.path.join(config['train_log_dir'], 'terminal_output.log')
try:
    os.makedirs(config['train_log_dir'], exist_ok=True)
    if not os.access(config['train_log_dir'], os.W_OK):
        raise PermissionError(f"No write permission for directory: {config['train_log_dir']}")
    sys.stdout = PrintLogger(sys.stdout, log_file)
    sys.stderr = PrintLogger(sys.stderr, log_file)
    print("PrintLogger initialized. Starting training...")
    
    # Optionally resume from a checkpoint
    resume_checkpoint = None
    trainer = PPOTrainer(config, resume_from_checkpoint=resume_checkpoint)
    trainer.train()
except OSError as e:
    print(f"Failed to initialize PrintLogger: {e}", file=sys.__stdout__)
    sys.exit(1)

# 12x12_With random_trained by self-play

In [11]:
board_size = (12, 12)
win_condition = [4, 4, 5]
def_level = 0.5
place_prob = 0.5
unplayable_grids = np.zeros(board_size)
corner_size = 4
unplayable_grids[:corner_size, :corner_size] = 1
unplayable_grids[:corner_size, -corner_size:] = 1
unplayable_grids[-corner_size:, :corner_size] = 1
unplayable_grids[-corner_size:, -corner_size:] = 1

rewards = {
    'win': 1.0, 'lose': -1.0, 'tie': 0.0, 'illegal': -1.0, 'forfeited': 0.0, 'step': -0.06,
    'row_live_3': 0.16, 'row_dead_3': 0.08, 'row_live_2': 0.04, 'row_dead_2': 0.02,
    'col_live_3': 0.16, 'col_dead_3': 0.08, 'col_live_2': 0.04, 'col_dead_2': 0.02,
    'diag_live_4': 0.16, 'diag_dead_4': 0.08, 'diag_live_3': 0.04, 'diag_dead_3': 0.02, 'diag_live_2': 0.01, 'diag_dead_2': 0.005
}

config = {
    'board_size': board_size,
    'win_condition': win_condition,
    'unplayable_grids': unplayable_grids,
    'rewards': rewards,
    'def_level': def_level,
    'place_prob': place_prob,
    'phase1_iterations': 1000,
    'phase2_iterations': 1000,
    'num_collect_episodes': 10,
    'random_proportion': 0.75,
    'num_eval_episodes': 50,
    'eval_interval': 50,
    'initial_learning_rate': 5e-4,
    'end_learning_rate': 1e-4,
    'decay_steps': 2000,
    'fc_layer_params': (64,),
    'train_log_dir': os.path.join('logs/test_12x12_with_random', f"PPO trained by self-play_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
}

log_file = os.path.join(config['train_log_dir'], 'terminal_output.log')
try:
    os.makedirs(config['train_log_dir'], exist_ok=True)
    if not os.access(config['train_log_dir'], os.W_OK):
        raise PermissionError(f"No write permission for directory: {config['train_log_dir']}")
    sys.stdout = PrintLogger(sys.stdout, log_file)
    sys.stderr = PrintLogger(sys.stderr, log_file)
    print("PrintLogger initialized. Starting training...")
    
    # Optionally resume from a checkpoint
    resume_checkpoint = None
    trainer = PPOTrainer(config, resume_from_checkpoint=resume_checkpoint)
    trainer.train()
except OSError as e:
    print(f"Failed to initialize PrintLogger: {e}", file=sys.__stdout__)
    sys.exit(1)



PrintLogger initialized. Starting training...
Phase 1 - Iteration 1/1000
Instructions for updating:
Use `as_dataset(..., single_deterministic_pass=True)` instead.


Instructions for updating:
Use `as_dataset(..., single_deterministic_pass=True)` instead.


Weights updated: True
Loss: 221.1087
Iteration 1 memory: 519.23 MB
Phase 1 - Iteration 2/1000
Weights updated: True
Loss: 14.2152
Iteration 2 memory: 533.59 MB
Phase 1 - Iteration 3/1000
Weights updated: True
Loss: 13.3386
Iteration 3 memory: 546.30 MB
Phase 1 - Iteration 4/1000
Weights updated: True
Loss: 11.8281
Iteration 4 memory: 555.42 MB
Phase 1 - Iteration 5/1000
Weights updated: True
Loss: 12.3907
Iteration 5 memory: 562.61 MB
Phase 1 - Iteration 6/1000
Weights updated: True
Loss: 14.5515
Iteration 6 memory: 575.38 MB
Phase 1 - Iteration 7/1000
Weights updated: True
Loss: 12.7034
Iteration 7 memory: 577.70 MB
Phase 1 - Iteration 8/1000

Training interrupted. Saving policy and metrics...
Metrics saved to logs/test_12x12_with_random/PPO trained by self-play_20250512_181046/metrics.npz
INFO:tensorflow:Assets written to: logs/test_12x12_with_random/PPO trained by self-play_20250512_181046/policy_v1/assets


INFO:tensorflow:Assets written to: logs/test_12x12_with_random/PPO trained by self-play_20250512_181046/policy_v1/assets


Policy V1 saved to logs/test_12x12_with_random/PPO trained by self-play_20250512_181046/policy_v1
INFO:tensorflow:Assets written to: logs/test_12x12_with_random/PPO trained by self-play_20250512_181046/policy_v2/assets


INFO:tensorflow:Assets written to: logs/test_12x12_with_random/PPO trained by self-play_20250512_181046/policy_v2/assets


Policy V2 saved to logs/test_12x12_with_random/PPO trained by self-play_20250512_181046/policy_v2
Closing PrintLogger...
