In [None]:
import gym
import numpy as np
import random
import math
from gym import spaces, Env
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# -----------------------
# 1. Define the custom environment
# -----------------------
class CustomEnv(Env):
    """
    A simple custom continuous environment.
    
    - State: [position, velocity]
      * position ∈ [-10, 10]
      * velocity ∈ [-10, 10]
    - Action: [acceleration]
      * acceleration ∈ [-1, 1]
    - Dynamics:
      position_{t+1} = position_t + velocity_t
      velocity_{t+1} = velocity_t + acceleration
    - Reward:
      * Negative squared error from the target position (5.0)
      * Minus a penalty for large acceleration (to encourage smooth control)
    - Episode ends when:
      * Time step reaches max_steps or position is within 0.1 of the target.
    """
    def __init__(self):
        super(CustomEnv, self).__init__()
        self.target = 5.0
        self.max_steps = 200
        
        # Define action space: one-dimensional continuous with low=-1, high=1.
        self.action_space = spaces.Box(low=np.array([-1.0]), high=np.array([1.0]), dtype=np.float32)
        
        # Define observation space: [position, velocity]
        self.observation_space = spaces.Box(low=np.array([-10.0, -10.0]), high=np.array([10.0, 10.0]), dtype=np.float32)
        
        self.state = None
        self.step_count = 0

    def reset(self):
        # Initialize state: set position and velocity to 0.
        self.state = np.array([0.0, 0.0], dtype=np.float32)
        self.step_count = 0
        return self.state

    def step(self, action):
        # Clip action (should be within [-1, 1])
        action = np.clip(action, self.action_space.low, self.action_space.high)
        
        position, velocity = self.state
        # Simple dynamics:
        position_new = position + velocity
        velocity_new = velocity + action[0]  # action is an array of one value
        
        self.state = np.array([position_new, velocity_new], dtype=np.float32)
        self.step_count += 1
        
        # Reward is negative squared error to the target plus a penalty on the control.
        error = self.target - position_new
        reward = - (error ** 2) - 0.1 * (action[0] ** 2)
        
        # Check termination: either reached close to the target or exceeded max steps.
        done = bool(abs(error) < 0.1 or self.step_count >= self.max_steps)
        return self.state, reward, done, {}

    def render(self, mode='human'):
        # For simplicity, we do not implement rendering here.
        pass

# -----------------------
# 2. Define the Actor-Critic Networks
# -----------------------
class Actor(nn.Module):
    """
    Actor network that outputs the parameters (mean) of a Gaussian distribution
    for the continuous action and learns a log_std (shared or fixed can be used).
    
    Input: state of shape [batch_size, state_dim]
    Output: mean for action of shape [batch_size, action_dim]
    """
    def __init__(self, state_dim, action_dim, hidden_dim=128):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.mean = nn.Linear(hidden_dim, action_dim)
        # Learn log_std as a parameter (we can initialize to a small constant)
        self.log_std = nn.Parameter(torch.zeros(action_dim))
    
    def forward(self, state):
        # state: [batch_size, state_dim]
        x = F.relu(self.fc1(state))      # x: [batch_size, hidden_dim]
        x = F.relu(self.fc2(x))          # x: [batch_size, hidden_dim]
        mean = self.mean(x)              # mean: [batch_size, action_dim]
        # Use softplus to ensure std is positive.
        std = torch.exp(self.log_std)    # std: [action_dim], same for every state
        return mean, std

class Critic(nn.Module):
    """
    Critic network that outputs a scalar value given a state.
    
    Input: state of shape [batch_size, state_dim]
    Output: value of shape [batch_size, 1]
    """
    def __init__(self, state_dim, hidden_dim=128):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.value = nn.Linear(hidden_dim, 1)
    
    def forward(self, state):
        # state: [batch_size, state_dim]
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        value = self.value(x)  # shape: [batch_size, 1]
        return value

# -----------------------
# 3. Training Setup for Actor-Critic (A2C)
# -----------------------
env = CustomEnv()

state_dim = env.observation_space.shape[0]     # 2 for our custom env ([position, velocity])
action_dim = env.action_space.shape[0]           # 1 for our continuous action

actor = Actor(state_dim, action_dim).to(device)
critic = Critic(state_dim).to(device)

actor_optimizer = optim.Adam(actor.parameters(), lr=1e-3)
critic_optimizer = optim.Adam(critic.parameters(), lr=1e-3)

# Hyperparameters for training
num_episodes = 500
gamma = 0.99

# To store episode rewards for plotting
episode_rewards = []

for episode in range(num_episodes):
    state = env.reset()  # state is a numpy array of shape [2]
    state = torch.tensor([state], dtype=torch.float32, device=device)  # shape: [1, 2]
    
    log_probs = []
    values = []
    rewards = []
    done = False
    t = 0
    
    while not done:
        t += 1
        
        # Get value estimate from Critic
        value = critic(state)  # shape: [1, 1]
        values.append(value)
        
        # Get actor output: mean and std for action distribution
        mean, std = actor(state)  # mean: [1, 1], std: [1] (broadcastable)
        
        # Create a normal distribution and sample an action
        dist = torch.distributions.Normal(mean, std)
        action = dist.sample()  # shape: [1, 1]
        log_prob = dist.log_prob(action)
        log_probs.append(log_prob)
        
        # Execute action in environment
        # Since env.step() expects a numpy array, convert the action to numpy.
        action_np = action.cpu().detach().numpy()[0]
        next_state_np, reward, done, _ = env.step(action_np)
        
        rewards.append(reward)
        
        # Convert next_state to tensor if not terminal.
        if not done:
            next_state = torch.tensor([next_state_np], dtype=torch.float32, device=device)
        else:
            next_state = None
        
        state = next_state if next_state is not None else torch.zeros((1, state_dim), device=device, dtype=torch.float32)
    
    # Compute returns (discounted cumulative rewards)
    R = 0
    returns = []
    for r in reversed(rewards):
        R = r + gamma * R
        returns.insert(0, R)
    returns = torch.tensor(returns, dtype=torch.float32, device=device).unsqueeze(1)  # shape: [T, 1]
    
    # Convert list of values (predicted by critic) to tensor: shape [T, 1]
    values = torch.cat(values, dim=0)
    log_probs = torch.cat(log_probs, dim=0)
    
    # Advantage: difference between returns and value estimates
    advantages = returns - values.detach()
    
    # Actor loss: negative of (log_prob * advantage)
    actor_loss = - (log_probs * advantages).mean()
    # Critic loss: Mean Squared Error between returns and values
    critic_loss = F.mse_loss(values, returns)
    
    # Backpropagation for actor
    actor_optimizer.zero_grad()
    actor_loss.backward()
    actor_optimizer.step()
    
    # Backpropagation for critic
    critic_optimizer.zero_grad()
    critic_loss.backward()
    critic_optimizer.step()
    
    episode_rewards.append(sum(rewards))
    if (episode+1) % 20 == 0:
        print(f"Episode {episode+1}: Total Reward = {episode_rewards[-1]:.2f}, Episode Length = {t}")

# Plot episode rewards to see progress
plt.plot(episode_rewards)
plt.xlabel("Episode")
plt.ylabel("Total Reward")
plt.title("Actor-Critic Training on Custom Continuous Environment")
plt.show()
