In [1]:
# import important libs and modules
import gym
import foragym
import numpy as np

from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy

In [2]:
# initiatlize gym env
env = gym.make("foragym:foragym/ForaGym-v1", render_mode="human")

In [3]:
# visualize transition matrix
num_states = env.nS
num_actions = env.nA

for state in range(num_states):
    for action in range(num_actions):
        for items in env.P[state][action]:
            try:
                days_left, life_point, forest_type = env.decode(state)
                p, new_state, reward, done = items
                print("="*10)
                print(f"enc_state: {state}, days_left: {days_left}, life_point: {life_point}, forest_type: {forest_type}")
                print(f"action: {env.action_dict[action]}, new_state: {new_state}, p: {round(p, 3):.3f}, reward: {reward}, is_dead: {done}")
                print("="*10)
            except:
                pass

enc_state: 504, days_left: 1, life_point: 1, forest_type: 0
action: wait, new_state: 0, p: 1.000, reward: -1, is_dead: True
enc_state: 504, days_left: 1, life_point: 1, forest_type: 0
action: forage, new_state: 0, p: 0.225, reward: -1, is_dead: True
enc_state: 504, days_left: 1, life_point: 1, forest_type: 0
action: forage, new_state: 0, p: 0.160, reward: -1, is_dead: True
enc_state: 504, days_left: 1, life_point: 1, forest_type: 0
action: forage, new_state: 144, p: 0.225, reward: 0, is_dead: True
enc_state: 504, days_left: 1, life_point: 1, forest_type: 0
action: forage, new_state: 144, p: 0.240, reward: 0, is_dead: True
enc_state: 504, days_left: 1, life_point: 1, forest_type: 0
action: forage, new_state: 0, p: 0.050, reward: -1, is_dead: True
enc_state: 504, days_left: 1, life_point: 1, forest_type: 0
action: forage, new_state: 0, p: 0.100, reward: -1, is_dead: True
enc_state: 505, days_left: 1, life_point: 1, forest_type: 1
action: wait, new_state: 1, p: 1.000, reward: -1, is_dead:

In [11]:
# run algorithm on sample episodes
num_episodes = 2

for episode in range(num_episodes):
    print(f"Episode #{episode+1}")
    print("=" * 10)

    done = False
    print("Initial state:-")
    obs = env.reset()
    print()

    while not done:
        action = 1 #env.action_space.sample()
        print(f"Action to take: {env.action_dict[action]}")
        print()
        obs, reward, done, info = env.step(action)
        print(f'--Consequence: {info["consequence"]}')
        print(f"--Reward?: {reward}")
        print(f"--Episode done?: {done}")
        print()

    print("-" * 10)

Episode #1
Initial state:-
--Days left: 7
--Current life: 5
--Forest Quality for the left environment: 0.50
--Forest Quality for the right environment: 0.70
--Threat Encounter probability for the left environment: 0.30
--Threat Encounter probability for the right environment: 0.10

Action to take: forage

--Days left: 6
--Current life: 3
--Forest Quality for the left environment: 0.50
--Forest Quality for the right environment: 0.70
--Threat Encounter probability for the left environment: 0.30
--Threat Encounter probability for the right environment: 0.10
--Consequence: Right environment / Forage failed / No threat encountered
--Reward?: 0
--Episode done?: False

Action to take: forage

--Days left: 5
--Current life: 4
--Forest Quality for the left environment: 0.50
--Forest Quality for the right environment: 0.70
--Threat Encounter probability for the left environment: 0.30
--Threat Encounter probability for the right environment: 0.10
--Consequence: Right environment / Forage success

In [6]:
# # evaluate algorithm
# num_episodes = 1000
# episode_reward = []

# for episode in range(num_episodes):
#     done = False
#     obs = env.reset()

#     total_reward = 0

#     while not done:
#         action = 0 #env.action_space.sample()
#         obs, reward, done, info = env.step(action)

#         total_reward += reward

#     episode_reward.append(total_reward)

# print(f"mean_reward:{np.mean(episode_reward):.2f} +/- {np.std(episode_reward):.2f}")


In [7]:
# find optimal policy
env.action_dict[2] = "Indifference"

num_states = env.num_days_left * env.num_life_points_left
num_actions = env.nA

V_list = []
policy_list = []

