## <center>CSE 546: Reinforcement Learning</center>
### <center>Prof. Alina Vereshchaka</center>
#### <center>Spring 2025</center>

Welcome to the Assignment 3, Part 1: Introduction to Actor-Critic Methods! It includes the implementation of simple actor and critic networks and best practices used in modern Actor-Critic algorithms.

## Section 0: Setup and Imports

In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gymnasium as gym
import matplotlib.pyplot as plt
from collections import deque

# Set seed for reproducibility
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)

<torch._C.Generator at 0x79ffed0c1af0>

In [50]:
import ale_py
import traceback
import warnings
from gymnasium.spaces import Box, Discrete, MultiDiscrete

try:
    from gymnasium.wrappers import AtariPreprocessing
except ImportError:
    class AtariPreprocessing:
        def __init__(self, *args, **kwargs): raise NotImplementedError("AtariPreprocessing Import Failed")

try:
    # from gymnasium.wrappers import FrameStack
    # from gym.wrappers import FrameStack
    from gymnasium.wrappers import FrameStackObservation

except ImportError:
    class FrameStack:
         def __init__(self, *args, **kwargs): raise NotImplementedError("Dummy FrameStack: Import Failed")


warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=UserWarning, message=".*WARN: Box observation space.*")

In [49]:
from gymnasium.wrappers import FrameStackObservation


In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

Using device: cpu


In [10]:
class ManualOneHotObservation(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        if not isinstance(env.observation_space, Discrete):
            raise ValueError("ManualOneHotObservation requires Discrete observation space.")
        self.n = env.observation_space.n
        self.observation_space = Box(0.0, 1.0, (self.n,), dtype=np.float32)

    def observation(self, obs):
        one_hot = np.zeros(self.n, dtype=np.float32)
        one_hot[obs] = 1.0
        return one_hot

## Section 1: Actor-Critic Network Architectures and Loss Computation

In this section, you will explore two common architectural designs for Actor-Critic methods and implement their corresponding loss functions using dummy tensors. These architectures are:
- A. Completely separate actor and critic networks
- B. A shared network with two output heads

Both designs are widely used in practice. Shared networks are often more efficient and generalize better, while separate networks offer more control and flexibility.

---


### Task 1a – Separate Actor and Critic Networks with Loss Function

Define a class `SeparateActorCritic`. Your goal is to:
- Create two completely independent neural networks: one for the actor and one for the critic.
- The actor should output a probability distribution over discrete actions (use `nn.Softmax`).
- The critic should output a single scalar value.

 Use `nn.ReLU()` as your activation function. Include at least one hidden layer of reasonable width (e.g. 64 or 128 units).

```python
# TODO: Define SeparateActorCritic class
```

 Next, simulate training using dummy tensors:
1. Generate dummy tensors for log-probabilities, returns, estimated values, and entropies.
2. Compute the actor loss using the advantage (return - value).
3. Compute the critic loss as mean squared error between values and returns.
4. Use a single optimizer for both the Actor and the Critic. In this case, combine the actor and critic losses into a total loss and perform backpropagation.
5. Use a separate optimizers for both the Actor and the Critic. In this case, keep the actor and critic losses separate and perform backpropagation.

```python
# TODO: Simulate loss computation and backpropagation
```

🔗 Helpful references:
- PyTorch Softmax: https://pytorch.org/docs/stable/generated/torch.nn.Softmax.html
- PyTorch MSE Loss: https://pytorch.org/docs/stable/generated/torch.nn.functional.mse_loss.html

---

In [25]:
# TODO: Define a class SeparateActorCritic with separate networks for actor and critic

# BEGIN_YOUR_CODE
class SeparateActorCritic(nn.Module):
    def __init__(self, observation_dim, action_dim, hidden_dim=64):
        super(SeparateActorCritic, self).__init__()
        self.actor = nn.Sequential(
            nn.Linear(observation_dim, hidden_dim), nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim), nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
            nn.Softmax(dim=-1)
        ).to(device)
        self.critic = nn.Sequential(
            nn.Linear(observation_dim, hidden_dim), nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim), nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        ).to(device)

    def forward(self, state):
        state = state.float().to(device)
        action_logits = self.actor(state)
        value = self.critic(state)
        return action_logits, value

## Simulating Loss adn BackProp
N, obs_dim, act_dim = 10, 8, 4
model = SeparateActorCritic(obs_dim, act_dim)

dummy_states  = torch.randn(N, obs_dim)
dummy_returns = torch.randn(N, 1)

