Logger

In [None]:
import numpy as np

class Logger:
    """
    Simple logger for collecting training metrics during SAC training.
    Stores each metric as a list so they can be plotted later.
    """

    def __init__(self):
        self.data = {}

    def log(self, key, value):
        """Append a value to a specific metric key."""
        if key not in self.data:
            self.data[key] = []
        self.data[key].append(value)

    def get(self, key):
        """Return all logged values of a metric."""
        return self.data.get(key, [])

    def keys(self):
        """Return all metric names."""
        return list(self.data.keys())

    def summary(self, last_n=10):
        """Print summary of mean of the last N entries."""
        print("=== Logger Summary ===")
        for k, v in self.data.items():
            if len(v) > 0:
                print(f"{k}: mean(last {last_n}) = {np.mean(v[-last_n:]):.5f}")


In [None]:
logger = Logger()

# Example logging
for i in range(5):
    logger.log("loss_actor", np.random.random())
    logger.log("entropy", np.random.random() * 0.5)

logger.summary()


Actor Network

In [None]:
import torch
import torch.nn as nn
from torch.distributions import Normal


class Actor(nn.Module):
    """
    Stochastic Actor Network for SAC (Gaussian Policy with Tanh Squash)
    Supports per-dimension action ranges, e.g.
    min_action = [-1, -2, -0.5]
    max_action = [ 1,  2,  1.0]
    """
    def __init__(self, state_dim, action_dim, min_action, max_action):
        super().__init__()
        self.state_dim = state_dim
        self.action_dim = action_dim
        
        # Ensure min/max are tensors with correct shape
        min_action = torch.tensor(min_action, dtype=torch.float32)
        max_action = torch.tensor(max_action, dtype=torch.float32)

        assert min_action.shape == (action_dim,)
        assert max_action.shape == (action_dim,)

        # Register as buffers (moved automatically with model.to(device))
        self.register_buffer("min_action", min_action)
        self.register_buffer("max_action", max_action)

        # Actor network
        self.net = nn.Sequential(
            nn.Linear(state_dim, 256), nn.ReLU(),
            nn.Linear(256, 256), nn.ReLU(),
            nn.Linear(256, action_dim),
        )

        # Learnable log standard deviation
        self.log_std = nn.Parameter(torch.zeros(action_dim))

        # Limit std range for stability
        self.LOG_STD_MIN = -20
        self.LOG_STD_MAX = 2

    def forward(self, state):
        """
        Returns mean and std of the Gaussian policy BEFORE tanh squash.
        """
        mean = self.net(state)

        # Clamp log_std for stability
        log_std = torch.clamp(self.log_std, self.LOG_STD_MIN, self.LOG_STD_MAX)
        std = torch.exp(log_std)

        return mean, std

    def sample(self, state):
        """
        Returns:
        - scaled action (after tanh + min/max scaling)
        - log_prob of the action (required for SAC)
        """
        mean, std = self.forward(state)
        dist = Normal(mean, std)

        # Reparameterization trick
        x_t = dist.rsample()
        y_t = torch.tanh(x_t)

        # Scale [-1,1] → [min,max]
        action = self.min_action + (y_t + 1) * 0.5 * (self.max_action - self.min_action)

        # Log probability with tanh correction
        log_prob = dist.log_prob(x_t)
        log_prob -= torch.log(1 - y_t.pow(2) + 1e-6)
        log_prob = torch.sum(log_prob, dim=1, keepdim=True)

        return action, log_prob

    def sample_deterministic(self, state):
        """
        Returns deterministic action (mean → tanh → scaled)
        """
        mean, _ = self.forward(state)
        y_t = torch.tanh(mean)

        action = self.min_action + (y_t + 1) * 0.5 * (self.max_action - self.min_action)
        return action


In [None]:
state_dim = 5
action_dim = 3
min_action = [0, 0, 0.5]
max_action = [1, 2, 1]

# สร้าง Actor
actor = Actor(state_dim, action_dim, min_action, max_action)

# ส่งไป GPU ถ้ามี
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
actor.to(device)

