In [None]:
import numpy as np
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv, VecVideoRecorder
import gymnasium as gym
import random

import torch
import torch.nn as nn
import torch.nn.functional as F

from Models.DecisionTransformer import DecisionTransformers

import pytorch_lightning as pl

In [None]:
env_id = 'CarRacing-v2'
render_mode = "human"

env = DummyVecEnv([lambda: gym.make(env_id, render_mode=render_mode)])

In [None]:
model = DecisionTransformers.load_from_checkpoint("checkpoints/DecisionTransformers-Overfitting.ckpt")

# disable randomness, dropout, etc...
model.eval()

In [None]:
from matplotlib import pyplot as plt

In [None]:
def padInput(rewards, observations, actions, seq_len=32):
    # Ensure inputs are tensors
    rewards = torch.tensor(rewards, dtype=torch.float32).view(-1, 1).unsqueeze(0)
    observations = torch.tensor(observations, dtype=torch.float32).permute(0, 3, 1, 2).unsqueeze(0)
    actions = torch.tensor(actions, dtype=torch.float32).view(-1, 3).unsqueeze(0)

    #print(rewards.shape)
    #print(observations)
    #print(actions.shape)
    
    # Pad rewards to shape (batch, seq_len, 1)
    if len(rewards) < seq_len:
        pad_size = seq_len - len(rewards)
        rewards = F.pad(rewards, (0,0,0, pad_size, 0, 0))
    
    # Pad observations to shape (batch, seq_len, 3, 96, 96)
    if len(observations) < seq_len:
        pad_size = seq_len - len(observations)
        observations = F.pad(observations, (0, 0, 0, 0, 0, 0, 0, pad_size, 0, 0))
    
    # Pad actions to shape (batch, seq_len, 3)
    if len(actions) < seq_len:
        pad_size = seq_len - len(actions)
        actions = F.pad(actions, (0, 0, 0, pad_size, 0, 0))

    #print(rewards.shape)
    #plt.imshow(observations[0][0].permute(1, 2, 0) / 255., interpolation='nearest')
    #plt.show()
    #print(actions.shape)
    
    return {
        'rewards': rewards,
        'observations': observations,
        'actions': actions
    }

In [None]:
def checkAction(action):
    #checking steering
    if action[0] > 1:
        action[0] = 1
    elif action[0] < -1:
        action[0] = 1
    #checking gas
    if action[1] > 1:
        action[1] = 1
    elif action[1] < 0:
        action[1] = 0
    #checking brake
    if action[2] > 1:
        action[2] = 1
    elif action[2] < 0:
        action[2] = 0
    return action

In [None]:
# Reset the environment to get the initial observation
observation = env.reset()
done = False  # Flag to indicate the episode is finished
rewards = [ 800 ]
# Sample a random action
actions = [ env.action_space.sample() ]
observations = observation
step = 0

while not done:
    # Render the environment to visualize the state (optional)
    env.render()
    
    # Your model provides an action based on the current observation
    # Replace this with your model's method of generating an action
    model_input = padInput(rewards, observations, actions)
    
    action = model(model_input)  

    #next_action = env.action_space.sample()
    next_action = checkAction(np.array(action[0][step].detach()))
    #print(next_action)
    
    # Execute the action in the environment
    observation, reward, done, info = env.step([next_action])
    env.render()

    actions.append(next_action)
    observations = np.concatenate((observations,observation))
    #print(rewards)
    rewards.append(rewards[-1] - reward[0])
    step+=1

    # Check if the first dimension is greater than 32
    if len(rewards) > 32:
        # Drop the first element
        rewards = rewards[1:]
        observations = observations[1:,:,:,:]
        actions = actions[1:]
        step = 31
    
    

# Close the environment
env.close()

In [None]:
print(observations.shape)

for observation in observations:
    plt.imshow(observation / 255., interpolation='nearest')
    plt.show()