probs, values = model(dummy_states)
print(f"Return Values From Model:\n Critic: {values} \n Actor: {probs}")
dist        = torch.distributions.Categorical(probs)
actions     = dist.sample()
log_probs   = dist.log_prob(actions)

advantage = (dummy_returns - values).detach()

actor_loss  = -(log_probs * advantage.squeeze(-1)).mean()

critic_loss = F.mse_loss(values, dummy_returns)

opt_actor  = optim.Adam(model.actor.parameters(),  lr=1e-3)
opt_critic = optim.Adam(model.critic.parameters(), lr=1e-3)

opt_actor.zero_grad()
opt_critic.zero_grad()

actor_loss.backward()
opt_actor.step()

critic_loss.backward()
opt_critic.step()

print(f"Calculated Actor Loss: {actor_loss.item()} And Critic Loss={critic_loss.item()}")

# END_YOUR_CODE

Return Values From Model:
 Critic: tensor([[ 0.0032],
        [ 0.0522],
        [-0.0357],
        [ 0.0978],
        [-0.0136],
        [ 0.1160],
        [ 0.0752],
        [ 0.0782],
        [-0.0077],
        [ 0.1205]], grad_fn=<AddmmBackward0>) 
 Actor: tensor([[0.2906, 0.2723, 0.2130, 0.2241],
        [0.2811, 0.2716, 0.2244, 0.2229],
        [0.2922, 0.2914, 0.2207, 0.1957],
        [0.2713, 0.2909, 0.2226, 0.2152],
        [0.2652, 0.3112, 0.2142, 0.2094],
        [0.2924, 0.2619, 0.2123, 0.2334],
        [0.2729, 0.2987, 0.2224, 0.2061],
        [0.2781, 0.2907, 0.2019, 0.2293],
        [0.2953, 0.2820, 0.2160, 0.2068],
        [0.2874, 0.2806, 0.2235, 0.2085]], grad_fn=<SoftmaxBackward0>)
Calculated Actor Loss: -0.39679527282714844 And Critic Loss=0.8613286018371582


### Discuss the motivation behind each setup and when it may be preferred in practice.

YOUR ANSWER:

So here we are using a seprate netwroks for both actor and critic. In our opinion this setup gives us the advantage of being more simple and stable.
Moreover, because both critic and the actor have a different objective to full-fill they might benefit more from having different gradients then allowing for shared learning.

### Task 1b – Shared Network with Actor and Critic Heads + Loss Function

Now define a class `SharedActorCritic`:
- Build a shared base network (e.g., linear layer + ReLU)
- Create two heads: one for actor (output action probabilities) and one for critic (output state value)

```python
# TODO: Define SharedActorCritic class
```

Then:
1. Pass a dummy input tensor through the model to obtain action probabilities and value.
2. Simulate dummy rewards and compute advantage.
3. Compute the actor and critic losses, combine them, and backpropagate.

```python
# TODO: Simulate shared network loss computation and backpropagation
```

 Use `nn.Softmax` for actor output and `nn.Linear` for scalar critic output.

🔗 More reading:
- Policy Gradient Methods: https://spinningup.openai.com/en/latest/algorithms/vpg.html
- Actor-Critic Overview: https://www.tensorflow.org/agents/tutorials/6_reinforce_tutorial
- PyTorch Categorical Distribution: https://pytorch.org/docs/stable/distributions.html#categorical

---

In [28]:
# BEGIN_YOUR_CODE
class SharedActorCritic(nn.Module):
    def __init__(self, observation_dim, action_dim, hidden_dim=64):
        super(SharedActorCritic, self).__init__()
        self.shared = nn.Sequential(
            nn.Linear(observation_dim, hidden_dim), nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim), nn.ReLU()
        ).to(device)
        # self.actor_head = nn.Linear(hidden_dim, action_dim).to(device)
        self.actor_head = nn.Sequential(
            nn.Linear(hidden_dim, action_dim),
            nn.Softmax(dim=-1),
        )
        self.critic_head = nn.Linear(hidden_dim, 1).to(device)

    def forward(self, state):
        state = state.float().to(device)
        shared_features = self.shared(state)
        action_logits = self.actor_head(shared_features)
        value = self.critic_head(shared_features)
        return action_logits, value

obs_dim, act_dim = 8, 4
model     = SharedActorCritic(obs_dim, act_dim).to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)

N = 10
states  = torch.randn(N, obs_dim, device=device)
returns = torch.randn(N, 1,    device=device)
probs, values = model(states)
print(f"Return Values From Model:\n Critic: {values} \n Actor: {probs}")

