In [1]:
import numpy as np
from collections import deque, namedtuple
import torch 
from torch import nn
import torch.distributions as dist
import random
import gymnasium as gym
import torch.optim as optim
from tqdm import trange
from scipy.stats import multivariate_normal



In [2]:
class Agent(object):
    ''' Base agent class

        Args:
            n_actions (int): actions dimensionality

        Attributes:
            n_actions (int): where we store the dimensionality of an action
    '''
    def __init__(self, n_actions: int):
        self.n_actions = n_actions

    def forward(self, state: np.ndarray):
        ''' Performs a forward computation '''
        pass

    def backward(self):
        ''' Performs a backward pass on the network '''
        pass


class RandomAgent(Agent):
    ''' Agent taking actions uniformly at random, child of the class Agent'''
    def __init__(self, n_actions: int):
        super(RandomAgent, self).__init__(n_actions)

    def forward(self, state: np.ndarray) -> np.ndarray:
        ''' Compute a random action in [-1, 1]

            Returns:
                action (np.ndarray): array of float values containing the
                    action. The dimensionality is equal to self.n_actions from
                    the parent class Agent
        '''
        return np.clip(-1 + 2 * np.random.rand(self.n_actions), -1, 1)


In [3]:
class CriticNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(8, 400),
            nn.ReLU(),
            nn.Linear(400, 200),
            nn.ReLU(),
            nn.Linear(200, 1),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

class ActorNetwork(nn.Module):
    def __init__(self, n_actions: int):
        super().__init__()
        self.flatten = nn.Flatten()
        self.relu = nn.ReLU()         
        self.tanh = nn.Tanh()
        self.sigmoid = nn.Sigmoid()
        self.Ls = nn.Linear(8,400)
        self.mu_head1 = nn.Linear(400, 200)
        self.mu_head2 = nn.Linear(200, n_actions)
        self.sigma_head1 = nn.Linear(400, 200)
        self.sigma_head2 = nn.Linear(200, n_actions)

    def forward(self, x):
        x = self.flatten(x)
        x = self.Ls(x)
        x = self.relu(x)
        # mu head
        mu = self.mu_head1(x)
        mu = self.relu(mu)
        mu = self.mu_head2(mu)
        mu = self.tanh(mu)
        # sigma head
        sigma = self.sigma_head1(x)
        sigma = self.relu(sigma)
        sigma = self.sigma_head2(sigma)
        sigma = self.sigmoid(sigma)

        return mu, sigma
 
class PPOAgent(Agent):

    def __init__(self, n_actions: int):
        super(PPOAgent, self).__init__(n_actions)
        self.critic_net = CriticNetwork()
        self.actor_net = ActorNetwork(n_actions)

    def forward(self, state: np.ndarray) -> np.ndarray:
        mu, sigma = self.actor_net(torch.tensor(np.expand_dims(state, axis=0)))
        a1 = mu[0][0].item() + sigma[0][0].item() * np.random.normal()
        a2 = mu[0][1].item() + sigma[0][1].item() * np.random.normal()
        return np.clip(np.array([a1, a2], dtype=np.float32), -1, 1)


In [4]:
# Define Experience tuple
# Experience represents a transition in the environment, including the current state, action taken,
# received reward, next state, and whether the episode is done.
Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done'])

class ExperienceReplayBuffer:
    """Replay buffer for storing experiences.
    
       The experience replay buffer stores past experiences so that the agent can learn from them later.
       By sampling randomly from these experiences, the agent avoids overfitting to the most recent 
       transitions and helps stabilize training.
       - The buffer size is limited, and older experiences are discarded to make room for new ones.
       - Experiences are stored as tuples of (state, action, reward, next_state, done).
       - A batch of experiences is sampled randomly during each training step for updating the Q-values."""

    def __init__(self, maximum_length):
        self.buffer = deque(maxlen=maximum_length)  # Using deque ensures efficient removal of oldest elements

    def append(self, experience):
        """Add a new experience to the buffer"""
        self.buffer.append(experience)

    def __len__(self):
        """Return the current size of the buffer"""
        return len(self.buffer)

    def sample_batch(self, n):
        """Randomly sample a batch of experiences"""
        if n > len(self.buffer):
            raise IndexError('Sample size exceeds buffer size!')
        indices = np.random.choice(len(self.buffer), size=n, replace=False)  # Random sampling
        batch = [self.buffer[i] for i in indices]  # Create a batch from sampled indices
        return zip(*batch)  # Unzip batch into state, action, reward, next_state, and done