for forest_type in range(0, env.num_forest):
	V = np.zeros(num_states)

	for state in range(env.nS):
		days_left, life_points, _ = env.decode(state)
		if life_points == 0:
			V[days_left*env.num_life_points_left] = -1

	optimal_policy = np.zeros([num_states, num_actions + 1])

	for days_left in range(1, env.num_days_left):
		for life_points_left in range(1, env.num_life_points_left):
			curr_state = (days_left*env.num_life_points_left) + life_points_left
			(forest_quality_left, threat_encounter_left), (forest_quality_right, threat_encounter_right) = env.forest[forest_type]
			enc_state = env.encode(days_left, life_points_left, forest_type)
			actions_s = np.zeros(num_actions)

			for action in range(env.nA):
				for transition_prob, next_enc_state, reward, done in env.P[enc_state][action]:
					next_days_left, next_life_points_left, _ = env.decode(next_enc_state)
					next_state = (next_days_left * env.num_life_points_left) + next_life_points_left
					if done:
						actions_s[action] += transition_prob * (reward)
					else:
						actions_s[action] += transition_prob * (reward + V[next_state])

			V[curr_state] = actions_s.max()
			if actions_s[0] != actions_s[1]:
				optimal_policy[curr_state, np.argmax(actions_s)] = 1.0
			else:
				optimal_policy[curr_state, 2] = 1.0

	V_list.append(V)
	policy_list.append(optimal_policy)

for days_left in range(1, env.num_days_left):
	for life_points_left in range(1, env.num_life_points_left):
		for forest_type in range(0, env.num_forest):
			curr_state = (days_left*env.num_life_points_left) + life_points_left
			(forest_quality_left, threat_encounter_left), (forest_quality_right, threat_encounter_right) = env.forest[forest_type]
			V = V_list[forest_type]
			optimal_policy = policy_list[forest_type]
			print(f"days_left: {days_left}, life_points: {life_points_left}, FQ_left: {forest_quality_left:.3f}, FQ_right: {forest_quality_right:.3f}, threat_left: {threat_encounter_left:.3f}, threat_right: {threat_encounter_right:.3f}:- action: {env.action_dict[np.argmax(optimal_policy[curr_state])]}, value: {V[curr_state]:.3f}")

days_left: 1, life_points: 1, FQ_left: 0.500, FQ_right: 0.600, threat_left: 0.100, threat_right: 0.200:- action: forage, value: -0.535
days_left: 1, life_points: 1, FQ_left: 0.500, FQ_right: 0.600, threat_left: 0.100, threat_right: 0.300:- action: forage, value: -0.565
days_left: 1, life_points: 1, FQ_left: 0.500, FQ_right: 0.600, threat_left: 0.100, threat_right: 0.400:- action: forage, value: -0.595
days_left: 1, life_points: 1, FQ_left: 0.500, FQ_right: 0.600, threat_left: 0.200, threat_right: 0.100:- action: forage, value: -0.530
days_left: 1, life_points: 1, FQ_left: 0.500, FQ_right: 0.600, threat_left: 0.200, threat_right: 0.300:- action: forage, value: -0.590
days_left: 1, life_points: 1, FQ_left: 0.500, FQ_right: 0.600, threat_left: 0.200, threat_right: 0.400:- action: forage, value: -0.620
days_left: 1, life_points: 1, FQ_left: 0.500, FQ_right: 0.600, threat_left: 0.300, threat_right: 0.100:- action: forage, value: -0.555
days_left: 1, life_points: 1, FQ_left: 0.500, FQ_right:

In [9]:
V

array([-1.        ,  0.        ,  0.        ,  0.        ,  0.        ,
        0.        , -1.        , -0.51      ,  0.        ,  0.        ,
        0.        ,  0.        , -1.        , -0.51      , -0.51      ,
        0.        ,  0.        ,  0.        , -1.        , -0.7599    ,
       -0.51      , -0.4316    ,  0.        ,  0.        , -1.        ,
       -0.7599    , -0.721484  , -0.471584  , -0.347565  ,  0.        ,
       -1.        , -0.86352716, -0.74107616, -0.64189085, -0.38140244,
       -0.32797284, -1.        , -0.87312732, -0.82452652, -0.67505154,
       -0.58151338, -0.38140244, -1.        , -0.91401799, -0.84077526,
       -0.77464193, -0.624406  , -0.58151338])