Generic MDP Code


In [240]:
import numpy as np
import random
import ast

class MDP:
    def __init__(self, states, terminal_states, transitions, current_state=None, slippery_factor = 0.8, is_slippery = False, cost_of_living = 0.01 ):
        self.states = states
        self.terminal_states = terminal_states
        self.actions = {state: list(action) for state, action in transitions.items()}
        self.transitions = transitions
        self.observation_space = len(states)
        self.action_space = len(self.actions)
        self.is_slippery = is_slippery
        self.slippery_factor = slippery_factor
        self.cost_of_living = cost_of_living
        if current_state is None:
            self.current_state = random.choice([s for s in states if s not in self.terminal_states])
        else:
            self.current_state = current_state

    def reset(self):
        available_states = [state for state in self.states if state not in self.terminal_states]
        self.current_state = random.choice(available_states)
        return self.current_state

    def step(self, action):
        random_number_generator = np.random.default_rng()
        if self.current_state in self.terminal_states:
            raise Exception("Already in a terminal state")
        if action not in self.get_available_actions():
            raise ValueError("Invalid action")

        if self.is_slippery and random_number_generator.random() < self.slippery_factor:
            action = random.choice(self.get_available_actions())
            print(f"Slipped")

        outcomes = self.transitions[self.current_state][action]
        
        if not outcomes:
            print(f"No transitions available from this state({self.current_state}).")
            self.current_state = None  
            return self.current_state, 0, True

        possible_states = list(outcomes.keys())
        probabilities = [outcomes[state][0] for state in possible_states]

        next_state = random.choices(possible_states, weights=probabilities)[0]
        
        reward = outcomes[next_state][1] - self.cost_of_living

        print(f"{current_state} -> {action} -> {next_state} | Reward: {reward}")
        
        self.current_state = next_state
        
        done = self.current_state in self.terminal_states or not self.get_available_actions()
        return next_state, reward, done

    def get_available_actions(self):
        return self.actions[self.current_state]

    def get_possible_next_states(self):
        possible_states = set()
        for action in self.actions[self.current_state]:
            outcomes = self.transitions[self.current_state][action].keys()
            possible_states.update(outcomes)
        return list(possible_states)



Assignment 1.3

In [225]:

states1_3 = [
    'S0',
    'S1',
    'S2'
]

transitions1_3 = {
    'S0': {
        'a0': {'S0': [0.5, 0], 'S2': [0.5, 0]},
        'a1': {'S2': [1, 0]}
    },
    'S1': {
        'a0': {'S0': [0.7, 5], 'S2': [0.2, 0], 'S1': [0.1, 0]},
        'a1': {'S1': [0.95, 0], 'S2': [0.05, 0]}
    },
    'S2': {
        'a1': {'S0': [0.3, -1], 'S2': [0.4, 0], 'S1': [0.3, 0]},
        'a0': {'S0': [0.4, 0], 'S2': [0.6, 0]}
    }
}


mdp1_3 = MDP(states1_3, [], transitions1_3)
mdp1_3.reset()

for i in range(10):
    current_state = mdp1_3.current_state
    available_actions = mdp1_3.get_available_actions()
    action = random.choice(available_actions)
    new_state, reward, done = mdp1_3.step(action)

    if done:
        print("Reached a terminal state.")
        break


Assignment 2.1

In [174]:
states2_1 = [
    '1','2','3','4','5'
]

terminal_states2_1 = ['1','5']

# transitions2_1 = {
#     '1' : {
#         'r' : {'2' : [1, 0]}
#     },
#     '2' : {
#         'l' : {'1' : [1, -1]},
#         'r' : {'3' : [1, 0]}
#     },
#     '3' : {
#         'l' : {'2' : [1, 0]},
#         'r' : {'4' : [1, 0]}
#     },
#     '4' : {
#         'l' : {'3' : [1, 0]},
#         'r' : {'5' : [1, 1]}
#     },
#     '5' : {
#         'l' : {'4' : [1, 0]}
#     }
# }

