In [1]:
import gym
from gym import spaces
import pandas as pd
import seaborn as sns
import numpy as np
from numpy.random import default_rng

from collections import defaultdict

import matplotlib.pyplot as plt
from matplotlib import style
style.use('ggplot')
%matplotlib inline

In [2]:
class AssetReplacementEnv(gym.Env):
    """Custom Environment that follows gym interface"""
    metadata = {'render.modes': ['human']}

    def __init__(self):
        super(AssetReplacementEnv, self).__init__()

        self.n_actions = 3
        self.action_space = spaces.Discrete(self.n_actions)

        # With spaces.Tuple you can create a multidimensional state
        # In our simple model observation and state are synonyms
        self.n_states = 6
        self.observation_space = spaces.Discrete(self.n_states)

        self.n_maint = 6
        self.maint_space = spaces.Discrete(self.n_maint)
        
        self.service = 10
        self.cost = 75
        self.rng = default_rng()
   


    def step(self, action):
        """
        State transition of the model.

        """
        assert self.action_space.contains(action), "%r (%s) invalid" % (action, type(action))

 
    
        # calculate the reward and the state transition
        if action == 2:                                    
            reward = self.profit(0) - self.cost         # replace
            self.state = 1 
            self.maint = 0
            
        elif action == 1:
            reward = self.profit(self.state) - self.service
            self.state += 1
            self.maint += 1 
               
                
        else: 
            reward = self.profit(self.state) 
            self.state += 1    

        return self.state, self.maint, reward



    
    def profit(self, state):
        return 50 - 2.5*state - 2.5*state**2
        

    def reset(self):
        # set initial state (age of machine) to 0
        self.state = 0
        self.maint = 0            
        return self.state, self.maint


    # We will not implement render and close function
    def render(self, mode='human'):
        pass
    def close (self):
        pass

In [3]:
class QLearningAgent():
    def __init__(self, agent_info):
        """Initialize Agent

        Args: 
            agent_info (dict): Parameters used to initialize agent.
            {
                n_actions (int): Number of actions.
                epsilon (float): Exploration parameter.
                step_size (float): Learning rate alpha.
                discount (float): Discount factor gamma.
            }
        """
        self.n_actions = agent_info["n_actions"]
        self.epsilon = agent_info["epsilon"]
        self.step_size = agent_info["step_size"]
        self.discount = agent_info["discount"]

        self.rng = default_rng()

        # Create an array for action-value estimates and initialize it to zero.
        self.q = defaultdict(lambda: np.zeros(self.n_actions))      


    def step(self, reward, state, maint):
        """A step taken by the agent

        Args:
            next_state (int):  next state from the environment
            next_maint (int):  next state of maintenance failure from environment 
        Returns:
            next_action (int): action the agent takes in next_state
        """

        # choose action using epsilon greedy policy
        action = self.select_action(state, maint)

            
        target = reward + self.discount*np.max(self.q[state, maint])   
        td_error = target - self.q[self.previous_state,self.previous_maint][self.previous_action]
        self.q[self.previous_state,self.previous_maint][self.previous_action] += self.step_size* td_error


        # save current state and action
        self.previous_state = state
        self.previous_maint = maint
        self.previous_action = action

        return action

        
    def select_action(self, state, maint):
        """Select action using epsilon greedy policy

        Args:
            state (int): current state
            maint (int): current times of maitenance failure
        Returns:
            action (int): action the agent takes
        """
        
        # performe epsilon greedy policy improvement
        # remember to replace asset if age is 5
        if state== 5:
            return 2
        
        if  self.rng.random() < self.epsilon:
            action = self.rng.choice(env.n_actions)
        else:
            action = self.argmax(self.q[state, maint])
        return action


    def argmax(self, q_values):
        """Return the index of maximum value with ties broken randomly.

        Args:
            q_values (numpy.ndarray): A shape-(n_actions,) array of estimated
                action_values.

        Returns:
            index (int): Index of the maximal value.
        """

        ties = np.flatnonzero(np.isclose(q_values, max(q_values)))
        index = self.rng.choice(ties)

        return index


    def start(self, state, maint):
        """Selects action in inital state

        Args:
            state (int): initial state
            maint (int): initial maint
        Returns:
            action (int): initial action
        """

        action = 0

        self.previous_maint= maint
        self.previous_state = state
        self.previous_action = action

        return action


In [None]:

env = AssetReplacementEnv()

epsilon = 0.3
step_size = 0.2
discount = 0.9


agent_info = {"n_actions": env.n_actions,
              "epsilon": epsilon,
              "step_size": step_size,
              "discount": discount}


agent = QLearningAgent(agent_info)

n_episodes = 10000
timesteps = 20

for i in range(n_episodes):
    state, maint = env.reset()
    action = agent.start(state, maint)

    for t in range(timesteps):
        agent.epsilon = 1/timesteps+ 0.001
        mu = (state - maint)/5
        state, maint, reward, = env.step(action)
        if action == 2:
            pass
        elif action ==1:
            if env.rng.random() < mu:
                reward = 0 - env.service
            else:
                pass
        else:
            if env.rng.random() < mu:
                reward = 0
            else:
                pass     
        action = agent.step(reward, state, maint)

V = defaultdict(float)
policy = defaultdict(int)
for state_maint, values in sorted(agent.q.items()):
    value = np.max(values)
    V[state_maint] = value
    action = np.argmax(values)
    policy[state_maint] = action



#convert dictionary to array so that we can compare the norm of different value function
V_QL = np.zeros((env.n_states, env.n_maint))
for k, v in V.items():
    V_QL[k[0],k[1]] = v
norm = np.linalg.norm(V_QL-V_DP)
print('Norm of difference of value function at episodes'+ str(n_episodes)+' simulation:' + str(norm))