In [1]:
import gymnasium as gym
import numpy as np

### 1. check this env

In [2]:
env= gym.make('Pendulum-v1', g=9.81,render_mode="human")#env
observation, info = env.reset(seed=42)
for _ in range(100):
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)
    if terminated or truncated:
        observation, info = env.reset()
env.close()


### 2.noise function(copy from [openai baselines](https://github.com/openai/baselines/blob/master/baselines/ddpg/noise.py))

In [3]:
class OrnsteinUhlenbeckActionNoise():
    def __init__(self, mu, sigma=0.15, theta=.15, dt=1e-2, x0=None):
        self.theta = theta
        self.mu = mu
        self.sigma = sigma
        self.dt = dt
        self.x0 = x0
        self.reset()

    def __call__(self):
        x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
        self.x_prev = x
        return x

    def reset(self):
        self.x_prev = self.x0 if self.x0 is not None else np.zeros_like(self.mu)

In [4]:
a= OrnsteinUhlenbeckActionNoise(mu=np.zeros(4))
a()

array([ 0.01968621,  0.02394039,  0.01288836, -0.01493496])

### 3.repaly buffer(st, at, rt, st+1)

In [5]:
class ReplayBuffer():
    def __init__(self, size, st_shape,simpe_size):
        self.size= size
        self.simpe_size=simpe_size
        self.count = 0
        self.states = np.zeros((size,*st_shape))
        self.ations = np.zeros((size,1))
        self.rewards = np.zeros((size,1))
        self.states_next = np.zeros((size,*st_shape))
        self.dones = np.zeros((size,1),dtype=np.bool_)
         
    def add(self,st,at,rt,st_1,done):
        self.states[self.count]=st
        self.ations[self.count]=at
        self.rewards[self.count]=rt
        self.states_next[self.count]=st_1
        self.dones[self.count]= done
        self.count=(self.count+1) % self.size
    def simple_buffer(self):
        index = np.random.choice(self.size,self.simpe_size)
        return self.states[index],self.ations[index],self.rewards[index],self.states_next[index],self.dones[index]
        

test,test,test

In [6]:
env= gym.make('Pendulum-v1', g=9.81,render_mode="human")#env
observation, info = env.reset(seed=42)

a_buffer=ReplayBuffer(100,env.observation_space.shape,2)
for _ in range(100):
    action = env.action_space.sample()
    st=observation
    observation, reward, terminated, truncated, info = env.step(action)
    st_1 =observation
    a_buffer.add(st,action,reward,st_1,terminated)
    if terminated or truncated:
        observation, info = env.reset()
env.close()

In [7]:
a_buffer.simple_buffer()

(array([[-0.99419463, -0.10759667,  4.58249998],
        [-0.44225168, -0.896891  ,  3.18491173]]),
 array([[1.75331879],
        [0.189982  ]]),
 array([[-11.30687016],
        [ -5.13085385]]),
 array([[-0.94069541, -0.33925238,  4.76633358],
        [-0.32445133, -0.94590241,  2.55352139]]),
 array([[False],
        [False]]))

### 4. Critic Network

In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
class Critic_net(nn.Module):
    def __init__(self, state_size,action_size):
        super().__init__()
        self.fc1_s = nn.Linear(*state_size,400)
        self.b1_s=nn.BatchNorm1d(400)
        f1=1/np.sqrt(self.fc1_s.weight.size()[0])
        self.fc1_s.weight.data.uniform_(-f1, f1)
        self.fc1_s.bias.data.uniform_(-f1, f1)

        self.fc2_s = nn.Linear(400,300)
        self.b2_s=nn.BatchNorm1d(300)
        f2=1/np.sqrt(self.fc2_s.weight.size()[0])
        self.fc2_s.weight.data.uniform_(-f2, f2)
        self.fc2_s.bias.data.uniform_(-f2, f2)


        self.fc2_a = nn.Linear(action_size,300)
        self.b2_a=nn.BatchNorm1d(300)
        f2_a=1/np.sqrt(self.fc2_a.weight.size()[0])
        self.fc2_a.weight.data.uniform_(-f2_a, f2_a)
        self.fc2_a.bias.data.uniform_(-f2_a, f2_a)

        self.out_layer = nn.Linear(300,1)
        f3 = 0.003
        self.out_layer.weight.data.uniform_(-f3, f3)
        self.out_layer.bias.data.uniform_(-f3, f3)

        #self.optimizer = optim.Adam(self.parameter(),lr=0.001)
        
    def forward(self,state,action):
        
        state = self.fc1_s(state)
        state = self.b1_s(state)
        state = F.relu(state)
        state = self.fc2_s(state)
        state = self.b2_s(state)
        

        action = self.fc2_a(action)
        action = self.b2_a(action)
        
        state_action = F.relu(torch.add(state,action))
        state_action =  F.relu(self.out_layer(state_action))
        
        return state_action

