# **Knots And Crosses**

Github: https://github.com/StrikerXYZ/Knots-And-Crosses




Environment:

In [10]:
import gym
import numpy as np
import random

class KnotsNCrosses(gym.Env):

  def __init__(self):
    self.done = False;
    self.observation = np.full((9), [' '],dtype=str);
    self.transitionTable = {}
    self.transitionTable[tuple(self.observation.copy())] = self.getDataForState(self.observation)
    self.valueTable = {}
    self.valueTable[tuple(self.observation.copy())] = [0.0, 0.0]
    self.policyTable = {}
    self.policyTable[tuple(self.observation.copy())] = [0.0, 0.0]
    self.playerTurn = random.choice([0,1])
    self.performValueIter = False

  def reset(self, isValueIter):
    self.done = False;
    self.observation = np.full((9), [' '],dtype=str);
    self.playerTurn = random.choice([0,1])
    self.performValueIter = isValueIter
    return self.observation;

  def getDataForState(self, observation):
    if not tuple(observation) in self.transitionTable:
      self.transitionTable[tuple(observation.copy())] = self.makeDataForAllActions(observation)
    return self.transitionTable[tuple(observation)]

  def makeDataForAllActions(self, observation):
    actions = self.getPossibleActions(observation)
    values = {}
    for i, action in enumerate(actions):
      values[action] = [self.getStateAfterAction(observation, action, 0), self.getStateAfterAction(observation, action, 1)]
    return values

  def getPossibleActions(self, observation):
    actions = []
    for i in range(9):
      if (observation[i] == ' '):
        actions.append(i)
    return actions

  def getStateAfterAction(self, observation, action, playerIndex):
    if (observation[action] == ' '):
      obs = observation.copy()
      obs[action] = self.playerMark(playerIndex);
      return obs
    return observation

  def playerMark(self, playerIndex):
    if (playerIndex == 0):
      return 'X'
    else:
      return 'O'

  def opponentMark(self, playerIndex):
    return self.playerMark(self.getNextPlayerIndex(playerindex))

  def getRewardForState(self, observation, playerIndex):
    if self.checkHorizontal(observation, playerIndex) or \
    self.checkVertical(observation, playerIndex) or \
    self.checkDiagonal(observation, playerIndex):
      return 2.0;

    opponentIndex = self.getNextPlayerIndex(playerIndex)
    if self.checkHorizontal(observation, opponentIndex) or \
    self.checkVertical(observation, opponentIndex) or \
    self.checkDiagonal(observation, opponentIndex):
      return -2.0;
    
    if len(self.getPossibleActions(observation)) == 0:
      return 1.0
    
    return 0.0;

  def checkHorizontal(self, observation, playerIndex):
    mark = self.playerMark(playerIndex)
    for i in range(3):
      count = 0
      for j in range(3):
        if observation[i*3 + j] == mark:
          count += 1
      if count == 3:
        return True
    return False
  
  def checkVertical(self, observation, playerIndex):
    mark = self.playerMark(playerIndex)
    for i in range(3):
      count = 0
      for j in range(3):
        if observation[j*3 + i] == mark:
          count += 1
      if count == 3:
        return True
    return False

  def checkDiagonal(self, observation, playerIndex):
    mark = self.playerMark(playerIndex)
    
    if observation[0] == mark and \
    observation[4] == mark and \
    observation[8] == mark:
      return True

    if observation[2] == mark and \
    observation[4] == mark and \
    observation[6] == mark:
      return True

    return False

  def policyIteration(self, playerIndex):
    isStable = False
    while (not isStable):
      self.policyEvaluation(playerIndex)
      isStable = self.policyImprovement(playerIndex)
    isStable = False

  def policyEvaluation(self, playerIndex):
    theta = 0.001
    gamma = 0.5
    while (True):
      diff = 0.0
      for s in self.transitionTable:
        value = self.getValueForObservation(np.asarray(s), playerIndex)
        newValue = self.getSumActionValueForObservation(np.asarray(s), playerIndex) + gamma * value
        self.valueTable[s][playerIndex] = newValue
        diff = max(diff, abs(newValue - value))
      if (diff < theta):
        break
      
  def policyImprovement(self, playerIndex):
    isStable = True
    for s in self.transitionTable:
      policy = self.getPolicyForObservation(np.asarray(s), playerIndex)
      newpolicy = self.getBestPolicyForObservation(np.asarray(s), playerIndex)
      self.policyTable[s][playerIndex] = newpolicy
      if (newpolicy != policy):
        isStable = False
    return isStable

  def valueIteration(self, playerIndex):
    gamma = 0.5
    for s in self.transitionTable:
      value = self.getValueForObservation(np.asarray(s), playerIndex)
      newValue = self.getSumActionValueForObservation(np.asarray(s), playerIndex) + gamma * value
      self.valueTable[s][playerIndex] = newValue
      policy = self.getPolicyForObservation(np.asarray(s), playerIndex)
      bestPolicy = self.getBestPolicyForObservation(np.asarray(s), playerIndex)
      self.policyTable[s][playerIndex] = bestPolicy

  def get_action_value(self, playerIndex):

    # Initialise Q 
    Q = 0  
    for s in mdp.get_all_states():
      # Compute Q using the equation above
      Q = Q + mdp.get_transition_prob(state, action, s)*(mdp.get_reward(state, action, s) + \
                                                               gamma*state_values[s]) 
    return Q

  def getValueForObservation(self, observation, playerIndex):
    if not tuple(observation) in self.valueTable:
      self.valueTable[tuple(observation)] = [self.getRewardForState(observation, 0), self.getRewardForState(observation, 1)]
    return self.valueTable[tuple(observation)][playerIndex]

  def getSumActionValueForObservation(self, observation, playerIndex):
    actions = self.transitionTable[tuple(observation)].copy()
    sumValue = 0
    for s in actions.values():
      value = self.getValueForObservation(s[playerIndex], playerIndex)
      sumValue += value
    return sumValue
    
  def getPolicyForObservation(self, observation, playerIndex):
    if not tuple(observation) in self.policyTable:
      firstAction = list(self.makeDataForAllActions(observation))[0]
      self.policyTable[tuple(observation)] = [firstAction, firstAction]
    return self.policyTable[tuple(observation)][playerIndex]

  def getBestPolicyForObservation(self, observation, playerIndex):
    actions = self.transitionTable[tuple(observation)]
    maxValue = None
    for a in actions:
      s = actions[a][playerIndex]
      v = self.getValueForObservation(s, playerIndex)
      if (not maxValue) or (maxValue < v):
        maxValue = v
        policy = a
    return policy

  def getNextPlayerIndex(self, playerIndex):
    return (playerIndex + 1) % 2

  def getRandomPolicy(self, observation):
    return np.random.choice(self.getPossibleActions(observation))

  def evaluateState(self, observation, playerIndex):
    if not self.done and not tuple(observation) in self.transitionTable:
      self.transitionTable[tuple(observation.copy())] = self.getDataForState(observation)

  def step(self, epsilon):
    if self.done:
      return

    if self.performValueIter:
      self.valueIteration(self.playerTurn)
    else:
      self.policyIteration(self.playerTurn)
    
    explore = np.random.uniform(0,1) < epsilon
    if explore:
      action = self.getRandomPolicy(self.observation)
    else:
      action = self.getPolicyForObservation(self.observation, self.playerTurn)

    self.observation = self.getStateAfterAction(self.observation, action, self.playerTurn)
    (done, winner) = self.isDone(self.observation)
    (rewardP1, rewardP2) = (0,0)

    if (done):
      rewardP1 = self.getRewardForState(self.observation, 0)
      rewardP2 = self.getRewardForState(self.observation, 1)
    else:
      self.evaluateState(self.observation, self.playerTurn)
      self.playerTurn = self.getNextPlayerIndex(self.playerTurn)

    self.done = done

    return done, winner, rewardP1, rewardP2;

  def isDone(self, observation):
    if self.getRewardForState(observation, 0) == 2:
      return (True, 1)
    if self.getRewardForState(observation, 1) == 2:
      return (True, 2)
    if len(self.getPossibleActions(observation)) == 0:
      return (True, 0)
    return (False, 0)

  def render(self):
        print(f'   |   |   ');
        print(f' {self.observation[0]} | {self.observation[1]} | {self.observation[2]} ');
        print(f'___|___|___');
        print(f'   |   |   ');
        print(f' {self.observation[3]} | {self.observation[4]} | {self.observation[5]} ');
        print(f'___|___|___');
        print(f'   |   |   ');
        print(f' {self.observation[6]} | {self.observation[7]} | {self.observation[8]} ');
        print(f'   |   |   \n');

