In [None]:
import copy

class CliffWalkingEnv:
    def  __init__(self, ncol=12, nrow=4):
        self.ncol = ncol # define the number of columns in the gridworld
        self.nrow = nrow # define the number of rows in the gridworld
        # transition matrix
        self.P = self.createP()

    def createP(self):
        P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]
        # map the action index to the corresponding change in position
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
        for i in range(self.nrow):
            for j in range(self.ncol):
                for a in range(4):
                    # drop from the cliff or reach the final goal
                    if i == self.nrow - 1 and j > 0:
                        P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0, True)]
                        continue
                    next_x = min(self.ncol - 1, max(0, j + change[a][0]))
                    next_y = min(self.nrow - 1, max(0, i + change[a][1]))
                    next_state = next_y * self.ncol + next_x
                    reward = -1
                    done = False
                    if next_y == self.row - 1 and next_x > 0:
                        done = True
                        if next_x != self.ncol - 1:
                            reward = -100
                    P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
        # the first entries of all P elements are definitive, that is one.
        return P

In [None]:
class PolicyIteration:
    def __init__(self, env, theta, gamma):
        self.env = env
        self.v = [0] * self.env.ncol * self.env.nrow
        self.pi = [[0.25] * 4 for i in range(self.env.ncol * self.env.nrow)]
        self.theta = theta
        self.gamma = gamma

    def policy_evaluation(self):
        cnt = 1
        while True:
            max_diff = 0
            new_v = [0] * self.env.ncol * self.env.nrow
            for s in range(self.env.ncol * self.env.nrow):
                qsa_list = []
                for a in range(4):
                    qsa = 0
                    for res in self.env.P[s][a]:
                        # in this scenaio, there is only one possible next state
                        # p is one.
                        p, next_state, r, done = res
                        # consider all succeeding states and their transition probabilities.
                        qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                    qsa_list.append(self.pi[s][a] * qsa)
                new_v[s] = sum(qsa_list)
                max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
            self.v = new_v
            if max_diff < self.theta: break
            cnt += 1
        print("Policy evaluation finished after %d iterations." % cnt)
    
    def policy_improvement(self):
        for s in range(self.env.ncol * self.env.nrow):
            qsa_list = []
            for a in range(4):
                qsa = 0
                for res in self.env.P[s][a]:
                    p, next_state, r, done = res
                    qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
                qsa_list.append(qsa)
            maxq = max(qsa_list)
            cntq = qsa_list.count(maxq)
            # uniformly randomly take the optimal policy.
            self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]
        print("Policy improvement finished.")
        return self.pi

    def policy_iteration(self):
        while True:
            self.policy_iteration()
            old_pi = copy.deepcopy(self.pi)
            new_pi = self.policy_improvement()
            # improve the policy until there is no possibility to improve.
            if old_pi == new_pi: break