In [8]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import copy

# Constants for the Pendulum environment
G = 10.0  # Gravity
L = 1.0   # Length of pendulum
M = 1.0   # Mass
DT = 0.05  # Time step
MAX_TORQUE = 2.0
MAX_SPEED = 8.0

class PendulumEnv:
    def __init__(self, render=False):
        self.state = None
        self.render_flag = render
        self.screen = None
        self.clock = None
        if self.render_flag:
            import pygame
            pygame.init()
            self.screen = pygame.display.set_mode((500, 500))
            pygame.display.set_caption("Pendulum Simulation")
            self.clock = pygame.time.Clock()
        self.reset()

    def reset(self):
        high = np.array([np.pi, 1])
        self.state = np.random.uniform(-high, high)
        self.state[1] *= MAX_SPEED  # theta_dot
        return self._get_obs(), {}

    def _get_obs(self):
        theta, theta_dot = self.state
        return np.array([np.cos(theta), np.sin(theta), theta_dot], dtype=np.float32)

    def step(self, action):
        action = np.clip(action, -MAX_TORQUE, MAX_TORQUE)
        if isinstance(action, np.ndarray) and action.ndim > 0:
            action = action[0]
        theta, theta_dot = self.state
        g = G
        l = L
        m = M
        dt = DT

        new_theta_dot = theta_dot + (3 * g / (2 * l) * np.sin(theta) + 3. / (m * l ** 2) * action) * dt
        new_theta_dot = np.clip(new_theta_dot, -MAX_SPEED, MAX_SPEED)
        new_theta = theta + new_theta_dot * dt

        self.state = np.array([new_theta, new_theta_dot])

        # Normalize theta to [-pi, pi]
        self.state[0] = ((self.state[0] + np.pi) % (2 * np.pi)) - np.pi

        # Reward calculation (same as Gym Pendulum-v1)
        cost = self._angle_normalize(self.state[0]) ** 2 + 0.1 * self.state[1] ** 2 + 0.001 * (action ** 2)
        reward = -cost

        # Done: In custom env, we set no termination by default, but will handle in loop
        terminated = False
        truncated = False

        return self._get_obs(), reward, terminated, truncated, {}

    def _angle_normalize(self, x):
        return ((x + np.pi) % (2 * np.pi)) - np.pi

    def render(self):
        if not self.render_flag:
            return
        import pygame
        self.screen.fill((255, 255, 255))
        theta = self.state[0]
        # Draw pendulum
        origin = (250, 250)
        end = (250 + 200 * np.sin(theta), 250 + 200 * np.cos(theta))
        pygame.draw.line(self.screen, (0, 0, 0), origin, end, 8)
        pygame.draw.circle(self.screen, (0, 0, 255), end, 20)
        pygame.display.flip()
        self.clock.tick(60)  # 60 FPS

    def close(self):
        if self.render_flag:
            import pygame
            pygame.quit()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action):
        super(Actor, self).__init__()
        self.l1 = nn.Linear(state_dim, 256)
        self.l2 = nn.Linear(256, 256)
        self.l3 = nn.Linear(256, action_dim)
        self.max_action = max_action

    def forward(self, state):
        a = F.relu(self.l1(state))
        a = F.relu(self.l2(a))
        return self.max_action * torch.tanh(self.l3(a))

class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        # Q1 architecture
        self.l1 = nn.Linear(state_dim + action_dim, 256)
        self.l2 = nn.Linear(256, 256)
        self.l3 = nn.Linear(256, 1)
        # Q2 architecture
        self.l4 = nn.Linear(state_dim + action_dim, 256)
        self.l5 = nn.Linear(256, 256)
        self.l6 = nn.Linear(256, 1)

    def forward(self, state, action):
        sa = torch.cat([state, action], 1)
        q1 = F.relu(self.l1(sa))
        q1 = F.relu(self.l2(q1))
        q1 = self.l3(q1)
        q2 = F.relu(self.l4(sa))
        q2 = F.relu(self.l5(q2))
        q2 = self.l6(q2)
        return q1, q2

    def Q1(self, state, action):
        sa = torch.cat([state, action], 1)
        q1 = F.relu(self.l1(sa))
        q1 = F.relu(self.l2(q1))
        q1 = self.l3(q1)
        return q1

