# Knightworld Version 2

Exploring and understanding dynamic programming concepts in a simple grid world environment.

Based on the 4x4 Grid World code from Nimish Sanghi's book titled "Deep Reinforcement Learning with Python", where moves are single steps in up, right, down or left direction, and end goal is only top left or bottom right corner.

We modify the environment to a larger 8x8 chess board with moves changed to a chess knight's "L"-shaped move and set the end goal to a particular square (compared to setting it as any of the 4 corners in Knightworld-Version1).

In [1]:
import numpy as np
import sys
from gym.envs.toy_text import discrete
from contextlib import closing
from io import StringIO

In [2]:
# Terminal state can be set to any square with number 0-63
TARGET = 12

In [3]:
class KnightworldEnv(discrete.DiscreteEnv):
    """
    An 8x8 Chess Board environment with a single Knight piece.
    The goal is to reach a specific square on the board in the fewest number of steps. 
    
    The terminal state is TARGET (defined in the previous cell).
    
    Actions are the allowed Knight moves in a chess game, i.e. "L"-shaped.
    8 possible actions numbered 0 to 7, in clockwise fashion, with 0 referring to 2 squares to the left and 1 square up.
    
    Actions going off the edge leave the Knight piece in current state (but still incur a reward of -1).
    
    Reward of -1 at each step until the Knight piece reaches a corner square 
    to incentivise the programme to look for the fewest number of steps.
    """
    
    metadata = {'render.modes': ['human', 'ansi']}
    
    def __init__(self):
        self.shape = (8, 8)
        self.nS = np.prod(self.shape)
        self.nA = 8
        self.terminal_state = TARGET
        
        P = {}
        for s in range(self.nS):
            position = np.unravel_index(s, self.shape)
            P[s] = {a: [] for a in range(self.nA)}
            P[s][0] = self._transition_prob(position, [-1,-2])
            P[s][1] = self._transition_prob(position, [-2,-1])
            P[s][2] = self._transition_prob(position, [-2,1])
            P[s][3] = self._transition_prob(position, [-1,2])
            P[s][4] = self._transition_prob(position, [1,2])
            P[s][5] = self._transition_prob(position, [2,1])
            P[s][6] = self._transition_prob(position, [2,-1])
            P[s][7] = self._transition_prob(position, [1,-2])
            
        # Initial state distribution is uniform
        isd = np.ones(self.nS) / self.nS
        
        # Expose model of environment for dynamic programming
        # Do not use for model-free learning algorithm
        self.P = P
        super(KnightworldEnv, self).__init__(self.nS, self.nA, P, isd)

    def _transition_prob(self, current, delta):
        """
        Model Transitions. Prob is always 1.0.
        :param current: Current position on the grid as (row, col)
        :param delta: Change in position for the transition
        :return: [(1.0, new_state, reward, done)]
        """
        # If stuck in terminal state
        current_state = np.ravel_multi_index(tuple(current), self.shape)
        if current_state == self.terminal_state:
            return [(1.0, current_state, 0, True)]

        temp_position = np.array(current) + np.array(delta)
        if temp_position[0] < 0 or temp_position[0] > self.shape[0] - 1\
        or temp_position[1] < 0 or temp_position[1] > self.shape[1] - 1:
            new_position = current
            
        else:
            new_position = temp_position
        new_state = np.ravel_multi_index(tuple(new_position), self.shape)

        is_done = new_state == self.terminal_state
        
        return [(1.0, new_state, -1, is_done)]

    def render(self, mode="human"):
        outfile = StringIO() if mode == 'ansi' else sys.stdout

        for s in range(self.nS):
            position = np.unravel_index(s, self.shape)
            if self.s == s:
                output = " x "
            elif s == self.terminal_state:
                output = " T "
            else:
                output = " o "

            if position[1] == 0:
                output = output.lstrip()
            if position[1] == self.shape[1] - 1:
                output = output.rstrip()
                output += '\n'

            outfile.write(output)

        outfile.write('\n')

        # No need to return anything for human

        if mode != 'human':
            with closing(outfile):
                return outfile.getvalue()

In [4]:
# Visualizing the moves in the environment
kw_env = KnightworldEnv()
obs = kw_env.reset()
print("Initial Knight position on chess board:")
kw_env.render()

