# Week 1: Dynamic Programming - Solutions

In [1]:
import numpy as np
import pulp
import os
import time
import copy

In [2]:
class MDP:
    def __init__(self, filename: str, quiet=False) -> None:
        self.name = filename.split('.')[0]
        stt = time.time()
        with open(filename) as f:
            for line in f:
                ls = line.split()
                if ls[0] == 'states':
                    self.N_s = int(ls[1])
                elif ls[0] == 'actions':
                    self.N_a = int(ls[1])
                    self.probs = [[[{} for _ in range(self.N_s)] for _ in range(self.N_a)] for _ in range(self.N_s)]
                elif ls[0] == 'tran':
                    self.probs[int(ls[1])][int(ls[2])][int(ls[3])][np.float64(ls[4])] = np.float64(ls[5])
                else:
                    assert(ls[0] == 'gamma')
                    self.gamma = np.float64(ls[1])
        if not quiet:
            print(f'Time taken to generate MDP from {filename}: {time.time() - stt} s')



class PolicyIterationSolver:
    '''Hyperparameters of this implementation of Policy Iteration'''
    EVAL_THR = 1e-5
    IMP_THR = 1e-7 # Required to avoid switching between multiple optimal policies infinitely often

    def __init__(self, MDP: MDP) -> None:
        self._MDP = MDP
        self._policy = np.random.choice(self._MDP.N_a, self._MDP.N_s)
        self._value = np.zeros(self._MDP.N_s, np.float64)

    def _policy_evaluation(self) -> np.float64:
        old_policy = copy.copy(self._value)
        while True:
            mdiff = 0.0
            for s in range(self._MDP.N_s):
                prev_value = self._value[s]
                '''An in-place implementation'''
                self._value[s] = np.sum([np.sum([self._MDP.probs[s][self._policy[s]][ss][r]*(r + self._MDP.gamma*self._value[ss]) for r in self._MDP.probs[s][self._policy[s]][ss]]) for ss in range(self._MDP.N_s)])
                mdiff = np.maximum(np.float64(mdiff), np.abs(self._value[s] - prev_value))
            if mdiff < PolicyIterationSolver.EVAL_THR:
                return np.max(np.abs(self._value - old_policy)) # type: ignore
    
    def _policy_improvement(self) -> None:
        for s in range(self._MDP.N_s):
            self._policy[s] = np.argmax([np.sum([np.sum([self._MDP.probs[s][a][ss][r]*(r + self._MDP.gamma*self._value[ss]) for r in self._MDP.probs[s][a][ss]]) for ss in range(self._MDP.N_s)]) for a in range(self._MDP.N_a)])
    
    '''Implements policy iteration and returns optimal value, optimal policy'''
    def solve(self, quiet=False) -> tuple:
        stt = time.time()
        while self._policy_evaluation() >= PolicyIterationSolver.IMP_THR:
            self._policy_improvement()
        if not quiet:
            print(f'Time taken for Policy Iteration on {self._MDP.name}: {time.time() - stt} s')
        return self._value, self._policy



class ValueIterationSolver:
    '''Hyperparameters of this implementation of Value Iteration'''
    THR = 1e-5

    def __init__(self, MDP: MDP) -> None:
        self._MDP = MDP
        self._value = np.zeros(self._MDP.N_s, np.float64)

    '''Implements value iteration and returns optimal value, optimal policy'''
    def solve(self, quiet=False) -> tuple:
        stt = time.time()
        while True:
            mdiff = 0.0
            for s in range(self._MDP.N_s):
                prev_value = self._value[s]
                '''An in-place implementation'''
                self._value[s] = np.max([np.sum([np.sum([self._MDP.probs[s][a][ss][r]*(r + self._MDP.gamma*self._value[ss]) for r in self._MDP.probs[s][a][ss]]) for ss in range(self._MDP.N_s)]) for a in range(self._MDP.N_a)])
                mdiff = np.maximum(mdiff, np.abs(self._value[s] - prev_value))
                if mdiff < ValueIterationSolver.THR:
                    policy = np.array([np.argmax([np.sum([np.sum([self._MDP.probs[s][a][ss][r]*(r + self._MDP.gamma*self._value[ss]) for r in self._MDP.probs[s][a][ss]]) for ss in range(self._MDP.N_s)]) for a in range(self._MDP.N_a)]) for s in range(self._MDP.N_s)])
                    if not quiet:
                        print(f'Time taken for Value Iteration on {self._MDP.name}: {time.time() - stt} s')
                    return self._value, policy



