In [10]:
import reward_machines
import gymnasium 
from reward_machines.envs.grids.grid_world import *
from reward_machines.reward_machines.rm_environment import RewardMachineWrapper

In [11]:
#Make RMEnv (debugging in environment_test.ipynb)
rm_files = ["./reward_machines/envs/grids/reward_machines/rm2.txt"]
test_env = ObstacleRMEnv(rm_files)

#Make RMEnv wrapper that generates counterfactual experience for each step
test_crm_env = RewardMachineWrapper(test_env, add_crm=True, gamma=1.0)

In [12]:
#Reset environment places agent back at starting location (0,0)
#and back at initial state in RM
test_crm_env.reset()

(array([4, 6, 2, 2, 0, 0, 1, 4, 1, 0]),
 {'distance_B': 10.0, 'distance_C': 4.0, 'distance_hazard': 5.0})

In [13]:
#Locations given by (col, row)
#(0,0) index in upper left corner rm_obs, rm_rew, done, info

#Agent actions that can be taken are:  
#0: right (+1 col)
#1: down  (+1 row)
#2: left  (-1 col)
#3: up    (-1 row)

#e.g.
#test_env.step(0) #takes a step right

#Taking a step out of bounds will not change the location
#e.g.
#test_env.step(3) #will not change location (going up sends out of bounds)

#Map is 8x8 with obstacles at: 
#(3,2), (3,3), (4,2), (4,1)

#TO DO: make so can pass in map file that is processed into grid env* 


In [14]:
# #Good chunk of code for debugging to see what states and transitions exist in RM: 
# rm = test_crm_env.reward_machines[0]
# rm_states = rm.U
# for state in rm_states:
#     print(rm.delta_u[state]) #prints all transition possibilities (trigger propositions and resulting state) 

In [15]:
#Test what experience is generated after initialization (Success!)
#Reset environment
test_crm_env.reset()
#Take a step (doesn't allow movement)
rm_obs, rm_rew, done, info = test_crm_env.step(3)

#Here would expect experience to be generated for both states in RM 
#with different rewards (based on their differing reward functions)
print(info["crm-experience"])

[(array([4, 6, 2, 2, 0, 0, 1, 4, 1, 0]), 3, -30.0, array([4, 6, 2, 2, 0, 0, 1, 4, 1, 0]), False), (array([4, 6, 2, 2, 0, 0, 1, 4, 0, 1]), 3, -1, array([4, 6, 2, 2, 0, 0, 1, 4, 0, 1]), False)]


In [16]:
#Test what experience is generated after reaching first target (Success!)

#resent env and travel to first target 
test_crm_env.reset() 
test_crm_env.step(0)
test_crm_env.step(0)
test_crm_env.step(0)
test_crm_env.step(0)
test_crm_env.step(0)
test_crm_env.step(1)
test_crm_env.step(1)
test_crm_env.step(1)
test_crm_env.step(1)
test_crm_env.step(1)
test_crm_env.step(1)
#last step to reach first target, expect to see experience calculated for first state 
#will both be negative 1 (because state 1 transition to state 2 has reward of -1, and state 2 has -1 as constant)
rm_obs, rm_rew, done, info = test_crm_env.step(2) 
print(info["crm-experience"])
print(len(info["crm-experience"]))

#one step after traveling to first target, don't expect experience to be calculated for first state
#(because no longer reachable given the sequence of actions performed thus far)
rm_obs, rm_rew, done, info = test_crm_env.step(2)
print(info["crm-experience"])
print(len(info["crm-experience"]))


[(array([4, 6, 2, 2, 5, 6, 1, 4, 1, 0]), 2, -1, array([4, 6, 2, 2, 4, 6, 1, 4, 0, 1]), False), (array([4, 6, 2, 2, 5, 6, 1, 4, 0, 1]), 2, -1, array([4, 6, 2, 2, 4, 6, 1, 4, 0, 1]), False)]
2
[(array([4, 6, 2, 2, 4, 6, 1, 4, 0, 1]), 2, -1, array([4, 6, 2, 2, 3, 6, 1, 4, 0, 1]), False)]
1


In [17]:
#Test traveling to hazard (BUG - was in rm2.txt)

#Reset environment and travel to hazard
test_crm_env.reset()
test_crm_env.step(0)
test_crm_env.step(1)
test_crm_env.step(1)
test_crm_env.step(1)

#rm_obs, rm_new, done, info = test_crm_env.step(1)
# for entry in info["crm-experience"]:
#     rm_obs,action,rm_rew,rm_next_obs,done = entry
#     print(rm_next_obs)

#print(info["crm_experience"])

# print("current rm state is", test_crm_env.env.reward_machines[0].current_u_id)
# print("available state transitions are", test_crm_env.env.reward)

rm_obs, rm_new, done, info = test_crm_env.step(1)

#Here would expect to learn over both states that transition to hazard with this action in this state 
print(info["crm-experience"])

[(array([4, 6, 2, 2, 1, 3, 1, 4, 1, 0]), 1, -100, array([4, 6, 2, 2, 1, 4, 1, 4, 0, 0]), True), (array([4, 6, 2, 2, 1, 3, 1, 4, 0, 1]), 1, -100, array([4, 6, 2, 2, 1, 4, 1, 4, 0, 0]), True)]


In [18]:
#Test traveling to target 2 before target 1 (Success!)

#Reset environment 
test_crm_env.reset()
#Travel to second target
test_crm_env.step(0)
test_crm_env.step(0)
test_crm_env.step(1)
rm_obs, rm_new, done, info = test_crm_env.step(1)

#While first state will terminate negatively, should learn that second state terminates positively 
print(info["crm-experience"])

[(array([4, 6, 2, 2, 2, 1, 1, 4, 1, 0]), 1, -100, array([4, 6, 2, 2, 2, 2, 1, 4, 0, 0]), True), (array([4, 6, 2, 2, 2, 1, 1, 4, 0, 1]), 1, 100, array([4, 6, 2, 2, 2, 2, 1, 4, 0, 0]), True)]