class TD3(object):
    def __init__(self, state_dim, action_dim, max_action, discount=0.99, tau=0.005, policy_noise=0.2, noise_clip=0.5, policy_freq=2):
        self.actor = Actor(state_dim, action_dim, max_action).to(device)
        self.actor_target = copy.deepcopy(self.actor)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=3e-4)

        self.critic = Critic(state_dim, action_dim).to(device)
        self.critic_target = copy.deepcopy(self.critic)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=3e-4)

        self.max_action = max_action
        self.discount = discount
        self.tau = tau
        self.policy_noise = policy_noise
        self.noise_clip = noise_clip
        self.policy_freq = policy_freq

        self.total_it = 0

    def select_action(self, state):
        state = torch.FloatTensor(state.reshape(1, -1)).to(device)
        return self.actor(state).cpu().data.numpy().flatten()

    def train(self, replay_buffer, batch_size=256):
        self.total_it += 1

        # Sample replay buffer 
        state, action, next_state, reward, not_done = replay_buffer.sample(batch_size)

        with torch.no_grad():
            # Select action according to policy and add clipped noise
            noise = (
                torch.randn_like(action) * self.policy_noise
            ).clamp(-self.noise_clip, self.noise_clip)
            
            next_action = (
                self.actor_target(next_state) + noise
            ).clamp(-self.max_action, self.max_action)

            # Compute the target Q value
            target_Q1, target_Q2 = self.critic_target(next_state, next_action)
            target_Q = torch.min(target_Q1, target_Q2)
            target_Q = reward + not_done * self.discount * target_Q

        # Get current Q estimates
        current_Q1, current_Q2 = self.critic(state, action)

        # Compute critic loss
        critic_loss = F.mse_loss(current_Q1, target_Q) + F.mse_loss(current_Q2, target_Q)

        # Optimize the critic
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # Delayed policy updates
        if self.total_it % self.policy_freq == 0:

            # Compute actor loss
            actor_loss = -self.critic.Q1(state, self.actor(state)).mean()
            
            # Optimize the actor 
            self.actor_optimizer.zero_grad()
            actor_loss.backward()
            self.actor_optimizer.step()

            # Update the frozen target models
            for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
                target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

            for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
                target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

class ReplayBuffer(object):
    def __init__(self, state_dim, action_dim, max_size=int(1e6)):
        self.max_size = max_size
        self.ptr = 0
        self.size = 0

        self.state = np.zeros((max_size, state_dim))
        self.action = np.zeros((max_size, action_dim))
        self.next_state = np.zeros((max_size, state_dim))
        self.reward = np.zeros((max_size, 1))
        self.not_done = np.zeros((max_size, 1))

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


    def add(self, state, action, next_state, reward, done):
        self.state[self.ptr] = state
        self.action[self.ptr] = action
        self.next_state[self.ptr] = next_state
        self.reward[self.ptr] = reward
        self.not_done[self.ptr] = 1. - done

        self.ptr = (self.ptr + 1) % self.max_size
        self.size = min(self.size + 1, self.max_size)


    def sample(self, batch_size):
        ind = np.random.randint(0, self.size, size=batch_size)

        return (
            torch.FloatTensor(self.state[ind]).to(self.device),
            torch.FloatTensor(self.action[ind]).to(self.device),
            torch.FloatTensor(self.next_state[ind]).to(self.device),
            torch.FloatTensor(self.reward[ind]).to(self.device),
            torch.FloatTensor(self.not_done[ind]).to(self.device)
        )

# Runs policy for X episodes and returns average reward
def eval_policy(policy, env, eval_episodes=10, max_steps=500):
    avg_reward = 0.
    for _ in range(eval_episodes):
        state, _ = env.reset()
        episode_reward = 0.
        for _ in range(max_steps):
            action = policy.select_action(np.array(state))
            state, reward, terminated, truncated, _ = env.step(action)
            episode_reward += reward
            if terminated or truncated:
                break
        avg_reward += episode_reward
    avg_reward /= eval_episodes
    print(f"Evaluation over {eval_episodes} episodes: {avg_reward:.3f}")
    return avg_reward

# Main script
env = PendulumEnv(render=False)  # No rendering during training

state_dim = 3  # [cos(theta), sin(theta), theta_dot]
action_dim = 1 
max_action = MAX_TORQUE

policy = TD3(state_dim, action_dim, max_action)

replay_buffer = ReplayBuffer(state_dim, action_dim)