class LinearProgrammingSolver:
    def __init__(self, MDP: MDP) -> None:
        self._MDP = MDP
        self._model = pulp.LpProblem("MDP", pulp.LpMinimize)
        self._vals = [pulp.LpVariable(f'{s}') for s in range(self._MDP.N_s)]
    
    '''Implements Linear Programming and returns the optimal value and optimal policy'''
    def solve(self, quiet=False):
        stt = time.time()
        self._model += sum(self._vals) # Objective function
        for a in range(self._MDP.N_a):
            for s in range(self._MDP.N_s):
                coeffs = np.zeros(self._MDP.N_s, np.float64)
                coeffs[s] = 1
                const = 0.0
                for ss in range(self._MDP.N_s):
                    for r in self._MDP.probs[s][a][ss]:
                        coeffs[ss] -= self._MDP.probs[s][a][ss][r]*self._MDP.gamma
                        const += r*self._MDP.probs[s][a][ss][r]
                self._model += sum([coeffs[s]*self._vals[s] for s in range(self._MDP.N_s)]) >= const # Constraints
        self._model.solve(pulp.PULP_CBC_CMD(msg=False))
        assert(self._model.status == 1)
        values = np.zeros(self._MDP.N_s, np.float64)
        for v in self._model.variables():
            values[int(v.name)] = v.varValue
        policy = np.array([np.argmax([np.sum([np.sum([self._MDP.probs[s][a][ss][r]*(r + self._MDP.gamma*values[ss]) for r in self._MDP.probs[s][a][ss]]) for ss in range(self._MDP.N_s)]) for a in range(self._MDP.N_a)]) for s in range(self._MDP.N_s)])
        if not quiet:
            print(f'Time taken for solving the Linear Program of {self._MDP.name}: {time.time() - stt} s')
        return values, policy

In [3]:
def print_to_file(filename: str, solution: tuple) -> None:
    assert(len(solution) == 2 and len(solution[0]) == len(solution[1]))
    with open(filename, 'w') as f:
        for i in range(len(solution[0])):
            f.write(f'{round(solution[0][i], 6)} {solution[1][i]}\n')

In [4]:
for mdpfile in os.listdir('MDPs/MDPs/'):
    mdp = MDP(f'MDPs/MDPs/{mdpfile}')
    print_to_file(f'MDPs/PI_solutions/sol-{mdpfile}', PolicyIterationSolver(mdp).solve())
    print_to_file(f'MDPs/VI_solutions/sol-{mdpfile}', ValueIterationSolver(mdp).solve())
    print_to_file(f'MDPs/LP_solutions/sol-{mdpfile}', LinearProgrammingSolver(mdp).solve())

Time taken to generate MDP from MDPs/MDPs/mdp-2-2.txt: 0.0007128715515136719 s
Time taken for Policy Iteration on MDPs/MDPs/mdp-2-2: 0.014780759811401367 s
Time taken for Value Iteration on MDPs/MDPs/mdp-2-2: 0.012240409851074219 s
Time taken for solving the Linear Program of MDPs/MDPs/mdp-2-2: 0.003809690475463867 s
Time taken to generate MDP from MDPs/MDPs/mdp-40-50.txt: 0.23939967155456543 s
Time taken for Policy Iteration on MDPs/MDPs/mdp-40-50: 2.2574880123138428 s
Time taken for Value Iteration on MDPs/MDPs/mdp-40-50: 4.316442012786865 s
Time taken for solving the Linear Program of MDPs/MDPs/mdp-40-50: 1.6045176982879639 s
Time taken to generate MDP from MDPs/MDPs/mdp-3-5.txt: 0.00027942657470703125 s
Time taken for Policy Iteration on MDPs/MDPs/mdp-3-5: 0.0009758472442626953 s
Time taken for Value Iteration on MDPs/MDPs/mdp-3-5: 0.0011119842529296875 s
Time taken for solving the Linear Program of MDPs/MDPs/mdp-3-5: 0.003379344940185547 s
Time taken to generate MDP from MDPs/MDPs