def create_transitions(num_states):
    transitions = {}
    for state in range(1, num_states + 1):
        state_str = str(state)
        transitions[state_str] = {}
        if state < num_states:
            transitions[state_str]['r'] = {str(state + 1): [1, 1 if state == num_states - 1 else 0]}
        if state > 1:
            transitions[state_str]['l'] = {str(state - 1): [1, -1 if state == 2 else 0]}
    return transitions

num_states = 5
transitions2_1 = create_transitions(num_states)

mdp2_1 = MDP(states2_1, terminal_states2_1, transitions2_1, slippery_factor=0.6, is_slippery=True, cost_of_living=0.1)
mdp2_1.reset()

while True:
    current_state = mdp2_1.current_state
    available_actions = mdp2_1.get_available_actions()
    action = random.choice(available_actions)
    print(f"Chosen action:", action)
    new_state, reward, done = mdp2_1.step(action)

    if done:
        print("Reached a terminal state.")
        break


Assignment 2.2

In [207]:

def create_states(h, w):
    states = []
    for i in range(h):
        for j in range(1, w + 1):
            states.append(str(j + w * i))
    return states

def create_transitions(h,w):
    transitions = {}
    for i in range(h):
        for j in range(1, w + 1):
            state_str = str(j + w * i)
            transitions[state_str] = {}
            if i != 0:
                transitions[state_str]['u'] = {str(j + w * (i-1) ): [1, 0]}
            if i != h - 1:
                reward = 1 if j == w and i == h - 2 else -1 if j != 1 and i == h - 2 else 0
                transitions[state_str]['d'] = {str(j + w * (i + 1) ): [1, reward]}
            if j != 1:
                reward = -1 if j != 2 and i == h - 1 else 0
                transitions[state_str]['l'] = {str((j - 1) + w * i ): [1, reward]}
            if j != w:
                reward = 1 if j == w - 1 and i == h - 1 else -1
                transitions[state_str]['r'] = {str((j + 1) + w * i ): [1, reward]}

    return transitions


h = 3
w = 5
states2_2 = create_states(h, w)
current_state2_2 = '1'
terminal_states2_2 = [states2_2[-1]]
transitions2_2 = create_transitions(h, w)


mdp2_2 = MDP(states2_2, terminal_states2_2,transitions2_2, current_state=current_state2_2, is_slippery=True, slippery_factor = 0.5, cost_of_living = 0.01)

for i in range(50):
    current_state = mdp2_2.current_state
    available_actions = mdp2_2.get_available_actions()
    action = random.choice(available_actions)
    print("Actions chosen:", action)
    new_state, reward, done = mdp2_2.step(action)

    if done:
        print("Reached a terminal state.")
        break

Assignment 2.4


In [243]:

def create_states():
    states = []
    for i in range(1,6):
        if i != 5:
            states.append(str((i,0)))
        states.append(str((i,1)))
    states.append(str((6,1)))
    return states


def create_transitions(states):
    transitions = {}
    for state in states:
        transitions[state] = {}
        position = ast.literal_eval(state)[0]
        key = ast.literal_eval(state)[1]

        if 1 < position < 4:
            reward_left = -1 if position == 2 else 0
            transitions[state]['l'] = {str((position-1, key)) : [1, reward_left]}
            transitions[state]['r'] = {str((position+1, key)) : [1, 0]}

        if position == 3:
            transitions[state]['u'] = {'(6, 1)' : [1, 0]}

        if position == 6:
            transitions[state]['d'] = {'(3, 1)' : [1, 0]} # good idea to add small reward to this step

        if position == 4:
            transitions[state]['l'] = {str((position-1, key)) : [1, 0]}
            if key == 1:
                transitions[state]['r'] = {'(5, 1)' : [1, 1]}


    return transitions


states2_4 = create_states()

terminal_states2_4 = ['(1, 0)',
                      '(1, 1)',
                      '(5, 1']

