In [3]:
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import gymnasium as gym
import random

from collections import deque
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

In [9]:
def true_reward(theta, theta_dt, torque):
    return -(theta**2 + 0.1 * theta_dt**2 + 0.001 * torque**2)

In [10]:
class PendulumRewardDataset(Dataset):
    def __init__(self, size=10000):
        self.size = size
        self.data = []

        for _ in range(size):
            theta = np.random.uniform(-np.pi, np.pi)
            theta_dt = np.random.uniform(-8.0, 8.0)
            torque = np.random.uniform(-2.0, 2.0)
            reward = true_reward(theta, theta_dt, torque)
            self.data.append((theta, theta_dt, torque, reward))
    
    def __len__(self):
        return self.size
    
    def __getitem__(self, idx):
        theta, theta_dt, torque, reward = self.data[idx]
        x = torch.tensor([theta, theta_dt, torque], dtype=torch.float32)
        y = torch.tensor([reward], dtype=torch.float32)
        return x, y

In [11]:
dataset = PendulumRewardDataset(size=10000)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

In [12]:
class RewardModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(3, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )
    
    def forward(self, x):
        return self.net(x)

In [13]:
def train_reward_model(model, dataloader, epochs=10, lr=1e-3, device='cpu'):
    model.to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)

            pred = model(x)
            loss = criterion(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        avg_loss = running_loss / len(dataloader)
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")

In [14]:
model = RewardModel()
train_reward_model(model, dataloader, epochs=10, lr=1e-3)

Epoch 1, Loss: 10.9741
Epoch 2, Loss: 1.9405
Epoch 3, Loss: 0.3938
Epoch 4, Loss: 0.1880
Epoch 5, Loss: 0.1291
Epoch 6, Loss: 0.0797
Epoch 7, Loss: 0.0514
Epoch 8, Loss: 0.0363
Epoch 9, Loss: 0.0268
Epoch 10, Loss: 0.0212


In [17]:
def test_model_prediction(model, theta, theta_dt, torque):
    model.eval()
    
    # Input tensor
    x = torch.tensor([[theta, theta_dt, torque]], dtype=torch.float32)
    
    # Model prediction
    with torch.no_grad():
        predicted_reward = model(x).item()
    
    # Ground truth
    actual_reward = true_reward(theta, theta_dt, torque)
    
    print(f"Input: theta={theta:.3f}, theta_dt={theta_dt:.3f}, torque={torque:.3f}")
    print(f"Predicted Reward: {predicted_reward:.4f}")
    print(f"True Reward:      {actual_reward:.4f}")
    print(f"Error:            {abs(predicted_reward - actual_reward):.4f}")

In [18]:
test_model_prediction(model, theta=1.0, theta_dt=0.5, torque=-1.5)

Input: theta=1.000, theta_dt=0.500, torque=-1.500
Predicted Reward: -0.8990
True Reward:      -1.0272
Error:            0.1283


In [1]:
# Hyperparameters for DDPG

GAMMA = 0.99
TAU = 0.005
ACTOR_LR = 1e-3
CRITIC_LR = 1e-3
MAX_EPISODES = 200
MAX_STEPS = 200
BUFFER_SIZE = 100000
BATCH_SIZE = 64

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Reward model
def cost_fn(state, action, model=None):
    theta = np.arctan2(state[1], state[0])
    theta_dot = state[2]
    torque = action[0]

    # model.eval()
    # x = torch.tensor([[theta, theta_dot, torque]], dtype=torch.float32)
    # with torch.no_grad():
    #     reward = model(x).item()
    reward =  theta**2 + 0.1 * theta_dot**2 + 0.001 * (torque**2)
    return reward

# World model
def true_dynamics(state, action, dt=0.05):
    g = 10.0     # gravity
    m = 1.0      # mass
    l = 1.0      # length of pendulum
    max_speed = 8.0
    max_torque = 2.0

    th = np.arctan2(state[1], state[0])  # angle θ
    thdot = state[2]                     # angular velocity

    u = np.clip(action, -max_torque, max_torque)[0]  # limit torque

    # Apply the physics: θ̈ = dynamics equation
    newthdot = thdot + (-3 * g / (2 * l) * np.sin(th + np.pi) + 3.0 / (m * l ** 2) * u) * dt
    newthdot = np.clip(newthdot, -max_speed, max_speed)

    newth = th + newthdot * dt

    return np.array([np.cos(newth), np.sin(newth), newthdot])

In [8]:
env = gym.make("Pendulum-v1")
state, _ = env.reset()
horizon = 15
num_samples = 100
action_dim = env.action_space.shape[0]
action_low = env.action_space.low
action_high = env.action_space.high

for t in range(200):
    # Sample random action sequences
    action_sequences = np.random.uniform(
        low=action_low, high=action_high, size=(num_samples, horizon, action_dim)
    )

    costs = []
    for seq in action_sequences:
        sim_state = np.copy(state)
        total_cost = 0
        for a in seq:
            total_cost += cost_fn(sim_state, a)
            sim_state = true_dynamics(sim_state, a)
        costs.append(total_cost)

    best_action = action_sequences[np.argmin(costs)][0]
    state, _, terminated, truncated, _ = env.step(best_action)
    env.render()
    if terminated or truncated:
        break

env.close()

  gym.logger.warn(


KeyboardInterrupt: 