In [10]:
import gymnasium as gym
import numpy as np
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
import time
from distutils.util import strtobool
import os

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from stable_baselines3.common.buffers import ReplayBuffer
from torch.utils.tensorboard import SummaryWriter

In [11]:
ENV_NAME = 'InvertedPendulum-v4'
#env = gym.make(ENV_NAME,render_mode = "human")
def make_env(env_id, seed, idx, capture_video, run_name):
    def thunk():
        if capture_video and idx == 0:
            env = gym.make(env_id, render_mode="rgb_array")
            env = gym.wrappers.RecordVideo(env, f"videos/{run_name}")
        else:
            env = gym.make(env_id, render_mode = "human")
        env = gym.wrappers.RecordEpisodeStatistics(env)
        env.action_space.seed(seed)
        return env

    return thunk

In [12]:
# ALGO LOGIC: initialize agent here:
class QNetwork(nn.Module):
    def __init__(self, env):
        super().__init__()
        self.fc1 = nn.Linear(np.array(env.single_observation_space.shape).prod() + np.prod(env.single_action_space.shape), 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, 1)

    def forward(self, x, a):
        x = torch.cat([x, a], 1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


class Actor(nn.Module):
    def __init__(self, env):
        super().__init__()
        self.fc1 = nn.Linear(np.array(env.single_observation_space.shape).prod(), 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc_mu = nn.Linear(256, np.prod(env.single_action_space.shape))
        # action rescaling
        self.register_buffer(
            "action_scale", torch.tensor((env.action_space.high - env.action_space.low) / 2.0, dtype=torch.float32)
        )
        self.register_buffer(
            "action_bias", torch.tensor((env.action_space.high + env.action_space.low) / 2.0, dtype=torch.float32)
        )

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = torch.tanh(self.fc_mu(x))
        return x * self.action_scale + self.action_bias


In [13]:
if __name__ == "__main__":

    given_seed = 42
    buffer_size = int(1e6)
    batch_size = 256
    total_timesteps = 2000
    learning_starts = 25e3
    exploration_noise = 0.1
    policy_frequency = 2
    tau = 0.005
    gamma = 0.99
    learning_rate = 3e-4

    exp_name = 'carpole_test'
    run_name = 'test'
    random.seed(given_seed)
    np.random.seed(given_seed)
    torch.manual_seed(given_seed)
    torch.backends.cudnn.deterministic = True
    writer = SummaryWriter(f"runs/{run_name}")
    
    # if GPU is to be used
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    print(f"Using {device}");

    envs = gym.vector.SyncVectorEnv([make_env(ENV_NAME, given_seed, 0, False, run_name)])
    assert isinstance(envs.single_action_space, gym.spaces.Box), "only continuous action space is supported"

    actor = Actor(envs).to(device)
    qf1 = QNetwork(envs).to(device)
    checkpoint = torch.load("/home/naveed/Documents/RL/naveed_codes/runs/test/carpole_test.cleanrl_model")
    actor.load_state_dict(checkpoint[0])
    qf1.load_state_dict(checkpoint[1])

    
 

Using cuda


In [14]:
    actor.eval()
    qf1.eval()

QNetwork(
  (fc1): Linear(in_features=5, out_features=256, bias=True)
  (fc2): Linear(in_features=256, out_features=256, bias=True)
  (fc3): Linear(in_features=256, out_features=1, bias=True)
)

In [15]:

    obs, _ = envs.reset(seed=given_seed)

    for global_step in range(total_timesteps):
        with torch.no_grad():
            actions = actor(torch.Tensor(obs).to(device))
            cost_to_go = -qf1(torch.Tensor(obs).to(device), actions)
            actions = actions.cpu().numpy().clip(envs.single_action_space.low, envs.single_action_space.high)
        next_obs, rewards, terminations, truncations, infos = envs.step(actions)
        
        if terminations:
            obs, _ = envs.reset(seed=given_seed)
        #envs.render()
        
        print("observation:", next_obs, " action:", actions, ' CTG=', cost_to_go)
        
        obs = next_obs

    envs.close()

observation: [[0.04957449 3.17068378 2.0852719  0.36467081]]  action: [[7.923197]]  CTG= tensor([[377.7521]], device='cuda:0')
observation: [[ 0.18197324  3.17234815  4.54865202 -0.1265781 ]]  action: [[9.757704]]  CTG= tensor([[221.7832]], device='cuda:0')
observation: [[ 0.29860909  3.01704166  1.30360746 -7.48394338]]  action: [[-9.789768]]  CTG= tensor([[12.5328]], device='cuda:0')
observation: [[  0.29574431   2.60861964  -1.34632877 -12.47255689]]  action: [[-9.010969]]  CTG= tensor([[-200.2743]], device='cuda:0')
observation: [[  0.23292221   2.12059411  -1.76216392 -11.77216632]]  action: [[-3.3520422]]  CTG= tensor([[-471.6268]], device='cuda:0')
observation: [[ 0.17084653  1.68834417 -1.37002498 -9.92177123]]  action: [[-0.37427765]]  CTG= tensor([[-582.3936]], device='cuda:0')
observation: [[ 0.15393051  1.32107615  0.50258285 -8.72301917]]  action: [[5.9974074]]  CTG= tensor([[-711.4035]], device='cuda:0')
observation: [[ 0.18735346  0.98892711  1.15494558 -7.97755875]]  ac