# สร้าง dummy state เพื่อทดสอบ
state = torch.randn(1, state_dim).to(device)  # batch size 1
action, log_prob = actor.sample(state)

print("Sampled action:\n", action)
print("Log probability:\n", log_prob)

----------------------------------------------------------------------------------------------------------------------------------------------------------------------

Critic Network

In [None]:
import torch
import torch.nn as nn


class Critic(nn.Module):
    """
    Twin Q Network for SAC (Q1 and Q2)
    Inputs:
        state:  [batch, state_dim]
        action: [batch, action_dim]
    Output:
        q1, q2: [batch, 1]
    """
    def __init__(self, state_dim, action_dim):
        super().__init__()

        # Q1 network
        self.q1_net = nn.Sequential(
            nn.Linear(state_dim + action_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 1),
        )

        # Q2 network
        self.q2_net = nn.Sequential(
            nn.Linear(state_dim + action_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 1),
        )

        # Apply weight initialization
        self.apply(self._init_weights)

    def _init_weights(self, m):
        """
        Custom weight initialization for better stability.
        """
        if isinstance(m, nn.Linear):
            nn.init.xavier_uniform_(m.weight)
            nn.init.zeros_(m.bias)

    def forward(self, state, action):
        """
        Forward both Q networks.
        """
        # Ensure concatenation on correct dimension
        sa = torch.cat([state, action], dim=1)

        q1 = self.q1_net(sa)
        q2 = self.q2_net(sa)
        return q1, q2


In [None]:
state_dim = 5
action_dim = 3

critic = Critic(state_dim, action_dim)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
critic.to(device)

# Dummy state & action
state = torch.randn(1, state_dim).to(device)  # batch size 1
action = torch.randn(1, action_dim).to(device)

q1, q2 = critic(state, action)
print("Q1:\n", q1)
print("Q2:\n", q2)


----------------------------------------------------------------------------------------------------------------------------------------------------------------------

Replaybuffer memory

In [None]:
import numpy as np
import torch

class Vanilla_ReplayBuffer:
    """
    Standard Replay Buffer for off-policy RL (SAC, TD3, DDPG)
    Features:
    - Fast NumPy storage
    - Tensor conversion only during sampling
    - Supports large batches efficiently
    - Works with GPU via device argument
    """
    def __init__(self, state_dim, action_dim, capacity=100000, device='cpu'):
        self.capacity = capacity
        self.device = device

        self.ptr = 0      # write pointer
        self.size = 0     # current size

        # Pre-allocate memory (fast!)
        self.state = np.zeros((capacity, state_dim), dtype=np.float32)
        self.action = np.zeros((capacity, action_dim), dtype=np.float32)
        self.reward = np.zeros((capacity, 1), dtype=np.float32)
        self.next_state = np.zeros((capacity, state_dim), dtype=np.float32)
        self.done = np.zeros((capacity, 1), dtype=np.float32)

    def push(self, s, a, r, s2, d):
        """
        Store one transition (s, a, r, s2, done)
        """
        i = self.ptr

        self.state[i] = s
        self.action[i] = a
        self.reward[i] = r
        self.next_state[i] = s2
        self.done[i] = d

        # Move pointer
        self.ptr = (self.ptr + 1) % self.capacity
        self.size = min(self.size + 1, self.capacity)

    def sample(self, batch_size):
        """
        Random sample a mini-batch.
        Returns tensors on the selected device.
        """
        idx = np.random.randint(0, self.size, size=batch_size)

        state      = torch.tensor(self.state[idx], dtype=torch.float32, device=self.device)
        action     = torch.tensor(self.action[idx], dtype=torch.float32, device=self.device)
        reward     = torch.tensor(self.reward[idx], dtype=torch.float32, device=self.device)
        next_state = torch.tensor(self.next_state[idx], dtype=torch.float32, device=self.device)
        done       = torch.tensor(self.done[idx], dtype=torch.float32, device=self.device)

        return state, action, reward, next_state, done

    def __len__(self):
        return self.size