dist      = torch.distributions.Categorical(probs)
actions   = dist.sample()
log_probs = dist.log_prob(actions)

adv = (returns - values).detach()

actor_loss  = -(log_probs * adv.squeeze(-1)).mean()
critic_loss = F.mse_loss(values, returns)

optimizer.zero_grad()
(actor_loss + critic_loss).backward()
optimizer.step()

print(f"Calculated Actor Loss: {actor_loss.item()} And Critic Loss={critic_loss.item()}")
# END_YOUR_CODE

Return Values From Model:
 Critic: tensor([[-0.0331],
        [-0.0798],
        [-0.0686],
        [-0.1056],
        [ 0.0533],
        [ 0.1209],
        [ 0.0556],
        [ 0.0528],
        [-0.0608],
        [ 0.0732]], grad_fn=<AddmmBackward0>) 
 Actor: tensor([[0.2581, 0.2051, 0.2917, 0.2450],
        [0.2634, 0.2092, 0.2887, 0.2387],
        [0.2741, 0.2066, 0.2863, 0.2329],
        [0.2517, 0.2140, 0.3234, 0.2110],
        [0.2854, 0.2175, 0.2871, 0.2100],
        [0.2761, 0.2280, 0.2852, 0.2107],
        [0.2636, 0.2246, 0.2891, 0.2226],
        [0.2697, 0.2272, 0.2737, 0.2294],
        [0.2799, 0.2036, 0.2926, 0.2239],
        [0.2518, 0.2314, 0.3108, 0.2060]], grad_fn=<SoftmaxBackward0>)
Calculated Actor Loss: -0.288370817899704 And Critic Loss=1.382912516593933


### Discuss the motivation behind each setup and when it may be preferred in practice.

YOUR ANSWER:

Here, we are using a combined base network for actor and critic. The motivation behind this setup is that it might be better to learn shared features for both actor and the crtic. This is prefered as a shared setup is also less computationaly expensive and can also take less time to train.
Moreover, in RL in some environments it might be that the featurs for understandign state's value are also correlated with calculating best actions.

## Section 2: Auto-Adaptive Network Setup for Environments

You will now create a function that builds a shared actor-critic network that adapts to any Gymnasium environment. This function should inspect the environment and build input/output layers accordingly.

### Task 2: Auto-generate Input and Output Layers
Write a function `create_shared_network(env)` that constructs a neural network using the following rules:
- The input layer should match the environment's observation space.
- The output layer for the **actor** should depend on the action space:
  - For discrete actions: output probabilities using `nn.Softmax`.
  - For continuous actions: output mean and log std for a Gaussian distribution.
- The **critic** always outputs a single scalar value.

```python
# TODO: Define function `create_shared_network(env)`
```

#### Environments to Support:
Test your function with the following environments:
1. `CliffWalking-v0` (Use one-hot encoding for discrete integer observations.)
2. `LunarLander-v3` (Standard Box space for observations and discrete actions.)
3. `PongNoFrameskip-v4` (Use gym wrappers for Atari image preprocessing.)
4. `HalfCheetah-v5` (Continuous observation and continuous action.)

```python
# TODO: Loop through environments and test `create_shared_network`
```

Hint: Use `gym.spaces` utilities to determine observation/action types dynamically.

🔗 Observation/Action Space Docs:
- https://gymnasium.farama.org/api/spaces/

---

