In [11]:
import or_gym
import gym
from gym import spaces
import numpy as np
import time
import copy
from or_gym.utils.env_config import *
from scipy.stats import *
# from or_gym.algos.rl_utils import create_env

In [3]:
env_name = 'InvManagement-v0'
env_config = {'env': env_name, 
              'mask': True}
# env = create_env(env_name)(env_config)

def create_env(config, *args, **kwargs):
    return InvManagementMasterEnv()

In [18]:
class InvManagementMasterEnv(gym.Env):

    def __init__(self, *args, **kwargs):
        # set default (arbitrary) values when creating environment (if no args or kwargs are given)
        self.periods = 30
        self.I0 = [100, 100, 200]
        self.p = 2
        self.r = [1.5, 1.0, 0.75, 0.5]
        self.k = [0.10, 0.075, 0.05, 0.025]
        self.h = [0.15, 0.10, 0.05]
        self.c = [100, 90, 80]
        self.L = [3, 5, 10]
        self.backlog = True
        self.dist = 1
        self.dist_param = {'mu': 20}
        self.alpha = 0.97
        self.seed_int = 0
        self.user_D = np.zeros(self.periods)
        
        # add environment configuration dictionary and keyword arguments
        for key, value in kwargs.items():
            setattr(self, key, value)
        keys = ['periods','I0','p','r','k','h','c','L','backlog','dist','dist_param','alpha','seed_int','user_D']
        for i, value in enumerate(args):
            setattr(self, keys[i], value)
        assign_env_config(self, kwargs)
        
        # input parameters
        try:
            self.init_inv = np.array(list(self.I0))
        except:
            self.init_inv = np.array([self.I0])
        self.num_periods = self.periods
        self.unit_price = np.append(self.p,self.r[:-1]) # cost to stage 1 is price to stage 2
        self.unit_cost = np.array(self.r)
        self.demand_cost = np.array(self.k)
        self.holding_cost = np.append(self.h,0) # holding cost at last stage is 0
        try:
            self.supply_capacity = np.array(list(self.c))
        except:
            self.supply_capacity = np.array([self.c])
        try:
            self.lead_time = np.array(list(self.L))
        except:
            self.lead_time = np.array([self.L])
        self.discount = self.alpha
        self.user_D = np.array(list(self.user_D))
        self.num_stages = len(self.init_inv) + 1
        
        #  parameters
        #  dictionary with options for demand distributions
        distributions = {1:poisson,
                         2:binom,
                         3:randint,
                         4:geom,
                         5:self.user_D}

        # check inputs
        assert np.all(self.init_inv) >=0, "The initial inventory cannot be negative"
        try:
            assert self.num_periods > 0, "The number of periods must be positive. Num Periods = {}".format(self.num_periods)
        except TypeError:
            print('\n{}\n'.format(self.num_periods))
        assert np.all(self.unit_price >= 0), "The sales prices cannot be negative."
        assert np.all(self.unit_cost >= 0), "The procurement costs cannot be negative."
        assert np.all(self.demand_cost >= 0), "The unfulfilled demand costs cannot be negative."
        assert np.all(self.holding_cost >= 0), "The inventory holding costs cannot be negative."
        assert np.all(self.supply_capacity > 0), "The supply capacities must be positive."
        assert np.all(self.lead_time >= 0), "The lead times cannot be negative."
        assert (self.backlog == False) | (self.backlog == True), "The backlog parameter must be a boolean."
        assert self.num_stages >= 2, "The minimum number of stages is 2. Please try again"
        assert len(self.unit_cost) == self.num_stages, "The length of r is not equal to the number of stages."
        assert len(self.demand_cost) == self.num_stages, "The length of k is not equal to the number of stages."
        assert len(self.holding_cost) == self.num_stages, "The length of h is not equal to the number of stages - 1."
        assert len(self.supply_capacity) == self.num_stages-1, "The length of c is not equal to the number of stages - 1."
        assert len(self.lead_time) == self.num_stages-1, "The length of L is not equal to the number of stages - 1."
        assert self.dist in [1,2,3,4,5], "dist must be one of 1, 2, 3, 4, 5."
        if self.dist < 5:
            assert distributions[self.dist].cdf(0,**self.dist_param), "Wrong parameters given for distribution."
        else:
            assert len(self.user_D) == self.num_periods, "The length of the user specified distribution is not equal to the number of periods."
        assert (self.alpha>0) & (self.alpha<=1), "alpha must be in the range (0,1]."
        
        # select distribution
        self.demand_dist = distributions[self.dist]  
        
        # intialize
        self.reset()
        
        # action space (reorder quantities for each stage; list)
        # An action is defined for every stage (except last one)
        self.action_space = gym.spaces.Box(
            low=np.zeros(self.num_stages-1), high=self.supply_capacity)
        # observation space (Inventory position at each echelon, which is any integer value)
        self.observation_space = gym.spaces.Box(
            low=-np.ones(self.num_stages-1)*np.Inf, 
            high=self.supply_capacity*self.num_periods)
        
    def reset(self):
        periods = self.num_periods
        m = self.num_stages
        I0 = self.init_inv
        
        # simulation result lists
        self.I=np.zeros([periods + 1, m - 1]) # inventory at the beginning of each period (last stage not included since iventory is infinite)
        self.T=np.zeros([periods + 1, m - 1]) # pipeline inventory at the beginning of each period (no pipeline inventory for last stage)
        self.R=np.zeros([periods, m - 1]) # replenishment order (last stage places no replenishment orders)
        self.D=np.zeros(periods) # demand at retailer
        self.S=np.zeros([periods, m]) # units sold
        self.B=np.zeros([periods, m]) # backlog (includes top most production site in supply chain)
        self.LS=np.zeros([periods, m]) # lost sales
        self.P=np.zeros(periods) # profit
        
        # initializetion
        self.period = 0 # initialize time
        self.I[0,:]=np.array(I0) # initial inventory
        self.T[0,:]=np.zeros(m-1) # initial pipeline inventory
        
        # set random generation seed (unless using user demands)
        if self.dist < 5:
            self.seed(self.seed_int) 
        
        # set state
        self._update_state()
        
        return self.state
    
    def _update_state(self):
        n = self.period
        m = self.num_stages
        if n>=1:
            IP = np.cumsum(self.I[n,:] + self.T[n,:] - self.B[n-1,:-1])
        else:
            IP = np.cumsum(self.I[n,:] + self.T[n,:])
        self.state = IP

In [19]:
env = create_env(env_config)

In [3]:
env.num_periods

30

In [6]:
env.supply_capacity

array([100,  90,  80])

In [8]:
env.num_stages

4

In [9]:
env.observation_space.shape

(3,)