In [None]:
import tensorflow as tf
physical_devices = tf.config.list_physical_devices()
print(physical_devices)
tf.config.experimental.set_memory_growth(physical_devices[1], True)

In [None]:
import numpy as np
import math
import random
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input
from collections import deque
import gym
import os
from tensorflow.keras.optimizers import Adam
from copy import copy

In [None]:
env = gym.make("BipedalWalker-v3")
input_size = env.observation_space.shape[0]
output_size = env.action_space.shape[0]
input_size, output_size

In [None]:
env = gym.wrappers.RecordVideo(env, "videos", step_trigger=lambda t: t%100==0)

In [None]:
tau = 0.005             #soft target update
gamma = 0.99             #discount factor


actor_lr = 1e-3      
critic_lr = 1e-3

memory_size = int(1e6)
minibatch_size = 128

num_episodes = 2_000   #max episodes for training
num_steps=800          #max number of steps in an episode

noise_std = 0.2       #stddev of noise added for exploration
start_after = 1000     #pure exploration before 1000 steps
update_after = 1000    #start learning after 1000 steps


In [None]:
import datetime
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
# rm -rf ./logs/train_PPO/
train_log_dir = "logs/train_DDPG/" + current_time
train_summary_writer = tf.summary.create_file_writer(train_log_dir)
%load_ext tensorboard
%tensorboard --logdir {train_log_dir}

In [None]:
class ReplayMemory:
    def __init__(self, input_size, output_size, size):
        self.states = np.zeros(shape = (size, input_size), dtype=np.float32)
        self.next_states = np.zeros(shape = (size, input_size), dtype=np.float32)
        self.actions = np.zeros(shape = (size, output_size), dtype=np.float32)
        self.rewards = np.zeros(shape = size, dtype=np.float32)
        self.dones = np.zeros(shape = size, dtype=np.float32)
        self.pointer=0
        self.size=0
        self.max_size=size
    
    def remember(self, state, action, reward, next_state, done):
        self.states[self.pointer] = state
        self.next_states[self.pointer] = next_state
        self.actions[self.pointer] = action
        self.rewards[self.pointer] =  reward
        self.dones[self.pointer] = done
        self.pointer = (self.pointer + 1)%self.max_size
        self.size = min(self.size+1, self.max_size)
        
    def sample(self, minibatch_size = 32):
        idxs = np.random.randint(0, self.size, size=minibatch_size)
        
        return self.states[idxs],\
               self.actions[idxs],\
               self.rewards[idxs],\
               self.next_states[idxs],\
               self.dones[idxs]
                

