In [1]:
import numpy as np
import gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [2]:
# Specify the `render_mode` parameter to show the attempts of the agent in a pop up window.
env = gym.make("Pendulum-v1", render_mode="rgb_array")

num_states = env.observation_space.shape[0]
num_actions = env.action_space.shape[0]

upper_bound = env.action_space.high[0]
lower_bound = env.action_space.low[0]

In [3]:
# Transition as a namedtuple: a simple data structure to group together s_t, a_t, r, s_t+1
Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state'))

# Cyclic buffer: holds the recent transitions + sampling a random batch of transitions for training
class ReplayBuffer(object):

    def __init__(self, capacity=10_000):
        # deque is used to automatically discard the oldest data
        self.memory = deque([], maxlen=capacity)

    # Save a new transition in the replay buffer
    def push(self, *args):
        self.memory.append(Transition(*args))
        
    # Randomly sample a batch of transitions from the replay buffer to train the agent
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    # Current size of the replay buffer
    def __len__(self):
        return len(self.memory)

In [4]:
class OUActionNoise:
    def __init__(self, mean, std_deviation, theta=0.15, dt=1e-2, x_initial=None):
        self.theta = theta
        self.mean = mean
        self.std_dev = std_deviation
        self.dt = dt
        self.x_initial = x_initial
        self.reset()

    def __call__(self):
        # Formula taken from the Ornstein-Uhlenbeck process
        x = (
            self.x_prev
            + self.theta * (self.mean - self.x_prev) * self.dt
            + self.std_dev * torch.sqrt(torch.tensor(self.dt)) * torch.randn_like(self.mean)
        )
        # Store x into x_prev
        # Makes next noise dependent on current one
        self.x_prev = x
        return x

    def reset(self):
        if self.x_initial is not None:
            self.x_prev = self.x_initial
        else:
            self.x_prev = torch.zeros_like(self.mean)

In [5]:
class Actor(nn.Module):
    def __init__(self, num_states, upper_bound):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(num_states, 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, 1)
        
        # Initialize weights between -3e-3 and 3e-3 for all layers
        self.fc1.weight.data.uniform_(-0.003, 0.003)
        self.fc2.weight.data.uniform_(-0.003, 0.003)
        self.fc3.weight.data.uniform_(-0.003, 0.003)
        
        # Initialize biases to zero for all layers
        self.fc1.bias.data.zero_()
        self.fc2.bias.data.zero_()
        self.fc3.bias.data.zero_()
        
        self.upper_bound = upper_bound

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        x = torch.tanh(self.fc3(x))
        # Scale output to match the upper bound
        x = x * self.upper_bound
        return x

class Critic(nn.Module):
    def __init__(self, num_states, num_actions):
        super(Critic, self).__init__()
        # State pathway
        self.state_fc1 = nn.Linear(num_states, 16)
        self.state_fc2 = nn.Linear(16, 32)
        
        # Action pathway
        self.action_fc1 = nn.Linear(num_actions, 32)
        
        # Combined pathway
        self.fc1 = nn.Linear(32 + 32, 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, 1)

    def forward(self, state, action):
        # State pathway
        state_out = F.relu(self.state_fc1(state))
        state_out = F.relu(self.state_fc2(state_out))
        
        # Action pathway
        action_out = F.relu(self.action_fc1(action))
        
        # Concatenate state and action pathways
        concat = torch.cat([state_out, action_out], dim=1)
        
        # Combined pathway
        out = F.relu(self.fc1(concat))
        out = F.relu(self.fc2(out))
        out = self.fc3(out)
        
        return out

In [6]:
# Replay Buffer Memory
CAPACITY = 20
BATCH_SIZE = 8
buffer_memory = ReplayBuffer(capacity=CAPACITY)

# Assuming get_actor and get_critic return PyTorch models
actor_model = Actor(num_states=num_states, upper_bound=upper_bound)
critic_model = Critic(num_states=num_states, num_actions=num_actions)

target_actor = Actor(num_states=num_states, upper_bound=upper_bound)
target_critic = Critic(num_states=num_states, num_actions=num_actions)