def experiment(environment, numEpisodes, numIterations, isValueIteration):
  epsilon = 0.5
  decay = 0.99
  minEpsilon = 0.1

  (player1, player2, draws, player1Reward, player2Reward) = (0, 0, 0, 0, 0)
  for i in range(numEpisodes):
    print('Episode:', i)
    (done, winner, rewardP1, rewardP2) = runEpisode(environment, numIterations, epsilon, isValueIteration)
    
    if winner == 0: 
      draws += 1
    if winner == 1: 
      player1 += 1 
    if winner == 2: 
      player2 += 1
    
    player1Reward += rewardP1
    player2Reward += rewardP2

    epsilon = max(minEpsilon, epsilon * decay)
  print("Player1: ", player1, "Player2:", player2, "Wins:", player1 + player2, " Draws:", draws)
  print("Player1 Rewards: ", player1Reward, " Player2 Rewards:", player2Reward)
  print("Player1 Average: ", player1Reward/numIterations, " Player2 Average:", player2Reward/numIterations)

def runEpisode(environment, numIterations, epsilon, isValueIteration):
  environment.reset(isValueIteration)
  for _ in range(numIterations):
    (done, winner, rewardP1, rewardP2) = environment.step(epsilon)
    if done:
      environment.render()
      if winner != 0:
        print("winner: player", winner, '\n')
      else:
        print("draw\n")
      print("rewards:", rewardP1, rewardP2, '\n')
      return done, winner, rewardP1, rewardP2;

