In [1]:
import numpy as np
from collections import defaultdict
import random

### Initializing Probabilities

In [2]:
ZONE_NAMES = ['A1', 'A2', 'A3', 'A4']

In [3]:
ACTIONS = ["WAIT", "DELIVER"]

In [4]:
PROB_FINDING_PACKAGE_IN_ZONE = {
    'A1':0.25, 
    'A2':0.3, 
    'A3':0.35, 
    'A4':0.1
}

In [5]:
# P(s, 'Deliver', s')
TRANSITION_PROBS = np.array([
    [0.75, 0.075, 0.1, 0.075],
    [0.15, 0.7, 0.06, 0.09],
    [0.14, 0.105, 0.65, 0.105],
    [0.05, 0.025, 0.025, 0.9]
])

In [6]:
SOURCE_DESC_WITH_PACKAGE_FEE = np.array([
    [0, 12, 8, 6],
    [10, 0, 7, 9],
    [14, 9, 0, 5],
    [15, 7, 4, 0]
])

TRAVELLING_COST = np.array([
    [0, 1.5, 2.0, 2.5],
    [1.0, 0, 1.8, 2.2],
    [1.8, 1.5, 0, 1.2],
    [2.0, 1.8, 1.0, 0]
])

REWARD = SOURCE_DESC_WITH_PACKAGE_FEE - TRAVELLING_COST

In [7]:
class Zone:
    def __init__(self, idx, name, package_prob):
        self.idx = idx
        self.name = name
        self.package_prob = package_prob
        self.policy = None
        self.V = 0

class Action:

    def __init__(self, idx, name):
        self.idx = idx
        self.name = name


class MDP:

    def __init__(self, discount_factor=0.95):

        self.discount_factor = discount_factor
        
        self.states =[Zone(idx, name, PROB_FINDING_PACKAGE_IN_ZONE[name]) for idx, name in enumerate(ZONE_NAMES)]

        self.actions = {name: Action(idx, name) for idx, name in enumerate(ACTIONS)}

        # should have probs for all (s, a, s')
        self.transition_probs = dict()

        # For Delivery action --> using the given prob tables
        for cur_state in self.states:
            for next_state in self.states:
                self.transition_probs[cur_state.idx, self.actions["DELIVER"].idx, next_state.idx] \
                    = TRANSITION_PROBS[cur_state.idx, next_state.idx]

        # For Waiting action
        for cur_state in self.states:
            for next_state in self.states:
                self.transition_probs[cur_state.idx, self.actions["WAIT"].idx, next_state.idx] \
                = 1 if(cur_state.idx == next_state.idx) else 0

        # reward
        self.reward = dict()
        for cur_state in self.states:
            for next_state in self.states:
                self.reward[cur_state.idx, self.actions["DELIVER"].idx, next_state.idx] \
                    = REWARD[cur_state.idx, next_state.idx]


    def random_state_policy_init(self):
        for state in self.states:
            state.policy = self.actions[random.choice(list(self.actions.keys()))]

    def print_policy(self):
        for i in self.states:
            print(i.name,":",  i.policy.name, i.V)
        
        print()


    def solve(self, theta = 1e-6):
        print("Random policy initiated!")
        self.random_state_policy_init()

        self.print_policy()

        stable = False
        itr = 1
        while(not stable):

            self.__policy_evaluation(theta)
            stable = self.__policy_improvement()

            print(f"Iteration : {itr}")
            self.print_policy()
            itr += 1


    def __get_Q(self, state, action):
        Q = 0
        for next_state in self.states:
            reward = 0
            if(next_state.name!=state.name):
                query_tuple = (state.idx, action.idx, next_state.idx)
                reward  = self.reward.get(query_tuple, 0)

            Q += self.transition_probs[(state.idx, action.idx, next_state.idx)]*\
                (reward + self.discount_factor*next_state.V)
            
        return Q

    def __policy_evaluation(self, theta):
        delta=np.inf
        while(delta>theta):
            delta = 0
            for state in self.states:
                v = state.V
                state_policy = state.policy

                max_Q = self.__get_Q(state, state_policy)
                state.V = max_Q

                delta = max(delta, abs(v - max_Q))

            if(delta<theta):
                break

    def __policy_improvement(self):

        policy_stable = True

        for state in self.states:
            old_action = state.policy
            Q_policy = state.V

            for action_name, action in self.actions.items():
                
                temp_Q = self.__get_Q(state, action)
                if(temp_Q > Q_policy):
                    state.policy = action
                    policy_stable = False

        return policy_stable


In [8]:
x = MDP()
x.solve()

Random policy initiated!
A1 : DELIVER 0
A2 : DELIVER 0
A3 : WAIT 0
A4 : WAIT 0

Iteration : 1
A1 : DELIVER 8.295930803665913
A2 : DELIVER 10.316924009025398
A3 : DELIVER 0.0
A4 : DELIVER 0.0

Iteration : 2
A1 : DELIVER 32.45983360215776
A2 : DELIVER 33.60653031538624
A3 : DELIVER 34.85303878502098
A4 : DELIVER 27.743145198148945

Iteration : 3
A1 : DELIVER 32.459834506453774
A2 : DELIVER 33.606531202089435
A3 : DELIVER 34.853039660032586
A4 : DELIVER 27.7431460914493

Iteration : 4
A1 : DELIVER 32.45983536071604
A2 : DELIVER 33.60653203973228
A3 : DELIVER 34.85304048663072
A4 : DELIVER 27.743146935324283

Iteration : 5
A1 : DELIVER 32.45983616771288
A2 : DELIVER 33.60653283102923
A3 : DELIVER 34.85304126749404
A4 : DELIVER 27.74314773250855

Iteration : 6
A1 : DELIVER 32.459836930059424
A2 : DELIVER 33.60653357854454
A3 : DELIVER 34.853042005153014
A4 : DELIVER 27.74314848558545

Iteration : 7
A1 : DELIVER 32.459837650226135
A2 : DELIVER 33.606534284700615
A3 : DELIVER 34.8530427019981