In [None]:
state_dim = 4
action_dim = 2
buffer = Vanilla_ReplayBuffer(state_dim, action_dim, capacity=10, device='cpu')

# สร้าง dummy data
for i in range(12):  # push เกิน capacity เพื่อทดสอบ circular buffer
    s = np.random.randn(state_dim)
    a = np.random.randn(action_dim)
    r = np.random.randn(1)
    s2 = np.random.randn(state_dim)
    d = np.random.randint(0,2)
    buffer.push(s, a, r, s2, d)
    print(f"Pushed transition {i+1}, buffer size: {len(buffer)}")

# sample batch
batch_size = 5
s_batch, a_batch, r_batch, s2_batch, d_batch = buffer.sample(batch_size)

print("\nSampled states:\n", s_batch)
print("Sampled actions:\n", a_batch)
print("Sampled rewards:\n", r_batch)
print("Sampled next_states:\n", s2_batch)
print("Sampled dones:\n", d_batch)

In [None]:
state_dim = 4
action_dim = 2
buffer = Vanilla_ReplayBuffer(state_dim, action_dim, capacity=5)

# Push 3 transitions
for i in range(3):
    buffer.push(np.random.randn(state_dim), np.random.randn(action_dim), np.random.randn(1),
                np.random.randn(state_dim), 0)
    print(f"After push {i+1}:")
    print("len(buffer):", len(buffer))
    print("buffer.__len__():", buffer.__len__())
    print("---")


SAC Agent 

In [None]:
import torch.nn.functional as F
import torch
import torch.nn as nn
from pathlib import Path