In [None]:
class Agent:
    def __init__(self):
        self.actor = self.build_actor()
        self.critic_1 = self.build_critic()
        self.critic_2 = self.build_critic()
        
        self.target_actor = copy(self.actor)
        self.target_critic_1 = copy(self.critic_1)
        self.target_critic_2 = copy(self.critic_2)
        
        self.actor_optimizer = Adam(lr=actor_lr)
        self.critic_optimizer =  Adam(lr=critic_lr)
        
        self.memory = ReplayMemory(input_size, output_size, memory_size)
        self.learn_count = 0
        self.noise_std = noise_std
        self.actor_update_itr = 2
 
    def build_actor(self):
        init = tf.random_uniform_initializer(minval=-3e-3, maxval=3e-3)
        inputs = Input(shape = input_size)
        x = Dense(400, activation = 'relu')(inputs)
        x = Dense(300, activation = 'relu')(x)
        x = Dense(output_size, activation = "tanh", kernel_initializer=init)(x)
        model = Model(inputs = inputs, outputs = x)
        return model
    
    def build_critic(self):
        init = tf.random_uniform_initializer(minval=-3e-3, maxval=3e-3)
        
        input1 = Input(shape = input_size)
        input2 = Input(shape = output_size)
        x = Dense(400, activation="relu")(tf.concat([input1, input2], axis =1))
        x = Dense(300, activation="relu")(x)
        x = Dense(1, activation = 'linear', kernel_initializer=init)(x)
        model = Model(inputs = [input1, input2], outputs = x)
        return model
    
    def remember(self,state,action,reward,next_state,done):
        self.memory.append((state,action,reward,next_state,done))
        
    @tf.function 
    def act(self, state, test = False):
        act_value = self.actor(state)
        if not test:
            act_value += tf.random.normal(shape = act_value.shape, mean=0.0, stddev = self.noise_std)
        return tf.clip_by_value(act_value, -1, 1)
    
    
    @tf.function
    def critic_learn(self, states, actions, rewards, next_states, dones):
        rewards= tf.expand_dims(rewards, axis=1)
        dones = tf.expand_dims(dones, axis=1)
      
        next_actions = self.target_actor(next_states)
        next_actions += tf.clip_by_value(tf.random.normal(shape = next_actions.shape, mean = 0.0, stddev = 0.2), -0.5, 0.5)
        next_actions = tf.clip_by_value(next_actions, -1, 1)
        
        next_q_1 = self.target_critic_1([next_states, next_actions])
       
        next_q_2 = self.target_critic_2([next_states, next_actions]) 
      
        next_values = tf.math.minimum(next_q_1, next_q_2)
      
        target_q_values = rewards + gamma * (1-dones) * next_values
      
        with tf.GradientTape(persistent=True) as tape:  
            
              
            pred_q_1 = self.critic_1([states, actions])
            pred_q_2 = self.critic_2([states, actions])
            
            critic_loss = tf.reduce_mean((target_q_values - pred_q_1)**2) + tf.reduce_mean((target_q_values - pred_q_2)**2)
            
        critic_1_grads = tape.gradient(critic_loss, self.critic_1.trainable_weights)
        critic_2_grads = tape.gradient(critic_loss, self.critic_2.trainable_weights)
        
        self.critic_optimizer.apply_gradients(zip(critic_1_grads, self.critic_1.trainable_weights))
        self.critic_optimizer.apply_gradients(zip(critic_2_grads, self.critic_2.trainable_weights))
        

    @tf.function
    def actor_learn(self, states, actions, rewards, next_states, dones):
        with tf.GradientTape() as tape:
            actions_pred = self.actor(states)
            actor_loss = self.critic_1([states, actions_pred])
            actor_loss = -tf.reduce_mean(actor_loss)
        actor_grads = tape.gradient(actor_loss, self.actor.trainable_weights)
        self.actor_optimizer.apply_gradients(zip(actor_grads, self.actor.trainable_weights))
    
    def learn(self):
        if agent.memory.size<minibatch_size:
            return 
        states, actions, rewards, next_states, dones = self.memory.sample(minibatch_size)
        self.critic_learn(states, actions, rewards, next_states, dones) 
        
        self.learn_count += 1
        
        if self.learn_count%self.actor_update_itr != 0:
            return
       
        self.learn_count = 0
        
        self.actor_learn(states, actions, rewards, next_states, dones)
        self.update_target_weights()
        
    def update_target_weights(self):
        target_actor_weights = self.target_actor.get_weights()
        weights=[]
        for i, weight in enumerate(self.actor.get_weights()):
            weights.append(weight * tau + target_actor_weights[i] * (1-tau))
        self.target_actor.set_weights(weights)
        
                           
        target_critic_1_weights = self.target_critic_1.get_weights()
        weights=[]
        for i, weight in enumerate(self.critic_1.get_weights()):
            weights.append(weight * tau + target_critic_1_weights[i] * (1-tau))
        self.target_critic_1.set_weights(weights)
        
        target_critic_2_weights = self.target_critic_2.get_weights()
        weights=[]
        for i, weight in enumerate(self.critic_2.get_weights()):
            weights.append(weight * tau + target_critic_2_weights[i] * (1-tau))
        self.target_critic_2.set_weights(weights)
                           


    def load(self, name):
        self.actor.load_weights(name + "actor.hdf5")
        self.critic_1.load_weights(name + "critic_1.hdf5")
        self.critic_2.load_weights(name + "critic_2.hdf5")
        
        self.target_actor.load_weights(name + "actor.hdf5")
        self.target_critic_1.load_weights(name + "critic_1.hdf5")
        self.target_critic_2.load_weights(name + "critic_2.hdf5")
    
    def save(self, name):
        self.actor.save_weights(name + "actor.hdf5")
        self.critic_1.save_weights(name + "critic_1.hdf5") 
        self.critic_2.save_weights(name + "critic_2.hdf5") 

In [None]:
agent = Agent()

In [None]:
#Training 
best_score = env.reward_range[0]
score_history = deque(maxlen=100)
global_step = 0
for ep in range(0, num_episodes):
    state = env.reset()
    done = False
    score = 0
    step = 0
    for step in range(num_steps):
        if global_step<start_after:
            action = env.action_space.sample()
        else:
            action =  agent.act(np.expand_dims(state, axis = 0))
            action = np.squeeze(action)
           
        next_state, reward, done, _ =  env.step(action)
        score += reward
        global_step += 1
    
        agent.memory.remember(state, action, reward, next_state, done)
        state = next_state
        if global_step>update_after:
            agent.learn()
            
        if done:
            break
    with train_summary_writer.as_default():
        tf.summary.scalar("Charts/score", score, global_step)
        tf.summary.scalar("Charts/episode_length", step, global_step)
        tf.summary.scalar("Charts/exploration", noise_std, global_step)
        
    score_history.append(score)
    avg_score = np.mean(score_history)
    print(f"Episode: {ep}, Len: {step}, Score: {score}, Avg Score: {avg_score}, Global_Step: {global_step}")  
    if avg_score > best_score:
        best_score = score
        agent.save("DDPG_") 
    if ep%25==0:
        print(f"Testing")
        for _ in range(2):
            state=env.reset()
            done = False
            test_score=0
            test_step=0
            while not done:
                action = agent.act(np.expand_dims(state, axis =0), test=True)
                next_state, reward, done, _ = env.step(np.squeeze(action))
                test_score+= reward
                test_step+=1
                state=next_state
            print(f"Episode ended with length {test_step} and a score of {test_score}")


In [None]:
#Load Pretrained Agent
agent.load("Latest_")

In [None]:
#Testing
done = False
state = env.reset()
score=0
step = 0
while not done:
    env.render()
    action = agent.act(np.expand_dims(state, axis=0), test=True)
    next_state, reward, done, _ = env.step(np.squeeze(action))
    score+= reward
    state = next_state
    print(action, reward)
    step += 1
print(f"Episode ended in {step} steps with score of {score}")
env.close()