In [53]:
# BEGIN_YOUR_CODE
# Task 2 auto generating actor critic networks based on environment spaces
def create_shared_network(env):
    obs_space = env.observation_space
    action_space = env.action_space
    print("Obs", obs_space, len(obs_space.shape))
    class DynamicActorCritic(nn.Module):
        def __init__(self, obs_space, action_space):
            super(DynamicActorCritic, self).__init__()
            self.obs_space = obs_space
            self.action_space = action_space
            self._build_model()

        def _build_model(self):
            obs_shape = self.obs_space.shape
            # handling different observation space types
            if isinstance(self.obs_space, Discrete):
                # using embeddings for discrete observations
                self.embedding = nn.Embedding(self.obs_space.n, 16)
                self.feature_extractor = nn.Sequential(nn.Linear(16, 64), nn.ReLU())
                feature_size = 64
            elif isinstance(self.obs_space, Box):
                if len(obs_shape) == 1:
                    # for vector observations (like lunarlander)
                    obs_dim = obs_shape[0]
                    hidden_dim_mlp = 64
                    self.feature_extractor = nn.Sequential(
                        nn.Linear(obs_dim, hidden_dim_mlp), nn.ReLU(),
                        nn.Linear(hidden_dim_mlp, hidden_dim_mlp), nn.ReLU()
                    )
                    feature_size = hidden_dim_mlp
                elif len(obs_shape) == 2:
                    # for 2d observations (like preprocessed atari frames)
                    in_channels = 1
                    H, W = obs_shape
                    self.cnn_base = nn.Sequential(
                        nn.Conv2d(in_channels, 32, 8, 4), nn.ReLU(),
                        nn.Conv2d(32, 64, 4, 2), nn.ReLU(),
                        nn.Conv2d(64, 64, 3, 1), nn.ReLU(), nn.Flatten()
                    )
                    with torch.no_grad():
                        dummy_input = torch.zeros(1, in_channels, H, W).to(device)
                        cnn_output_size = self.cnn_base(dummy_input).shape[1]
                    fc_hidden_dim = 512
                    self.feature_extractor = nn.Sequential(
                        self.cnn_base, nn.Linear(cnn_output_size, fc_hidden_dim), nn.ReLU()
                    )
                    feature_size = fc_hidden_dim
                elif len(obs_shape) == 3:
                    # for 3d observations (like stacked frames)
                    in_channels = obs_shape[0]
                    self.cnn_base = nn.Sequential(
                        nn.Conv2d(in_channels, 32, 8, 4), nn.ReLU(),
                        nn.Conv2d(32, 64, 4, 2), nn.ReLU(),
                        nn.Conv2d(64, 64, 3, 1), nn.ReLU(), nn.Flatten()
                    )
                    with torch.no_grad():
                        dummy_input = torch.zeros(1, *obs_shape).to(device)
                        cnn_output_size = self.cnn_base(dummy_input).shape[1]
                    fc_hidden_dim = 512
                    self.feature_extractor = nn.Sequential(
                        self.cnn_base, nn.Linear(cnn_output_size, fc_hidden_dim), nn.ReLU()
                    )
                    feature_size = fc_hidden_dim
                else: raise NotImplementedError(f"Unsupported Box obs shape: {obs_shape}")
            else: raise NotImplementedError(f"Unsupported observation space: {type(self.obs_space)}")

            # shared layers that both actor and critic will use
            shared_layer_out_dim = 64
            self.shared_layer = nn.Sequential(
                nn.Linear(feature_size, shared_layer_out_dim), nn.ReLU()
            )
            final_feature_size = shared_layer_out_dim

            # handling different action space types
            if isinstance(self.action_space, Discrete):
                # for discrete actions (like cliffwalking, pong)
                # self.actor_head = nn.Linear(final_feature_size, )
                self.actor_head = nn.Sequential(
                    nn.Linear(final_feature_size, self.action_space.n),
                    nn.Softmax(dim=-1)
                )
                self._action_adapter = self._discrete_action_adapter
            elif isinstance(self.action_space, Box):
                # for continuous actions (like halfcheetah)
                action_dim = self.action_space.shape[0]
                self.actor_mean = nn.Linear(final_feature_size, action_dim)
                self.actor_logstd = nn.Parameter(torch.zeros(action_dim))
                self._action_adapter = self._continuous_action_adapter
            elif isinstance(self.action_space, MultiDiscrete):
                # for multi discrete actions
                self.nvec = self.action_space.nvec
                total_action_dim = int(np.sum(self.nvec))
                self.actor_head = nn.Linear(final_feature_size, total_action_dim)
                self._action_adapter = self._multidiscrete_action_adapter
            else: raise NotImplementedError(f"Unsupported action space: {type(self.action_space)}")

            # value function head is always a single output
            self.critic_head = nn.Linear(final_feature_size, 1)

        # action adapters for different action space types
        def _discrete_action_adapter(self, features): return self.actor_head(features)

        def _continuous_action_adapter(self, features):
            mean = self.actor_mean(features)
            logstd = self.actor_logstd.expand_as(mean)
            return (mean, logstd)

        def _multidiscrete_action_adapter(self, features):
            logits_concat = self.actor_head(features)
            chunks = torch.split(logits_concat, self.nvec.tolist(), dim=1)
            probs = [F.softmax(chunk, dim=-1) for chunk in chunks]
            return probs

        def forward(self, x):
            x = x.float().to(device)
            obs_shape = self.obs_space.shape
            # handling preprocessing based on observation type
            if isinstance(self.obs_space, Discrete):
                x = self.embedding(x.long().squeeze(-1).to(device))
            elif isinstance(self.obs_space, Box) and len(obs_shape) == 2:
                 x = x.unsqueeze(1)

            # forward pass through shared layers
            features = self.feature_extractor(x)
            shared_features = self.shared_layer(features)

            # getting action and value outputs
            action_outputs = self._action_adapter(shared_features)
            value = self.critic_head(shared_features)
            return action_outputs, value

    model = DynamicActorCritic(obs_space, action_space).to(device)
    return model




