Import Libraries

In [None]:
import copy
import math
import random

Classes and Definitions

In [None]:
class SimpleLeftRightMDP:
    state_set = [0, 1, 2, 3, 4, "exited"]

    def reward(self, state, action, state_prime):
        if state == 0 and action == "exit":
            return 10
        elif state == 4 and action == "exit":
            return 1
        else:
            return 0

    def transition_prob(self, state, action, state_prime):
        if state > 0 and action == "left" and state_prime == state - 1:
            return 1
        elif state < 4 and action == "right" and state_prime == state + 1:
            return 1
        elif state in [0, 4] and action == "exit" and state_prime == "exited":
            return 1
        else:
            return 0

    def successor_states(self, state, action):
        if state > 0 and action == "left":
            return [state - 1]
        elif state < 4 and action == "right":
            return [state + 1]
        elif state in [0, 4] and action == "exit":
            return ["exited"]
        else:
            return []

    def possible_actions(self, state):
        if state in [0, 4]:
            return ["exit"]
        if state == "exited":
            return []
        actions = []
        if state < 4:
            actions.append("right")
        if state > 0:
            actions.append("left")
        return actions


class DoubleBanditsMDP:
    state_set = ["won", "lost"]

    def reward(self, state, action, state_prime):
        if action == "red" and state_prime == "won":
            return 2
        elif action == "red" and state_prime == "lost":
            return 0
        elif action == "blue":
            return 1
        else:
            return 0

    def transition_prob(self, state, action, state_prime):
        if action == "red" and state_prime == "won":
            return 0.75
        elif action == "red" and state_prime == "lost":
            return 0.25
        elif action == "blue" and state_prime == "won":
            return 1
        else:
            return 0

    def successor_states(self, state, action):
        if action == "red":
            return ["won", "lost"]
        elif action == "blue":
            return ["won"]
        else:
            return []

    def possible_actions(self, state):
        return ["red", "blue"]


class OverheatingCarMDP:
    state_set = ["cool", "warm", "overheated"]

    def reward(self, state, action, state_prime):
        if state_prime == "overheated":
            return -10
        elif action == "slow":
            return 1
        elif action == "fast":
            return 2
        else:
            return 0

    def transition_prob(self, state, action, state_prime):
        probs = {
            ("cool", "slow", "cool"): 1,
            ("cool", "fast", "cool"): 0.5,
            ("cool", "fast", "warm"): 0.5,
            ("warm", "slow", "cool"): 0.5,
            ("warm", "slow", "warm"): 0.5,
            ("warm", "fast", "overheated"): 1
        }
        return probs.get((state, action, state_prime), 0)

    def successor_states(self, state, action):
        successors = {
            ("cool", "slow"): ["cool"],
            ("cool", "fast"): ["cool", "warm"],
            ("warm", "slow"): ["cool", "warm"],
            ("warm", "fast"): ["overheated"]
        }
        return successors.get((state, action), [])

    def possible_actions(self, state):
        return [] if state == "overheated" else ["slow", "fast"]


class FiveStateGridworldMDP:
    state_set = ["A", "B", "C", "D", "E", "exited"]

    def reward(self, state, action, state_prime):
        if action == "exit" and state == "A":
            return -10
        elif action == "exit" and state == "D":
            return 10
        else:
            return -1

    def transition_prob(self, state, action, state_prime):
        probs = {
            ("B", "right", "C"): 1,
            ("E", "up", "C"): 1,
            ("C", "right", "D"): 0.8,
            ("C", "right", "A"): 0.1,
            ("C", "right", "E"): 0.1,
            ("C", "left", "B"): 0.8,
            ("C", "left", "A"): 0.1,
            ("C", "left", "E"): 0.1,
            ("C", "up", "A"): 0.8,
            ("C", "up", "B"): 0.1,
            ("C", "up", "D"): 0.1,
            ("C", "down", "E"): 0.8,
            ("C", "down", "B"): 0.1,
            ("C", "down", "D"): 0.1,
            ("A", "exit", "exited"): 1,
            ("D", "exit", "exited"): 1,
        }
        return probs.get((state, action, state_prime), 0)

    def successor_states(self, state, action):
        if state == "B":
            return ["C"]
        elif state == "E":
            return ["C"]
        elif state == "C":
            return ["A", "B", "D", "E"]
        elif state == "A":
            return ["exited"]
        elif state == "D":
            return ["exited"]
        else:
            return []

    def possible_actions(self, state):
        if state == "B":
            return ["right"]
        elif state == "E":
            return ["up"]
        elif state == "C":
            return ["up", "right", "down", "left"]
        elif state == "A":
            return ["exit"]
        elif state == "D":
            return ["exit"]
        else:
            return []