# Making the weights equal initially
target_actor.load_state_dict(actor_model.state_dict())
target_critic.load_state_dict(critic_model.state_dict())

# Optimizers
critic_lr = 0.002
actor_lr = 0.001

critic_optimizer = optim.Adam(critic_model.parameters(), lr=critic_lr)
actor_optimizer = optim.Adam(actor_model.parameters(), lr=actor_lr)

# Discount Factor
GAMMA = 0.99

std_dev = 0.2
ou_noise = OUActionNoise(mean=torch.zeros(1), std_deviation=float(std_dev) * torch.ones(1))

TAU = 0.005

def train():
    
    if len(buffer_memory)<BATCH_SIZE:
        return
    
    # Ensure models and optimizers are set to training mode
    target_actor.train()
    target_critic.train()
    critic_model.train()
    actor_model.train()
    
    # Sample a batch of transitions from the buffer memory
    transitions = buffer_memory.sample(BATCH_SIZE)
    
    # Transpose the sampled batch to separate its components (state, action, reward, next_state) 
    batch = Transition(*zip(*transitions))
    
    # Concatenate state, action, reward, and next_state batches into separate tensors
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)
    next_state_batch = torch.cat(batch.next_state)
    
    # Update Critic Model
    critic_optimizer.zero_grad()  # Reset gradients accumulation
    with torch.no_grad():
        target_actions = target_actor(next_state_batch)
        y = reward_batch + GAMMA * target_critic(next_state_batch, target_actions)
    critic_value = critic_model(state_batch, action_batch.unsqueeze(1))
    critic_loss = torch.mean((y - critic_value) ** 2)
    critic_loss.backward()  # Compute gradients
    critic_optimizer.step()  # Apply gradients

    # Update Actor Model
    actor_optimizer.zero_grad()  # Reset gradients accumulation
    actions = actor_model(state_batch)
    critic_value = critic_model(state_batch, actions)
    actor_loss = -torch.mean(critic_value)
    actor_loss.backward()  # Compute gradients
    actor_optimizer.step()  # Apply gradients

In [7]:
def update_target(target, original, tau):
    target_state_dict = target.state_dict()
    original_state_dict = original.state_dict()

    for key in target_state_dict:
        target_state_dict[key] = tau * original_state_dict[key] + (1 - tau) * target_state_dict[key]

    target.load_state_dict(target_state_dict)

In [8]:
def policy(state, actor_model, noise_object, lower_bound, upper_bound):
    
    # Get action from the actor model
    sampled_actions = actor_model(state).squeeze()

    # Add noise to the action
    noise = noise_object()
    sampled_actions = sampled_actions + torch.tensor(noise, dtype=torch.float)

    # Ensure action is within bounds
    legal_action = torch.clamp(sampled_actions, lower_bound, upper_bound)

    return legal_action.detach().cpu().numpy()

In [None]:
num_time_steps = 200
num_episodes = 100

# To store reward history of each episode
ep_reward_list = []
# To store average reward history of last few episodes
avg_reward_list = []

# Pre-allocations
reward_per_time_step = np.zeros([num_episodes, num_time_steps])
average_reward_per_episode = np.zeros([num_episodes])
x_coordinate_per_time_step = np.zeros([num_episodes, num_time_steps])
y_coordinate_per_time_step = np.zeros([num_episodes, num_time_steps])
angular_velocity_per_time_step = np.zeros([num_episodes, num_time_steps])
action_per_time_step = np.zeros([num_episodes, num_time_steps])

