In [3]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
from time import sleep
import torch.nn.functional as F
from matplotlib import pyplot as plt
import copy
from easydict import EasyDict as edict
import keyboard

device = torch.device( "cuda" if torch.cuda.is_available() else "cpu")
print(device)

ModuleNotFoundError: No module named 'keyboard'

In [9]:
class Actor(nn.Module):
    def __init__(self, env):
        super().__init__()
        self.actor_Net= nn.Sequential(nn.Conv2d(3, 5, 5, stride=1, bias=True), #(92x92x5)
                                     nn.ReLU(),
                                     nn.MaxPool2d(2),  #(46x46x5)
                                     nn.Conv2d(5, 10, 5, stride=1, bias=True), #(in_ch, out_ch, kernel_size) #(42x42x10)
                                     nn.ReLU(),
                                     nn.MaxPool2d(3), #(kernel_size) #(14,14,10)
                                     nn.Flatten(start_dim=0), #by def only flatens dim staring from 1
                                     nn.Linear(1960, 500),
                                     nn.ReLU(),
                                     nn.Linear(500, 3)) #(input, output)

    def forward(self, obs):
        #Data prep
        obs= np.transpose(obs, (2,0,1))    # h*w*#ch -> #ch*h*w
        obs= torch.from_numpy(obs).to(device).float() #uint8 -> float, numpy -> torch, to(gpu) 

        #Data Normalize
        
        #Forward pass
        obs= self.actor_Net(obs)
        return obs

In [10]:
class Critic(nn.Module):
    def __init__(self, env):
        super().__init__()
        self.m_critic_Net= nn.Sequential(nn.Conv2d(3, 5, 5, stride=1, bias=True), 
                                         nn.ReLU(),
                                         nn.MaxPool2d(2),
                                         nn.Conv2d(5, 10, 5, stride=1, bias=True),
                                         nn.ReLU(),
                                         nn.MaxPool2d(3), 
                                         nn.Flatten(start_dim=0),
                                         nn.Linear(1960, 500),
                                         nn.ReLU(),
                                         nn.Linear(500, 1))

        self.t_critic_net = copy.deepcopy(self.m_critic_Net)

    def m_critic_forward(self, obs):  #Forward pass of main critic network
        #Data prep
        obs= np.transpose(obs, (2,0,1))    # h*w*#ch -> #ch*h*w
        obs= torch.from_numpy(obs).to(device).float() #uint8 -> float, numpy -> torch, to(gpu) 

        #Data Normalize

        #Forward pass
        obs= self.m_critic_Net(obs)        
        return obs 

    def t_critic_forward(self, obs):  #Forward pass of target critic network
        #Data prep
        obs= np.transpose(obs, (2,0,1))    # h*w*#ch -> #ch*h*w
        obs= torch.from_numpy(obs).to(device).float() #uint8 -> float, numpy -> torch, to(gpu) 

        #Data Normalize

        #Forward pass
        obs= self.t_critic_Net(obs)        
        return obs 

    def sync_net(self):
        self.t_critic_net.load_state_dict(self.m_critic_net.state_dict())

In [11]:
class ReplayBuffer():
    def __init__(self, buffer_size, obs_space, act_space, device, n_env):
        self.buffer_size = buffer_size
        self.n_env= n_env
        self.device= device
        
        self.obs = np.empty((0, obs_space))
        self.next_obs = np.empty((0, obs_space))
        self.act = np.empty(0)
        self.rew = np.empty(0)
        self.dones = np.empty(0, dtype= bool)

    def to_numpy(self, data):
        if isinstance(data, torch.Tensor):
            return data.cpu().numpy()
        return data 

    def add(self, obs, next_obs, act, rew, dones):
        #Convert to Numpy and send to CPU
        obs = self.to_numpy(obs)
        next_obs = self.to_numpy(next_obs)
        act = self.to_numpy(act)
        rew = self.to_numpy(rew)
        done = self.to_numpy(dones)
        
        if len(self.obs)>= self.buffer_size:
            self.obs= self.obs[self.n_env:]
            self.next_obs= self.next_obs[self.n_env:]
            self.act= self.act[self.n_env:]
            self.rew= self.rew[self.n_env:]
            self.dones= self.dones[self.n_env:]
        
        self.obs= np.append(self.obs, obs, axis=0)
        self.next_obs= np.append(self.next_obs, next_obs, axis=0)
        self.act= np.append(self.act, act, axis=0)
        self.rew= np.append(self.rew, rew, axis=0)
        self.dones= np.append(self.dones, dones, axis=0)
        
    def show(self):        
        print("Observations: ", self.obs)
        print("Next observations: ", self.next_obs)
        print("Actions: ", self.act)
        print("Rewards: ", self.rew)
        print("Dones: ", self.dones)

    def sample(self, batch_size):
        index= np.random.choice(len(self.obs), batch_size, replace = False)
        #print("Index: ", index)
        data= edict
        data.obs = torch.from_numpy(self.obs[index]).to(self.device)
        data.next_obs= torch.from_numpy(self.next_obs[index]).to(self.device)
        data.act= torch.from_numpy(self.act[index]).to(torch.int64).to(self.device)
        data.rew= torch.from_numpy(self.rew[index]).to(torch.int64).to(self.device)
        data.dones= torch.from_numpy(self.dones[index]).to(self.device)
        return data
        

In [16]:
env= gym.make_vec("CarRacing-v3", render_mode="rgb_array",num_envs=4,  lap_complete_percent= 0.95, domain_randomize=False, vectorization_mode="async")
actor= Actor(env).to(device)
rb= ReplayBuffer(1000, )

obs, _= env.reset(options={"randomize": False})

# for i in range(4):
#     plt.subplot(1,4, i+1)
#     plt.imshow(obs[i,:,:,:], interpolation="nearest")
#     plt.show()
print(obs.shape)
ret=0
done=False

for i in range(4):
   
    act= env.action_space.sample()
    next_obs, reward, trunc, info, done = env.step(act)
    obs=next_obs
    #env.render()
env.close()

TypeError: ReplayBuffer.__init__() missing 5 required positional arguments: 'buffer_size', 'obs_space', 'act_space', 'device', and 'n_env'