In [6]:
# Real Time Dynamic Programming (RTDP) Algorithm

from collections import defaultdict

import numpy as np
from gym.envs.toy_text.frozen_lake import FrozenLakeEnv


MAP_20x20 = [
    'SFFFFFFFHFFFFFFHFHFF',
    'FFFFFFFFFFFHFFFFFHFF',
    'FFHFFHFHFFFFFFHFFFFH',
    'FFHFFHFFFFFFFFHFFHFF',
    'FFFHFFFFFFFFFFFFFFHF',
    'FFFFHFFFFFHFFFFHFFFH',
    'FFFFFFFHFHFFHFFFFFFF',
    'HFHFFFFFFFFFFHFFFFFF',
    'HFFFFFFFFHHFHFFHHFFF',
    'FFFFFFFFFHFHFFFFFFFF',
    'FFFFFFFFFFFFHFFFFFFH',
    'FFFFFFFHFFFFFFFFFFFH',
    'FFFFFFHFFFFFFFFFHHFF',
    'HFFHFFFHHFHFFFHHFFFF',
    'FFFFFFFFFHFHFFHHHFFF',
    'HFFFFFHFFFFFHFHFFFFF',
    'HFFFFFFFFFFFFFFFHFFH',
    'FHFFFFFFFHFFFFFFFFFF',
    'FFHFFFFFFFHFFFFHFHFF',
    'FFHFHFFFFFFFHHFFFFFG'
]


class RTDP:
    def __init__(self, env):
        self.env = env
        self.iterations = 17000
        self.gamma = 1.
        self.goal_cost = -1.
        self.hole_cost = .1
        self.default_cost = .1

        self.actions = [0, 1, 2, 3]

        # FrozenLakeEnv gives us a transition matrix, the states and actions.
        # But no costs. So let's set something up ourselves.
        self._init_cost()
        self.calc_policy()

    def calc_policy(self):
        # RTDP
        V = defaultdict(lambda: 0.)

        for i in range(self.iterations):
            if i % 500 == 0:
                print(i)
            obs = self.env.reset()
            while True:
                action = np.argmin(
                    [
                        self.cost[obs][a] + self.gamma * sum(
                            item[0] * V[item[1]]
                            for item in self.env.P[obs][a]
                        )
                        for a in self.actions
                    ]
                )
                V[obs] = self.cost[obs][action] + self.gamma * sum(
                    item[0] * V[item[1]]
                    for item in self.env.P[obs][action]
                )
                obs, reward, done, _ = self.env.step(action)
                if done:
                    cost_ = -1
                    if reward == 0.0:
                        cost_ = 1
                    V[obs] = cost_ + self.gamma * sum(
                        item[0] * V[item[1]]
                        for item in self.env.P[obs][action]
                    )
                    break

        print(' ')
        print(V)
        self.V = V

    def _init_cost(self):
        # TODO Code below kinda ugly.
        cost = defaultdict(dict)
        for state in range(self.env.observation_space.n):
            for action in self.actions:
                # why 1? its the action itself. Others are slippery outcomes.
                if len(self.env.P[state][action]) == 1:
                    if self.env.P[state][action][0][2] == 1.0:
                        cost[state][action] = self.goal_cost
                    else:
                        cost[state][action] = self.hole_cost
                    continue

                done = self.env.P[state][action][1][3]
                if done and self.env.P[state][action][1][2] == 0.0:
                    cost[state][action] = self.hole_cost
                else:
                    cost[state][action] = self.default_cost

                if done and self.env.P[state][action][1][2] == 1.0:
                    cost[state][action] = self.goal_cost

        self.cost = cost

    def policy(self, state):
        return np.argmin(
            [
                self.cost[state][a] + self.gamma * sum(
                    item[0] * self.V[item[1]]
                    for item in self.env.P[state][a]
                )
                for a in self.actions
            ]
        )


def evaluate(rtdp, env):
    rewards = 0.
    state = env.reset()
    done = False
    steps = 0
    while not done:
        state, reward, done, _ = env.step(rtdp.policy(state))
        rewards += reward
        steps += 1
        if steps > 1e4:
            break

    return rewards


