In [2]:
import numpy
import json
import utils
from utils import device
import hashlib
import torch
import numpy as np
from copy import deepcopy
import pickle
import matplotlib.pyplot as plt
import cv2
from PIL import Image

frame_count = 0

def add_count_to_frame(frame):
    global frame_count

    # Increment the frame count
    frame_count += 1

    # Add count number to the top left corner
    count_text = str(frame_count)
    position = (10, 20)
    color = (255, 0, 0)
    thickness = 2
    font_scale = 0.8
    cv2.putText(frame, count_text, position, cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, thickness, cv2.LINE_AA)

    # Convert the frame to PIL Image
    image = Image.fromarray(frame)

    # Convert the image to a quantized version with 256 colors
    quantized_image = image.quantize(colors=256)

    # Convert the quantized image back to 'RGB' mode
    quantized_image_rgb = quantized_image.convert('RGB')

    # Convert the RGB image back to a NumPy array
    quantized_frame = np.array(quantized_image_rgb)

    # cv2.imshow("quantized_frame", quantized_frame)
    # cv2.waitKey(0)

    # No need to convert to BGR as it's already in RGB format
    return quantized_frame

def hash_state(state):
    state_string = pickle.dumps(state.grid)
    return hashlib.sha256(state_string).hexdigest()




def hash_grid(env):
    # Retrieve the grid
    grid = env.grid.encode()

    # Retrieve the agent's position and direction
    agent_pos = env.agent_pos
    agent_dir = env.agent_dir

    # Flatten the grid and convert to string
    grid_string = ''.join(str(cell) for row in grid for cell in row)

    # Add the agent's position and direction to the string
    state_string = f'{grid_string},{agent_pos},{agent_dir}'
    
    print(agent_pos, agent_dir)
    # Hash the string
    return hashlib.sha256(state_string.encode('utf-8')).hexdigest()

def depth_first_search(env, agent, depth):
    if depth == 0:
        return [hash_state(env)]
    
    next_states = get_next_states(env, agent)

    # for each of the next states, get their next states and append them to all_states
    all_states = []

    for new_env in next_states:
        all_states.extend(depth_first_search(new_env, agent, depth=depth-1))
    
    return all_states

def get_next_states(env, agent):
    """
    Returns all possible next states given current state
    """
    # find all possible actions
    action_space = env.action_space.n

    # initialize next_states list
    saved_env = pickle.dumps(env)
    next_envs = []
    # get next state for each action
    for action in range(action_space):
        # we load the original state
        old_env = pickle.loads(saved_env) 
        obs_new, _, _, _, _ = old_env.step(action) # The new env gets created and svaed
        next_envs.append(old_env)

        if gif:
            frame = old_env.get_frame()
            frame = add_count_to_frame(frame)
            frames.append(frame)

    
    return set(next_envs)


frames = []
# Replace command line arguments with hard-coded values.
env_name = "MiniGrid-DoorKey-5x5-v0"
model_name = "DoorKeya2c"
seed = 0
shift = 0
argmax = False
pause = 0.1
gif = True
episodes = 1
memory = False
text = False


# Set seed for all randomness sources
utils.seed(seed)

# Set device
print(f"Device: {device}\n")

# Load environment
env = utils.make_env(env_name, seed)
for _ in range(shift):
    env.reset()
print("Environment loaded\n")

# Load agent
model_dir = utils.get_model_dir(model_name)
agent = utils.Agent(env.observation_space, env.action_space, model_dir,
                    argmax=argmax, use_memory=memory, use_text=text)
print("Agent loaded\n")

# Run the agent
if gif:
    from array2gif import write_gif
    frames = []

for episode in range(episodes):
    obs, _ = env.reset()
    cycle = 0

    while True:
        action = agent.get_action(obs)
        obs, reward, terminated, truncated, _ = env.step(action)

        next_states = depth_first_search(env, agent, depth=4)
        if gif:
            
            print(f"Saving gif {str(cycle) + 'decisions.gif'} of {len(frames)} length \n", end="")
            pil_images = [Image.fromarray(frame) for frame in frames]
            pil_images[0].quantize(colors=256).save(str(cycle) + 'decisions.gif', save_all=True, append_images=[pil_img.quantize(colors=256) for pil_img in pil_images[1:]], optimize=False, duration=30, loop=0)
            # write_gif(numpy.array(frames), str(cycle) +"decisions.gif", fps=2/pause)
            print("Done.")
            frames =[]
            cycle+=1
            frame_count = 0
        

        done = terminated | truncated
        agent.analyze_feedback(reward, done)

        if done:
            break




# if gif:
#     print("Saving gif... ", end="")
#     write_gif(numpy.array(frames), gif+".gif", fps=1/pause)
#     print("Done.")


Device: cuda

Environment loaded

Agent loaded

Saving gif 0decisions.gif of 2800 length 
Done.
Saving gif 1decisions.gif of 2800 length 
Done.
Saving gif 2decisions.gif of 2800 length 
Done.
Saving gif 3decisions.gif of 2800 length 
Done.
Saving gif 4decisions.gif of 2800 length 
Done.
Saving gif 5decisions.gif of 2800 length 
Done.
Saving gif 6decisions.gif of 2800 length 
Done.
Saving gif 7decisions.gif of 2800 length 
Done.


In [6]:

def copy_environment(env):
    # Create a new environment with the same grid size and type
    new_env = utils.make_env(env_name, seed, render_mode="human")
    
    # Copy the grid state
    new_env.grid = env.grid.encode()
    
    # Retrieve the agent's position and direction
    agent_pos = env.agent_pos
    agent_dir = env.agent_dir
    
    # Set the agent's position and direction in the new environment
    new_env.agent_pos = agent_pos
    new_env.agent_dir = agent_dir
    
    return new_env

In [9]:
env = utils.make_env(env_name, seed, render_mode="human")
for _ in range(shift):
    env.reset()

# env2.grid = env.grid

# image_data = env.get_frame()
# plt.imshow(image_data)
# plt.show()
        
model_dir = utils.get_model_dir(model)
agent = utils.Agent(env.observation_space, env.action_space, model_dir,
                    argmax=argmax, use_memory=memory, use_text=text)
print("Agent loaded\n")

# Run the agent

if gif:
    from array2gif import write_gif

    frames = []

# Create a window to view the environment
env.render()

for episode in range(episodes):
    obs, _ = env.reset()

    while True:
        env.render()
        if gif:
            frames.append(numpy.moveaxis(env.get_frame(), 2, 0))

        action = agent.get_action(obs)
        obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated | truncated
        agent.analyze_feedback(reward, done)

        if done:
            break

NameError: name 'model' is not defined

In [None]:
env_copy2 = env.grid


In [None]:
dir(env)

In [None]:
env_copy2.grid

In [None]:
env_copy.grid

In [None]:
action = agent.get_action(obs)
obs, reward, terminated, truncated, _ = env.step(action)

# next_states now contains a dictionary of possible states at t+1, t+2, etc.        
next_states =  depth_first_search(env, obs, agent, depth=3)
print(len(next_states))

done = terminated | truncated
agent.analyze_feedback(reward, done)



In [None]:

for episode in range(episodes):
    obs, _ = env.reset()

#     while True:



