<a href="https://colab.research.google.com/github/hinsley/RL-depot/blob/master/temporal-difference/TDc4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# TDc4

Temporal difference learning model for Connect 4

---

Uses value function approximation inspired by [this paper](https://link.springer.com/content/pdf/10.1007/s10994-012-5280-0.pdf)

State features: 1x1, 2x2, 3x3, 4x4, 5x5 local configurations, reflected horizontally for weight sharing, with location dependent and location independent weight vectors

In [0]:
import math
import numpy as np
import requests
from collections import Counter
from multiprocessing import Pool
from random import choice, choices
from time import time
from typing import Dict, List, Optional, Tuple, Union

In [0]:
PositionState = int # See below.
VALUE_BLANK = 0
VALUE_X = 1
VALUE_O = 2

GameState = np.ndarray # 6 x 7 (Rows x Cols)

LocalFeature = np.ndarray # [1, 5] x [1, 5] (Rows x Cols)

EncodedFeature = int # Does not include any indication of local feature size.

# Indexed by feature size, and row/column of upper-leftmost position. Row and column are -1 for location-independent features.
EncodedFeatureVector = Dict[int, Dict[int, Dict[int, List[EncodedFeature]]]]

# Indexed by feature size, and row/column of upper-leftmost position. Row and column are -1 for location-independent features.
WeightVector = Dict[int, Dict[int, Dict[int, Dict[EncodedFeature, float]]]]

StateValue = float # [0.0, 1.0]

In [0]:
def pprint(state: GameState):
  """ Displays a given game configuration. """
  def rasterize_position(position_state: PositionState) -> str:
    return {
        VALUE_BLANK: " ",
        VALUE_X: "X",
        VALUE_O: "O",
    }[position_state]

  print(" __ _ _ _ _ _ __")
  for i, row in enumerate(state):
    print(f"""{i+1}|{' '.join([rasterize_position(position_state) for
                              position_state in
                              row])}|""")
  print(" -- - - - - - --")
  print("  A B C D E F G ")

In [0]:
def empty_game_state() -> GameState:
  """ Generates an empty, new game state. """
  rows = 6
  cols = 7
  return np.full((rows, cols), VALUE_BLANK)

In [0]:
def eval_state(state: GameState, check_game_over: bool=False) -> Union[StateValue, bool]:
  """
  Takes as input a game state and returns a reward (for X winning), punishment
  (for X losing), or a neutral state value (unexplored ambiguity). This is NOT
  called for states that have already been explored. If check_game_over is
  True, returns a boolean value that is True if the game is over, and False
  otherwise.
  """
  if np.all(state != VALUE_BLANK):
    if check_game_over:
      return True
    else:
      return 0.0 # Draw!

  def check_horizontal_wins(state: GameState) -> StateValue:
    for row in state:
      for col in range(len(row) - 3):
        if (row[col:col+4] == row[col]).all() and row[col] in [VALUE_X, VALUE_O]:
          return float(row[col] == VALUE_X) * 2 - 1
    return 0.0 # Ambiguous state.

  horizontal_win = check_horizontal_wins(state)

  if horizontal_win != 0.0:
    if check_game_over:
      return True
    else:
      return horizontal_win
  
  vertical_win = check_horizontal_wins(state.T)

  if vertical_win != 0.0:
    if check_game_over:
      return True
    else:
      return vertical_win

  def check_diagonal_wins(state: GameState) -> StateValue:
    for row in range(len(state) - 3):
      for col in range(len(state[row]) - 3):
        if all([state[row, col] == state[row + offset, col + offset] for offset in range(1, 4)]) and state[row, col] in [VALUE_X, VALUE_O]:
          return float(state[row, col] == VALUE_X) * 2 - 1
    return 0.0 # Ambiguous state.
  
  backslash_diagonal_win = check_diagonal_wins(state)

  if backslash_diagonal_win != 0.0:
    if check_game_over:
      return True
    else:
      return backslash_diagonal_win
  
  slash_diagonal_win = check_diagonal_wins(np.flip(state, axis=1))

  if slash_diagonal_win != 0.0:
    if check_game_over:
      return True
    else:
      return slash_diagonal_win
  
  if check_game_over:
    return False
  else:
    return 0.0

In [0]:
def active_features(state: GameState) -> List[Tuple[int, int, int, LocalFeature]]:
  """
  Takes in a board state and returns the active features.

  For each active feature, we return a tuple consisting of the local feature
  size (width or height), the row index of the top left position within the
  feature, the column index of the top left position within the feature, and
  the feature itself.
  """
  local_features = []

  for local_size in range(1, 6): # local_size is width and height of a feature.
    for row in range(state.shape[0] - local_size + 1):
      for col in range(state.shape[1] - local_size + 1):
        local_feature = state[row : row + local_size, col : col + local_size]

        # Location-independent feature.
        local_features.append((
            local_size,
            -1,
            -1,
            local_feature,
        ))

        # Location-dependent feature.
        local_features.append((
          local_size,
          row,
          col,
          local_feature,
        ))
  
  return local_features

In [0]:
def encode_feature(feature: LocalFeature) -> EncodedFeature:
  """ Encodes a local feature so that it is hashable for tabular indexing. """
  encoding = 0

  for row in range(feature.shape[0]):
    for col in range(feature.shape[1]):
      encoding += feature[row, col] * 3 ** (row * feature.shape[0] + col)

  return encoding

In [0]:
def drop_piece(state: GameState, column: int, x_move: bool) -> GameState:
  drop_row = state.shape[0] - 1 # In case entire column is empty.

  for row in range(state.shape[0]):
    if state[row][column] != VALUE_BLANK:
      drop_row = row - 1
      break

  new_state = state.copy()
  new_state[drop_row][column] = VALUE_X if x_move else VALUE_O
  
  return new_state

In [0]:
def encode_game_state(state: GameState) -> EncodedFeatureVector:
  encoding = {i: {-1: {-1: []}} for i in range(1, 6)}
  
  for feature in active_features(state):
    encoded_feature = encode_feature(feature[3])
    if not feature[0] in encoding: # Generate local feature size dictionary.
      encoding[feature[0]] = dict()
    if not feature[1] in encoding[feature[0]]: # Generate row dictionary.
      encoding[feature[0]][feature[1]] = dict()
    if not feature[2] in encoding[feature[0]][feature[1]]: # Generate column list.
      encoding[feature[0]][feature[1]][feature[2]] = []
    encoding[feature[0]][feature[1]][feature[2]].append(encoded_feature)
  
  return encoding

In [0]:
weight_vector: WeightVector = {i: {-1: {-1: dict()}} for i in range(1, 6)} # We can already create the row/column keys associated with the location-independent weight vector.

In [0]:
class Agent():

  x_player: bool # X player will try to maximize reward, O player will try to minimize it.
  _prev_encoded_feature_vector: EncodedFeatureVector
  _prev_state_value: float

  def __init__(self, x_player: bool):
    self.x_player = x_player
    self.reset_game_state()
  
  def reset_game_state(self):
    state = empty_game_state()

    self._prev_encoded_feature_vector = encode_game_state(state)
    self._prev_state_value = self.evaluate(state)
  
  def evaluate(self, state: GameState) -> float:
    if eval_state(state, check_game_over=True):
      return eval_state(state, check_game_over=False)

    features: List[Tuple[int, int, int, LocalFeature]] = active_features(state)

    accumulator: float = 0
    
    for feature in features:
      # TODO: Implement mirroring / player reversal.
      encoded_feature = encode_feature(feature[3])
      if not feature[0] in weight_vector:
        weight_vector[feature[0]] = dict()
      if not feature[1] in weight_vector[feature[0]]:
        weight_vector[feature[0]][feature[1]] = dict()
      if not feature[2] in weight_vector[feature[0]][feature[1]]:
        weight_vector[feature[0]][feature[1]][feature[2]] = dict()
      if not encoded_feature in weight_vector[feature[0]][feature[1]][feature[2]]:
        weight_vector[feature[0]][feature[1]][feature[2]][encoded_feature] = eval_state(state)

      accumulator += weight_vector[feature[0]][feature[1]][feature[2]][encoded_feature]

    return 1 / (1 + math.exp(-accumulator)) # Sigmoid to squash to [0.0, 1.0].

  def td_update(self, new_state: GameState, learning_rate: float=0.03) -> float:
    """
    This function could be sped up a lot by cacheing state value, among other
    things.
    """
    new_state_value = self.evaluate(new_state)

    feature_count: Dict[int, Dict[int, Dict[int, Counter[EncodedFeature]]]] = {i: {-1: {-1: None}} for i in range(1, 6)}

    signal_power = 0 # Used for normalization to make learning rate invariant under feature variation.
    for local_size in self._prev_encoded_feature_vector:
      for row in self._prev_encoded_feature_vector[local_size]:
        for col in self._prev_encoded_feature_vector[local_size][row]:
          if not row in feature_count[local_size]:
            feature_count[local_size][row] = dict()
          feature_count[local_size][row][col] = Counter(self._prev_encoded_feature_vector[local_size][row][col])
          signal_power += sum([feature_count[local_size][row][col][encoded_feature] ** 2 for encoded_feature in feature_count[local_size][row][col]])
    
    for local_size in feature_count:
      for row in feature_count[local_size]:
        if not row in weight_vector[local_size]:
          weight_vector[local_size][row] = dict()
        for col in feature_count[local_size][row]:
          if not col in weight_vector[local_size][row]:
            weight_vector[local_size][row][col] = dict()
          for encoded_feature in feature_count[local_size][row][col]:
            if not encoded_feature in weight_vector[local_size][row][col]:
              weight_vector[local_size][row][col][encoded_feature] = 0
            weight_vector[local_size][row][col][encoded_feature] += learning_rate * feature_count[local_size][row][col][encoded_feature] / signal_power * (new_state_value - self._prev_state_value)

    # Store values needed for next update.
    self._prev_encoded_feature_vector = encode_game_state(new_state)
    self._prev_state_value = new_state_value

    return new_state_value
  
  def best_move(self, state: GameState, think_time: float=0.0, epsilon: float=0.15) -> GameState:
    def rollout(initial_state: GameState, time_allowance: float):
      initial_time = time()

      x_player = Agent(True)
      o_player = Agent(False)

      # TODO: Transfer self._prev... stuff to agents created above.

      rollout_state = initial_state

      rollouts = 0
      while time() - initial_time < time_allowance:
        rollout_state = initial_state
        while True:
          rollout_state = x_player.best_move(rollout_state, think_time=0.0)
          if eval_state(rollout_state, check_game_over=True):
            rollouts += 1
            break
          rollout_state = o_player.best_move(rollout_state, think_time=0.0)
          if eval_state(rollout_state, check_game_over=True):
            rollouts += 1
            break
    
    rollout(initial_state=state, time_allowance=think_time)

    # Exploitation-Exploration selection.
    if choices([True, False], weights=[epsilon, 1.0-epsilon])[0]:
      # Explore.
      action_column = choice([i for
                              i in
                              range(state.shape[1]) if
                              state[0, i] == VALUE_BLANK])
      new_state = drop_piece(state, action_column, self.x_player)
      self.td_update(new_state)
      return new_state
      
    # Exploit.
    optimum_value = 0.0 if self.x_player else 1.0 # Initialize with the worst possible value.
    optimum_states: List[GameState] = []
    for column in range(state.shape[1]):
      if state[0, column] == VALUE_BLANK: # Column is not full of pieces.
        new_state = drop_piece(state, column, self.x_player)

        state_value = self.td_update(new_state)
        
        if (self.x_player and state_value >= optimum_value) or (not self.x_player and state_value <= optimum_value):
          if optimum_value != state_value: # New optimum state found.
            optimum_value = state_value
            optimum_states = []
          optimum_states.append(new_state)
    
    new_state = choice(optimum_states)
    self.td_update(new_state)

    return new_state

In [0]:
#@title Reset knowledge

weight_vector: WeightVector = {i: {-1: {-1: dict()}} for i in range(1, 6)}

In [0]:
def optimal_move(state: GameState, x_move: bool) -> GameState:
  scores = requests.get(f"http://kevinalbs.com/connect4/back-end/index.php/getMoves?board_data={''.join(['0'] * 7 + [''.join([str(col) for col in row]) for row in state])}&player={'1' if x_move else '2'}").json()
  return int(max(scores, key=scores.get))

In [119]:
#@title Training by Self-Play

x_player = Agent(True)
o_player = Agent(False)

games =   500#@param {type: "number"}
think_time = 0#@param {type: "number"}
show_games = True #@param {type: "boolean"}

start_time = time()
for game in range(games):
  game_state = empty_game_state()
  if show_games:
    pprint(game_state)
  while True:
    if show_games:
      print()
    game_state = x_player.best_move(game_state, think_time=think_time, epsilon=0.2)
    # game_state = drop_piece(game_state, optimal_move(game_state, True), True)
    if show_games:
      pprint(game_state)
    if eval_state(game_state, check_game_over=True):
      print(f"Game {game+1:,} of {games:,} ({(game+1)/games:.3%}): X wins!")
      break
    if show_games:
      print()
    game_state = o_player.best_move(game_state, think_time=think_time, epsilon=0.2)
    # game_state = drop_piece(game_state, optimal_move(game_state, False), False)
    if show_games:
      pprint(game_state)
    if np.all(game_state != VALUE_BLANK):
      print(f"Game {game+1:,} of {games:,} ({(game+1)/games:.3%}): Draw!")
      break
    if eval_state(game_state, check_game_over=True):
      print(f"Game {game+1:,} of {games:,} ({(game+1)/games:.3%}): O wins!")
      break

time_elapsed = time() - start_time

print(f"Played {games:,} games in {time_elapsed:,.2f} seconds.")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
2|    X   O X  |
3|X   O   X O  |
4|X   X   O X  |
5|O   X   O X O|
6|O O X   X O X|
 -- - - - - - --
  A B C D E F G 

 __ _ _ _ _ _ __
1|    O   O X  |
2|O   X   O X  |
3|X   O   X O  |
4|X   X   O X  |
5|O   X   O X O|
6|O O X   X O X|
 -- - - - - - --
  A B C D E F G 

 __ _ _ _ _ _ __
1|X   O   O X  |
2|O   X   O X  |
3|X   O   X O  |
4|X   X   O X  |
5|O   X   O X O|
6|O O X   X O X|
 -- - - - - - --
  A B C D E F G 

 __ _ _ _ _ _ __
1|X   O   O X  |
2|O   X   O X  |
3|X   O   X O  |
4|X   X   O X O|
5|O   X   O X O|
6|O O X   X O X|
 -- - - - - - --
  A B C D E F G 

 __ _ _ _ _ _ __
1|X   O   O X  |
2|O   X   O X  |
3|X   O   X O X|
4|X   X   O X O|
5|O   X   O X O|
6|O O X   X O X|
 -- - - - - - --
  A B C D E F G 

 __ _ _ _ _ _ __
1|X   O   O X  |
2|O   X   O X O|
3|X   O   X O X|
4|X   X   O X O|
5|O   X   O X O|
6|O O X   X O X|
 -- - - - - - --
  A B C D E F G 

 __ _ _ _ _ _ __
1|X   O   O X X|
2|O   X   O

In [121]:
#@title Play as O

# Note: Control flow completely does not work for player to play as X yet. Need
# to totally rethink this.

column = "C" #@param ["A", "B", "C", "D", "E", "F", "G"]
think_time =  20#@param {type: "number"}

col_i = ord(column) - ord("A")

try:
  turn = turn
except:
  turn = 1
  playing_as_x = False

print(f"Turn {turn}")

if turn == 1:
  game_state = empty_game_state()

  if playing_as_x:
    game_state = drop_piece(game_state, col_i, x_move=playing_as_x)
  else:
    game_state = x_player.best_move(game_state, think_time=think_time, epsilon=0.01)

  print(f"Win Probability: {x_player.evaluate(game_state):.2%}")
  pprint(game_state)

  turn += 1
else:
  if playing_as_x:
    game_state = o_player.best_move(game_state, think_time=think_time, epsilon=0.01)
  else:
    game_state = drop_piece(game_state, col_i, x_move=playing_as_x)

  if eval_state(game_state, check_game_over=True):
    pprint(game_state)
    turn = 1
    print("O wins!")
  else:
    # Draw. This can only happen after O's turn.
    if np.all(game_state != VALUE_BLANK):
      turn = 1
      print("Draw!")
    else:
      if playing_as_x:
        game_state = drop_piece(game_state, col_i, x_move=playing_as_x)
      else:
        game_state = x_player.best_move(game_state, think_time=think_time, epsilon=0.0)

      print(f"Win Probability: {x_player.evaluate(game_state):.2%}")
      pprint(game_state)

      if eval_state(game_state, check_game_over=True):
        turn = 1
        print("X wins!")
      else:
        turn += 1

Turn 2
Win Probability: 22.70%
 __ _ _ _ _ _ __
1|             |
2|             |
3|             |
4|    X        |
5|    O        |
6|    X        |
 -- - - - - - --
  A B C D E F G 


In [0]:
save_state = game_state

In [0]:
game_state = save_state

In [0]:
pprint(game_state)

In [0]:
weight_vector

# Algorithm

- [X] Start with a game state $s_t$.
- [X] For the afterstate of each possible action:
  - [X] Enumerate the local shape features.
  - [X] Look up each local shape feature in the weights table.
    - [X] Implement location dependence.
  - [X] Sum all the weights associated with each feature up.
  - [X] Apply the logistic sigmoid function to this linear sum. The result of this calculation is the associated action's afterstate value.
    - $V(s) = \sigma(\phi(s) \cdot \theta^{LI} + \phi(s) \cdot \theta^{LD})$
- [X] Select the next action which results in the greatest afterstate value $V(s_{t+1})$.
  - [X] Other player tries to minimize $V(s_{t+1})$ instead of maximize it.
  - [X] $\epsilon$-greedy selection
- [X] Save what you need such that you can do the following update after your opponent makes a move and creates the new state $s_{t+2}$:
  - [X] $\Delta\theta^{LD} = \Delta\theta^{LI} = \alpha \frac{\phi(s_t)}{||\phi(s_t)||^2}(V(s_{t+2})-V(s_t))$
  - [X] Save $\phi(s_t)$ and $V(s_t)$, as well as a reference to $\theta_t$ so that it can be updated by $\Delta\theta$.