In [5]:
N_EPISODES = 1600  # Number of training episodes
BUFFER_SIZE = 2000  # Size of the replay buffer
CRITIC_LEARNING_RATE = 1e-3
ACTOR_LEARNING_RATE = 1e-5 # Learning rate for the optimizer
GAMMA = 0.99
M=10
EPSILON=0.2

In [6]:
env = gym.make('LunarLanderContinuous-v3')
m = len(env.action_space.high)
agent = PPOAgent(m)
buffer = ExperienceReplayBuffer(maximum_length=BUFFER_SIZE)
actor_optimizer = optim.Adam(agent.actor_net.parameters(), lr=ACTOR_LEARNING_RATE)
critic_optimizer = optim.Adam(agent.critic_net.parameters(), lr=CRITIC_LEARNING_RATE)


In [7]:
def select_action(state, epsilon):
    """Epsilon-greedy action selection
    # We balance exploration and exploitation using epsilon-greedy.
    # Exploration: Choose a random action.
    # Exploitation: Choose the action with the highest Q-value (the optimal action)."""
    if random.random() < epsilon:
        return env.action_space.sample()  # Explore by selecting a random action
    else:
        state_tensor = torch.tensor([state], dtype=torch.float32)  # Convert state to tensor
        return agent.net(state_tensor).argmax().item()  # Exploit by selecting the action with max Q-value

# state = env.observation_space.sample()
# print("Type of sampled state:", type(state))
# print("Sampled state shape:", np.shape(state))

#s = torch.tensor(np.expand_dims(state, axis=0))

In [16]:
EPISODES = trange(N_EPISODES, desc='Episode: ', leave=True)

for i in EPISODES:

    buffer = ExperienceReplayBuffer(maximum_length=BUFFER_SIZE)

     # Reset enviroment data
    done, truncated = False, False
    state = env.reset()[0]
    total_episode_reward = 0.
    t = 0
    rewards = []

    while not (done or truncated):
        action = agent.forward(state)
        next_state, reward, terminal, truncated, _ = env.step(action)
        done = terminal or truncated 
        total_episode_reward += reward
        buffer.append(Experience(state, action, reward, next_state, done))
        state = next_state
        t+= 1
    
    G = []
    A = []
    pi_old_logpdf = []

    running = 0


    for exp in reversed(buffer.buffer):
        running = GAMMA * running + exp.reward
        G.append(running)
        A.append(running - agent.critic_net(torch.tensor(np.expand_dims(exp.state, axis=0))).item())

        mu, sigma = agent.actor_net(torch.tensor(np.expand_dims(exp.state, axis=0)))
        mu, sigma = mu[0].detach(), sigma[0].detach()

        #pi_old_logpdf.append(torch.distributions.MultivariateNormal(mu, torch.diag(sigma**2)).log_prob(torch.tensor(exp.action)))
        
        #pi_old_logpdf.append((multivariate_normal(mu, np.diag(sigma**2))).logpdf(exp.action))

        
    
    G.reverse()
    A.reverse()
    G = torch.tensor(G)
    A = torch.tensor(A)
    
    # s = torch.tensor([[exp.state] for exp in buffer.buffer]) 
    # a = [exp.action for exp in buffer.buffer]
    states = torch.as_tensor(
    np.stack([exp.state for exp in buffer.buffer]), 
    dtype=torch.float32, 
    )
    actions = torch.as_tensor(
        np.stack([exp.action for exp in buffer.buffer]), 
        dtype=torch.float32, 
    )


    with torch.no_grad():  # old policy should not require gradients
        mu_old, sigma_old = agent.actor_net(states)   # shapes [T, act_dim]

        dist_old = torch.distributions.MultivariateNormal(
            loc=mu_old,
            covariance_matrix=torch.diag_embed(sigma_old ** 2)  # [T, act_dim, act_dim]
        )

        pi_old_logpdf = dist_old.log_prob(actions)    # shape [T]

    actor_loss = 0
    critic_loss = 0
    for m in range(M):
        # critic
        values = agent.critic_net(states).squeeze(-1)
        critic_loss = torch.mean((values - G) ** 2)

        # actor
        mu, sigma = agent.actor_net(states)
        dist = torch.distributions.MultivariateNormal(
            mu, torch.diag_embed(sigma ** 2)
        )
        log_probs = dist.log_prob(actions)

        r = (log_probs - pi_old_logpdf).exp()
        clipped_r = torch.clamp(r, 1.0 - EPSILON, 1.0 + EPSILON)

        surr1 = r * A
        surr2 = clipped_r * A
        actor_loss = -torch.mean(torch.min(surr1, surr2))

        critic_optimizer.zero_grad()
        actor_optimizer.zero_grad()

        critic_loss.backward()
        actor_loss.backward()

        critic_optimizer.step()
        actor_optimizer.step()
        

    if i % 400 == 0:
        eval_r = []
        for i in range(50):
            state = env.reset()[0]
            done, truncated = False, False

            total_episode_reward = 0.

            while not (done or truncated):
                action = agent.forward(state)
                next_state, reward, done, truncated, _ = env.step(action)
                total_episode_reward += reward
                state = next_state

            eval_r.append(total_episode_reward)

        print("reward: ", np.mean(eval_r))
        



