In [1]:
import sys
sys.path.append("../../")

import matplotlib.pyplot as plt

import numpy as np
from environments.Passive_T_Maze_Flag.env.env_passive_t_maze_flag import TMazeClassicPassive

In [2]:
# corridor_length = 88
# episode_timeout = corridor_length + 2 # * DEFAULT CONFIGURATION: T = L + 2
# desired_reward = 1 # * DEFAULT
# penalty = -0.1

episode_timeout = 15
corridor_length = 13
desired_reward = 1
penalty = -0.0714

In [3]:
env = TMazeClassicPassive(episode_length=episode_timeout, 
                          corridor_length=corridor_length, 
                          goal_reward=desired_reward,
                          penalty=penalty)

In [4]:
print(f'Action space: {env.action_space}\nObservation_space: {env.observation_space}')

Action space: Discrete(4)
Observation_space: MultiDiscrete([3 3 2 3], start=[-1 -1  0 -1])


In [5]:
import gymnasium
gymnasium.spaces.MultiDiscrete(nvec = [3, 3, 2, 3], start=[-1, -1, 0, -1]).sample()

array([ 1, -1,  1,  1])

In [6]:
gymnasium.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32).sample()

array([-0.14605996,  0.78547776,  0.4864355 ], dtype=float32)

In [7]:
env.action_space.contains(0)

True

In [8]:
env.observation_space.sample()


array([-1,  0,  0,  1])

In [9]:
from torch import nn
import torch
# an Embedding module containing 10 tensors of size 3
# embedding = nn.Embedding(10, 3)
# # a batch of 2 samples of 4 indices each
# input = torch.LongTensor([[1, 2, 4, 5], [4, 3, 2, 9]])
# embedding(input)



# example with padding_idx
embedding = nn.Embedding(10, 3, padding_idx=0)
input = torch.LongTensor([[0, 2, 0, 5]])
embedding(input)

# example of changing `pad` vector
padding_idx = 0
embedding = nn.Embedding(3, 3, padding_idx=padding_idx)
embedding.weight
with torch.no_grad():
    embedding.weight[padding_idx] = torch.ones(3)
embedding.weight

Parameter containing:
tensor([[ 1.0000,  1.0000,  1.0000],
        [-1.1867,  0.4191, -0.0621],
        [ 0.4200,  0.8275, -0.4135]], requires_grad=True)

In [10]:
sample_obs = env.observation_space.sample()
sample_obs

array([ 1,  1,  1, -1])

In [11]:
env.observation_space.nvec

array([3, 3, 2, 3])

In [12]:
env.observation_space.start

array([-1, -1,  0, -1])

In [13]:
emb = nn.Embedding(max(env.observation_space.nvec) + 1, 8)

In [14]:
emb(torch.IntTensor(sample_obs - env.observation_space.start))

tensor([[ 1.1828e+00, -1.6288e+00,  6.5457e-01, -7.5468e-01,  4.3747e-04,
          5.7303e-01,  1.8187e+00, -3.8963e-01],
        [ 1.1828e+00, -1.6288e+00,  6.5457e-01, -7.5468e-01,  4.3747e-04,
          5.7303e-01,  1.8187e+00, -3.8963e-01],
        [-3.7157e-01, -3.6561e-01,  4.4892e-01,  1.7652e-01,  7.8725e-01,
          3.6055e-01, -7.4463e-01, -6.9592e-02],
        [-3.6550e-01,  1.4621e+00,  6.3402e-01, -4.1693e-02,  1.4245e+00,
         -1.5372e-02, -9.3236e-01, -7.6748e-01]], grad_fn=<EmbeddingBackward0>)

In [15]:
vocab_sizes = 4
embed_per_obs_dim = 8
outer_embed_size = 64
obs_dim = 4

In [16]:
embedding = nn.Sequential(
    nn.Embedding(vocab_sizes, embed_per_obs_dim),
    nn.Flatten(start_dim=-2),
    nn.Linear(embed_per_obs_dim * obs_dim, outer_embed_size),
)

In [17]:
env.observation_space.start if 0 else None

In [18]:
embedding(torch.IntTensor(np.array([1, -1, 0, 1]) - env.observation_space.start))

tensor([-0.0994, -0.1877,  0.2686,  0.8510,  0.1187,  0.2928, -0.2609,  0.7188,
         0.2015, -0.3910, -0.2819, -0.2357, -1.0178, -0.4905,  0.5011, -0.4083,
        -0.2027, -0.8734, -0.1038, -0.1175, -0.7441,  0.4236,  0.4068, -1.0264,
        -0.5201,  0.1866, -0.2006,  0.2609, -0.1636, -0.2310, -0.0293, -0.0423,
        -0.7181, -0.5914,  0.0048,  0.1936,  0.5824, -0.1123,  0.2089,  0.1284,
         0.0791, -0.4059, -0.6135, -0.4111, -0.0912,  0.1447, -0.2116, -0.9837,
        -0.4801,  0.0952,  0.6121,  0.6372, -0.4187, -0.1972,  0.3429, -0.1803,
        -0.6344, -0.4401,  0.2661, -0.8525,  0.5618, -0.9396, -0.3906,  0.6107],
       grad_fn=<ViewBackward0>)