# Parameters
start_timesteps = 10000
max_episodes = 75  # Total episodes
max_steps = 500  # Timesteps per episode
eval_freq = 10  # Evaluate every 10 episodes
expl_noise = 0.15
batch_size = 256

evaluations = []

state, _ = env.reset()
episode_reward = 0
episode_timesteps = 0
episode_num = 0
total_timesteps = 0

while episode_num < max_episodes:
    episode_timesteps += 1
    total_timesteps += 1

    # Select action randomly or according to policy
    if total_timesteps < start_timesteps:
        action = np.random.uniform(-max_action, max_action, size=action_dim)
    else:
        action = (
            policy.select_action(np.array(state))
            + np.random.normal(0, max_action * expl_noise, size=action_dim)
        ).clip(-max_action, max_action)

    # Perform action
    next_state, reward, terminated, truncated, _ = env.step(action) 
    done = terminated or truncated or episode_timesteps >= max_steps
    episode_reward += reward

    # Store data in replay buffer
    replay_buffer.add(state, action, next_state, reward, done)

    state = next_state

    if done or episode_timesteps >= max_steps: 
        print(f"Episode Num: {episode_num + 1} Episode T: {episode_timesteps} Reward: {episode_reward:.3f}")
            # Train agent after collecting sufficient data
        for _ in range(200):
            policy.train(replay_buffer, batch_size)

        state, _ = env.reset()
        episode_reward = 0
        episode_timesteps = 0
        episode_num += 1 

        # Evaluate episode
        if episode_num % eval_freq == 0:
            print("---------------------------------------")
            evaluations.append(eval_policy(policy, env, max_steps=max_steps))
            print("---------------------------------------")

# Save the trained actor model
torch.save(policy.actor.state_dict(), "td3_pendulum_actor.pth")

env.close()

Episode Num: 1 Episode T: 500 Reward: -3025.774
Episode Num: 2 Episode T: 500 Reward: -2240.064
Episode Num: 2 Episode T: 500 Reward: -2240.064
Episode Num: 3 Episode T: 500 Reward: -2720.860
Episode Num: 3 Episode T: 500 Reward: -2720.860
Episode Num: 4 Episode T: 500 Reward: -2926.188
Episode Num: 4 Episode T: 500 Reward: -2926.188
Episode Num: 5 Episode T: 500 Reward: -2559.347
Episode Num: 5 Episode T: 500 Reward: -2559.347
Episode Num: 6 Episode T: 500 Reward: -2833.149
Episode Num: 6 Episode T: 500 Reward: -2833.149
Episode Num: 7 Episode T: 500 Reward: -2532.866
Episode Num: 7 Episode T: 500 Reward: -2532.866
Episode Num: 8 Episode T: 500 Reward: -2529.303
Episode Num: 8 Episode T: 500 Reward: -2529.303
Episode Num: 9 Episode T: 500 Reward: -2295.750
Episode Num: 9 Episode T: 500 Reward: -2295.750
Episode Num: 10 Episode T: 500 Reward: -2133.608
Episode Num: 10 Episode T: 500 Reward: -2133.608
---------------------------------------
---------------------------------------
Evalua

In [9]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import gymnasium as gym
import time

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action):
        super(Actor, self).__init__()
        self.l1 = nn.Linear(state_dim, 256)
        self.l2 = nn.Linear(256, 256)
        self.l3 = nn.Linear(256, action_dim)
        self.max_action = max_action

    def forward(self, state):
        a = F.relu(self.l1(state))
        a = F.relu(self.l2(a))
        return self.max_action * torch.tanh(self.l3(a))

class TD3:
    def __init__(self, state_dim, action_dim, max_action):
        self.actor = Actor(state_dim, action_dim, max_action).to(device)

    def select_action(self, state):
        state = torch.FloatTensor(state.reshape(1, -1)).to(device)
        return self.actor(state).cpu().data.numpy().flatten()

# Main script
env = gym.make("Pendulum-v1", render_mode="human")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])

policy = TD3(state_dim, action_dim, max_action)
policy.actor.load_state_dict(torch.load("td3_pendulum_actor.pth"))
policy.actor.eval()

state, _ = env.reset()
episode_reward = 0
for _ in range(500):
    action = policy.select_action(np.array(state))
    state, reward, terminated, truncated, _ = env.step(action)
    episode_reward += reward
    env.render()
    time.sleep(0.01)
    if terminated or truncated:
        break

print(f"Episode Reward: {episode_reward:.2f}")
env.close()

Episode Reward: -113.57