env = KnotsNCrosses()

Value Stats:

In [11]:
def displayForState(s):
  d = str(s[0] + s[1] + s[2] + '\n' \
          + s[3] + s[4] + s[5] + '\n' \
          + s[6] + s[7] + s[8])
  d = d.replace(' ', '.')
  return d

def displayValues(env):
  for s in env.valueTable.keys():
    player1value = env.valueTable[s][0]
    player2value = env.valueTable[s][1]

    player1policy, player2policy = -1, -1
    if (s in env.policyTable):
      player1policy= env.policyTable[s][0]
      player2policy= env.policyTable[s][1]

    #function to reduce bloat
    if player1value != 0 and player2value != 0 and player1policy != -1:
      print("\n")
      print(displayForState(s))
      print("player 1 value: ", player1value, "policy: ", player1policy)
      print("player 2 value: ", player2value, "policy: ", player2policy)



#Policy Iteration

In [12]:
#policy iteration
experiment(env, 100, 10, False)

Episode: 0
   |   |   
 X |   | O 
___|___|___
   |   |   
   |   | O 
___|___|___
   |   |   
 X |   | O 
   |   |   

winner: player 2 

rewards: -2.0 2.0 

Episode: 1
   |   |   
   | X |   
___|___|___
   |   |   
 O | O | O 
___|___|___
   |   |   
   | X | X 
   |   |   

winner: player 2 

rewards: -2.0 2.0 

Episode: 2
   |   |   
   |   | X 
___|___|___
   |   |   
   | O | X 
___|___|___
   |   |   
 O | O | X 
   |   |   

winner: player 1 

rewards: 2.0 -2.0 

Episode: 3
   |   |   
   | O | O 
___|___|___
   |   |   
   | O | X 
___|___|___
   |   |   
 X | O | X 
   |   |   

winner: player 2 

rewards: -2.0 2.0 

Episode: 4
   |   |   
 X | X | X 
___|___|___
   |   |   
 O | O |   
___|___|___
   |   |   
   | X | O 
   |   |   

winner: player 1 

rewards: 2.0 -2.0 

Episode: 5
   |   |   
 X |   |   
___|___|___
   |   |   
 X |   | O 
___|___|___
   |   |   
 X | O | O 
   |   |   

winner: player 1 

rewards: 2.0 -2.0 

Episode: 6
   |   |   
 O | O | O 
___|___|___

In [13]:
#policy iteration
displayValues(env)



X.O
...
...
player 1 value:  16.0 policy:  6
player 2 value:  8.0 policy:  8


