In [143]:
import numpy as np
import random

class MDP:
    def __init__(self, states, terminal_states, transitions, current_state=None, slippery_factor = 0.8, is_slippery = False, cost_of_living = 0.01 ):
        self.states = states
        self.terminal_states = terminal_states
        self.actions = {state: list(action) for state, action in transitions.items()}
        self.transitions = transitions
        self.observation_space = len(states)
        self.action_space = len(self.actions)
        self.is_slippery = is_slippery
        self.slippery_factor = slippery_factor
        self.cost_of_living = cost_of_living
        if current_state is None:
            self.current_state = random.choice([s for s in states if s not in self.terminal_states])
        else:
            self.current_state = current_state

    def reset(self):
        available_states = [state for state in self.states if state not in self.terminal_states]
        self.current_state = random.choice(available_states)
        return self.current_state

    def step(self, action):
        random_number_generator = np.random.default_rng()
        if self.current_state in self.terminal_states:
            raise Exception("Already in a terminal state")
        if action not in self.get_available_actions():
            raise ValueError("Invalid action")

        if self.is_slippery and random_number_generator.random() < self.slippery_factor:
            action = random.choice(self.get_available_actions())
            print(f"Slipped")

        outcomes = self.transitions[self.current_state][action]
        
        if not outcomes:
            print(f"No transitions available from this state({self.current_state}).")
            self.current_state = None  
            return self.current_state, 0, True

        possible_states = list(outcomes.keys())
        probabilities = [outcomes[state][0] for state in possible_states]

        next_state = random.choices(possible_states, weights=probabilities)[0]
       
        reward = outcomes[next_state][1]
        
        self.current_state = next_state
        
        done = self.current_state in self.terminal_states or not self.get_available_actions()
        return next_state, reward, done

    def get_available_actions(self):
        return self.actions[self.current_state]

    def get_possible_next_states(self):
        possible_states = set()
        for action in self.actions[self.current_state]:
            outcomes = self.transitions[self.current_state][action].keys()
            possible_states.update(outcomes)
        return list(possible_states)



states1_2 = [
    'You have Food',
    'You dont have Food',
    'Neighbour suspect you'
]

terminal_states1_2 = [
    'You have Food'
]

transitions1_2 = {
    'You have Food': {
        'Eat own food': {'You dont have Food': [1.0, 1]},
        'Take neighbour Food': {'Neighbour suspect you': [0.2, -1], 'You have Food': [0.8, 1]}
    },
    'You dont have Food': {
        'Buy Food': {'You have Food': [1.0, -2]},
        'Take neighbour Food': {'Neighbour suspect you': [0.5, -1], 'You have Food': [0.5, 1]}
    },
    'Neighbour suspect you': {
        'Buy Food': {'You have Food': [1.0, -2]},
        'Take neighbour Food': {'Neighbour suspect you': [0.9, -5], 'You have Food': [0.1, 1]}
    }
}


mdp1_2 = MDP(states1_2,terminal_states1_2, transitions1_2, is_slippery=True, slippery_factor=0.9)
current_state = mdp1_2.reset()

print(mdp1_2.get_possible_next_states())

print("Initial State:", current_state)

available_actions = mdp1_2.get_available_actions()

print("Available Actions:", available_actions)

action_to_take = available_actions[random.randint(0, 1)]

print("Chose action:", action_to_take)

new_state, reward, done = mdp1_2.step(action_to_take)
print(f"Action Taken: {action_to_take}, New State: {new_state}, Reward: {reward}, Finish: {done}")


['Neighbour suspect you', 'You have Food']
Initial State: You dont have Food
Available Actions: ['Buy Food', 'Take neighbour Food']
Chose action: Buy Food
Slipped
Action Taken: Buy Food, New State: You have Food, Reward: -2, Finish: True


In [139]:
mdp1_2.reset()

while True:
    current_state = mdp1_2.current_state
    available_actions = mdp1_2.get_available_actions()
    action = random.choice(available_actions)
    new_state, reward, done = mdp1_2.step(action)

    print(f"{current_state} -> {action} -> {new_state} | Reward: {reward}")

    if done:
        print("Reached a terminal state.")
        break

Neighbour suspect you -> Take neighbour Food -> You have Food | Reward: 1
Reached a terminal state.


In [75]:
states1_3 = [
    'S0',
    'S1',
    'S2'
]

transitions1_3 = {
    'S0': {
        'a0': {'S0': [0.5, 0], 'S2': [0.5, 0]},
        'a1': {'S2': [1, 0]}
    },
    'S1': {
        'a0': {'S0': [0.7, 5], 'S2': [0.2, 0], 'S1': [0.1, 0]},
        'a1': {'S1': [0.95, 0], 'S2': [0.05, 0]}
    },
    'S2': {
        'a1': {'S0': [0.3, -1], 'S2': [0.4, 0], 'S1': [0.3, 0]},
        'a0': {'S0': [0.4, 0], 'S2': [0.6, 0]}
    }
}


mdp1_3 = MDP(states1_3, [], transitions1_3)
mdp1_3.reset()

for i in range(10):
    current_state = mdp1_3.current_state
    available_actions = mdp1_3.get_available_actions()
    action = random.choice(available_actions)
    new_state, reward, done = mdp1_3.step(action)

    print(f"{current_state} -> {action} -> {new_state} | Reward: {reward}")

    if done:
        print("Reached a terminal state.")
        break


