In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [1]:
%pip install gymnasium[mujoco]

Collecting mujoco>=2.3.3 (from gymnasium[mujoco])
  Downloading mujoco-3.2.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.4/44.4 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
Collecting glfw (from mujoco>=2.3.3->gymnasium[mujoco])
  Downloading glfw-2.8.0-py2.py27.py3.py30.py31.py32.py33.py34.py35.py36.py37.py38.p39.p310.p311.p312.p313-none-manylinux_2_28_x86_64.whl.metadata (5.4 kB)
Collecting pyopengl (from mujoco>=2.3.3->gymnasium[mujoco])
  Downloading PyOpenGL-3.1.7-py3-none-any.whl.metadata (3.2 kB)
Downloading mujoco-3.2.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.4/6.4 MB[0m [31m69.8 MB/s[0m eta [36m0:00:00[0m:00:01[0m0:01[0m
[?25hDownloading glfw-2.8.0-py2.py27.py3.py30.py31.py32.py33.py34.py35.py36.py37.py38.p39.p310.p311.p312.p313-none-manylinux_2_28_x86_64.whl (243 kB)
[2K   [90m

In [2]:
import numpy as np
import gymnasium as gym
import mujoco
import torch
import torch.nn as nn
import torch.optim as optim
# Initialize the Walker2d environment
env = gym.make("Hopper-v4")



In [6]:
def evaluate_agent(env, policy_net, num_episodes=200, render=False):
    """
    Evaluates the trained policy network on the environment.

    Args:
    - env: The Gym environment.
    - policy_net: The trained policy network.
    - num_episodes: Number of episodes to evaluate.
    - render: Whether to render the environment during evaluation.

    Returns:
    - Average reward over the evaluated episodes.
    """
    device = next(policy_net.parameters()).device
    total_rewards = []

    for episode in range(num_episodes):
        reset_result = env.reset()
        if isinstance(reset_result, tuple):
            state = reset_result[0]
        else:
            state = reset_result

        if not isinstance(state, np.ndarray):
            state = np.array(state, dtype=np.float32)

        episode_reward = 0
        terminated, truncated = False, False

        while not (terminated or truncated):
            if render:
                env.render()

            # Move the state tensor to the same device as the policy network
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)

            with torch.no_grad():
                policy_output = policy_net(state_tensor)

                if isinstance(policy_output, tuple):
                    # Assuming (mean, std) for continuous action spaces
                    action_mean, action_std = policy_output
                    action = torch.normal(action_mean, action_std).cpu().numpy()
                else:
                    # Assuming logits for discrete action spaces
                    action_prob = torch.softmax(policy_output, dim=-1)
                    action = torch.argmax(action_prob, dim=-1).cpu().numpy()

            # Squeeze the action to ensure proper shape
            action = action.squeeze()

            # Take a step in the environment
            next_step_result = env.step(action)
            if isinstance(next_step_result, tuple):
                next_state, reward, terminated, truncated, _ = next_step_result
            else:
                next_state, reward, terminated, truncated = next_step_result[:4]

            episode_reward += reward
            state = next_state

        total_rewards.append(episode_reward)
        print(f"Episode {episode + 1}: Reward = {episode_reward}")

    average_reward = sum(total_rewards) / num_episodes
    print(f"Average Reward over {num_episodes} Episodes: {average_reward}")
    return average_reward


In [7]:
def apply_perturbation(attack_method, state, params, policy_model=None, sarsa_model=None):
    """
    Apply perturbation to the state based on the attack method.

    Args:
        attack_method (str): The type of attack ('robust_sarsa', 'mad', 'random').
        state (torch.Tensor): The current state tensor.
        params (object): Parameters for the attack (e.g., epsilon, steps, etc.).
        policy_model (nn.Module): The policy model (for MAD and Sarsa+MAD).
        sarsa_model (nn.Module): The Sarsa model (for Robust Sarsa).

    Returns:
        torch.Tensor: The perturbed state.
    """
    eps = params.get("epsilon", 0.1)
    steps = params.get("steps", 10)
    step_eps = eps / steps
    clamp_min = state - eps
    clamp_max = state + eps

    if attack_method == "robust_sarsa":
        assert sarsa_model is not None, "Sarsa model is required for Robust Sarsa attack."
        perturbed_state = state.clone().detach().requires_grad_()
        for _ in range(steps):
            actions = policy_model(perturbed_state)[0]  # Assuming policy returns action logits
            value = sarsa_model(torch.cat((state, actions), dim=1)).mean(dim=1)
            value.backward()
            update = perturbed_state.grad.sign() * step_eps
            perturbed_state.data = torch.min(torch.max(perturbed_state - update, clamp_min), clamp_max)
            sarsa_model.zero_grad()
        return perturbed_state.detach()

    elif attack_method == "mad":
        assert policy_model is not None, "Policy model is required for MAD attack."
        original_action = policy_model(state)[0].detach()
        perturbed_state = state.clone().detach().requires_grad_()
        for _ in range(steps):
            new_action = policy_model(perturbed_state)[0]
            action_diff = ((new_action - original_action) ** 2).sum(dim=1)
            action_diff.backward()
            update = perturbed_state.grad.sign() * step_eps
            perturbed_state.data = torch.min(torch.max(perturbed_state + update, clamp_min), clamp_max)
            policy_model.zero_grad()
        return perturbed_state.detach()

    elif attack_method == "random":
        noise = torch.empty_like(state).uniform_(-eps, eps)
        return (state + noise).detach()

    else:
        raise ValueError(f"Unknown attack method: {attack_method}")

In [10]:
attack_params = {
    "epsilon": 0.1,  # Maximum perturbation magnitude
    "steps": 5,      # Number of iterative steps
}

In [10]:
import torch
import numpy as np


def evaluate_agent_with_mad_attack(env, policy_net, epsilon, num_episodes=200, attack_steps=10, step_epsilon=0.01, beta=1.0):
    """
    Evaluate the agent under a MAD (Maximizing Action Discrepancy) attack for continuous action spaces.

    Args:
        env (gym.Env): The environment.
        policy_net (torch.nn.Module): The trained policy network.
        epsilon (float): Maximum perturbation magnitude.
        num_episodes (int): Number of episodes for evaluation.
        attack_steps (int): Number of gradient steps for the attack.
        step_epsilon (float): Step size for each gradient step.
        beta (float): Inverse temperature parameter for SGLD noise.

    Returns:
        float: Average reward over the episodes under MAD attack.
    """
    device = next(policy_net.parameters()).device
    total_reward = 0

    for episode in range(num_episodes):
        state = env.reset()
        if isinstance(state, tuple):
            state = state[0]  # Unpack if state is returned as (observation, info)
        state = torch.tensor(state, dtype=torch.float32).to(device)

        episode_reward = 0
        done = False

        while not done:
            # Start with the original state
            perturbed_state = state.clone().detach().requires_grad_(True)

            for _ in range(attack_steps):
                # Compute the policy outputs for original and perturbed states
                original_mean, original_std = policy_net(state.unsqueeze(0))
                perturbed_mean, perturbed_std = policy_net(perturbed_state.unsqueeze(0))

                # Compute KL divergence between original and perturbed distributions
                loss = -torch.distributions.kl.kl_divergence(
                    torch.distributions.Normal(original_mean, original_std),
                    torch.distributions.Normal(perturbed_mean, perturbed_std)
                ).mean()
                loss.backward()

                # Compute gradient and add noise for SGLD
                grad = perturbed_state.grad
                noise = torch.randn_like(perturbed_state) * torch.sqrt(torch.tensor(2 / (beta * step_epsilon), device=perturbed_state.device))
                perturbation = step_epsilon * grad + noise

                # Update the perturbed state
                perturbed_state = (perturbed_state + perturbation).detach().requires_grad_(True)

                # Clamp the perturbed state to within the epsilon-ball
                perturbed_state = torch.max(
                    torch.min(perturbed_state, state + epsilon), state - epsilon
                ).detach().requires_grad_(True)

            # Use the perturbed state to select the action
            with torch.no_grad():
                perturbed_mean, perturbed_std = policy_net(perturbed_state.unsqueeze(0))
                action_dist = torch.distributions.Normal(perturbed_mean, perturbed_std)
                action = action_dist.sample().squeeze().cpu().numpy()  # Match expected shape (e.g., (3,) for continuous action)

            # Step the environment
            next_step_result = env.step(action)
            if isinstance(next_step_result, tuple):
                next_state, reward, done, _, _ = next_step_result
            else:
                next_state, reward, done = next_step_result[:3]

            # Accumulate the reward
            episode_reward += reward

            # Update the state
            state = torch.tensor(next_state, dtype=torch.float32).to(device)

        total_reward += episode_reward
        print(f"Episode {episode + 1}/{num_episodes}: Reward = {episode_reward}")

    average_reward = total_reward / num_episodes
    print(f"Average Reward under MAD attack: {average_reward}")
    return average_reward


In [11]:
import random

def robust_sarsa_attack(env, policy_net, epsilon_schedule, num_steps=10000, lambda_rs=0.1, batch_size=64, gamma=0.99):
    """
    Train a robust value function for a policy under attack using Robust Sarsa.

    Args:
        env (gym.Env): The environment.
        policy_net (torch.nn.Module): The trained policy network.
        epsilon_schedule (list): Schedule for perturbation magnitudes.
        num_steps (int): Number of training steps.
        lambda_rs (float): Regularization parameter for the robust objective.
        batch_size (int): Number of transitions sampled per update.
        gamma (float): Discount factor.

    Returns:
        torch.nn.Module: The robust Q-value network.
    """
    device = next(policy_net.parameters()).device

    # Detect action space type
    if isinstance(env.action_space, gym.spaces.Discrete):
        action_dim = env.action_space.n  # Discrete action space
    elif isinstance(env.action_space, gym.spaces.Box):
        action_dim = env.action_space.shape[0]  # Continuous action space
    else:
        raise ValueError("Unsupported action space type. Only Discrete and Box spaces are supported.")

    # Initialize Q-function (robust critic) as a neural network
    q_net = torch.nn.Sequential(
        torch.nn.Linear(env.observation_space.shape[0] + action_dim, 128),
        torch.nn.ReLU(),
        torch.nn.Linear(128, 1)  # Single Q-value output
    ).to(device)

    optimizer = torch.optim.Adam(q_net.parameters(), lr=1e-3)

    # Replay buffer
    replay_buffer = []

    def collect_trajectory():
        """Collect one trajectory and add to the replay buffer."""
        state = env.reset()
        if isinstance(state, tuple):
            state = state[0]  # Unpack if necessary
        state = torch.tensor(state, dtype=torch.float32).to(device)
    
        done = False
        while not done:
            with torch.no_grad():
                if isinstance(env.action_space, gym.spaces.Discrete):
                    # For discrete actions, pass only the state to Q-network
                    q_values = torch.cat([q_net(torch.cat([state, torch.eye(action_dim)[a].to(device)], dim=0))
                                          for a in range(action_dim)])
                    action = torch.argmax(q_values).item()
                else:
                    # For continuous actions, extract mean from policy network
                    policy_output = policy_net(state.unsqueeze(0))
                    if isinstance(policy_output, tuple):
                        mean, _ = policy_output  # Extract mean and ignore std
                    else:
                        mean = policy_output  # If single output, it's the mean
                    action = mean.squeeze().cpu().numpy()  # Convert to NumPy
    
            # Step the environment
            next_state, reward, done, truncated, _ = env.step(action)
            done = done or truncated  # Combine termination conditions
            next_state = torch.tensor(next_state, dtype=torch.float32).to(device)
    
            replay_buffer.append((state, action, reward, next_state, done))
    
            if len(replay_buffer) > 10000:
                replay_buffer.pop(0)
    
            state = next_state

    for step in range(num_steps):
        # Collect new trajectories periodically
        if len(replay_buffer) < batch_size or step % 10 == 0:
            collect_trajectory()

        # Ensure the buffer has enough samples for a batch
        if len(replay_buffer) < batch_size:
            continue  # Skip training step until buffer has enough data

        # Sample batch
        batch = random.sample(replay_buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.stack(states).to(device)
        rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
        next_states = torch.stack(next_states).to(device)
        dones = torch.tensor(dones, dtype=torch.float32).to(device)

        # Prepare inputs for Q-network
        if isinstance(env.action_space, gym.spaces.Discrete):
            actions = torch.tensor(actions, dtype=torch.int64).to(device)  # Discrete actions
            state_action_pairs = torch.cat([states, torch.eye(action_dim).to(device)[actions]], dim=1)
            next_state_action_pairs = torch.cat([next_states, torch.eye(action_dim).to(device)], dim=1)
        else:
            actions = torch.tensor(actions, dtype=torch.float32).to(device)  # Continuous actions
            state_action_pairs = torch.cat([states, actions], dim=1)
            next_state_action_pairs = torch.cat([next_states, actions], dim=1)

        # Temporal Difference Loss
        q_values = q_net(state_action_pairs).squeeze()
        q_values_next = q_net(next_state_action_pairs).squeeze()
        td_loss = (rewards + gamma * (1 - dones) * q_values_next - q_values).pow(2).mean()

        # Robustness Loss
        epsilon = epsilon_schedule[min(step, len(epsilon_schedule) - 1)]
        robust_loss = 0
        for i in range(batch_size):
            perturbation = (torch.rand_like(states[i]) * 2 - 1) * epsilon
            perturbed_state = states[i] + perturbation
            perturbed_state_action = torch.cat([perturbed_state, actions[i]], dim=0)
            robust_loss += (q_net(perturbed_state_action.unsqueeze(0)) - q_values[i]).pow(2).mean()
        robust_loss /= batch_size

        # Total Loss
        total_loss = td_loss + lambda_rs * robust_loss

        # Optimize
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()

        # Print progress
        if step % 100 == 0:
            print(f"Step {step}/{num_steps}, TD Loss: {td_loss.item():.4f}, Robust Loss: {robust_loss.item():.4f}")

    return q_net


In [12]:
def evaluate_agent_with_robust_sarsa_attack(env, policy_net, robust_q_net, epsilon, step_size, num_episodes=100, attack_steps=10):
    """
    Evaluate the agent under a Robust Sarsa Critic-based attack.

    Args:
        env (gym.Env): The environment.
        policy_net (torch.nn.Module): The trained policy network.
        robust_q_net (torch.nn.Module): The robust Q-value network trained with Robust Sarsa.
        epsilon (float): Maximum perturbation magnitude for the attack.
        step_size (float): Step size for the gradient update.
        num_episodes (int): Number of episodes for evaluation.
        attack_steps (int): Number of attack steps (K in the pseudocode).

    Returns:
        float: Average reward over the episodes under Robust Sarsa Critic-based attack.
    """
    device = next(policy_net.parameters()).device
    total_reward = 0

    for episode in range(num_episodes):
        state = env.reset()
        if isinstance(state, tuple):
            state = state[0]  # Unpack if state is returned as (observation, info)
        state = torch.tensor(state, dtype=torch.float32).to(device)

        episode_reward = 0
        done = False

        while not done:
            # Initialize the perturbed state
            perturbed_state = state.clone().detach().requires_grad_(True)

            # Perform the attack as per Algorithm 2
            for _ in range(attack_steps):
                # Forward pass through the policy to get the action
                with torch.no_grad():
                    if isinstance(env.action_space, gym.spaces.Discrete):
                        action_probs = policy_net(perturbed_state.unsqueeze(0))
                        action = torch.argmax(action_probs, dim=-1)
                    else:
                        policy_output = policy_net(perturbed_state.unsqueeze(0))
                        if isinstance(policy_output, tuple):
                            mean, _ = policy_output  # Extract mean and ignore std
                        else:
                            mean = policy_output
                        action = mean.squeeze()

                # Compute Q(s, a) for the critic
                state_action = torch.cat([perturbed_state, action.float().to(device)]) if isinstance(env.action_space, gym.spaces.Box) else \
                               torch.cat([perturbed_state, torch.eye(env.action_space.n)[action].to(device)], dim=0)
                q_value = robust_q_net(state_action.unsqueeze(0))

                # Backpropagate the gradient
                q_value.backward()
                grad = perturbed_state.grad

                # Update the perturbed state based on the gradient and step size
                perturbed_state = perturbed_state - step_size * grad.sign()
                perturbed_state = torch.max(
                    torch.min(perturbed_state, state + epsilon), state - epsilon
                ).detach().requires_grad_(True)  # Clamp to the epsilon-ball

            # Use the adversarially perturbed state to select the final action
            with torch.no_grad():
                if isinstance(env.action_space, gym.spaces.Discrete):
                    action_probs = policy_net(perturbed_state.unsqueeze(0))
                    action = torch.argmax(action_probs, dim=-1).item()
                else:
                    policy_output = policy_net(perturbed_state.unsqueeze(0))
                    if isinstance(policy_output, tuple):
                        mean, _ = policy_output
                    else:
                        mean = policy_output
                    action = mean.squeeze().cpu().numpy()

            # Step the environment
            next_step_result = env.step(action)
            if isinstance(next_step_result, tuple):
                next_state, reward, done, _, _ = next_step_result
            else:
                next_state, reward, done = next_step_result[:3]

            # Accumulate the reward
            episode_reward += reward

            # Update the state
            state = torch.tensor(next_state, dtype=torch.float32).to(device)

        total_reward += episode_reward
        print(f"Episode {episode + 1}/{num_episodes}: Reward = {episode_reward}")

    average_reward = total_reward / num_episodes
    print(f"Average Reward under Robust Sarsa Critic-based attack: {average_reward}")
    return average_reward


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gymnasium as gym

# Import sophisticated networks
class ValueDenseNet(nn.Module):
    def __init__(self, state_dim, hidden_sizes=(128, 128), activation=nn.Tanh):
        super(ValueDenseNet, self).__init__()
        self.activation = activation()
        self.layers = nn.ModuleList()

        input_size = state_dim
        for hidden_size in hidden_sizes:
            layer = nn.Linear(input_size, hidden_size)
            nn.init.orthogonal_(layer.weight, gain=nn.init.calculate_gain("relu"))
            self.layers.append(layer)
            input_size = hidden_size

        self.output_layer = nn.Linear(input_size, 1)
        nn.init.orthogonal_(self.output_layer.weight, gain=1.0)

    def forward(self, state):
        x = state
        for layer in self.layers:
            x = self.activation(layer(x))
        return self.output_layer(x)


class CtsPolicy(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_sizes=(128, 128), activation=nn.Tanh):
        super(CtsPolicy, self).__init__()
        self.activation = activation()
        self.layers = nn.ModuleList()

        input_size = state_dim
        for hidden_size in hidden_sizes:
            layer = nn.Linear(input_size, hidden_size)
            nn.init.orthogonal_(layer.weight, gain=nn.init.calculate_gain("tanh"))
            self.layers.append(layer)
            input_size = hidden_size

        self.mean_layer = nn.Linear(input_size, action_dim)
        nn.init.orthogonal_(self.mean_layer.weight, gain=0.01)
        self.log_std = nn.Parameter(torch.zeros(action_dim))

    def forward(self, state):
        x = state
        for layer in self.layers:
            x = self.activation(layer(x))
        mean = torch.tanh(self.mean_layer(x))
        std = torch.exp(self.log_std)
        return mean, std




# class SAPPOAgent:
#     def __init__(self, state_dim, action_dim, discrete=True, lr=3e-4, gamma=0.99, lam=0.95, eps_clip=0.2, k_epochs=4, sgld_steps=10, sgld_lr=0.01):
#         self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#         # Actor and critic networks
#         self.policy_net = CtsPolicy(state_dim, action_dim, hidden_sizes=(128, 128), activation=nn.Tanh).to(self.device)
#         self.value_net = ValueDenseNet(state_dim, hidden_sizes=(128, 128), activation=nn.Tanh).to(self.device)

#         self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
#         self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=lr)

#         self.gamma = gamma
#         self.lam = lam
#         self.eps_clip = eps_clip
#         self.k_epochs = k_epochs

#         self.sgld_steps = sgld_steps
#         self.sgld_lr = sgld_lr

#     def select_action(self, state):
#         state = torch.tensor(state, dtype=torch.float32).to(self.device).unsqueeze(0)
#         with torch.no_grad():
#             mean, std = self.policy_net(state)
#             dist = torch.distributions.Normal(mean, std)
#             action = dist.sample()
#             return action.cpu().numpy().squeeze(), dist.log_prob(action).sum()

#     def compute_gae(self, rewards, values, dones):
#         advantages = []
#         advantage = 0
#         for t in reversed(range(len(rewards))):
#             delta = rewards[t] + self.gamma * values[t + 1] * (1 - dones[t]) - values[t]
#             advantage = delta + self.gamma * self.lam * (1 - dones[t]) * advantage
#             advantages.insert(0, advantage)
#         return torch.tensor(advantages, device=self.device, dtype=torch.float32)

#     def sgld_step(self, state, epsilon):
#         """Perform Stochastic Gradient Langevin Dynamics (SGLD) to generate perturbed states."""
#         perturbed_state = state.clone().detach().to(self.device).requires_grad_(True)
    
#         for _ in range(self.sgld_steps):
#             if perturbed_state.grad is not None:
#                 perturbed_state.grad.zero_()
    
#             # Compute KL divergence between original and perturbed policies
#             with torch.no_grad():
#                 original_logits = self.policy_net(state)
#             perturbed_logits = self.policy_net(perturbed_state)
    
#             if self.policy_net.discrete:
#                 original_policy = dist.Categorical(original_logits)
#                 perturbed_policy = dist.Categorical(perturbed_logits)
#             else:
#                 original_mean, original_std = original_logits
#                 perturbed_mean, perturbed_std = perturbed_logits
#                 original_policy = dist.Normal(original_mean, original_std)
#                 perturbed_policy = dist.Normal(perturbed_mean, perturbed_std)
    
#             kl_div = dist.kl.kl_divergence(original_policy, perturbed_policy).mean()
    
#             # Backpropagate KL divergence
#             kl_div.backward()
    
#             # Update perturbed state using gradient and noise
#             perturbed_state = perturbed_state + epsilon * perturbed_state.grad + torch.randn_like(perturbed_state) * epsilon
#             perturbed_state = perturbed_state.detach().clone().requires_grad_(True)
    
#         return perturbed_state.detach()

#     def compute_kl_regularization(self, states, actions):
#         """Compute the KL divergence regularization across all states."""
#         if len(states) == 0:
#             return torch.tensor(0.0, device=self.device)
    
#         kl_div_total = 0
#         for state in states:
#             perturbed_state = self.sgld_step(state, self.sgld_lr)
    
#             with torch.no_grad():
#                 original_logits = self.policy_net(state)
#             perturbed_logits = self.policy_net(perturbed_state)
    
#             if self.policy_net.discrete:
#                 original_policy = dist.Categorical(original_logits)
#                 perturbed_policy = dist.Categorical(perturbed_logits)
#             else:
#                 original_mean, original_std = original_logits
#                 perturbed_mean, perturbed_std = perturbed_logits
#                 original_policy = dist.Normal(original_mean, original_std)
#                 perturbed_policy = dist.Normal(perturbed_mean, perturbed_std)
    
#             kl_div = dist.kl.kl_divergence(original_policy, perturbed_policy).mean()
#             kl_div_total += kl_div
    
#         return kl_div_total / len(states)
#     def train(self, env, max_episodes=1000, rollout_steps=2048, batch_size=64):
#         for episode in range(max_episodes):
#             states, actions, rewards, dones, log_probs, values = [], [], [], [], [], []
    
#             # Reset the environment
#             state, _ = env.reset()
#             state = torch.tensor(state, dtype=torch.float32).to(self.device)
    
#             # Rollout phase: Collect trajectories
#             for _ in range(rollout_steps):
#                 value = self.value_net(state).squeeze(0).detach()  # Detach the value tensor
#                 action, log_prob = self.select_action(state.cpu().numpy())
    
#                 next_state, reward, done, truncated, _ = env.step(action)
    
#                 # Append data to lists
#                 states.append(state.clone().detach())
#                 actions.append(action)
#                 rewards.append(reward)
#                 dones.append(done or truncated)
#                 log_probs.append(log_prob.clone().detach())
#                 values.append(value)
    
#                 # Update state
#                 state = torch.tensor(next_state, dtype=torch.float32).to(self.device)
#                 if done or truncated:
#                     state, _ = env.reset()
#                     state = torch.tensor(state, dtype=torch.float32).to(self.device)
    
#             # Add a final value estimate
#             values.append(torch.tensor([0], device=self.device).detach())
    
#             # Compute advantages and returns
#             advantages = self.compute_gae(rewards, values, dones)
#             returns = advantages + torch.tensor(values[:-1], device=self.device)

#             # Convert lists to tensors
#             states = torch.stack(states).to(self.device)
#             actions = torch.tensor(
#                 np.array(actions),
#                 dtype=torch.float32  # Always float32 for continuous actions
#             ).to(self.device)
#             log_probs = torch.stack(log_probs).to(self.device)
    
#             # Optimization phase
#             for _ in range(self.k_epochs):
#                 kl_reg = self.compute_kl_regularization(states, actions)
    
#                 for i in range(0, rollout_steps, batch_size):
#                     batch_states = states[i:i + batch_size]
#                     batch_actions = actions[i:i + batch_size]
#                     batch_log_probs = log_probs[i:i + batch_size]
#                     batch_advantages = advantages[i:i + batch_size]
#                     batch_returns = returns[i:i + batch_size]

#                     mean, std = self.policy_net(batch_states)
#                     dist = torch.distributions.Normal(mean, std)
#                     new_log_probs = dist.log_prob(batch_actions).sum(dim=-1)
    
#                     ratio = torch.exp(new_log_probs - batch_log_probs)
#                     surr1 = ratio * batch_advantages
#                     surr2 = torch.clamp(ratio, 1 - self.eps_clip, 1 + self.eps_clip) * batch_advantages
#                     policy_loss = -torch.min(surr1, surr2).mean()
    
#                     value_preds = self.value_net(batch_states).squeeze(-1)
#                     value_loss = nn.MSELoss()(value_preds, batch_returns)
    
#                     # Detach kl_reg to prevent graph accumulation
#                     kl_reg = kl_reg.detach()

#                     total_loss = policy_loss + 0.5 * value_loss + 0.01 * kl_reg
    
#                     self.policy_optimizer.zero_grad()
#                     self.value_optimizer.zero_grad()
#                     total_loss.backward(retain_graph=False)  # No need to retain the graph here
#                     self.policy_optimizer.step()
#                     self.value_optimizer.step()
    
#             print(f"Episode {episode + 1}: Policy Loss = {policy_loss.item()}, Value Loss = {value_loss.item()}, KL Reg = {kl_reg.item()}")
    
        

    
     
          


In [14]:
class SAPPOAgent:
    def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99, lam=0.95, eps_clip=0.2, k_epochs=4, sgld_steps=10, sgld_lr=0.01):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Actor and critic networks
        self.policy_net = CtsPolicy(state_dim, action_dim).to(self.device)
        self.value_net = ValueDenseNet(state_dim).to(self.device)

        self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=lr)

        self.gamma = gamma
        self.lam = lam
        self.eps_clip = eps_clip
        self.k_epochs = k_epochs

        self.sgld_steps = sgld_steps
        self.sgld_lr = sgld_lr

    def select_action(self, state):
        state = torch.tensor(state, dtype=torch.float32).to(self.device)
        mean, std = self.policy_net(state)
        dist = torch.distributions.Normal(mean, std)
        action = dist.sample()
        return action.cpu().numpy(), dist.log_prob(action).sum()

    def compute_gae(self, rewards, values, dones):
        advantages = []
        advantage = 0
        for t in reversed(range(len(rewards))):
            delta = rewards[t] + self.gamma * values[t + 1] * (1 - dones[t]) - values[t]
            advantage = delta + self.gamma * self.lam * (1 - dones[t]) * advantage
            advantages.insert(0, advantage)
        return torch.tensor(advantages, device=self.device, dtype=torch.float32)

    def sgld_step(self, state, epsilon):
        """Perform Stochastic Gradient Langevin Dynamics (SGLD) to generate perturbed states."""
        perturbed_state = state.clone().detach().to(self.device).requires_grad_(True)

        for _ in range(self.sgld_steps):
            if perturbed_state.grad is not None:
                perturbed_state.grad.zero_()

            # Compute KL divergence between original and perturbed policies
            with torch.no_grad():
                original_mean, original_std = self.policy_net(state)
            perturbed_mean, perturbed_std = self.policy_net(perturbed_state)

            original_policy = dist.Normal(original_mean, original_std)
            perturbed_policy = dist.Normal(perturbed_mean, perturbed_std)

            kl_div = dist.kl.kl_divergence(original_policy, perturbed_policy).mean()

            # Backpropagate KL divergence
            kl_div.backward()

            # Update perturbed state using gradient and noise
            perturbed_state = perturbed_state + epsilon * perturbed_state.grad + torch.randn_like(perturbed_state) * epsilon
            perturbed_state = perturbed_state.detach().clone().requires_grad_(True)

        return perturbed_state.detach()

    def compute_kl_regularization(self, states, actions):
        """Compute the KL divergence regularization across all states."""
        if len(states) == 0:
            return torch.tensor(0.0, device=self.device)

        kl_div_total = 0
        for state in states:
            perturbed_state = self.sgld_step(state, self.sgld_lr)

            with torch.no_grad():
                original_mean, original_std = self.policy_net(state)
            perturbed_mean, perturbed_std = self.policy_net(perturbed_state)

            original_policy = dist.Normal(original_mean, original_std)
            perturbed_policy = dist.Normal(perturbed_mean, perturbed_std)

            kl_div = dist.kl.kl_divergence(original_policy, perturbed_policy).mean()
            kl_div_total += kl_div

        return kl_div_total / len(states)

    def train(self, env, max_episodes=1000, rollout_steps=2048, batch_size=64):
        for episode in range(max_episodes):
            states, actions, rewards, dones, log_probs, values = [], [], [], [], [], []

            # Reset the environment
            state, _ = env.reset()
            state = torch.tensor(state, dtype=torch.float32).to(self.device)

            # Rollout phase: Collect trajectories
            for _ in range(rollout_steps):
                value = self.value_net(state).squeeze(0).detach()
                action, log_prob = self.select_action(state.cpu().numpy())

                next_state, reward, done, truncated, _ = env.step(action)

                states.append(state.clone().detach())
                actions.append(action)
                rewards.append(reward)
                dones.append(done or truncated)
                log_probs.append(log_prob.clone().detach())
                values.append(value)

                state = torch.tensor(next_state, dtype=torch.float32).to(self.device)
                if done or truncated:
                    state, _ = env.reset()
                    state = torch.tensor(state, dtype=torch.float32).to(self.device)

            # Add a final value estimate
            values.append(torch.tensor([0], device=self.device).detach())

            # Compute advantages and returns
            advantages = self.compute_gae(rewards, values, dones)
            returns = advantages + torch.tensor(values[:-1], device=self.device)

            # Convert lists to tensors
            states = torch.stack(states).to(self.device)
            actions = torch.tensor(np.array(actions), dtype=torch.float32).to(self.device)
            log_probs = torch.stack(log_probs).to(self.device)

            # Optimization phase
            for _ in range(self.k_epochs):
                kl_reg = self.compute_kl_regularization(states, actions)

                for i in range(0, rollout_steps, batch_size):
                    batch_states = states[i:i + batch_size]
                    batch_actions = actions[i:i + batch_size]
                    batch_log_probs = log_probs[i:i + batch_size]
                    batch_advantages = advantages[i:i + batch_size]
                    batch_returns = returns[i:i + batch_size]

                    mean, std = self.policy_net(batch_states)
                    dist = torch.distributions.Normal(mean, std)
                    new_log_probs = dist.log_prob(batch_actions).sum(dim=-1)

                    ratio = torch.exp(new_log_probs - batch_log_probs)
                    surr1 = ratio * batch_advantages
                    surr2 = torch.clamp(ratio, 1 - self.eps_clip, 1 + self.eps_clip) * batch_advantages
                    policy_loss = -torch.min(surr1, surr2).mean()

                    value_preds = self.value_net(batch_states).squeeze(-1)
                    value_loss = nn.MSELoss()(value_preds, batch_returns)

                    total_loss = policy_loss + 0.5 * value_loss + 0.01 * kl_reg.detach()

                    self.policy_optimizer.zero_grad()
                    self.value_optimizer.zero_grad()
                    total_loss.backward()
                    self.policy_optimizer.step()
                    self.value_optimizer.step()

            print(f"Episode {episode + 1}: Policy Loss = {policy_loss.item()}, Value Loss = {value_loss.item()}, KL Reg = {kl_reg.item()}")


In [16]:
if __name__ == "__main__":
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.shape[0]
    discrete = False  # Set to True if action space is discrete
    
    RobustAgent = SAPPOAgent(state_dim, action_dim, discrete)
    RobustAgent.train(env, max_episodes=20)


Episode 1: Policy Loss = -3.2936959266662598, Value Loss = 16.149124145507812, KL Reg = 5.190177176928046e-09
Episode 2: Policy Loss = -8.31801986694336, Value Loss = 89.69292449951172, KL Reg = 5.093167665393139e-09
Episode 3: Policy Loss = -7.369718551635742, Value Loss = 75.51033020019531, KL Reg = 4.908844442041982e-09
Episode 4: Policy Loss = -4.391140937805176, Value Loss = 30.305814743041992, KL Reg = 5.617031728633037e-09
Episode 5: Policy Loss = -6.125728607177734, Value Loss = 61.661521911621094, KL Reg = 5.258087742987527e-09
Episode 6: Policy Loss = -3.5035226345062256, Value Loss = 21.29940414428711, KL Reg = 5.403604230735937e-09
Episode 7: Policy Loss = -5.351285457611084, Value Loss = 35.21036911010742, KL Reg = 5.995378860035316e-09
Episode 8: Policy Loss = -8.764406204223633, Value Loss = 108.15007781982422, KL Reg = 5.733447494549182e-09
Episode 9: Policy Loss = -3.085789442062378, Value Loss = 20.388507843017578, KL Reg = 6.451332801304943e-09
Episode 10: Policy Los

In [17]:
# Initialize the environment
env = gym.make("Hopper-v4")

# Evaluate the agent using the trained policy network
average_reward = evaluate_agent(env, RobustAgent.policy_net, num_episodes=200)


Episode 1: Reward = 8.305492123964871
Episode 2: Reward = 25.747382310612124
Episode 3: Reward = 3.2708648496659745
Episode 4: Reward = 6.6823732225626244
Episode 5: Reward = 10.078487054843045
Episode 6: Reward = 9.429944475378923
Episode 7: Reward = 9.619057926258394
Episode 8: Reward = 9.45160716691598
Episode 9: Reward = 14.935010016757237
Episode 10: Reward = 6.438583452859067
Episode 11: Reward = 6.774514376270599
Episode 12: Reward = 8.481237161450133
Episode 13: Reward = 12.686266890047545
Episode 14: Reward = 6.1580337376665515
Episode 15: Reward = 7.811738139567256
Episode 16: Reward = 6.082923122193883
Episode 17: Reward = 21.28172084748687
Episode 18: Reward = 8.16732417631659
Episode 19: Reward = 5.917934371751272
Episode 20: Reward = 9.265311478868544
Episode 21: Reward = 6.061407251017224
Episode 22: Reward = 19.45414598822266
Episode 23: Reward = 9.029031308873432
Episode 24: Reward = 18.00142902981201
Episode 25: Reward = 15.822545588110108
Episode 26: Reward = 36.5255

In [18]:
import torch
import numpy as np
import gymnasium as gym

def random_perturbation(state, epsilon):
    """
    Apply random perturbation to the state.
    Args:
        state: The original state.
        epsilon: The maximum magnitude of random noise.
    Returns:
        Perturbed state.
    """
    noise = np.random.uniform(-epsilon, epsilon, size=state.shape)
    perturbed_state = state + noise
    return perturbed_state

def evaluate_agent_with_random_attack(env, policy_net, epsilon=5, num_episodes=200):
    """
    Evaluate the agent with random perturbation applied to states during testing.
    Args:
        env: The environment to test the agent.
        policy_net: The trained policy network.
        epsilon: Maximum magnitude of random noise for perturbation.
        num_episodes: Number of episodes for evaluation.
    Returns:
        Average reward over the episodes.
    """
    # Ensure policy network is on the same device as input tensors
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    policy_net.to(device)
    policy_net.eval()  # Set the network to evaluation mode

    total_reward = 0

    for episode in range(num_episodes):
        state = env.reset()
        if isinstance(state, tuple):  # Handle Gymnasium's (observation, info) format
            state = state[0]
        episode_reward = 0
        done = False

        while not done:
            # Apply random perturbation to the state
            perturbed_state = random_perturbation(state, epsilon)

            # Convert perturbed state to tensor and send to the same device as the policy network
            state_tensor = torch.tensor(perturbed_state, dtype=torch.float32, device=device).unsqueeze(0)

            # Get action from the policy network
            with torch.no_grad():
                policy_output = policy_net(state_tensor)
                # If the policy network outputs a tuple, extract the action vector
                if isinstance(policy_output, tuple):
                    action = policy_output[0].cpu().numpy().squeeze()
                else:
                    action = policy_output.cpu().numpy().squeeze()

            # Take the action in the environment
            next_step_result = env.step(action)
            if isinstance(next_step_result, tuple):
                next_state, reward, done, _, _ = next_step_result
            else:
                next_state, reward, done = next_step_result[:3]

            episode_reward += reward
            state = next_state

        total_reward += episode_reward
        print(f"Episode {episode + 1}: Reward = {episode_reward}")

    average_reward = total_reward / num_episodes
    print(f"Average Reward over {num_episodes} episodes: {average_reward}")
    return average_reward

# Example usage
env = gym.make("Hopper-v4")
policy_net = RobustAgent.policy_net  # Use your trained policy network here

epsilon = 0.1  # Maximum perturbation magnitude
evaluate_agent_with_random_attack(env, policy_net, epsilon)


Episode 1: Reward = 39.2224931317058
Episode 2: Reward = 41.22929859486118
Episode 3: Reward = 40.45784215218881
Episode 4: Reward = 40.23283605668382
Episode 5: Reward = 40.39897994694226
Episode 6: Reward = 40.221428874264504
Episode 7: Reward = 41.01772176944938
Episode 8: Reward = 40.27922336631392
Episode 9: Reward = 39.061015672546574
Episode 10: Reward = 40.70719901907581
Episode 11: Reward = 40.31623703239629
Episode 12: Reward = 38.260970694485444
Episode 13: Reward = 39.33027439897924
Episode 14: Reward = 40.41245361055472
Episode 15: Reward = 40.08277081093781
Episode 16: Reward = 41.2159869693659
Episode 17: Reward = 41.54063980094815
Episode 18: Reward = 39.195923863531505
Episode 19: Reward = 39.36841631877313
Episode 20: Reward = 40.094481008761214
Episode 21: Reward = 40.09466668347692
Episode 22: Reward = 40.35980564809633
Episode 23: Reward = 40.15648743358332
Episode 24: Reward = 40.58885837654398
Episode 25: Reward = 40.14466295435432
Episode 26: Reward = 40.0830280

40.009037119857226

In [19]:
# Example usage
env = gym.make("Hopper-v4")

# Initialize the trained policy network
policy_net = RobustAgent.policy_net  # Use your trained policy network here

# Parameters for MAD attack
epsilon = 0.1  # Maximum perturbation magnitude
attack_steps = 10  # Number of gradient steps
step_epsilon = 0.01  # Step size for each gradient step

# Evaluate the policy under MAD attack
average_reward = evaluate_agent_with_mad_attack(env, policy_net, epsilon, num_episodes=200, attack_steps=attack_steps, step_epsilon=step_epsilon)
print(f"Final Average Reward under MAD Attack: {average_reward}")


Episode 1/200: Reward = 10.434742914791503
Episode 2/200: Reward = 5.417426003477402
Episode 3/200: Reward = 14.668772095687922
Episode 4/200: Reward = 38.2123422633607
Episode 5/200: Reward = 14.282820308923435
Episode 6/200: Reward = 8.953468696901771
Episode 7/200: Reward = 17.629842888418775
Episode 8/200: Reward = 8.24715327926889
Episode 9/200: Reward = 15.3304959363805
Episode 10/200: Reward = 133.57492896370198
Episode 11/200: Reward = 8.08644098517564
Episode 12/200: Reward = 25.81871571647448
Episode 13/200: Reward = 7.176300306696672
Episode 14/200: Reward = 13.486829103478758
Episode 15/200: Reward = 10.950812106214816
Episode 16/200: Reward = 24.268289808574032
Episode 17/200: Reward = 11.207309992832563
Episode 18/200: Reward = 4.0628568264083995
Episode 19/200: Reward = 10.721401542100223
Episode 20/200: Reward = 9.15268438293499
Episode 21/200: Reward = 15.409369871994203
Episode 22/200: Reward = 12.352357959653983
Episode 23/200: Reward = 8.117591254145553
Episode 24/2

In [20]:
# Example usage
env = gym.make("Hopper-v4")

# Initialize the trained policy network
policy_net = RobustAgent.policy_net  # Use your trained policy network here

# Parameters for MAD attack
epsilon = 0.1  # Maximum perturbation magnitude
attack_steps = 10  # Number of gradient steps
step_epsilon = 0.01  # Step size for each gradient step
epsilon_schedule = [0.01 * i for i in range(1, 101)]
# Evaluate the policy under MAD attack

robust_q_net=robust_sarsa_attack(
    env=env,
    policy_net=policy_net,
    epsilon_schedule=epsilon_schedule,
    num_steps=5000,        # Number of training steps
    lambda_rs=0.1,         # Regularization parameter for robust loss
    batch_size=64,         # Batch size for training
    gamma=0.99             # Discount factor
)


  actions = torch.tensor(actions, dtype=torch.float32).to(device)  # Continuous actions


Step 100/5000, TD Loss: 0.6278, Robust Loss: 0.1636
Step 200/5000, TD Loss: 0.5392, Robust Loss: 0.3412
Step 300/5000, TD Loss: 0.5079, Robust Loss: 0.3753
Step 400/5000, TD Loss: 0.4827, Robust Loss: 0.7698
Step 500/5000, TD Loss: 0.4616, Robust Loss: 0.5638
Step 600/5000, TD Loss: 0.4881, Robust Loss: 0.9662
Step 700/5000, TD Loss: 0.4720, Robust Loss: 0.7391
Step 800/5000, TD Loss: 0.4840, Robust Loss: 0.8571
Step 900/5000, TD Loss: 0.4464, Robust Loss: 0.8866
Step 1000/5000, TD Loss: 0.4585, Robust Loss: 1.0489
Step 1100/5000, TD Loss: 0.4712, Robust Loss: 0.8254
Step 1200/5000, TD Loss: 0.4780, Robust Loss: 0.9241
Step 1300/5000, TD Loss: 0.4499, Robust Loss: 0.9922
Step 1400/5000, TD Loss: 0.4310, Robust Loss: 0.7281
Step 1500/5000, TD Loss: 0.4719, Robust Loss: 0.9722
Step 1600/5000, TD Loss: 0.4361, Robust Loss: 0.9781
Step 1700/5000, TD Loss: 0.4505, Robust Loss: 0.8806
Step 1800/5000, TD Loss: 0.4228, Robust Loss: 1.3964
Step 1900/5000, TD Loss: 0.4204, Robust Loss: 0.7944
St

In [31]:
average_reward = average_reward = evaluate_agent_with_robust_sarsa_attack(
    env=env,
    policy_net=policy_net,
    robust_q_net=robust_q_net,
    epsilon=0.05,
    num_episodes=200,
    step_size=0.01
)
print(f"Final Average Reward under Robust Sarsa Attack: {average_reward}")


Episode 1/200: Reward = 39.48946673843793
Episode 2/200: Reward = 38.88514034896076
Episode 3/200: Reward = 39.399382327984625
Episode 4/200: Reward = 41.69809705750213
Episode 5/200: Reward = 40.36334403271557
Episode 6/200: Reward = 39.28390451162531
Episode 7/200: Reward = 39.000457952806386
Episode 8/200: Reward = 39.27576847154834
Episode 9/200: Reward = 39.41900023785964
Episode 10/200: Reward = 39.097197104467675
Episode 11/200: Reward = 39.22610448110138
Episode 12/200: Reward = 40.34034905099732
Episode 13/200: Reward = 39.37402439952458
Episode 14/200: Reward = 38.909393465164904
Episode 15/200: Reward = 40.27047623765004
Episode 16/200: Reward = 39.40471991610045
Episode 17/200: Reward = 40.133139698614194
Episode 18/200: Reward = 40.07949520524676
Episode 19/200: Reward = 40.53360367318834
Episode 20/200: Reward = 41.43635253200792
Episode 21/200: Reward = 39.19371684746211
Episode 22/200: Reward = 40.39199855596881
Episode 23/200: Reward = 41.40508177300148
Episode 24/200:

In [32]:
def evaluate_agent_with_state_value_attack(env, policy_net, value_net, epsilon, num_episodes=200, attack_steps=10, step_epsilon=0.01):
    """
    Evaluate the agent under a State Value Attack using a value network.

    Args:
        env (gym.Env): The environment.
        policy_net (torch.nn.Module): The trained policy network.
        value_net (torch.nn.Module): The trained value network.
        epsilon (float): Maximum perturbation magnitude.
        num_episodes (int): Number of episodes for evaluation.
        attack_steps (int): Number of gradient steps for the attack.
        step_epsilon (float): Step size for each gradient step.

    Returns:
        float: Average reward over the episodes under the state value attack.
    """
    device = next(policy_net.parameters()).device
    total_reward = 0

    for episode in range(num_episodes):
        state = env.reset()
        if isinstance(state, tuple):
            state = state[0]  # Unpack if state is returned as (observation, info)
        state = torch.tensor(state, dtype=torch.float32).to(device)

        episode_reward = 0
        done = False

        while not done:
            # Start with the original state
            perturbed_state = state.clone().detach().requires_grad_(True)

            for _ in range(attack_steps):
                # Compute value for the perturbed state
                value = value_net(perturbed_state.unsqueeze(0))

                # Minimize or maximize the value
                loss = -value.mean()  # Gradient ascent to maximize adversarial effect
                loss.backward()

                # Apply gradient-based perturbation
                grad = perturbed_state.grad
                perturbation = step_epsilon * grad.sign()
                perturbed_state = (perturbed_state + perturbation).detach().requires_grad_(True)

                # Clamp the perturbed state to within the epsilon-ball
                perturbed_state = torch.max(
                    torch.min(perturbed_state, state + epsilon), state - epsilon
                ).detach().requires_grad_(True)

            # Use the perturbed state to select the action
            with torch.no_grad():
                action_output = policy_net(perturbed_state.unsqueeze(0))
                if isinstance(action_output, tuple):
                    action = action_output[0]  # Extract mean for continuous actions
                else:
                    action = action_output

                action = action.squeeze().cpu().numpy()  # Ensure the action is in NumPy format

            # Step the environment
            next_step_result = env.step(action)
            if isinstance(next_step_result, tuple):
                next_state, reward, done, _, _ = next_step_result
            else:
                next_state, reward, done = next_step_result[:3]

            # Accumulate the reward
            episode_reward += reward

            # Update the state
            state = torch.tensor(next_state, dtype=torch.float32).to(device)

        total_reward += episode_reward
        print(f"Episode {episode + 1}/{num_episodes}: Reward = {episode_reward}")

    average_reward = total_reward / num_episodes
    print(f"Average Reward under State Value Attack: {average_reward}")
    return average_reward


In [33]:
def evaluate_agent_with_target_policy_attack(env, policy_net, target_action, epsilon, num_episodes=200, attack_steps=10, step_epsilon=0.01):
    """
    Evaluate the agent under a Target Policy Misclassification attack.

    Args:
        env (gym.Env): The environment.
        policy_net (torch.nn.Module): The trained policy network.
        target_action (torch.Tensor): The target action to force the policy to output.
        epsilon (float): Maximum perturbation magnitude.
        num_episodes (int): Number of episodes for evaluation.
        attack_steps (int): Number of gradient steps for the attack.
        step_epsilon (float): Step size for each gradient step.

    Returns:
        float: Average reward over the episodes under Target Policy Misclassification attack.
    """
    device = next(policy_net.parameters()).device
    total_reward = 0

    for episode in range(num_episodes):
        state = env.reset()
        if isinstance(state, tuple):
            state = state[0]  # Unpack if state is returned as (observation, info)
        state = torch.tensor(state, dtype=torch.float32).to(device)

        episode_reward = 0
        done = False

        while not done:
            perturbed_state = state.clone().detach().requires_grad_(True)

            for _ in range(attack_steps):
                # Get policy output for the perturbed state
                policy_output = policy_net(perturbed_state.unsqueeze(0))
                if isinstance(env.action_space, gym.spaces.Discrete):
                    logits = policy_output  # For discrete actions
                    loss = torch.nn.functional.cross_entropy(logits, target_action)  # Cross-entropy loss
                elif isinstance(env.action_space, gym.spaces.Box):
                    if isinstance(policy_output, tuple):
                        mean, _ = policy_output  # Mean and std
                    else:
                        mean = policy_output
                    loss = torch.nn.functional.mse_loss(mean, target_action)  # MSE loss for continuous actions
                else:
                    raise ValueError("Unsupported action space type.")

                # Backpropagate to compute gradients
                loss.backward()

                # Apply gradient-based perturbation
                grad = perturbed_state.grad
                perturbation = step_epsilon * grad.sign()
                perturbed_state = (perturbed_state + perturbation).detach().requires_grad_(True)

                # Clamp the perturbed state
                perturbed_state = torch.max(
                    torch.min(perturbed_state, state + epsilon), state - epsilon
                ).detach().requires_grad_(True)

            # Use the perturbed state to select the action
            with torch.no_grad():
                policy_output = policy_net(perturbed_state.unsqueeze(0))
                if isinstance(env.action_space, gym.spaces.Discrete):
                    action = torch.argmax(policy_output, dim=1).item()  # Discrete action
                else:
                    if isinstance(policy_output, tuple):
                        mean, _ = policy_output
                    else:
                        mean = policy_output
                    action = mean.squeeze().cpu().numpy()  # Continuous action

            # Step the environment
            next_step_result = env.step(action)
            if isinstance(next_step_result, tuple):
                next_state, reward, done, _, _ = next_step_result
            else:
                next_state, reward, done = next_step_result[:3]

            episode_reward += reward
            state = torch.tensor(next_state, dtype=torch.float32).to(device)

        total_reward += episode_reward
        print(f"Episode {episode + 1}/{num_episodes}: Reward = {episode_reward}")

    average_reward = total_reward / num_episodes
    print(f"Average Reward under Target Policy Misclassification attack: {average_reward}")
    return average_reward


In [34]:
# Assuming `policy_net` and `q_net` are already defined and trained
# Example environment
import gymnasium as gym
env = gym.make("Hopper-v4")

# Parameters
epsilon = 0.1  # Maximum perturbation magnitude
num_episodes = 200  # Number of episodes to evaluate
attack_steps = 10  # Number of attack gradient steps
step_epsilon = 0.01  # Step size for each gradient step

# Call the attack evaluation function
average_reward_sav = evaluate_agent_with_state_value_attack(
    env=env,
    policy_net=RobustAgent.policy_net,  # Trained policy network
    value_net=RobustAgent.value_net,  # Trained Q-value network (critic)
    epsilon=epsilon,
    num_episodes=num_episodes,
    attack_steps=attack_steps,
    step_epsilon=step_epsilon
)

print(f"Average Reward under State Action Value Attack: {average_reward_sav}")


Episode 1/200: Reward = 39.53260809583266
Episode 2/200: Reward = 41.21169296067531
Episode 3/200: Reward = 40.26438461444689
Episode 4/200: Reward = 40.11466603377153
Episode 5/200: Reward = 39.23989540268539
Episode 6/200: Reward = 39.158202056593446
Episode 7/200: Reward = 40.14646258301174
Episode 8/200: Reward = 38.85618688325834
Episode 9/200: Reward = 40.05670682291415
Episode 10/200: Reward = 39.25603490676412
Episode 11/200: Reward = 39.60938887267172
Episode 12/200: Reward = 39.141153634082784
Episode 13/200: Reward = 39.170966047815554
Episode 14/200: Reward = 40.67548902909676
Episode 15/200: Reward = 40.33815091997627
Episode 16/200: Reward = 40.51220212706108
Episode 17/200: Reward = 40.306312231794095
Episode 18/200: Reward = 38.579173226762954
Episode 19/200: Reward = 39.525388071306544
Episode 20/200: Reward = 38.29488015910395
Episode 21/200: Reward = 39.27354281913439
Episode 22/200: Reward = 39.38107192474693
Episode 23/200: Reward = 40.19868080589899
Episode 24/200

In [38]:
# Assuming `policy_net` is already defined and trained
# Example environment
env = gym.make("Hopper-v4")

# Parameters
epsilon = 0.1  # Maximum perturbation magnitude
num_episodes = 200  # Number of episodes to evaluate
attack_steps = 10  # Number of attack gradient steps
step_epsilon = 0.01  # Step size for each gradient step

# Define the target action for the misclassification attack
# For discrete actions: target_action is the action index
if isinstance(env.action_space, gym.spaces.Discrete):
    target_action = torch.tensor([1], dtype=torch.long).to(next(RobustAgent.policy_net.parameters()).device)
# For continuous actions: target_action is a vector of desired action values
elif isinstance(env.action_space, gym.spaces.Box):
    target_action = torch.tensor([0.5] * env.action_space.shape[0], dtype=torch.float32).to(next(RobustAgent.policy_net.parameters()).device)
else:
    raise ValueError("Unsupported action space type.")

# Call the attack evaluation function
average_reward_tpm = evaluate_agent_with_target_policy_attack(
    env=env,
    policy_net=RobustAgent.policy_net,  # Trained policy network
    target_action=target_action,
    epsilon=epsilon,
    num_episodes=num_episodes,
    attack_steps=attack_steps,
    step_epsilon=step_epsilon
)

print(f"Average Reward under Target Policy Misclassification Attack: {average_reward_tpm}")


  loss = torch.nn.functional.mse_loss(mean, target_action)  # MSE loss for continuous actions


Episode 1/200: Reward = 38.3616147138168
Episode 2/200: Reward = 40.49922303707294
Episode 3/200: Reward = 39.46538939845266
Episode 4/200: Reward = 37.40209181156585
Episode 5/200: Reward = 38.493583997793
Episode 6/200: Reward = 38.21740920038773
Episode 7/200: Reward = 38.25061637062804
Episode 8/200: Reward = 40.06234226616439
Episode 9/200: Reward = 40.64480758303203
Episode 10/200: Reward = 38.355623418441304
Episode 11/200: Reward = 38.59940858455558
Episode 12/200: Reward = 39.290440324454025
Episode 13/200: Reward = 38.44555439176992
Episode 14/200: Reward = 39.26627947626379
Episode 15/200: Reward = 40.657967966286265
Episode 16/200: Reward = 38.63376392131351
Episode 17/200: Reward = 39.395417330771615
Episode 18/200: Reward = 38.31453385631184
Episode 19/200: Reward = 38.592123299886154
Episode 20/200: Reward = 39.42447717630096
Episode 21/200: Reward = 38.19811996250014
Episode 22/200: Reward = 40.32086506642841
Episode 23/200: Reward = 38.57550752530815
Episode 24/200: Re