for j in range(10):
    # End if already on TARGET square
    if obs == TARGET:
        break
    action = np.random.choice(kw_env.nA)
    kw_env.step(action)
    print(f"Taking step {j+1} in direction {action}:")
    kw_env.render()

Initial Knight position on chess board:
o  o  o  o  o  o  o  o
o  o  x  o  T  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o

Taking step 1 in direction 2:
o  o  o  o  o  o  o  o
o  o  x  o  T  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o

Taking step 2 in direction 6:
o  o  o  o  o  o  o  o
o  o  o  o  T  o  o  o
o  o  o  o  o  o  o  o
o  x  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o

Taking step 3 in direction 3:
o  o  o  o  o  o  o  o
o  o  o  o  T  o  o  o
o  o  o  x  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o

Taking step 4 in direction 7:
o  o  o  o  o  o  o  o
o  o  o  o  T  o  o  o
o  o  o  o  o  o  o  o
o  x  o  o  o  o  o  o
o  o  o 

Notice how the Knight (x) did not move in some of the steps when the action would have made it land outside the chess board.

In [5]:
def policy_eval(policy, env, discount_factor=1.0, theta=0.00001):
    """
    Evaluate a policy given an environment and a full description of the environment's dynamics.
    Args:
        policy: [S, A] shaped matrix representing the policy. Random in our case
        env: OpenAI env. env.P -> transition dynamics of the environment.
            env.P[s][a] [(prob, next_state, reward, done)]
            env.nS is number of states
            env.nA is number of actions
        theta: stop evaluation once value function change is less than theta for all states
        discount_factor: Gamma discount factor
    Returns: Vector of length env.nS representing value function
    """
    # Start with a (all 0) value function
    V = np.zeros(env.nS)
    V_new = np.copy(V)
    while True:
        delta = 0
        # For each state, perform a "backup"
        for s in range(env.nS):
            v = 0
            # Look at possible next actions
            for a, pi_a in enumerate(policy[s]):
                # For each action, look at possible next states
                for prob, next_state, reward, done in env.P[s][a]:
                    # Calculate expected value as per backup diagram
                    v += pi_a * prob * (reward + discount_factor * V[next_state])
            # How much our value function changed (across any states)
            V_new[s] = v
            delta = max(delta, np.abs(V_new[s] - V[s]))
        V = np.copy(V_new)
        # Stop if change is below threshold
        if delta < theta:
            break
    return np.array(V)

In [6]:
# Evaluate a random policy where every action has equal probability regardless of state
np.round(policy_eval(np.ones((64,8)) / 8, KnightworldEnv()).reshape((8,8)), 2)

array([[-107.12, -103.94,  -75.94, -103.96, -104.88, -103.15,  -67.42,
         -98.63],
       [ -93.54, -107.17, -105.4 , -104.79,    0.  , -102.94, -104.17,
        -106.87],
       [-107.89, -100.84,  -91.14, -101.38, -106.59, -100.59,  -86.33,
         -93.68],
       [-106.21, -105.55, -109.14,  -92.35, -103.08,  -90.81, -108.88,
        -106.39],
       [-109.52, -105.07, -107.34, -105.3 , -108.42, -104.76, -105.55,
        -100.39],
       [-110.99, -111.19, -108.69, -109.54, -105.47, -109.  , -108.06,
        -111.44],
       [-111.44, -111.41, -110.08, -111.33, -110.03, -111.31, -107.41,
        -111.19],
       [-114.64, -113.01, -112.55, -111.22, -111.01, -111.01, -112.82,
        -113.68]])

A random policy does poorly, taking over 100 steps for most squares.