S2 -> a0 -> S0 | Reward: 0
S0 -> a0 -> S2 | Reward: 0
S2 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S1 | Reward: 0
S1 -> a1 -> S1 | Reward: 0
S1 -> a0 -> S0 | Reward: 5
S0 -> a0 -> S2 | Reward: 0
S2 -> a1 -> S0 | Reward: -1


In [114]:
states2_1 = [
    '1','2','3','4','5'
]

terminal_states2_1 = ['1','5']

transitions2_1 = {
    '1' : {
        'r' : {'2' : [1, 0]}
    },
    '2' : {
        'l' : {'1' : [1, -1]},
        'r' : {'3' : [1, 0]}
    },
    '3' : {
        'l' : {'2' : [1, 0]},
        'r' : {'4' : [1, 0]}
    },
    '4' : {
        'l' : {'3' : [1, 0]},
        'r' : {'5' : [1, 1]}
    },
    '5' : {
        'l' : {'4' : [1, 0]}
    }
}

mdp2_1 = MDP(states2_1, terminal_states2_1, transitions2_1, slippery_factor=0.2, is_slippery=True, cost_of_living=0.1)

mdp2_1.reset()

for i in range(10):
    current_state = mdp2_1.current_state
    available_actions = mdp2_1.get_available_actions()
    action = random.choice(available_actions)
    new_state, reward, done = mdp2_1.step(action)

    print(f"{current_state} -> {action} -> {new_state} | Reward: {reward}")

    if done:
        print("Reached a terminal state.")
        break

3 -> l -> 2 | Reward: 0
2 -> r -> 3 | Reward: 0
3 -> l -> 2 | Reward: 0
2 -> l -> 1 | Reward: -1
Reached a terminal state.


In [112]:
class QAgent:
    def __init__(self, mdp):
        self.mdp = mdp
        self.q_table = {state: {action: 0 for action in mdp.actions[state]} for state in mdp.states}

    def train(self,
              episodes=400,
              learning_rate=0.1,
              discount_factor=0.9,
              cost_of_living=0.01):

        env = self.mdp
        
        self.q_table = {state: {action: 0 for action in env.actions[state]} for state in env.states}
    
        epsilon = 1
        epsilon_decay = 1/(episodes * 0.9)
        random_number_generator = np.random.default_rng()
        rewards_per_episode = np.zeros(episodes)
        time_rewards_per_episode = np.zeros(episodes)
        steps_per_episode = []
    
        for i in range(episodes):
            state = env.reset()
    
            for step in range(20):
                if random_number_generator.random() < epsilon:
                    action = random.choice(env.get_available_actions())
                else:
                    action = max(self.q_table[state], key=self.q_table[state].get)
    
                new_state, reward, terminated = env.step(action)
    
                # if terminated & (reward == 0):
                #     reward = reward - 1


                best_next_action = max(self.q_table[new_state], key=self.q_table[new_state].get)

                target = reward + discount_factor * self.q_table[new_state][best_next_action]

                td_error = target - self.q_table[state][action]
                
                self.q_table[state][action] += learning_rate * td_error
    
                state = new_state
    
                if terminated:
                    break
    
    
            epsilon = max(epsilon - epsilon_decay, 0)
    
            if epsilon == 0:
                learning_rate = learning_rate * 0.1

        
    def run(self, episodes = 1):

        env = self.mdp
        
        total_reward = 0
    
        for i in range(episodes):
            state = env.reset()
        
            for i in range(20):
                action = max(self.q_table[state], key=self.q_table[state].get)
        
                new_state, reward, terminated = env.step(action)
                
                total_reward += reward
                
                print(f"{state} -> {action} -> {new_state} | Reward: {reward}")
                
                state = new_state

            return total_reward


    def evaluate_QAgent(self, episodes = 1):
        env = self.mdp
        total_reward = 0

        for i in range(episodes):
            state = env.reset()

            for i in range(20):
                action = max(self.q_table[state], key=self.q_table[state].get)
                new_state, reward, terminated = env.step(action)
                total_reward += reward
                state = new_state

            return total_reward   
        

    def run_random_agent(self, episodes=1):
        total_reward = 0
        for _ in range(episodes):
            state = self.mdp.reset()

            for i in range(20):
                action = random.choice(list(self.mdp.actions[state]))
                state, reward, terminated = self.mdp.step(action)
                total_reward += reward
                
        return total_reward


    def compare_agents(self, episodes=100):
        q_agent_rewards = 0
        random_agent_rewards = 0
        for _ in range(episodes):
            q_agent_rewards += self.evaluate_QAgent(1)
            random_agent_rewards += self.run_random_agent(1)

        print(f"Q-Agent Total Rewards over {episodes} episodes: {q_agent_rewards}")
        print(f"Random Agent Total Rewards over {episodes} episodes: {random_agent_rewards}")
    
    
agent = QAgent(mdp1_3)

agent.train(episodes=1000)

agent.run(episodes=1)

agent.compare_agents(episodes=1000)

S0 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S0 | Reward: -1
S0 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S0 | Reward: -1
S0 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S0 | Reward: -1
S0 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S0 | Reward: -1
S0 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S1 | Reward: 0
S1 -> a0 -> S0 | Reward: 5
S0 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S0 | Reward: -1
S0 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S2 | Reward: 0
S2 -> a1 -> S0 | Reward: -1
Q-Agent Total Rewards over 1000 episodes: 9934
Random Agent Total Rewards over 1000 episodes: 4780