# Testing ALL ENVIRONEMNTS:-

print("\n Running Sequential Test Execution ")

# testing the implementation on different environments
environments_to_run = [
    "CliffWalking-v0",
    "LunarLander-v3",
    "PongNoFrameskip-v4",
    "HalfCheetah-v5"
]

for env_name in environments_to_run:
    print(f"\n Testing Environment: {env_name} ")
    env = None
    model = None
    skip_tests = False

    try:
        print(f"\n Setting up: {env_name} ")
        if env_name == "CliffWalking-v0":
            # for discrete state space, applying one hot encoding
            base_env = gym.make(env_name, render_mode=None)
            env = ManualOneHotObservation(base_env)
        elif env_name == "PongNoFrameskip-v4":
            base_env = gym.make(env_name, obs_type="grayscale", render_mode=None)
            env = AtariPreprocessing(base_env, screen_size=84, grayscale_obs=True, scale_obs=True, terminal_on_life_loss=False)
            env = FrameStackObservation(env, 4)
        elif env_name == "HalfCheetah-v5":
            # mujoco environment with continuous action space
            env = gym.make(env_name, render_mode=None)
        elif env_name == "LunarLander-v3":
             env = gym.make(env_name, render_mode=None)
        else:
            print(f"Warning: Environment '{env_name}' not explicitly handled.")
            skip_tests = True

        if env is None: raise ValueError("Environment creation failed.")

    except gym.error.DependencyNotInstalled as e:
        print(f"SKIPPING {env_name}: Missing Dependencies. {e}")
        skip_tests = True
    except NotImplementedError as e:
         print(f"SKIPPING {env_name}: A required wrapper failed ({e})")
         skip_tests = True
    except Exception as e:
         print(f"ERROR during {env_name} setup: {e}")
         print(traceback.format_exc())
         skip_tests = True

    if not skip_tests and env is not None:
        try:
            print(f"\nRunning tests for: {env_name}")
            # creating a network based on environment specs
            model = create_shared_network(env)
            obs, _ = env.reset(seed=SEED)
            obs_np = np.array(obs)
            # testing forward pass through the network
            print("\nTesting Forward Pass ")
            obs_tensor = torch.tensor(obs_np, dtype=torch.float32).unsqueeze(0).to(device)
            model.eval()
            with torch.no_grad(): action_output, value_output = model(obs_tensor)
            model.train()
            print("Forward pass successful.")

            optimizer = optim.Adam(model.parameters(), lr=1e-4)
            action_output_grad, value_output_grad = model(obs_tensor)
            loss_actor = torch.tensor(0.0, device=device)
            loss_critic = torch.tensor(0.0, device=device)
            dummy_return = torch.randn(1, 1, device=device)
            loss_critic = F.mse_loss(value_output_grad, dummy_return)

            # handling different action space types for loss calculation
            if isinstance(env.action_space, Discrete):
                logits = torch.log(action_output_grad + 1e-8)
                loss_actor = F.cross_entropy(logits, torch.randint(0, env.action_space.n, (1,), device=device))
            elif isinstance(env.action_space, Box):
                mean, logstd = action_output_grad
                std = torch.exp(logstd).clamp(min=1e-6)
                dist = torch.distributions.Normal(mean, std)
                log_prob = dist.log_prob(torch.randn_like(mean)).sum(axis=-1, keepdim=True)
                loss_actor = -log_prob.mean()
            elif isinstance(env.action_space, MultiDiscrete):
                 loss_actor_total = 0
                 for i, logits in enumerate(action_output_grad):
                     logits = torch.log(logits + 1e-8)
                     loss_actor_total += F.cross_entropy(logits, torch.randint(0, env.action_space.nvec[i], (1,), device=device))
                 loss_actor = loss_actor_total

            total_loss = loss_actor + loss_critic
            print(f"Calculated Dummy Loss For Env ({env_name}): {total_loss.item():.4f}")
            optimizer.zero_grad()
            total_loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.5)
            optimizer.step()
            print("==== DONE WITH BACKPROP ===")
        except Exception as e:
            print(f"\n!!! Error during testing execution for {env_name}: {e}")
            print(traceback.format_exc())
        finally:
            if env is not None: env.close()