In [7]:
def policy_improvement(policy, V, env, discount_factor=1.0):
    """
    Improve a policy given an environment and a full description of its dynamics and state-values V.
    Args:
        policy: [S, A] shaped matrix representing policy
        V: current state-value for given policy
        env: OpenAI env (same as policy_eval)
        discount_factor: Gamma discount factor
    Returns:
        policy: [S, A] shaped matrix representing improved policy
        policy_changed: boolean which takes value 'True' if policy was changed
    """
    def argmax_a(arr):
        """Return idxs of all max values in array"""
        max_idx = []
        max_val = float('-inf')
        for idx, elem in enumerate(arr):
            if elem == max_val:
                max_idx.append(idx)
            elif elem > max_val:
                max_idx = [idx]
                max_val = elem
        return max_idx
    
    policy_changed = False
    Q = np.zeros([env.nS, env.nA])
    new_policy = np.zeros([env.nS, env.nA])
    
    # For each state, perform 'greedy improvement'
    for s in range(env.nS):
        old_action = np.array(policy[s])
        for a in range(env.nA):
            for prob, next_state, reward, done in env.P[s][a]:
                # Calculate expected value as per backup diagram
                Q[s,a] += prob * (reward + discount_factor * V[next_state])
        # Get maximizing actions and set new policy for state s
        best_actions = argmax_a(Q[s])
        new_policy[s, best_actions] = 1.0 / len(best_actions)
    if not np.allclose(new_policy[s], policy[s]):
        policy_changed = True
    return new_policy, policy_changed

In [8]:
# Policy Iteration
def policy_iteration(env, discount_factor=1.0, theta=0.00001):
    # Initialize a random policy
    policy = np.ones([env.nS, env.nA]) / env.nA
    while True:
        V = policy_eval(policy, env, discount_factor, theta)
        policy, changed = policy_improvement(policy, V, env, discount_factor)
        if not changed:
            V_optimal = policy_eval(policy, env, discount_factor, theta)
            print("Optimal Policy\n", np.round(policy,3))
            return np.array(V_optimal)

In [9]:
V = policy_iteration(KnightworldEnv())
print("\nOptimal V:\n", V.reshape((8,8)))