In [19]:
torch.IntTensor([[1, 4, 4], [1, 4, 4]]) - torch.IntTensor([1, 3, 4])

tensor([[0, 1, 0],
        [0, 1, 0]], dtype=torch.int32)

In [20]:
torch.IntTensor([1, 3, 4]).int()

tensor([1, 3, 4], dtype=torch.int32)

In [21]:
env.action_space

Discrete(4)

In [22]:
# new 

done = False
obs = env.reset()  # * {y, hint, flag, noise}


history = [obs]
while not done:
    action = 0 #env.action_space.sample()
    obs, reward, done, info = env.step(action)
    print(f'Action :{action}, obs: {obs}, reward: {reward}, terminated: {done}, info: {info}')
    history.append(obs)

env.close()

Action :0, obs: [ 0.  0. -1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0. -1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0. -1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0. -1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0. -1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0. -1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [ 0.  1. -1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [ 0.  1. -1.], reward: -0.0714, terminated: False, in

In [None]:
corridor_length = 88
episode_timeout = corridor_length + 2 # * DEFAULT CONFIGURATION: T = L + 2
desired_reward = 1 # * DEFAULT
penalty = -0.1

______


In [31]:
# OPTIMAL POLICY

# env = TMazeClassicPassive(episode_length=episode_timeout, 
#                           corridor_length=corridor_length, 
#                           goal_reward=desired_reward)

obs = env.reset(seed=41)  # * {y, hint, flag, noise}
x0 = env.x
cue = obs[1]
done = False
step = 0
R = 0
print(f'Init obs:{obs}')
# obss = []
while not done:
    # obss.append(env.render())
    a = 0 if step != corridor_length else (1 if cue == 1 else 3)
    obs, r, done, info = env.step(a)
    step += 1
    R += r
    x = env.x

    print(f'Action :{a}, obs: {obs}, reward: {r}, terminated: {done}, info: {info}')


# obss.append(env.render())
env.close()

# print(step,'\t', obs, '\t\t', R)

Init obs:[ 0. -1.  0. -1.]
Action :0, obs: [0. 0. 0. 1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0.  0. -1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0. 0.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0. 0.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0. 0.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0. 1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0. 0.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0.  0. -1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0.  0. -1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0. 1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0. 1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0.  0. -1.], reward: -0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0.  1. -1.], reward: -0.0, terminated: False, info: {}
Acti

In [66]:
# OPTIMAL POLICY

env = TMazeClassicPassive(episode_length=episode_timeout, 
                          corridor_length=corridor_length, 
                          goal_reward=desired_reward)

obs = env.reset(seed=41)  # * {y, hint, flag, noise}
x0 = env.x
cue = obs[1]
done = False
step = 0
R = 0
print(f'Init obs:{obs}')
# obss = []
while not done:
    # obss.append(env.render())
    a = 0 if step != corridor_length else (1 if cue == 1 else 3)
    obs, r, done, info = env.step(a)
    step += 1
    R += r
    x = env.x

    print(f'Action :{a}, obs: {obs}, reward: {r}, terminated: {done}, info: {info}')


# obss.append(env.render())
env.close()

# print(step,'\t', obs, '\t\t', R)

Init obs:[ 0. -1.  0. -1.]
Action :0, obs: [0. 0. 0. 1.], reward: 0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0.  0. -1.], reward: 0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0. 0.], reward: 0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0. 0.], reward: 0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0. 0.], reward: 0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0. 1.], reward: 0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0. 0.], reward: 0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0.  0. -1.], reward: 0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0.  0. -1.], reward: 0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0. 1.], reward: 0.0, terminated: False, info: {}
Action :0, obs: [0. 0. 0. 1.], reward: 0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0.  0. -1.], reward: 0.0, terminated: False, info: {}
Action :0, obs: [ 0.  0.  0. -1.], reward: 0.0, terminated: False, info: {}
Action :0, obs: [

In [8]:
# from moviepy.editor import ImageSequenceClip
# import numpy as np

# desired_resolution = (945, 540)
# original_aspect_ratio = 112 / 64
# width = int(desired_resolution[0] * original_aspect_ratio)
# height = desired_resolution[1]

# observations = [np.squeeze(o) for o in obss]

# # Create ImageSequenceClip
# clip = ImageSequenceClip(observations, fps=24)
# clip = clip.resize(width=width, height=height)

# # Display the modified clip
# clip.ipython_display(maxduration=120)