def weighted_random(probs):
    random_val = random.uniform(0, 1)
    cumulative_val = 0
    for item, prob in probs.items():
        cumulative_val += prob
        if cumulative_val > random_val:
            return item


def run_episodes(mdp, num_episodes, sample_limit=100):
    episodes = []
    for i in range(num_episodes):
        # run the episode to completion, recording transitions seen (s, a, s', r)
        episode = []
        init_state = random.choice(mdp.state_set)
        state = init_state
        while len(episode) < sample_limit:
            actions = mdp.possible_actions(state)
            if not actions:
                break
            action = random.choice(mdp.possible_actions(state))
            probs = {}
            for state_prime in mdp.successor_states(state, action):
                probs[state_prime] = mdp.transition_prob(state, action, state_prime)
            state_prime = weighted_random(probs)
            episode.append((state, action, state_prime, mdp.reward(state, action, state_prime)))
            state = state_prime
        episodes.append(episode)
        print("EPISODE", i, episode)
    return episodes


def find_value_function(mdp, num_iterations, transition_probs, rewards):
    state_set = mdp.state_set
    value_function = {state: 0 for state in state_set}

    for i in range(num_iterations):
        new_value_function = {}
        for state in state_set:
            action_values = []
            for action in mdp.possible_actions(state):
                value = sum([
                    transition_probs.get((state, action), {}).get(state_prime, 0) *
                    (rewards.get((state, action, state_prime), [0])[0] +
                    (mdp.discount_factor * value_function.get(state_prime, 0)))
                    for state_prime in mdp.successor_states(state, action)
                ])
                action_values.append(value)
            new_value_function[state] = max(action_values or [0])
        value_function = new_value_function
    return value_function


def extract_policy(mdp, value_function, transition_probs, rewards):
    policy = {}
    for state in mdp.state_set:
        scored_actions = []
        for action in mdp.possible_actions(state):
            action_value = sum([
                transition_probs.get((state, action), {}).get(state_prime, 0) *
                (rewards.get((state, action, state_prime), [0])[0] +
                (mdp.discount_factor * value_function.get(state_prime, 0)))
                for state_prime in mdp.successor_states(state, action)
            ])
            scored_actions.append((action, action_value))
        policy[state] = max(scored_actions or [("do_nothing", 0)], key=lambda entry: entry[1])[0]
    return policy

def learn_model(mdp, episodes):
    learned_transition_probs = {}
    learned_rewards = {}

    for episode in episodes:
        for step in episode:
            (state, action, state_prime, reward) = step
            # Update transition counts
            state_prime_counts_by_state_action = learned_transition_probs.get((state, action), {})
            prev_count = state_prime_counts_by_state_action.get(state_prime, 0)
            state_prime_counts_by_state_action[state_prime] = prev_count + 1
            learned_transition_probs[(state, action)] = state_prime_counts_by_state_action

            # Update rewards seen
            rewards_by_state_action_state_prime = learned_rewards.get((state, action, state_prime), [])
            rewards_by_state_action_state_prime.append(reward)
            learned_rewards[(state, action, state_prime)] = rewards_by_state_action_state_prime

    # Normalize transition counts to probabilities
    for state_action, state_prime_counts in learned_transition_probs.items():
        total_count = sum(state_prime_counts.values())
        for state_prime in state_prime_counts:
            state_prime_counts[state_prime] /= total_count

    # Average the rewards
    for state_action_state_prime, rewards in learned_rewards.items():
        learned_rewards[state_action_state_prime] = [sum(rewards) / len(rewards)]

    return learned_transition_probs, learned_rewards