..O
...
..X
player 1 value:  8.0 policy:  0
player 2 value:  8.0 policy:  6


X.O
...
..O
player 1 value:  24.0 policy:  6
player 2 value:  4.0 policy:  5


X.O
...
X..
player 1 value:  4.0 policy:  3
player 2 value:  8.0 policy:  8


X.O
...
..X
player 1 value:  4.0 policy:  4
player 2 value:  32.0 policy:  7


X.O
...
X.O
player 1 value:  12.0 policy:  5
player 2 value:  4.0 policy:  5


...
O..
..X
player 1 value:  8.0 policy:  6
player 2 value:  8.0 policy:  0


...
.O.
..X
player 1 value:  8.0 policy:  7
player 2 value:  8.0 policy:  7


...
...
.OX
player 1 value:  16.0 policy:  4
player 2 value:  16.0 policy:  1


.X.
.OO
..X
player 1 value:  8.0 policy:  7
player 2 value:  4.0 policy:  3


.X.
.OO
.XX
player 1 value:  4.0 policy:  6
player 2 value:  4.0 policy:  3


...
.O.
.OX
player 1 value:  56.0 policy:  6
player 2 value:  4.0 policy:  1


...
.O.
.XX
player 1 value:  4.0 policy:  6
player 2 va

#Value Iteration

In [14]:
#value iteration
experiment(env, 100, 10, True)

Episode: 0
   |   |   
 O | O | X 
___|___|___
   |   |   
   | O | X 
___|___|___
   |   |   
 X |   | O 
   |   |   

winner: player 2 

rewards: -2.0 2.0 

Episode: 1
   |   |   
 X | O | O 
___|___|___
   |   |   
 O | O | X 
___|___|___
   |   |   
 X | X | O 
   |   |   

draw

rewards: 1.0 1.0 

Episode: 2
   |   |   
 X |   |   
___|___|___
   |   |   
 X | O |   
___|___|___
   |   |   
 X | O | O 
   |   |   

winner: player 1 

rewards: 2.0 -2.0 

Episode: 3
   |   |   
   | O |   
___|___|___
   |   |   
   | O |   
___|___|___
   |   |   
 X | X | X 
   |   |   

winner: player 1 

rewards: 2.0 -2.0 

Episode: 4
   |   |   
 X |   | O 
___|___|___
   |   |   
 X | O | O 
___|___|___
   |   |   
   | X | O 
   |   |   

winner: player 2 

rewards: -2.0 2.0 

Episode: 5
   |   |   
 X | O | X 
___|___|___
   |   |   
 X | X | O 
___|___|___
   |   |   
 O | O | O 
   |   |   

winner: player 2 

rewards: -2.0 2.0 

Episode: 6
   |   |   
 O |   |   
___|___|___
   |   |   
 

In [15]:
#value iteration
displayValues(env)



X.O
...
...
player 1 value:  16.0 policy:  6
player 2 value:  8.0 policy:  8


..O
...
X..
player 1 value:  8.0 policy:  0
player 2 value:  8.0 policy:  1


..O
...
..X
player 1 value:  8.0 policy:  0
player 2 value:  8.0 policy:  6


X.O
...
..O
player 1 value:  32.0 policy:  6
player 2 value:  4.0 policy:  5


X.O
...
X..
player 1 value:  4.0 policy:  3
player 2 value:  8.0 policy:  8


X.O
...
..X
player 1 value:  4.0 policy:  4
player 2 value:  32.0 policy:  7


X.O
X..
..O
player 1 value:  4.0 policy:  6
player 2 value:  4.0 policy:  5


X.O
...
X.O
player 1 value:  12.0 policy:  5
player 2 value:  4.0 policy:  5


.O.
...
..X
player 1 value:  8.0 policy:  7
player 2 value:  8.0 policy:  7


...
O..
..X
player 1 value:  8.0 policy:  6
player 2 value:  8.0 policy:  0


...
.O.
..X
player 1 value:  8.0 policy:  7
player 2 value:  8.0 policy:  7


...
...
.OX
player 1 value:  24.0 policy:  2
player 2 value:  16.0 policy:  1


...
..O
.XX
player 1 value:  4.0 policy:  6
player 2 val