Optimal Policy
 [[0.    0.    0.    0.    0.5   0.5   0.    0.   ]
 [0.    0.    0.    0.    0.    1.    0.    0.   ]
 [0.    0.    0.    0.    1.    0.    0.    0.   ]
 [0.    0.    0.    0.    0.    0.    1.    0.   ]
 [0.    0.    0.    0.    0.25  0.25  0.25  0.25 ]
 [0.    0.    0.    0.    0.    1.    0.    0.   ]
 [0.    0.    0.    0.    0.    0.    0.    1.   ]
 [0.    0.    0.    0.    0.    0.    1.    0.   ]
 [0.    0.    0.    0.5   0.5   0.    0.    0.   ]
 [0.    0.    0.    0.333 0.333 0.    0.333 0.   ]
 [0.    0.    0.    0.    0.    1.    0.    0.   ]
 [0.2   0.    0.    0.2   0.2   0.2   0.    0.2  ]
 [0.125 0.125 0.125 0.125 0.125 0.125 0.125 0.125]
 [0.2   0.    0.    0.2   0.2   0.    0.2   0.2  ]
 [0.    0.    0.    0.    0.    0.    1.    0.   ]
 [0.5   0.    0.    0.    0.    0.    0.    0.5  ]
 [0.    0.    0.333 0.333 0.    0.333 0.    0.   ]
 [0.    0.    0.5   0.    0.5   0.    0.    0.   ]
 [0.    0.    0.    1.    0.    0.    0.    0.   ]
 [0.    0.5   0

The Knight can reach the target square within 4 moves regardless of where it starts from.

In [10]:
# Value Iteration
def value_iteration(env, discount_factor=1.0, theta=0.00001):
    """
    Carry out Value iteration given an environment and the full description of its dynamics
    Returns:
        policy: [S,A] shaped matrix representing optimal policy
        value: [S] length vector representing optimal value
    """
    def argmax_a(arr):
        """Return idxs of max element in array"""
        max_idx = []
        max_val = float('-inf')
        for idx, elem in enumerate(arr):
            if elem == max_val:
                max_idx.append(idx)
            elif elem > max_val:
                max_idx = [idx]
                max_val = elem
        return max_idx
    
    optimal_policy = np.zeros([env.nS, env.nA])
    V = np.zeros(env.nS)
    V_new = np.copy(V)
    while True:
        delta = 0
        # For each state, perform a "greedy backup"
        for s in range(env.nS):
            q = np.zeros(env.nA)
            # Look at possible next actions
            for a in range(env.nA):
                # For each action, look at possible next states to calculate q[s,a]
                for prob, next_state, reward, done in env.P[s][a]:
                    # Calculate value for each action as per backup diagram
                    if not done:
                        q[a] += prob * (reward + discount_factor * V[next_state])
                    else:
                        q[a] += prob * reward
            # Find max value over all possible actions and store updated state value
            V_new[s] = q.max()
            # How much our value function changed (across any states)
            delta = max(delta, np.abs(V_new[s] - V[s]))
        V = np.copy(V_new)
        # Stop if change is below threshold
        if delta < theta:
            break
    # V(s) has optimal values. Use these values and one step backup to calculate optimal policy
    for s in range(env.nS):
        q = np.zeros(env.nA)
        # Look at possible next actions
        for a in range(env.nA):
            # For each action, look at possible next states and calculate q[s,a]
            for prob, next_state, reward, done in env.P[s][a]:
                # Calculate the value for each action as per backup diagram
                if not done:
                    q[a] += prob * (reward + discount_factor * V[next_state])
                else:
                    q[a] += prob * reward
                    
        # Find the optimal actions
        best_actions = argmax_a(q)
        optimal_policy[s, best_actions] = 1.0 / len(best_actions)
    return optimal_policy, V

In [11]:
policy, V = value_iteration(KnightworldEnv())
print("Optimal Policy:\n", np.round(policy, 3))
print("\nOptimal V:\n", V.reshape((8,8)))

Optimal Policy:
 [[0.    0.    0.    0.    0.5   0.5   0.    0.   ]
 [0.    0.    0.    0.    0.    1.    0.    0.   ]
 [0.    0.    0.    0.    1.    0.    0.    0.   ]
 [0.    0.    0.    0.    0.    0.    1.    0.   ]
 [0.    0.    0.    0.    0.25  0.25  0.25  0.25 ]
 [0.    0.    0.    0.    0.    1.    0.    0.   ]
 [0.    0.    0.    0.    0.    0.    0.    1.   ]
 [0.    0.    0.    0.    0.    0.    1.    0.   ]
 [0.    0.    0.    0.5   0.5   0.    0.    0.   ]
 [0.    0.    0.    0.333 0.333 0.    0.333 0.   ]
 [0.    0.    0.    0.    0.    1.    0.    0.   ]
 [0.2   0.    0.    0.2   0.2   0.2   0.    0.2  ]
 [0.125 0.125 0.125 0.125 0.125 0.125 0.125 0.125]
 [0.2   0.    0.    0.2   0.2   0.    0.2   0.2  ]
 [0.    0.    0.    0.    0.    0.    1.    0.   ]
 [0.5   0.    0.    0.    0.    0.    0.    0.5  ]
 [0.    0.    0.333 0.333 0.    0.333 0.    0.   ]
 [0.    0.    0.5   0.    0.5   0.    0.    0.   ]
 [0.    0.    0.    1.    0.    0.    0.    0.   ]
 [0.    0.5   

Exact same results achieved with value iteration compared to policy iteration.

In [12]:
# Visualize optimal moves
kw_env = KnightworldEnv()

# Try 5 different initial positions
for i in range(5):
    print(f" Session {i+1} ".center(40, "="))
    obs = kw_env.reset()
    print("Initial Knight position on chess board:")
    kw_env.render()

    # Move up to 4 actions
    for j in range(4):
        # End if already in a corner square
        if obs == TARGET:
            break
            
        action = np.argmax(policy[obs])
        obs, reward, done, info = kw_env.step(action)
        print(f"Taking step {j+1} in direction {action}:")
        kw_env.render()

Initial Knight position on chess board:
o  o  o  o  o  o  o  o
o  o  o  o  T  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  x  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o

Taking step 1 in direction 0:
o  o  o  o  o  o  o  o
o  o  o  o  T  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  x  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o

Taking step 2 in direction 0:
o  o  o  o  o  o  o  o
o  o  o  o  T  o  o  o
o  o  x  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o

Taking step 3 in direction 3:
o  o  o  o  o  o  o  o
o  o  o  o  x  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  o

Initial Knight position on chess board:
o  o  o  o  o  o  o  o
o  o  o  o  T  o  o  o
o  o  o  o  o  o  o  o
o  o  o  o  o  o  o  

## Conclusion
With well-written functions, it is easy to modify the game settings, like changing the target square from any of the 4 corners to just one particular square on the chess board (which could be where the opponent king is in an actual chess game).