Episode:   0%|          | 1/1600 [00:03<1:25:12,  3.20s/it]

reward:  -157.43439725390957


Episode:  25%|██▌       | 401/1600 [01:51<34:35,  1.73s/it]

reward:  -85.49579295673426


Episode:  50%|█████     | 801/1600 [04:15<22:16,  1.67s/it]

reward:  123.75140524942174


Episode:  75%|███████▌  | 1201/1600 [05:36<04:52,  1.37it/s]

reward:  248.13719644076943


Episode: 100%|██████████| 1600/1600 [06:28<00:00,  4.12it/s]


In [None]:
# env.action_space.sample()
# state = env.observation_space.sample()
# print("Type of sampled state:", type(state))
# print("Sampled state shape:", np.shape(state))

# s = torch.tensor(np.expand_dims(state, axis=0))

# mu[0][0].item()
# mu, sigma = agent.actor_net(s)
# agent.forward(state)

In [None]:
a = Experience(0.1, 0.2, 0.3, 0.4, 0.5)
b = Experience(0.1, 0.2, 0.3, 0.4, 0.7)

for i, exp in zip(reversed(range(len([a, b]))), reversed([a, b])):
    print(exp)
    print(i)

Experience(state=0.1, action=0.2, reward=0.3, next_state=0.4, done=0.7)
1
Experience(state=0.1, action=0.2, reward=0.3, next_state=0.4, done=0.5)
0


In [17]:
# Save the trained actor (agent) network
torch.save(agent.actor_net, 'neural-network-3-actor.pth')

# Save the trained critic network
torch.save(agent.critic_net, 'neural-network-3-critic.pth')


In [None]:
# Pull out the weights from the actor and critic networks

# For torch nn.Module, use .state_dict() or .parameters()
actor_weights = agent.actor_net.state_dict()
critic_weights = agent.critic_net.state_dict()

print("Actor network weights:")
for name, param in actor_weights.items():
    print(name, param.shape)

print("\nCritic network weights:")
for name, param in critic_weights.items():
    print(name, param.shape)


In [None]:
l

[0.1, 0.2, 0.3]

In [None]:
env.action_space.sample()
state = env.observation_space.sample()
print("Type of sampled state:", type(state))
print("Sampled state shape:", np.shape(state))

s = torch.tensor(np.expand_dims(state, axis=0))

#ActorNetworkmu[0][0].item()
mu, sigma = agent.actor_net(s)
agent.forward(state)

Type of sampled state: <class 'numpy.ndarray'>
Sampled state shape: (8,)


In [None]:
logpdf.item()

-2.7282395362854004

array([-99.09631038,  99.60542318])

In [None]:
multivariate_normal(np.array([100, -100]), np.diag([1,1]))

<scipy.stats._multivariate.multivariate_normal_frozen at 0x15d129290>

In [None]:
multivariate_normal.logpdf([1, 1])

array([-1.41893853, -1.41893853])

In [None]:
class ActorNetworkVar(nn.Module):
    def __init__(self, n_actions: int):
        super().__init__()
        self.flatten = nn.Flatten()
        self.relu = nn.ReLU()         
        self.tanh = nn.Tanh()
        self.sigmoid = nn.Sigmoid()
        self.Ls = nn.Linear(8,400)
        self.mu_head1 = nn.Linear(400, 200)
        self.mu_head2 = nn.Linear(200, n_actions)
        self.sigma_head1 = nn.Linear(400, 200)
        self.sigma_head2 = nn.Linear(200, n_actions)

    def forward(self, x):
        
        if x.ndim == 1:
            x = x.unsqueeze(0)

        x = self.flatten(x)
        x = self.Ls(x)
        x = self.relu(x)
        # mu head
        mu = self.mu_head1(x)
        mu = self.relu(mu)
        mu = self.mu_head2(mu)
        mu = self.tanh(mu)
        # sigma head
        sigma = self.sigma_head1(x)
        sigma = self.relu(sigma)
        sigma = self.sigma_head2(sigma)
        sigma = self.sigmoid(sigma)

        return mu, sigma**2
 

In [19]:
actor_net_var = ActorNetworkVar()

TypeError: ActorNetworkVar.__init__() missing 1 required positional argument: 'n_actions'