initial_state2_4 = '(3, 0)'

transitions2_4 = create_transitions(states2_4)

mdp2_4 = MDP(states2_4, terminal_states2_4, transitions2_4, current_state=initial_state2_4, slippery_factor=0.3, is_slippery=True, cost_of_living=0.01)

for i in range(50):
    current_state = mdp2_4.current_state
    available_actions = mdp2_4.get_available_actions()
    action = random.choice(available_actions)
    print("Actions chosen:", action)
    new_state, reward, done = mdp2_4.step(action)

    if done:
        print("Reached a terminal state.")
        break

Generic QAgent


In [112]:
class QAgent:
    def __init__(self, mdp):
        self.mdp = mdp
        self.q_table = {state: {action: 0 for action in mdp.actions[state]} for state in mdp.states}

    def train(self,
              episodes=400,
              learning_rate=0.1,
              discount_factor=0.9,
              cost_of_living=0.01):

        env = self.mdp
        
        self.q_table = {state: {action: 0 for action in env.actions[state]} for state in env.states}
    
        epsilon = 1
        epsilon_decay = 1/(episodes * 0.9)
        random_number_generator = np.random.default_rng()
        rewards_per_episode = np.zeros(episodes)
        time_rewards_per_episode = np.zeros(episodes)
        steps_per_episode = []
    
        for i in range(episodes):
            state = env.reset()
    
            for step in range(20):
                if random_number_generator.random() < epsilon:
                    action = random.choice(env.get_available_actions())
                else:
                    action = max(self.q_table[state], key=self.q_table[state].get)
    
                new_state, reward, terminated = env.step(action)
    
                # if terminated & (reward == 0):
                #     reward = reward - 1


                best_next_action = max(self.q_table[new_state], key=self.q_table[new_state].get)

                target = reward + discount_factor * self.q_table[new_state][best_next_action]

                td_error = target - self.q_table[state][action]
                
                self.q_table[state][action] += learning_rate * td_error
    
                state = new_state
    
                if terminated:
                    break
    
    
            epsilon = max(epsilon - epsilon_decay, 0)
    
            if epsilon == 0:
                learning_rate = learning_rate * 0.1

        
    def run(self, episodes = 1):

        env = self.mdp
        
        total_reward = 0
    
        for i in range(episodes):
            state = env.reset()
        
            for i in range(20):
                action = max(self.q_table[state], key=self.q_table[state].get)
        
                new_state, reward, terminated = env.step(action)
                
                total_reward += reward
                
                print(f"{state} -> {action} -> {new_state} | Reward: {reward}")
                
                state = new_state

            return total_reward


    def evaluate_QAgent(self, episodes = 1):
        env = self.mdp
        total_reward = 0

        for i in range(episodes):
            state = env.reset()

            for i in range(20):
                action = max(self.q_table[state], key=self.q_table[state].get)
                new_state, reward, terminated = env.step(action)
                total_reward += reward
                state = new_state

            return total_reward   
        

    def run_random_agent(self, episodes=1):
        total_reward = 0
        for _ in range(episodes):
            state = self.mdp.reset()

            for i in range(20):
                action = random.choice(list(self.mdp.actions[state]))
                state, reward, terminated = self.mdp.step(action)
                total_reward += reward
                
        return total_reward


    def compare_agents(self, episodes=100):
        q_agent_rewards = 0
        random_agent_rewards = 0
        for _ in range(episodes):
            q_agent_rewards += self.evaluate_QAgent(1)
            random_agent_rewards += self.run_random_agent(1)

        print(f"Q-Agent Total Rewards over {episodes} episodes: {q_agent_rewards}")
        print(f"Random Agent Total Rewards over {episodes} episodes: {random_agent_rewards}")
    
    
agent = QAgent(mdp1_3)

agent.train(episodes=1000)

agent.run(episodes=1)

agent.compare_agents(episodes=1000)