# Takes about 4 min to train
for ep in range(num_episodes):
    state, _ = env.reset()
    episodic_reward = 0
    # Convert the state to a PyTorch tensor and add a batch dimension
    state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
    time_step = 0
    
    while True:
        # Store the state values
        x_coordinate_per_time_step[ep, time_step] = state[0][0].numpy()
        y_coordinate_per_time_step[ep, time_step] = state[0][1].numpy()
        angular_velocity_per_time_step[ep, time_step] = state[0][2].numpy()
        
        # Assuming policy is a function that takes a state and an OUActionNoise instance and returns an action
        action = policy(state_tensor, actor_model, ou_noise, lower_bound, upper_bound)

        # Perform the action in the environment
        observation, reward, done, truncated, _ = env.step(action)
        
        reward_tensor = torch.tensor([reward], dtype=torch.float32)
        action_tensor = torch.tensor(action, dtype=torch.float32)
        next_state_tensor = torch.tensor(observation, dtype=torch.float32).unsqueeze(0)

        # Record the transition in the replay buffer
        # Assuming buffer has a method 'push' that takes a transition tuple
        buffer_memory.push(state_tensor, action_tensor, reward_tensor, next_state_tensor)

        episodic_reward += reward
        
        reward_per_time_step[ep, time_step] = reward

        train()

        update_target(target_actor, actor_model, TAU)
        update_target(target_critic, critic_model, TAU)

        # End this episode when `done` or `truncated` is True
        if done or truncated:
            break

        state_tensor = next_state_tensor
        time_step += 1

    average_reward_per_episode[ep] = np.mean(reward_per_time_step[ep, :])
    
    ep_reward_list.append(episodic_reward)
    
    # Mean of last 40 episodes
    avg_reward = np.mean(ep_reward_list[-40:])
    print(f"Episode * {ep} * Avg Reward is ==> {avg_reward}")
    avg_reward_list.append(avg_reward)

# Plotting graph
# Episodes versus Avg. Rewards
plt.plot(avg_reward_list)
plt.xlabel("Episode")
plt.ylabel("Avg. Episodic Reward")
plt.show()

np.save('reward_per_time_step.npy', reward_per_time_step)
np.save('average_reward_per_episode.npy', average_reward_per_episode)
np.save('x_coordinate_per_time_step.npy', x_coordinate_per_time_step)
np.save('y_coordinate_per_time_step.npy', y_coordinate_per_time_step)
np.save('angular_velocity_per_time_step.npy', angular_velocity_per_time_step)
np.save('action_per_time_step.npy', action_per_time_step)
np.save('num_time_steps.npy', num_time_steps)
np.save('num_episodes.npy', num_episodes)

In [10]:
buffer = ReplayBuffer(capacity=20)

In [37]:
# Assuming env is your environment instance and buffer is an instance of ReplayBuffer
state, _ = env.reset()
episodic_reward = 0

# Convert the state to a PyTorch tensor and add a batch dimension
state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)

# Assuming policy is a function that takes a state and an OUActionNoise instance and returns an action
action = policy(state_tensor, actor_model, ou_noise, lower_bound, upper_bound)

# Perform the action in the environment
observation, reward, done, truncated, _ = env.step(action)

# Record the transition in the replay buffer
# Assuming buffer has a method 'push' that takes a transition tuple
buffer_memory.push(state_tensor, torch.tensor(action), torch.tensor([reward], dtype=torch.float32), torch.tensor(observation, dtype=torch.float32).unsqueeze(0))
len(buffer_memory)

  sampled_actions = sampled_actions + torch.tensor(noise, dtype=torch.float)


20

In [59]:
transitions = buffer_memory.sample(8)
batch = Transition(*zip(*transitions))
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
next_state_batch = torch.cat(batch.next_state)
q_values = actor_model(state_batch)
with torch.no_grad():
    target_actions = target_actor(next_state_batch)
    y = reward_batch + GAMMA * target_critic(next_state_batch, target_actions)

critic_value = critic_model(state_batch, action_batch.unsqueeze(1))
actions = actor_model(state_batch)


In [131]:
torch_prev_state = torch.tensor(prev_state, dtype=torch.float32).unsqueeze(0)
action = policy(torch_prev_state, actor_model, ou_noise, lower_bound, upper_bound)
observation, reward, done, truncated, _ = env.step(action)
        
reward_tensor = torch.tensor([reward], dtype=torch.float32)
action_tensor = torch.tensor(action, dtype=torch.float32)
next_state_tensor = torch.tensor(observation, dtype=torch.float32).unsqueeze(0)

# Record the transition in the replay buffer
# Assuming buffer has a method 'push' that takes a transition tuple
buffer.push(torch_prev_state, action_tensor, reward_tensor, next_state_tensor)

  sampled_actions = sampled_actions + torch.tensor(noise, dtype=torch.float)