def run():
    env = FrozenLakeEnv(desc=MAP_20x20)
    rtdp = RTDP(env)
    tot_rewards = 0.
    eval_iter = int(1e4)
    for i in range(eval_iter):
        if i % 100 == 0:
            print(i)
        tot_rewards += evaluate(rtdp, env)
    print(tot_rewards / eval_iter)


run()


0
500
1000
1500
2000
2500
3000
3500
4000
4500
5000
5500
6000
6500
7000
7500
8000
8500
9000
9500
10000
10500
11000
11500
12000
12500
13000
13500
14000
14500
15000
15500
16000
16500
 
defaultdict(<function RTDP.calc_policy.<locals>.<lambda> at 0x000001FC70E48860>, {0: -3269.4303487248294, 20: -3268.650864422044, 1: -3269.804213892321, 21: -3269.490035687288, 2: -3270.806927816323, 41: 28.831525618161045, 22: -3270.4363880254195, 42: 30.0, 23: -3271.7328631729642, 40: 28.871108518445215, 3: -3272.144216943732, 61: 28.726295007686165, 60: 28.653230974425426, 81: 28.404280351816663, 62: 41.0, 80: 28.29804965688937, 100: -1173.206644783156, 101: -1162.929653685433, 82: 32.411381658562966, 102: -1132.490087563446, 83: 60.0, 122: -1341.13900674379, 103: -916.2068605885343, 123: -1675.4304942021572, 104: 60.0, 143: -1815.197640881998, 124: -1888.986746578832, 142: 37.0, 163: -1823.705989979966, 144: -1963.556447629739, 164: -1971.7966494624543, 145: -2040.8407177295146, 125: -2029.1719747469415

In [14]:
# Simple Example of Real Time Dynamic Programming (RTDP) with value iteration algorithm and policy

import numpy as np
import gym

# Create FrozenLake environment
env = gym.make('FrozenLake-v1')

# Set hyperparameters
gamma = 0.99
theta = 1e-6

# Initialize value function V with zeros
V = np.zeros(env.observation_space.n)

# Value Iteration algorithm
while True:
    delta = 0
    for s in range(env.observation_space.n):
        v = V[s]
        # Calculate value for each action in the current state
        q_values = np.zeros(env.action_space.n)
        for a in range(env.action_space.n):
            for p, s_next, r, done in env.P[s][a]:
                q_values[a] += p * (r + gamma * V[s_next])
        # Update value function for the current state
        V[s] = np.max(q_values)
        delta = max(delta, abs(v - V[s]))
    if delta < theta:
        break

# Print optimal value function
print("Optimal value function:")
print(V)

# Print optimal policy
policy = np.zeros(env.observation_space.n, dtype=int)
for s in range(env.observation_space.n):
    q_values = np.zeros(env.action_space.n)
    for a in range(env.action_space.n):
        for p, s_next, r, done in env.P[s][a]:
            q_values[a] += p * (r + gamma * V[s_next])
    policy[s] = np.argmax(q_values)
print("Optimal policy:")
print(policy)

Optimal value function:
[0.54201404 0.49878743 0.47067727 0.45683193 0.5584404  0.
 0.35834012 0.         0.59179013 0.64307363 0.61520214 0.
 0.         0.74171617 0.86283528 0.        ]
Optimal policy:
[0 3 3 3 0 0 0 0 3 1 0 0 0 2 1 0]


In [13]:
# Second Example of RTDP

import gym
import numpy as np

env = gym.make('FrozenLake-v1')

def rtdp(env, gamma=0.9, theta=0.0001):
    V = np.zeros(env.observation_space.n)
    while True:
        delta = 0
        for s in range(env.observation_space.n):
            v = V[s]
            q_values = np.zeros(env.action_space.n)
            for a in range(env.action_space.n):
                for prob, next_state, reward, done in env.P[s][a]:
                    q_values[a] += prob * (reward + gamma * V[next_state])
            V[s] = np.max(q_values)
            delta = max(delta, abs(v - V[s]))
        if delta < theta:
            break
    return V

optimal_value_function = rtdp(env)
print(optimal_value_function)


[0.06848032 0.06111567 0.07422254 0.05560469 0.09153995 0.
 0.11212558 0.         0.14522151 0.24737863 0.29954442 0.
 0.         0.37986011 0.63898452 0.        ]