def learn_q_table(mdp, episodes, learning_rate):
 q_table={}
 for episode in episodes:
   for sample in episode:
    (state,action,state_prime,reward)=sample
    new_q_table=copy.deepcopy(q_table)
    next_actions=mdp.possible_actions(state_prime)
    successor_q_values=[
    q_table.get((state_prime,action_prime),0)
     for action_prime in next_actions
 ]
    best_successor_q_value=max(successor_q_values or [0])
    sample_value=reward+(mdp.discount_factor*best_successor_q_value) #expected goodness of current sample
    old_q=q_table.get((state,action),0)
    new_q_table[(state,action)]=old_q+(learning_rate*(sample_value-old_q))
    q_table=new_q_table
    print("Q",q_table)
 print("best_successor_q_value", best_successor_q_value, reward)
 print("current new_q_table", new_q_table)
 print("Q",q_table)
 return q_table

def extract_policy_from_q_table(mdp, q_table):
 policy={}
 for state in mdp.state_set:
  best_action="do_nothing"
  for action in mdp.possible_actions(state):
   if q_table.get((state,action),-math.inf)>q_table.get((state,best_action),-math.inf):
      best_action=action
      policy[state]=best_action
 return policy

# To run model-based RL
mdp = DoubleBanditsMDP()
num_iterations = 10
num_episodes = 1000
mdp.discount_factor = 0.9
episodes = run_episodes(mdp, num_episodes)
(learned_transition_probs, learned_rewards) = learn_model(mdp, episodes)
value_function = find_value_function(mdp, num_iterations, learned_transition_probs, learned_rewards)
policy = extract_policy(mdp, value_function, learned_transition_probs, learned_rewards)
print("MDP    :", mdp.__class__.__name__)
print("T^     :", learned_transition_probs)
print("R^     :", learned_rewards)
print("VALUE  :", value_function)
print("POLICY :", policy)
"""
# To run Q-learning
mdp = SimpleLeftRightMDP()
num_episodes = 1000
learning_rate = 0.5
mdp.discount_factor = 0.9
episodes = run_episodes(mdp, 1000)
q_table = learn_q_table(mdp, episodes, learning_rate)
policy = extract_policy_from_q_table(mdp, q_table)
print("MDP    :", mdp.__class__.__name__)
print("QTABLE :", q_table)
print("POLICY :", policy)
"""

EPISODE 0 [('lost', 'red', 'won', 2), ('won', 'red', 'won', 2), ('won', 'blue', 'won', 1), ('won', 'red', 'lost', 0), ('lost', 'blue', 'won', 1), ('won', 'blue', 'won', 1), ('won', 'blue', 'won', 1), ('won', 'blue', 'won', 1), ('won', 'red', 'won', 2), ('won', 'blue', 'won', 1), ('won', 'red', 'won', 2), ('won', 'blue', 'won', 1), ('won', 'blue', 'won', 1), ('won', 'red', 'won', 2), ('won', 'red', 'won', 2), ('won', 'blue', 'won', 1), ('won', 'red', 'won', 2), ('won', 'blue', 'won', 1), ('won', 'blue', 'won', 1), ('won', 'red', 'won', 2), ('won', 'red', 'won', 2), ('won', 'blue', 'won', 1), ('won', 'red', 'lost', 0), ('lost', 'blue', 'won', 1), ('won', 'blue', 'won', 1), ('won', 'blue', 'won', 1), ('won', 'blue', 'won', 1), ('won', 'blue', 'won', 1), ('won', 'red', 'won', 2), ('won', 'blue', 'won', 1), ('won', 'red', 'won', 2), ('won', 'red', 'won', 2), ('won', 'blue', 'won', 1), ('won', 'red', 'won', 2), ('won', 'red', 'won', 2), ('won', 'blue', 'won', 1), ('won', 'blue', 'won', 1), (

'\n to run Q-learning\nmdp = SimpleLeftRightMDP()\nnum_episodes = 1000\nlearning_rate = 0.5\nmdp.discount_factor = 0.9\nepisodes = run_episodes(mdp, 1000)\nq_table = learn_q_table(mdp, episodes, learning_rate)\npolicy = extract_policy_from_q_table(mdp, q_table)\nprint("MDP    :", mdp.__class__.__name__)\nprint("QTABLE :", q_table)\nprint("POLICY :", policy)\n'