In [1]:
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np
import random

In [18]:
#inheriting Env class from gym package, and overriding its methods
class ShowerEnv(Env):
    
    def __init__(self):
        #actions are 0, 1, 2
        self.action_space=Discrete(3)
        #states are 5 discrete values 0, 1, 2,...
        self.observation_space=Discrete(5)
        #we have 60 seconds to adjust water's temprature
        self.shower_length=60
        #initial stat is random value from the states space
        self.state=(self.observation_space.sample() + random.randint(-1,1))%5
        #creating P dictionary to make it model based environment
        self.P={}
        #for each state we manage each possible action
        for state in range(self.observation_space.n):
            #this dict holds the results of applying an action to this state, holding all possible results
            actions_states={}
            #each possible action 
            for action in range(self.action_space.n):
                #each action can result in one of the 3 possible states with an equal probability
                new_state=[]
                next_state=(state +(action - 1))%5
                if next_state==2:
                    reward=1
                else:
                    reward=0
                new_state.append([0.33333333, next_state, reward, False])
                #actions_states[action]=new_state
                for s in range(0,2):
                    #choose a random state
                    next_state=(state+random.randint(-1,1))%5
                    #print(next_state)
                    #if the state is 2(warm water), then it's the goal state
                    if next_state==2:
                        reward=1
                    else:
                        reward=0
                    new_state.append([0.33333333, next_state, reward, False])
                actions_states[action]=new_state
            self.P[state]=actions_states
            
            
    #step function represents the interaction between the agent and the environment
    def step(self, action):
        #get the results of applying this action to the current state
        states_list=self.P[self.state][action]
        #choose random state from the states that can result in applying action to the current state
        num_state=random.randint(0,2)
        next_state=states_list[num_state][1]
        reward=states_list[num_state][2]
        #each step takes one second
        self.shower_length -=1
        #episode is over(game is over), when the time is over
        if self.shower_length==0:
            done=True
        else:
            done=False
        
        #set placeholder for info
        info={}
        
        return next_state, reward, done, info
    
    def render(self):
        pass
    
    #we call this function at the begining of each episode
    def reset(self):
        self.state= (self.observation_space.sample() + random.randint(0,1))%5
        self.shower_length=60
        return self.state

In [19]:
env = ShowerEnv()
env.observation_space.sample()

1

In [20]:
env.action_space.sample()

2

In [21]:
for state in range(env.observation_space.n):
    for action in range(env.action_space.n): 
        print ('env.P[',state,'][',action,']',env.P[state][action]) 