class SACAgent:
    """
    Soft Actor-Critic (SAC) Agent
    ------------------------------------------------------------
    A clean and minimal SAC implementation suitable for research,
    simulation studies, and real-world control tasks.

    Key Features
    ------------
    - Stochastic Gaussian Actor (Tanh-squashed)
    - Twin Q-Critic network (Q1, Q2) to avoid overestimation
    - Target Critic for stable Bellman backups
    - Fixed entropy coefficient α (no automatic entropy tuning)
    - Works with a standard Replay Buffer
    - Optional logging for training curves

    Parameters
    ----------
    state_dim : int
        Dimensionality of the state vector.
    action_dim : int
        Dimensionality of the action vector.
    min_action, max_action : float or array-like
        Action bounds after Tanh scaling.
    lr : float
        Learning rate for both actor and critic optimizers.
    gamma : float
        Discount factor used in the target Q-value computation.
    tau : float
        Soft update coefficient for the target critic.
    alpha : float
        Entropy regularization coefficient (higher = more exploration).
    replay_capacity : int
        Maximum size of the replay buffer.
    device : str
        'cpu' or 'cuda'.
    logger_status : bool
        Whether to enable metric logging during training.

    Example
    -------
    >>> agent = SACAgent(state_dim=3, action_dim=1,
    ...                  min_action=-1, max_action=1,
    ...                  logger_status=True)
    >>> state = env.reset()
    >>> action = agent.select_action(state)
    >>> agent.replay_buffer.push(state, action, reward, next_state, done)
    >>> agent.update(batch_size=64)
    """

    def __init__(self, state_dim, action_dim, min_action, max_action, 
                 lr=3e-4, gamma=0.99, tau=0.005, alpha=0.2,
                 replay_capacity=100000,
                 device='cuda',
                 logger_status=False):
        
        # Device selection with CUDA fallback
        self.device = torch.device(device if torch.cuda.is_available() else 'cpu')

        # ------------------------------------------------------------
        # Initialize Actor, Critic, and Target Critic Networks
        # Target critic starts as a direct copy of the critic.
        # ------------------------------------------------------------
        self.actor = Actor(state_dim, action_dim, min_action, max_action).to(self.device)
        self.critic = Critic(state_dim, action_dim).to(self.device)
        self.target_critic = Critic(state_dim, action_dim).to(self.device)
        self.target_critic.load_state_dict(self.critic.state_dict())

        # ------------------------------------------------------------
        # Optimizers for Actor and Critic
        # ------------------------------------------------------------
        self.actor_opt = torch.optim.Adam(self.actor.parameters(), lr=lr)
        self.critic_opt = torch.optim.Adam(self.critic.parameters(), lr=lr)

        # ------------------------------------------------------------
        # Replay Buffer
        # ------------------------------------------------------------
        self.replay_buffer = Vanilla_ReplayBuffer(
            state_dim, action_dim,
            capacity=replay_capacity,
            device=self.device
        )

        # ------------------------------------------------------------
        # Hyperparameters
        # ------------------------------------------------------------
        self.gamma = gamma      # discount factor
        self.tau = tau          # target network soft update rate
        self.alpha = alpha      # entropy weight (encourages exploration)

        # Logger for training curves
        self.logger = Logger()
        self.logger_status = logger_status

    # ======================================================================
    # Action Selection
    # ======================================================================
    def select_action(self, state, deterministic=False):
        """
        Select an action given the environment state.

        Parameters
        ----------
        deterministic : bool
            - True:  use the mean action (good for evaluation)
            - False: sample stochastically (recommended for training)

        Returns
        -------
        action : np.ndarray
            Scaled action in the range [min_action, max_action].
        """
        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(self.device)

        if deterministic:
            # Use the actor mean → tanh → scale back to action bounds.
            mean, _ = self.actor.forward(state)
            y_t = torch.tanh(mean)
            action = self.actor.min_action + (y_t + 1) * 0.5 * (self.actor.max_action - self.actor.min_action)
        else:
            # Sample using reparameterization trick.
            # Produces differentiable stochastic actions.
            action, _ = self.actor.sample(state)

        return action.cpu().detach().numpy()[0]

    # ======================================================================
    # Training Step
    # ======================================================================
    def update(self, batch_size=64):
        """
        Performs one SAC update step:
        - Sample batch from replay buffer
        - Update critic using Bellman backup
        - Update actor by maximizing expected Q minus entropy
        - Soft update target critic

        This function is called repeatedly during training.
        """
        if len(self.replay_buffer) < batch_size:
            return  # Not enough samples yet

        # ------------------------------------------------------------
        # Sample a random mini-batch from the Replay Buffer
        # ------------------------------------------------------------
        state, action, reward, next_state, done = self.replay_buffer.sample(batch_size)
        state, action, reward, next_state, done = [
            x.to(self.device) for x in (state, action, reward, next_state, done)
        ]

        # ------------------------------------------------------------
        # Compute the Target Q-value (Bootstrapped)
        # ------------------------------------------------------------
        with torch.no_grad():
            next_action, next_log_prob = self.actor.sample(next_state)
            q1_t, q2_t = self.target_critic(next_state, next_action)

            # Twin-critic trick: use min(Q1, Q2) to reduce positive bias
            min_q_t = torch.min(q1_t, q2_t)

            # SAC target: r + γ * (minQ - α logπ)
            target_q = reward + (1 - done) * self.gamma * (min_q_t - self.alpha * next_log_prob)

        # ------------------------------------------------------------
        # Critic Update (Minimize MSE between Q and target)
        # ------------------------------------------------------------
        q1, q2 = self.critic(state, action)

        critic_loss = F.mse_loss(q1, target_q) + F.mse_loss(q2, target_q)

        self.critic_opt.zero_grad()
        critic_loss.backward()
        self.critic_opt.step()

        # ------------------------------------------------------------
        # Actor Update (Maximize expected Q - α * entropy)
        # ------------------------------------------------------------
        a_pi, log_pi = self.actor.sample(state)
        q1_pi, q2_pi = self.critic(state, a_pi)
        q_pi = torch.min(q1_pi, q2_pi)

        # Actor minimizes (α logπ - Q) which equals maximizing Q - α logπ
        actor_loss = (self.alpha * log_pi - q_pi).mean()

        self.actor_opt.zero_grad()
        actor_loss.backward()
        self.actor_opt.step()

        # ------------------------------------------------------------
        # Soft Update for Target Critic
        # θ_target ← τ θ + (1 - τ) θ_target
        # This stabilizes learning by slowly tracking the critic.
        # ------------------------------------------------------------
        for p, tp in zip(self.critic.parameters(), self.target_critic.parameters()):
            tp.data.copy_(self.tau * p.data + (1 - self.tau) * tp.data)

        # ------------------------------------------------------------
        # Logging (losses, entropy, Q values, etc.)
        # ------------------------------------------------------------
        if self.logger_status:
            self.logger.log("loss_actor", actor_loss.item())
            self.logger.log("loss_critic", critic_loss.item())
            self.logger.log("q1_mean", q1.mean().item())
            self.logger.log("q2_mean", q2.mean().item())
            self.logger.log("entropy", -log_pi.mean().item())
            self.logger.log("alpha", self.alpha)
            self.logger.log("tau", self.tau)
    
    # ======================================================================
    # Save / Load Model (Drop-Version Safe, pathlib + auto .pt)
    # ======================================================================
    def save_model(self, path="sac_model.pt"):
        """
        Save Actor, Critic, Target Critic networks and hyperparameters
        into a single file using state_dict.
        Drop-version safe (compatible across PyTorch versions).

        Parameters
        ----------
        path : str or Path
            File path for saving the model.
            Example: "checkpoints/sac_model" or "checkpoints/sac_model.pt"
        """
        path = Path(path)
        if path.suffix != ".pt":
            path = path.with_suffix(".pt")  # Add .pt if missing

        # Ensure parent directories exist
        path.parent.mkdir(parents=True, exist_ok=True)

        data = {
            "actor": self.actor.state_dict(),
            "critic": self.critic.state_dict(),
            "target_critic": self.target_critic.state_dict(),
            "hyperparams": {
                "gamma": self.gamma,
                "tau": self.tau,
                "alpha": self.alpha,
                "min_action": self.actor.min_action.cpu().tolist(),
                "max_action": self.actor.max_action.cpu().tolist(),
                "state_dim": self.actor.state_dim,
                "action_dim": self.actor.action_dim
            }
        }
        torch.save(data, path)
        print(f"[SACAgent] Model saved to '{path}'")

    def load_model(self, path="sac_model.pt"):
        """
        Load Actor, Critic, Target Critic networks and hyperparameters
        from a single drop-version-safe file.
        """
        path = Path(path)
        if path.suffix != ".pt":
            path = path.with_suffix(".pt")

        if not path.exists():
            raise FileNotFoundError(f"[SACAgent] File not found: '{path}'")

        # Safe loader for drop-version compatibility
        data = torch.load(path, map_location=self.device)

        # ------------------------------------------------------------------
        # Helper: safely convert anything → Tensor without PyTorch warning
        # ------------------------------------------------------------------
        def to_tensor_safe(x):
            if isinstance(x, torch.Tensor):
                return x.detach().clone().float()
            return torch.as_tensor(x, dtype=torch.float32)

        # ------------------------------------------------------------------
        # 1) Load hyperparameters
        # ------------------------------------------------------------------
        hyper = data["hyperparams"]
        self.gamma = hyper["gamma"]
        self.tau = hyper["tau"]
        self.alpha = hyper["alpha"]

        min_action = to_tensor_safe(hyper["min_action"])
        max_action = to_tensor_safe(hyper["max_action"])

        state_dim  = hyper["state_dim"]
        action_dim = hyper["action_dim"]

        # ------------------------------------------------------------------
        # 2) Re-create Actor (must match original architecture)
        # ------------------------------------------------------------------
        self.actor = Actor(
            state_dim=state_dim,
            action_dim=action_dim,
            min_action=min_action,
            max_action=max_action
        ).to(self.device)

        # ------------------------------------------------------------------
        # 3) Load network weights (drop-version safe)
        # ------------------------------------------------------------------
        self.actor.load_state_dict(data["actor"], strict=False)
        self.critic.load_state_dict(data["critic"])
        self.target_critic.load_state_dict(data["target_critic"])

        print(f"[SACAgent] Model loaded from '{path}'")