print("\n All Environment Tests Completed ")


""

# END_YOUR_CODE


 Running Sequential Test Execution 

 Testing Environment: CliffWalking-v0 

 Setting up: CliffWalking-v0 

Running tests for: CliffWalking-v0
Obs Box(0.0, 1.0, (48,), float32) 1

Testing Forward Pass 
Forward pass successful.
Calculated Dummy Loss For Env (CliffWalking-v0): 1.8187
==== DONE WITH BACKPROP ===

 Testing Environment: LunarLander-v3 

 Setting up: LunarLander-v3 

Running tests for: LunarLander-v3
Obs Box([ -2.5        -2.5       -10.        -10.         -6.2831855 -10.
  -0.         -0.       ], [ 2.5        2.5       10.        10.         6.2831855 10.
  1.         1.       ], (8,), float32) 1

Testing Forward Pass 
Forward pass successful.
Calculated Dummy Loss For Env (LunarLander-v3): 2.9827
==== DONE WITH BACKPROP ===

 Testing Environment: PongNoFrameskip-v4 

 Setting up: PongNoFrameskip-v4 

Running tests for: PongNoFrameskip-v4
Obs Box(0.0, 1.0, (4, 84, 84), float32) 3

Testing Forward Pass 
Forward pass successful.
Calculated Dummy Loss For Env (PongNoFramesk

''

### Discuss the motivation behind each setup and when it may be preferred in practice.

YOUR ANSWER:

### Task 3: Write Observation Normalization Function
Create a function `normalize_observation(obs, env)` that:
- Checks if the observation space is `Box` and has `low` and `high` attributes.
- If so, normalize the input observation.
- Otherwise, return the observation unchanged.

```python
# TODO: Define `normalize_observation(obs, env)`
```

Test this function with observations from:
- `LunarLander-v3`
- `PongNoFrameskip-v4`

Note: Atari observations are image arrays. Normalize pixel values to [0, 1]. For LunarLander-v3, the different elements in the observation vector have different ranges. Normalize them to [0, 1] using the `low` and `high` attributes of the observation space.


---

In [7]:
# BEGIN_YOUR_CODE
def normalize_observation(obs: np.ndarray, env: gym.Env) -> np.ndarray:
    obs_space = env.observation_space
    if not isinstance(obs_space, Box): return obs.astype(np.float32)

    low = obs_space.low
    high = obs_space.high

    if not (np.all(np.isfinite(low)) and np.all(np.isfinite(high))):
        return obs.astype(np.float32)

    if np.any((high - low) <= 1e-8):
        return obs.astype(np.float32)

    denominator = high - low
    normalized_obs = 2.0 * (obs.astype(np.float32) - low) / denominator - 1.0
    return np.clip(normalized_obs, -1.0, 1.0).astype(np.float32)
# END_YOUR_CODE

### Discuss the motivation behind each setup and when it may be preferred in practice.

YOUR ANSWER:

## Section 4: Gradient Clipping

To prevent exploding gradients, it's common practice to clip gradients before optimizer updates.

### Task 4: Clip Gradients for Actor-Critic Networks
Use dummy tensors and apply gradient clipping with the following PyTorch method:
```python
# During training, after loss.backward():
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.5)
```

Reuse the loss computation from Task 1a or 1b. After computing the gradients, apply gradient clipping.
Print the gradient norm before and after clipping to verify it’s applied.

🔗 PyTorch Docs: https://pytorch.org/docs/stable/generated/torch.nn.utils.clip_grad_norm_.html


---

In [8]:
# BEGIN_YOUR_CODE
def apply_gradient_clipping(model: nn.Module, optimizer: optim.Optimizer, loss: torch.Tensor, max_norm: float = 1.0):
    if not isinstance(loss, torch.Tensor) or not loss.requires_grad:
        print(f" Skipping gradient clipping: invalid loss.")
        return

    optimizer.zero_grad()
    loss.backward()

    # printing gradient norm before clipping
    total_norm_before = torch.nn.utils.clip_grad_norm_(
        model.parameters(), max_norm=max_norm, norm_type=2.0, error_if_nonfinite=False
    ).item()
    print(f"Gradient norm before clipping: {total_norm_before:.4f}")

    # calculating gradient norm after clipping as a validation check
    total_norm_after = 0.0
    for p in model.parameters():
        if p.grad is not None:
            if torch.isfinite(p.grad).all():
                param_norm = p.grad.data.norm(2.0)
                total_norm_after += param_norm.item() ** 2
            else:
                 print(f" Warning: Non-finite gradients detected after clipping attempt.")
    total_norm_after = total_norm_after ** 0.5
    print(f"Gradient norm after clipping (manual check): {total_norm_after:.4f}")

    optimizer.step()

# END_YOUR_CODE

### Discuss the motivation behind each setup and when it may be preferred in practice.

YOUR ANSWER:

If you are working in a team, provide a contribution summary.
| Team Member | Step# | Contribution (%) |
|---|---|---|
|   | Task 1 |   |
|   | Task 2 |   |
|   | Task 3 |   |
|   | Task 4 |   |
|   | **Total** |   |


In [15]:
print("\n Running Sequential Test Execution ")

# testing the implementation on different environments
environments_to_run = [
    "CliffWalking-v0",
    "LunarLander-v3",
    "PongNoFrameskip-v4",
    "HalfCheetah-v4"
]

for env_name in environments_to_run:
    print(f"\n Testing Environment: {env_name} ")
    env = None
    model = None
    skip_tests = False

    try:
        print(f"\n Setting up: {env_name} ")
        if env_name == "CliffWalking-v0":
            # for discrete state space, applying one hot encoding
            base_env = gym.make(env_name, render_mode=None)
            env = ManualOneHotObservation(base_env)
        elif env_name == "PongNoFrameskip-v4":
            is_framestack_dummy = False
            try: _ = FrameStack(None, 0)
            except NotImplementedError: is_framestack_dummy = True
            except Exception: pass
            # using ataripreprocessing wrapper to handle the image observations
            base_env = gym.make(env_name, obs_type="grayscale", render_mode=None)
            env = AtariPreprocessing(base_env, screen_size=84, grayscale_obs=True, scale_obs=True, terminal_on_life_loss=False)
            if not is_framestack_dummy:
                 print("INFO: Skipping FrameStack application (unavailable/dummy).")
            else:
                 print("INFO: Skipping FrameStack application (unavailable/dummy).")
        elif env_name == "HalfCheetah-v4":
            # mujoco environment with continuous action space
            env = gym.make(env_name, render_mode=None)
        elif env_name == "LunarLander-v3":
             env = gym.make(env_name, render_mode=None)
        else:
            print(f"Warning: Environment '{env_name}' not explicitly handled.")
            skip_tests = True

        if env is None: raise ValueError("Environment creation failed.")

    except gym.error.DependencyNotInstalled as e:
        print(f"SKIPPING {env_name}: Missing Dependencies. {e}")
        skip_tests = True
    except NotImplementedError as e:
         print(f"SKIPPING {env_name}: A required wrapper failed ({e})")
         skip_tests = True
    except Exception as e:
         print(f"ERROR during {env_name} setup: {e}")
         print(traceback.format_exc())
         skip_tests = True

    if not skip_tests and env is not None:
        try:
            print(f"\nRunning tests for: {env_name}")
            # creating a network based on environment specs
            model = create_shared_network(env)

            # testing observation normalization
            print("\nTesting Normalization ")
            obs, _ = env.reset(seed=SEED)
            obs_np = np.array(obs)
            normalized_obs = normalize_observation(obs_np, env)
            print(f"Obs Range: Original=[{np.min(obs_np):.2f},{np.max(obs_np):.2f}] -> Normalized=[{np.min(normalized_obs):.2f},{np.max(normalized_obs):.2f}]")

            # testing forward pass through the network
            print("\nTesting Forward Pass ")
            obs_tensor = torch.tensor(normalized_obs, dtype=torch.float32).unsqueeze(0).to(device)
            model.eval()
            with torch.no_grad(): action_output, value_output = model(obs_tensor)
            model.train()
            print("Forward pass successful.")

            # testing gradient clipping
            print("\nTesting Gradient Clipping ")
            optimizer = optim.Adam(model.parameters(), lr=1e-4)
            action_output_grad, value_output_grad = model(obs_tensor)
            loss_actor = torch.tensor(0.0, device=device)
            loss_critic = torch.tensor(0.0, device=device)
            dummy_return = torch.randn(1, 1, device=device)
            loss_critic = F.mse_loss(value_output_grad, dummy_return)

            # handling different action space types for loss calculation
            if isinstance(env.action_space, Discrete):
                loss_actor = F.cross_entropy(action_output_grad, torch.randint(0, env.action_space.n, (1,), device=device))
            elif isinstance(env.action_space, Box):
                mean, logstd = action_output_grad
                std = torch.exp(logstd).clamp(min=1e-6)
                dist = torch.distributions.Normal(mean, std)
                log_prob = dist.log_prob(torch.randn_like(mean)).sum(axis=-1, keepdim=True)
                loss_actor = -log_prob.mean()
            elif isinstance(env.action_space, MultiDiscrete):
                 loss_actor_total = 0
                 for i, logits in enumerate(action_output_grad):
                     loss_actor_total += F.cross_entropy(logits, torch.randint(0, env.action_space.nvec[i], (1,), device=device))
                 loss_actor = loss_actor_total

            total_loss = loss_actor + loss_critic
            print(f"Calculated Dummy Loss: {total_loss.item():.4f}")
            apply_gradient_clipping(model, optimizer, total_loss, max_norm=0.5)
            print("Gradient clipping applied.")

        except Exception as e:
            print(f"\n!!! Error during testing execution for {env_name}: {e}")
            print(traceback.format_exc())
        finally:
            if env is not None: env.close()

print("\n All Environment Tests Completed ")


 Running Sequential Test Execution 

 Testing Environment: CliffWalking-v0 

 Setting up: CliffWalking-v0 

Running tests for: CliffWalking-v0

Testing Normalization 
Obs Range: Original=[0.00,1.00] -> Normalized=[-1.00,1.00]

Testing Forward Pass 
Forward pass successful.

Testing Gradient Clipping 
Calculated Dummy Loss: 1.3616
Gradient norm before clipping: 1.2765
Gradient norm after clipping (manual check): 0.5000
Gradient clipping applied.

 Testing Environment: LunarLander-v3 

 Setting up: LunarLander-v3 

Running tests for: LunarLander-v3

Testing Normalization 
Obs Range: Original=[-0.05,1.42] -> Normalized=[-1.00,0.57]

Testing Forward Pass 
Forward pass successful.

Testing Gradient Clipping 
Calculated Dummy Loss: 1.8198
Gradient norm before clipping: 1.9791
Gradient norm after clipping (manual check): 0.5000
Gradient clipping applied.

 Testing Environment: PongNoFrameskip-v4 

 Setting up: PongNoFrameskip-v4 
INFO: Skipping FrameStack application (unavailable/dummy).

Ru

In [11]:
!pip install swig

Collecting swig
  Downloading swig-4.3.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl.metadata (3.5 kB)
Downloading swig-4.3.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.9 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.9 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.9 MB[0m [31m28.0 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m26.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: swig
Successfully installed swig-4.3.0


In [13]:
!pip install gymnasium[box2d]

Collecting box2d-py==2.3.5 (from gymnasium[box2d])
  Downloading box2d-py-2.3.5.tar.gz (374 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.4/374.4 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: box2d-py
  Building wheel for box2d-py (setup.py) ... [?25l[?25hdone
  Created wheel for box2d-py: filename=box2d_py-2.3.5-cp311-cp311-linux_x86_64.whl size=2379494 sha256=5bef9a76f1459e20859e57ad3605966aff58c6d81aac3204246de9e1c1be412f
  Stored in directory: /root/.cache/pip/wheels/ab/f1/0c/d56f4a2bdd12bae0a0693ec33f2f0daadb5eb9753c78fa5308
Successfully built box2d-py
Installing collected packages: box2d-py
Successfully installed box2d-py-2.3.5


In [14]:
!pip install gymnasium[mujoco]

Collecting mujoco>=2.1.5 (from gymnasium[mujoco])
  Downloading mujoco-3.3.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.4/44.4 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
Collecting glfw (from mujoco>=2.1.5->gymnasium[mujoco])
  Downloading glfw-2.9.0-py2.py27.py3.py30.py31.py32.py33.py34.py35.py36.py37.py38.p39.p310.p311.p312.p313-none-manylinux_2_28_x86_64.whl.metadata (5.4 kB)
Downloading mujoco-3.3.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.6/6.6 MB[0m [31m33.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading glfw-2.9.0-py2.py27.py3.py30.py31.py32.py33.py34.py35.py36.py37.py38.p39.p310.p311.p312.p313-none-manylinux_2_28_x86_64.whl (243 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m243.5/243.5 kB[0m [31m15.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packag