env.P[ 0 ][ 0 ] [[0.33333333, 4, 0, False], [0.33333333, 1, 0, False], [0.33333333, 4, 0, False]]
env.P[ 0 ][ 1 ] [[0.33333333, 0, 0, False], [0.33333333, 1, 0, False], [0.33333333, 1, 0, False]]
env.P[ 0 ][ 2 ] [[0.33333333, 1, 0, False], [0.33333333, 1, 0, False], [0.33333333, 4, 0, False]]
env.P[ 1 ][ 0 ] [[0.33333333, 0, 0, False], [0.33333333, 2, 1, False], [0.33333333, 1, 0, False]]
env.P[ 1 ][ 1 ] [[0.33333333, 1, 0, False], [0.33333333, 2, 1, False], [0.33333333, 2, 1, False]]
env.P[ 1 ][ 2 ] [[0.33333333, 2, 1, False], [0.33333333, 1, 0, False], [0.33333333, 0, 0, False]]
env.P[ 2 ][ 0 ] [[0.33333333, 1, 0, False], [0.33333333, 2, 1, False], [0.33333333, 3, 0, False]]
env.P[ 2 ][ 1 ] [[0.33333333, 2, 1, False], [0.33333333, 3, 0, False], [0.33333333, 3, 0, False]]
env.P[ 2 ][ 2 ] [[0.33333333, 3, 0, False], [0.33333333, 2, 1, False], [0.33333333, 3, 0, False]]
env.P[ 3 ][ 0 ] [[0.33333333, 2, 1, False], [0.33333333, 4, 0, False], [0.33333333, 4, 0, False]]
env.P[ 3 ][ 1 ] [[0.

In [11]:
type(env.P)

dict

In [12]:
def value_iteration (env, gamma = 1.0) : 
    value_table = np. zeros(env. observation_space.n) 
    
    no_of_iterations = 100000 
    threshold = 1e-20 
    for i in range (no_of_iterations) : 
        updated_value_table = np. copy(value_table) 
        #for each state in the space observation we're going to find the best possible value
        for state in range(env. observation_space. n) : 
            Q_value = [] 
            #to find the best value we have to discover all possible actions and their corresponding values
            for action in range(env. action_space. n) : 
                next_states_rewards = [] 
                #we'll use bellman optimality equation to find the optimal value table
                #we do that by calculating the expected value of the current state considering all possible next states coming 
                #after taking the action
                #for example, taking action a0 in state s0 can lead 0.2 to state s1, and 0.8 to state s2
                #for that we loop over all possible rewards that can be gained using action a0 to find its value in state s0
                #by that we're calculating state_action value (Q table)
                for next_sr in env.P[state][action] : 
                    trans_prob, next_state, reward_prob,_ = next_sr 
                    next_states_rewards.append((trans_prob * (reward_prob + gamma * updated_value_table [next_state]))) 
                Q_value.append (np.sum (next_states_rewards) ) 
                # Pick up the maximum Q value and update it as value of a state 
                value_table[state] = max(Q_value) 
        #fabs() function is used to compute the absolute values element-wise
        #if the difference between the old and updated value table is smaller or equal to the threshold,we reached convergence
        if np.sum(np.fabs(updated_value_table - value_table)) <= threshold : 
            print('Value-iteration converged at iteration# %d. ' %(i+1)) 
            break 
    return value_table 

In [13]:
#extracting a policy from the optimal value table
#we may have more than one optimal policy
def extract_policy (value_table, gamma = 1.0) : 
    #initialize the policy as empty(zeroes)
    policy = np.zeros(env. observation_space. n) 
    #for each state we're gonna compute the Q table that corresponds to it and all possible actions using the optimal value table
    #that we calculated in a previous function
    for state in range(env. observation_space. n):
        Q_table = np. zeros(env. action_space. n) 
        #for each action calculate the q value using Bellman update equation
        for action in range(env. action_space. n): 
            for next_sr in env.P[state][action]: 
                trans_prob, next_state, reward_prob,_ = next_sr 
                Q_table[action] += (trans_prob * (reward_prob + gamma * value_table[next_state])) 
        #now choose the action that has the maximum value using np. argmax which returns the indices of the maximum values
        policy[state] = np. argmax(Q_table) 
    return policy

In [14]:
#we use gamma=1 because the observation space is small and the future rewards are quite important as the immediate ones
x=value_iteration(env, gamma = 1.0) 
#always for episodic problems, the value of a terminal state is always zero,For the sake of consistent maths notation,
#you can consider a terminal state to be "absorbing", i.e. any transition out of it results in zero reward and returning 
#to the same terminal state.
print(x)

[99944.26616251 99942.76616262 99950.01616225 99948.76616229
 99947.26616233]


In [15]:
optimal_policy = extract_policy(x, gamma=1.0) 

In [16]:
optimal_policy

array([1., 1., 0., 0., 0.])

In [17]:
#the outputs of this function demonstrates how many steps it took the agent before ending the game (the game can end when it:
#reaches the goal, or falls in a hole)
def get_score(env, optimal_policy, episodes=1000):
  rewards=0
  for episode in range(episodes):
    #observation is the initial state, s0
    observation = env.reset()
    
    while True:
      action = optimal_policy[observation]
      #When object interacts with environment with an action, then step() function returns observation which generally represents environment 
      #next state, reward a float of reward in previous action, done when it’s time to reset the environment or goal achieved 
      #and info a dict for debugging, it can be used for learning if it contains raw probabilities of environment’s last state.
      observation, reward, done, _ = env.step(action)
      
      rewards+=reward
      if done == 1:
        #print('You are in the goal after {} steps'.format(steps))
        #print('you managed to keep water temprature perfect for ', rewards,' second')
        break
  print('you kept water warm for ',rewards/60)

In [18]:
episodes=1000
get_score(env, optimal_policy, episodes)

you kept water warm for  322.46666666666664