Training loop

In [None]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt

# สร้าง environment
env = gym.make("Pendulum-v1", render_mode="human")

state, _ = env.reset()
state_dim = state.shape[0]
action_dim = env.action_space.shape[0]
min_action = env.action_space.low
max_action = env.action_space.high

print("State dim:", state_dim)
print("Action dim:", action_dim)
print("Action range:", min_action, max_action)


In [None]:

# สร้าง agent
agent = SACAgent(
    state_dim=state_dim,
    action_dim=action_dim,
    min_action=min_action,
    max_action=max_action,
    lr=3e-4,
    gamma=0.99,
    tau=0.005,
    alpha=0.4,
    logger_status= True
)

# ===== Training Parameters =====
episodes = 500
max_steps = 200
batch_size = 512

rewards_history = []

# ===== Training Loop =====
for ep in range(episodes):

    state, _ = env.reset()
    episode_reward = 0

    for step in range(max_steps):
        # 1. เลือก action
        action = agent.select_action(state)
       

        # 2. Step environment
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated

        # 3. เก็บลง Replay Buffer
        agent.replay_buffer.push(
            state, action, reward, next_state, float(done)
        )

        # 4. อัปเดต network
        agent.update(batch_size)
        
        state = next_state
        episode_reward += reward

        if done:
            break

    rewards_history.append(episode_reward)
    print(f"Episode {ep+1}/{episodes} Reward: {episode_reward:.2f}")