In [19]:
a=Critic_net((4,),1)
b=torch.randn((10,4))
c=torch.ones((10,1))
a(b,c)

tensor([[0.0285],
        [0.0133],
        [0.0018],
        [0.0263],
        [0.0141],
        [0.0091],
        [0.0142],
        [0.0478],
        [0.0000],
        [0.0000]], grad_fn=<ReluBackward0>)

### 5. Actor net

In [10]:
class Actor_net(nn.Module):
    def __init__(self, state_size):
        super().__init__()
        self.fc1_s = nn.Linear(*state_size,400)
        self.b1_s=nn.BatchNorm1d(400)
        f1=1/np.sqrt(self.fc1_s.weight.size()[0])
        self.fc1_s.weight.data.uniform_(-f1, f1)
        self.fc1_s.bias.data.uniform_(-f1, f1)

        self.fc2_s = nn.Linear(400,300)
        self.b2_s=nn.BatchNorm1d(300)
        f2=1/np.sqrt(self.fc2_s.weight.size()[0])
        self.fc2_s.weight.data.uniform_(-f2, f2)
        self.fc2_s.bias.data.uniform_(-f2, f2)

        self.out_layer = nn.Linear(300,1)
        f3 = 0.0003
        self.out_layer.weight.data.uniform_(-f3, f3)
        self.out_layer.bias.data.uniform_(-f3, f3)

        
    def forward(self,state):
        state = self.fc1_s(state)
        state = self.b1_s(state)
        state = F.relu(state)
        state = self.fc2_s(state)
        state = self.b2_s(state)
        state = F.relu(state)
        state = self.out_layer(state)
        state = nn.Tanh(state)
        return state


### 6.Algorithm

In [11]:
#Randomly initialize critic network Q(s, a|θQ) and actor µ(s|θµ) with weights θ Q and θµ
critic = Critic_net(env.observation_space.shape,1)
optimizer_c = torch.optim.Adam(critic.parameters(), lr=0.001)
actor =  Actor_net(env.observation_space.shape)
optimizer_a = torch.optim.Adam(actor.parameters(), lr=0.0001)
#Initialize target network
target_critic = Critic_net(env.observation_space.shape,1)
target_actor =  Actor_net(env.observation_space.shape)
#Initialize replay buffer R
buffer=ReplayBuffer(1000,env.observation_space.shape,32)#??size,simple_size
#for episode = 1, M do
epoch =10
for i in range(epoch):
    #Initialize a random process N for action exploration
    noise = OrnsteinUhlenbeckActionNoise(mu=np.zeros(*env.observation_space.shape))
    #Receive initial observation state s1
    env= gym.make('Pendulum-v1', g=9.81,render_mode="human")#env
    observation, info = env.reset(seed=42)
    for j in range(1000):
        st = observation
        actor.eval()
        acion = actor(state)
        action += torch.tensor(noise(),dtype=torch.float)
        actor.train()
        observation, reward, terminated, truncated, info = env.step(action)
        st_1 =observation
        buffer.add(st,action,reward,st_1,terminated)
        if i>=1:
            #Sample a random minibatch of N transitions (si, ai, ri, si+1) from R
            sts,actions,rewards,st_1s,dones=buffer.simple_buffer()
            sts = torch.tensor(sts,dytpe=torch.float)
            actions = torch.tensor(actions,dytpe=torch.float)
            rewards = torch.tensor(rewards,dytpe=torch.float)
            st_1s = torch.tensor(st_1s,dytpe=torch.float)
            dones = torch.tensor(dones)
            #get the loss of critic
            target_actions_next =  target_actor(st_1s)
            gamma=0.99
            target_q = rewards+ gamma*target_critic(st_1s,target_actions_next)*(1-dones)
            
            q_vlaues = critic(sts,actions)
            critic_loss = F.mse_loss(target_q,q_vlaues)
            optimizer_c.zero_grad()
            critic_loss.backward()
            optimizer_c.step()

            #get the loss of actor
            actor_loss = -critic(sts,actor(sts))
            actor_loss = torch.mean(actor_loss)
            optimizer_a.zero_grad()
            actor_loss.backward()
            optimizer_a.step()
            
            #update target
            tau = 0.001
            for p_,p in zip(target_critic.parameters(),critic.parameters()):
                new_val = tau*p+(1-tau)*p_
                p_.copy_(new_val)
            for p1_,p1 in zip(target_actor.parameters(),actor.parameters()):
                new_val = tau*p1+(1-tau)*p1_
                p1_.copy_(new_val)
            

        