agent.save_model(path=rf"D:\Project_end\New_world\my_project\notebooks\Test_Pendulum-v1.pt")

In [None]:
print("Replay buffer size:", len(agent.replay_buffer))
print("q1|q2 size: ",len(agent.logger.get("q1_mean")), len(agent.logger.get("q2_mean")))

In [None]:
plt.figure(figsize=(10,4))
plt.plot(rewards_history)
plt.title("Episode Reward")
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.grid(True)
plt.show()


In [None]:
plt.figure(figsize=(10,4))
plt.plot(agent.logger.get("loss_actor"), label="Actor Loss",alpha=0.5)
plt.plot(agent.logger.get("loss_critic"), label="Critic Loss",alpha=0.5)
plt.title("Loss Curve")
plt.xlabel("Training Step")
plt.ylabel("Loss")
plt.legend()
plt.grid(True)
plt.show()
plt.figure(figsize=(12,4))

# ------ Left : Actor Loss ------
plt.subplot(1, 2, 1)
plt.plot(agent.logger.get("loss_actor"), label="Actor Loss", alpha=0.7)
plt.title("Actor Loss")
plt.xlabel("Training Step")
plt.ylabel("Loss")
plt.grid(True)
plt.legend()

# ------ Right : Critic Loss ------
plt.subplot(1, 2, 2)
plt.plot(agent.logger.get("loss_critic"), label="Critic Loss", alpha=0.7, color="orange")
plt.title("Critic Loss")
plt.xlabel("Training Step")
plt.ylabel("Loss")
plt.grid(True)
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
plt.figure(figsize=(10,4))
plt.plot(agent.logger.get("q1_mean"), label="Q1 Mean",alpha=0.3)
plt.plot(agent.logger.get("q2_mean"), label="Q2 Mean",alpha=0.3)
plt.title("Q-value Estimate")
plt.xlabel("Training Step")
plt.ylabel("Q-value")
plt.legend()
plt.grid(True)
plt.show()
plt.figure(figsize=(12,4))

# ------ Left : Actor Loss ------
plt.subplot(1, 2, 1)
plt.plot(agent.logger.get("q1_mean"), label="Q1 Mean",alpha=0.3)
plt.title("Actor Loss")
plt.xlabel("Training Step")
plt.ylabel("Loss")
plt.grid(True)
plt.legend()

# ------ Right : Critic Loss ------
plt.subplot(1, 2, 2)
plt.plot(agent.logger.get("q2_mean"), label="Q2 Mean",alpha=0.3)
plt.title("Critic Loss")
plt.xlabel("Training Step")
plt.ylabel("Loss")
plt.grid(True)
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
plt.figure(figsize=(10,4))
plt.plot(agent.logger.get("entropy"), label="Entropy")
plt.title("Policy Entropy")
plt.xlabel("Training Step")
plt.ylabel("Entropy")
plt.grid(True)
plt.show()


Test Agent

In [None]:
def test_agent(env, agent, episodes=5, max_steps=200):
    returns = []
    trajectories = []   # เก็บ state/action/reward เพื่อนำไป plot

    for ep in range(episodes):
        state, _ = env.reset()
        episode_reward = 0
        states = []
        actions = []
        rewards = []

        for step in range(max_steps):
            # ใช้ deterministic เพื่อดู performance จริง
            action = agent.select_action(state, deterministic=True)

            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            # เก็บข้อมูลสำหรับ plot
            states.append(state)
            actions.append(action)
            rewards.append(reward)

            episode_reward += reward
            state = next_state

            if done:
                break

        returns.append(episode_reward)
        trajectories.append({
            "states": np.array(states),
            "actions": np.array(actions),
            "rewards": np.array(rewards),
        })

        print(f"[TEST] Episode {ep+1}/{episodes} Reward = {episode_reward:.2f}")

    return returns, trajectories


In [None]:
agent.load_model(path=rf"D:\Project_end\New_world\my_project\notebooks\Test_Pendulum-v1.pt")
test_returns, test_traj = test_agent(env, agent, episodes=5, max_steps=200)
plt.figure(figsize=(12,4))
plt.plot(test_traj[2]["rewards"], label="Reward per step")
plt.title("Reward curve (Test Episode 1)")
plt.xlabel("Step")
plt.ylabel("Reward")
plt.grid(True)
plt.legend()
plt.show()


In [None]:
plt.figure(figsize=(12,4))
plt.plot(test_traj[1]["actions"], label="Action", alpha=0.7)
plt.title("Action output (Test Episode 1)")
plt.xlabel("Step")
plt.ylabel("Action Value")
plt.grid(True)
plt.legend()
plt.show()


In [None]:
plt.figure(figsize=(12,4))
plt.plot(test_traj[2]["states"])
plt.title("State trajectory (Test Episode 1)")
plt.xlabel("Step")
plt.ylabel("State Value")
plt.grid(True)
plt.show()


In [None]:
plt.figure(figsize=(6,4))
plt.plot(test_returns, marker="o")
plt.title("Total test return per episode")
plt.xlabel("Test Episode")
plt.ylabel("Total Return")
plt.grid(True)
plt.show()


In [None]:
import gymnasium as gym
from stable_baselines3 import SAC

env = gym.make("MountainCarContinuous-v0")

model = SAC(
    "MlpPolicy",
    env,
    learning_rate=3e-4,          # ไม่สูง ไม่ต่ำ (ตามงานวิจัย MC)
    buffer_size=200000,          # สำคัญมาก ให้ agent explore นานๆ
    learning_starts=5000,        # ต้องมีประสบการณ์พอ ก่อนเริ่ม learning

    batch_size=64,               # เล็ก = stable กว่า
    tau=0.02,                    # target update เร็วขึ้น เพิ่ม stability
    gamma=0.98,                  # MountainCar horizon สั้น

    train_freq=1,
    gradient_steps=1,

    # ปิด auto entropy → ต้องใช้ fixed entropy coefficient
    ent_coef=0.1,                # สูตรสำเร็จ: 0.05–0.15
                                # exploration พอดีสำหรับสร้าง momentum

    # ลด network size → train เร็วกว่า หา optimal ได้ง่ายกว่า
    policy_kwargs=dict(
        net_arch=dict(
            pi=[64, 64],         # actor network
            qf=[64, 64]          # critic network
        ),
        log_std_init=-1.0,       # ทำให้ Gaussian กว้างพอในช่วงต้น
    ),

    target_update_interval=1,
    verbose=1
)

model.learn(total_timesteps=300_000)
model.save("sac_mountaincar_success")


In [None]:
import numpy as np

env = gym.make("MountainCarContinuous-v0", render_mode="human")
model = SAC.load("sac_mountaincar_success")

obs, _ = env.reset()
done = False

while not done:
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
