# **Homework 3: Function-based RL**
#### **Created by 65340500058 Anuwit Intet**

## **Learning Objectives:**

- Understand how function approximation works and how to implement it.

- Understand how policy-based RL works and how to implement it.

- Understand how advanced RL algorithms balance exploration and exploitation.

- Be able to differentiate RL algorithms based on stochastic or deterministic policies, as well as value-based, policy-based, or Actor-Critic approaches.

- Gain insight into different reinforcement learning algorithms, including Linear Q-Learning, Deep Q-Network (DQN), the REINFORCE algorithm, and the Actor-Critic algorithm. Analyze their strengths and weaknesses.


In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.distributions as distributions
import random
import matplotlib.pyplot as plt
import json
import numpy as np

## <font color="pink">**Part 1: Understanding the Algorithm**</font>

In this homework, you have to implement 4 different function approximation-based RL algorithms:

- Linear Q-Learning
 
- Deep Q-Network (DQN)

- REINFORCE algorithm

- One algorithm chosen from the following Actor-Critic methods:

    - Deep Deterministic Policy Gradient (DDPG)

    - Advantage Actor-Critic (A2C)

    - Proximal Policy Optimization (PPO)
    
    - Soft Actor-Critic (SAC)

For each algorithm, describe whether it follows a value-based, policy-based, or Actor-Critic approach, specify the type of policy it learns (stochastic or deterministic), identify the type of observation space and action space (discrete or continuous), and explain how each advanced RL method balances exploration and exploitation.

- it follows a value-based, policy-based, or Actor-Critic approach

- the type of policy it learns (stochastic or deterministic)

- the type of observation space and action space (discrete or continuous)

- how each advanced RL method balances exploration and exploitation

### <font color="yellow">**Linear Q-Learning**</font>

- **About Linear Q-Learning**

  - Linear Q-Learning is a value-based approach. It sometimes learns a function Q(s, a) that is used to determine the method by selecting the maximum Q-value action.

  - This algorithm uses the deterministic policy because Linear Q-Learning uses a ε-greedy policy which argmax Q-value, not uses the probability.

  - Linear Q-Learning is applied to continuous observation space (because it uses input feature vectors). But the action space must be discrete because it must compute $max⁡_Q(s,a)$, which must look at all actions. 

  - To balance Exploration vs Exploitation, Linear Q-Learning uses a ε-greedy policy, i.e. random action with probability ε and greedy action with probability 1−ε.

- **In Linear Q-Learning, Q-Function is estimate by**

  $$
  Q(s,a) = \phi(s,a)^T w
  $$

  where:

  - $\phi(s,a)$ is feature vector of state-action pair  
  - $w$ is weight vector

- **And update weight by this,**

  $$
  w \leftarrow w + \alpha \cdot \delta \cdot \phi(s, a)
  $$

  where: 
  - $\delta = r + \gamma \max_{a'} Q(s', a') - Q(s, a)$ is TD error
  - $\alpha$ is learning rate


##### **Example of Linear Q-Learning in CartPole**

- **State**: Vector of size 4:  
  $$
  s = [x, \dot{x}, \theta, \dot{\theta}]
  $$
- **Action space**: 2 Actions (discrete):

  - `0` = push cart to left

  - `1` = push cart to right

---

We will approximate $Q(s, a)$ with a linear function like this:

$$
Q(s, a) = w_a^T s
$$

where:
- $w_0$, $w_1$ are weight vectors for action 0 and 1 respectively
- or combined into a matrix $W \in \mathbb{R}^{2 \times 4}$

---

- $s = [0.0, 0.5, 0.05, -0.2]$
- $W = \begin{bmatrix} 0.1 & 0.0 & 0.0 & 0.0 \\ 0.0 & 0.1 & 0.0 & 0.0 \end{bmatrix}$
- Choose action $a = 1$
- Acheive reward $r = 1$
- next state: $s' = [0.01, 0.55, 0.045, -0.18]$
- $\alpha = 0.1$, $\gamma = 0.99$


**1. Calculate $Q(s, a)$**

$$
Q(s, a=1) = w_1^T s = 0.0*0.0 + 0.1*0.5 + 0.0*0.05 + 0.0*(-0.2) = 0.05
$$

**2. Calculate $\max_{a'} Q(s', a')$**

$$
Q(s', 0) = w_0^T s' = 0.1*0.01 = 0.001 \\
Q(s', 1) = w_1^T s' = 0.1*0.55 = 0.055 \\
\Rightarrow \max_{a'} Q(s', a') = 0.055
$$

**3. Calculate TD Error**

$$
\delta = r + \gamma \cdot \max_{a'} Q(s', a') - Q(s, a) \\
= 1 + 0.99 \cdot 0.055 - 0.05 = 1.00445
$$

**4. Update weight**

$w_1$ only:

$$
w_1 \leftarrow w_1 + \alpha \cdot \delta \cdot s \\
= [0.0, 0.1, 0.0, 0.0] + 0.1 \cdot 1.00445 \cdot [0.0, 0.5, 0.05, -0.2] \\
= [0.0, 0.1502, 0.005, -0.0201]
$$

### <font color="yellow">**Deep Q-Network (DQN)**</font>

- **About Deep Q-Network**

    - Deep Q-Network has the same property of Linear Q-Learning, Both has difference in term of complexity of network that from 1 layer neural network to deep neural network (more than 1 layer).

    - Deep Q-Network is a value-based approach. It sometimes learns a function Q(s, a) that is used to determine the method by selecting the maximum Q-value action.

    - This algorithm uses the deterministic policy because Deep Q-Network uses a ε-greedy policy which argmax Q-value, not uses the probability.

    - Deep Q-Network is applied to continuous observation space (because it uses input feature vectors). But the action space must be discrete because it must compute $max⁡_Q(s,a)$, which must look at all actions. 

    - To balance Exploration vs Exploitation, Deep Q-Network uses a ε-greedy policy, i.e. random action with probability ε and greedy action with probability 1−ε.

- DQN solves this problem by using a deep neural network to replace the Q-table with an approximate function: $Q(s,a;θ)$.

    Where:

    - θ is the neural network parameter

    - input = state s

    - output = Q value for every action

- DQN training consists of 2 main techniques:

    - Experience Replay
        - Store the experience $(s, a, r, s', done)$ in a buffer
        - Then randomly select a mini-batch to train it

    - Target Network
        - Use a separate network called target network​ to calculate the target:$y=r+γmax_{⁡a′}Q_{target}(s′,a′)$
        - Then update only the main network Q periodically

##### **Example of DQN in CartPole**

**1. Choose action with $\epsilon$-greedy**

- Suppose $\epsilon = 0.1$ → Random chance = 10%

- Luckily pick Random → Use Q-network to predict:
$$
Q(s, a=0) = 0.4,\quad Q(s, a=1) = 0.6
$$

- Choose **action = 1** (right) because Q is highest

**2. Send action to environment**

- Got:

  - reward = 1

  - next_state = [0.06, 0.025, -0.015, 0.035]

  - done = False (Not yet failed)

**3. Store transition**

- Store $(s, a=1, r=1, s', done=False)$ in the replay buffer.

**4. Assume that the buffer is sufficient → Start training 1 round**

- Suppose that we store the transition in the replay buffer and randomly get 2 mini-batch samples as follows:

| Index | State (s)                  | Action (a) | Reward (r) | Next State (s′)             | Done  |
|-------|----------------------------|------------|------------|------------------------------|--------|
| 0     | [0.05, 0.02, -0.01, 0.03]  | 1          | 1.0        | [0.06, 0.025, -0.015, 0.035] | False  |
| 1     | [-0.01, -0.03, 0.02, -0.02] | 0         | 1.0        | [0.00, -0.02, 0.01, -0.01]   | True   |




**5. Calculate Q(s, a) from policy network**

Suppose the policy network gives:

| Index | Q(s, a=0) | Q(s, a=1) |
|-------|-----------|-----------|
| 0 | 0.5 | 0.65 |
| 1 | 0.6 | 0.55 |

Get Q of the selected action:

- Example 0: action = 1 → Q = 0.65

- Example 1: action = 0 → Q = 0.6

**Q(s,a) = [0.65, 0.6]**

**Calculate Target Q(s′, a′) from target network**

Suppose Target Network gives:

| Index | Q(s′, a=0) | Q(s′, a=1) | Done |
|-------|------------|------------|--------|
| 0 | 0.4 | 0.6 | False |
| 1 | -- | -- | True |

→ max Q(s′) only for not done:

- Index 0: max = 0.6 

- Index 1: terminal → max = 0

**max_next_q_values ​​= [0.6, 0.0]**

**7. Calculate Target Q-value (Bellman target)**

Use the formula:
$$
y = r + \gamma \cdot (1 - \text{done}) \cdot \max Q(s', a')
$$

ให้ $\gamma = 0.99$:

- Index 0:  $y = 1.0 + 0.99 \cdot 0.6 = 1.594$

- Index 1:  $y = 1.0 + 0 = 1.0$

**Target Q-values = [1.594, 1.0]**

**8. Calculate Loss**

Use the formula:
$$
\text{Loss} = \frac{1}{2} \sum_i (Q(s_i, a_i) - y_i)^2
$$

- $Q(s_i, a_i)$ is Q-value from policy network

- $y_i$ is Q-value from target network

Substitute the value:

$$
\text{Loss} = \frac{1}{2} \left[ (0.65 - 1.594)^2 + (0.6 - 1.0)^2 \right] \\
= \frac{1}{2} \left[ 0.891 + 0.16 \right] = \frac{1.051}{2} = \mathbf{0.5255}
$$

**Summary Table**

| Index | Q(s, a) | Target y | Loss per item |
|-------|---------|----------|----------------|
| 0     | 0.65    | 1.594    | 0.891          |
| 1     | 0.6     | 1.0      | 0.160          |
|       |         |          | Total = 1.051 |
|       |         |          | Avg = 0.5255  |

**Final Loss = 0.5255**

**9. Do Gradient Descent**

- Call `loss.backward()` to compute gradient

- Call `optimizer.step()` to update policy network weights

**This is where Q-network learns from the latest experience that is randomly selected from the replay buffer**


### <font color="yellow">**MC-REINFORCE**</font>

- **About MC-REINFORCE Algorithm**

    - MC-REINFORCE (Monte Carlo REINFORCE) is the basic algorithm of Policy Gradient Method, using the concept of learning policy directly, not estimating Q-function like Q-learning.

    - MC-REINFORCE is Policy-based because REINFORCE does not learn Q-value or V-value, but learns policy $\pi_\theta$ directly (pure policy gradient method).

    - The type of policy is Stochastic policy because it uses $\pi_\theta(a|s)$ (e.g. softmax, categorical), not argmax, which is a deterministic policy. For example, $\pi_\theta(a=0|s) = 0.4$, $\pi_\theta(a=1|s) = 0.6$. We will sample actions according to these probabilities, not argmax, which will give us action 1.

    - The Observation space is Continuous, while the Action space can be Discrete or Continuous, depending on the policy model chosen. If it is discrete, softmax must be used with linear output, but if it is continuous, Gaussian must be used with linear output instead.

    - Exploration comes directly from stochastic policy because the agent has a chance to randomly take an action every time. If the policy learns that an action has a high reward, then the probability of that action will automatically increase, which will become exploitation.

- **MC-REINFORCE ใช้สูตรของ Monte Carlo Policy Gradient**
    $$
    \nabla_\theta J(\theta) = \mathbb{E}_{\tau \sim \pi_\theta} \left[ \sum_{t=0}^T \nabla_\theta \log \pi_\theta(a_t \mid s_t) \cdot G_t \right]
    $$


    Where:

    - $\pi_\theta(a_t | s_t)$ is the policy (e.g. softmax over linear output or neural net)

    - $G_t$ is the total return from timestep $t$ to the end of the episode

    - We keep the entire trajectory until the end (Monte Carlo) and then update

Overall steps:

- Randomize trajectory $(s_0, a_0, r_1, s_1, a_1, r_2, ..., s_T)$ by sampling from policy $\pi_\theta$

- Compute return $G_t$ from timestep $t$ to end of episode

- Compute gradient: $\nabla_\theta \log \pi_\theta(a_t|s_t) \cdot G_t$

- Adjust policy weights with `gradient ascent`

##### **Example of MC-REINFORCE in CartPole**

Use softmax policy network:$π_θ(a∣s)=softmax()$

- Current state: $s_0 = [0.1, 0.0, 0.05, -0.02]$

**1. Randomize trajectory from policy**

Suppose agent randomizes this trajectory:

| t | State $s_t$ | Action $a_t$ | Reward $r_{t+1}$ | $\pi_\theta(a_t \mid s_t)$ |
|---|-------------|---------------|-------------------|-----------------------------|
| 0 | $s_0$       | 1             | 1                 | 0.6                         |
| 1 | $s_1$       | 0             | 1                 | 0.4                         |
| 2 | $s_2$       | 1             | 1                 | 0.7                         |
| 3 | $s_3$       | 1             | 1                 | 0.8                         |

Episode ends at timestep 4 → get reward = 1 at every timestep

**2. Calculate Return $G_t$**

Let $\gamma = 0.99$

$$
\begin{aligned}
G_3 &= r_4 = 1 \\
G_2 &= r_3 + \gamma G_3 = 1 + 0.99 \cdot 1 = 1.99 \\
G_1 &= r_2 + \gamma G_2 = 1 + 0.99 \cdot 1.99 = 2.9701 \\
G_0 &= r_1 + \gamma G_1 = 1 + 0.99 \cdot 2.9701 = 3.9404
\end{aligned}
$$

|Time t	|$G_t$|
|-------|---------|
0|	3.9404|
1|	2.9701|
2|	1.99|
3|	1.0|

**3. Calculate the gradient from each step**

We will use:
$$
\nabla_\theta J(\theta) \approx \sum_{t=0}^T \nabla_\theta \log \pi_\theta(a_t \mid s_t) \cdot G_t
$$

- Step 0 example:

    - Suppose the policy network predicts softmax:

    $$
    \pi(a = 0 \mid s_0) = 0.4,\quad \pi(a = 1 \mid s_0) = 0.6
    $$

    - Choose $a_0 = 1$

    - We get:

    $$
    \nabla_\theta \log \pi_\theta(1 \mid s_0) = \nabla_\theta \log(0.6)
    $$

    - Multiply by return:

    $$
    \nabla_\theta J \leftarrow \nabla_\theta \log(0.6) \cdot 3.9404
    $$

- Step 1:

    - $\log \pi(0 \mid s_1) = \log(0.4) \approx -0.9163$

    - $-(-0.9163) \cdot 2.9701 = 2.722$

- Step 2:

    - $\log \pi(1 \mid s_2) = \log(0.7) \approx -0.3567$

    - $-(-0.3567) \cdot 1.9900 = 0.709$

- Step 3:

    - $\log \pi(1 \mid s_3) = \log(0.8) \approx -0.2231$

    - $-(-0.2231) \cdot 1.0000 = 0.223$

- Total Loss = 2.013 + 2.722 + 0.709 + 0.223 = 5.667

**Summary**

| t   | $\log \pi(a_t \mid s_t)$ | $G_t$   | $-\log \pi \cdot G_t$ |
|-----|---------------------------|---------|-------------------------|
| 0   | -0.5108                   | 3.9404  | 2.013                   |
| 1   | -0.9163                   | 2.9701  | 2.722                   |
| 2   | -0.3567                   | 1.9900  | 0.709                   |
| 3   | -0.2231                   | 1.0000  | 0.223                   |
|     |                           |         | **Total: 5.667**        |

**4. Do Gradient Ascent**

- Call `loss.backward()` to compute gradient

- Call `optimizer.step()` to update policy network weights


### <font color="yellow">**Advantage Actor-Critic (A2C)**</font>

- **About Advantage Actor Critic**

    - The name says it all: **Actor** → is the policy $\pi_\theta(a \mid s)$ that chooses the action, and **Critic** → is the value function $V_w(s)$ that evaluates how good the state is, using what's called the **Advantage Function** to indicate how good the action is compared to the average: $A(s, a) = Q(s, a) - V(s) \approx r + \gamma V(s') - V(s)$.

    - MC-REINFORCE is Actor-Critic-based because REINFORCE learn both value-function (critic) and policy (actor).

    - The type of policy is Stochastic policy because it uses $\pi_\theta(a|s)$ (e.g. softmax, categorical), not argmax, which is a deterministic policy. For example, $\pi_\theta(a=0|s) = 0.4$, $\pi_\theta(a=1|s) = 0.6$. We will sample actions according to these probabilities, not argmax, which will give us action 1.

    - The Observation space is Continuous, while the Action space can be Discrete or Continuous, depending on the policy model chosen. If it is discrete, softmax must be used with linear output, but if it is continuous, Gaussian must be used with linear output instead.

    - Exploration comes directly from stochastic policy because the agent has a chance to randomly take an action every time. If the policy learns that an action has a high reward, then the probability of that action will automatically increase, which will become exploitation.

- **A2C Brief Process**

    - Actor uses policy network to randomize action: $a_t \sim \pi_\theta(a_t \mid s_t)$

    - Critic uses value network to predict: $V(s_t)$

    - Play in environment → get $r_t$, $s_{t+1}$ → calculate Advantage

    - Update **Actor** with gradient: $\nabla_\theta J(\theta) = \nabla_\theta \log \pi_\theta(a_t \mid s_t) \cdot A(s_t, a_t)$

    - Update **Critic** with MSE loss: $L_{\text{critic}} = \left( r_t + \gamma V(s_{t+1}) - V(s_t) \right)^2$

##### **Example of Advantage Actor Critic in CartPole**

**0. Create trajectory**

**Example Episode (T = 3)**

| t | $s_t$                           | $a_t$ | $r_t$ | $s_{t+1}$                        |
|---|---------------------------------|-------|-------|----------------------------------|
| 0 | [0.05, 0.01, 0.03, 0.0]         | 1     | 1.0   | [0.06, 0.015, 0.035, 0.01]       |
| 1 | [0.06, 0.015, 0.035, 0.01]      | 1     | 1.0   | [0.07, 0.02, 0.04, 0.015]        |
| 2 | [0.07, 0.02, 0.04, 0.015]       | 0     | 1.0   | [0.08, 0.025, 0.05, 0.02]        |

**1. Calculate advantage, critic loss, actor loss**

**Step 0:**

- receive action, value and reward

    - $s_0 = [0.05, 0.01, 0.03, 0.0]$

    - Actor random action: $\pi(a=1 \mid s_0) = 0.6, \pi(a=0 \mid s_0) = 0.4$

    - Critic calculate value: $V(s_0) = 1.5, V(s_1) = 1.8$

    - Receive reward: $r_1 = 1.0$

- **Calculate Advantage:**

$$
A(s_0, a_0) = r_1 + \gamma V(s_1) - V(s_0) = 1.0 + 0.99 \cdot 1.8 - 1.5 = 1.282
$$

- **Calculate Actor Loss:**

$$
L_{\text{actor}} = -\log \pi(a_0 \mid s_0) \cdot A(s_0, a_0) = -\log(0.6) \cdot 1.282 \approx 0.511 \cdot 1.282 = 0.655
$$

- **Calculate Critic Loss:**

$$
L_{\text{critic}} = \left( r_1 + \gamma V(s_1) - V(s_0) \right)^2 = (1.282)^2 \approx 1.644
$$

**Step 1:**

- $s_1 = [0.06, 0.015, 0.035, 0.01]$

- $\pi(a=1|s_1) = 0.7$, $V(s_1) = 1.8$, $V(s_2) = 1.5$

- $r_2 = 1.0$

$$
A(s_1, a_1) = 1 + 0.99 \cdot 1.5 - 1.8 = 0.685
$$

$$
L_{\text{actor}} = -\log(0.7) \cdot 0.685 \approx 0.357 \cdot 0.685 = 0.244
$$

$$
L_{\text{critic}} = (0.685)^2 = 0.469
$$

**Step 2:**

- $V(s_2) = 1.5$, $V(s_3) = 0.0$ (assume episode ended)

$$
A(s_2, a_2) = 1 + 0 - 1.5 = -0.5
$$

$$
L_{\text{actor}} = -\log(0.3) \cdot (-0.5) \approx -(-1.204) \cdot (-0.5) = 0.602
$$

$$
L_{\text{critic}} = (-0.5)^2 = 0.25
$$

**2. average Loss (Episode Summary)**

| Step | Actor Loss | Critic Loss |
|------|------------|-------------|
| 0    | 0.655      | 1.644       |
| 1    | 0.244      | 0.469       |
| 2    | 0.602      | 0.250       |
| total | **1.501** | **2.363**   |
| avg | **0.5** | **0.788**   |

**3. update Neural Net**

```python
avg_actor_loss.backward()
avg_actor_loss.optimizer.step()

avg_critic_loss.backward()
avg_critic_loss.optimizer.step()


## <font color="pink">**Part 2: Setting up Cart-Pole Agent**</font>

Similar to the previous homework, you will implement a common components that will be the same in most of the function approximation-based RL in the RL_base_function.py.The core components should include, but are not limited to:

### <font color="orange">**1. RL Base class**</font>

- This class should include:

    - Constructor (__init__) to initialize the following parameters:

        - Number of actions: The total number of discrete actions available to the agent.

        - Action range: The minimum and maximum values defining the range of possible actions.

        - Discretize state weight: Weighting factor applied when discretizing the state space for learning.

        - Learning rate: Determines how quickly the model updates based on new information.

        - Initial epsilon: The starting probability of taking a random action in an ε-greedy policy.

        - Epsilon decay rate: The rate at which epsilon decreases over time to favor exploitation over exploration.

        - Final epsilon: The lowest value epsilon can reach, ensuring some level of exploration remains.

        - Discount factor: A coefficient (γ) that determines the importance of future rewards in decision-making.

        - Buffer size: Maximum number of experiences the buffer can hold.

        - Batch size: Number of experiences to sample per batch.

    - Core Functions

        - scale_action(): scale the action (if it is computed from the sigmoid or softmax function) to the proper length.

        - decay_epsilon(): Decreases epsilon over time and returns the updated value.

- Additional details about these functions are provided in the class file. You may also implement additional functions for further analysis.

#### <font color="yellow">**scale_action()**</font>

In [None]:
def scale_action(self, action):
    """
    Maps a discrete action in range [0, n] to a continuous value in [action_min, action_max].

    Args:
        action (int): Discrete action in range [0, n].
        n (int): Number of discrete actions (inclusive range from 0 to n).
    
    Returns:
        torch.Tensor: Scaled action tensor.
    """
    # ========= put your code here ========= #

    # Unpack the minimum and maximum values of the action range
    action_min, action_max = self.action_range

    # Scale the discrete action index (0 to num_of_action-1) to a continuous value within [action_min, action_max]
    scaled = action_min + (action / (self.num_of_action - 1)) * (action_max - action_min)

    # Check if the scaled value is already a torch.Tensor
    if isinstance(scaled, torch.Tensor):
        # If yes, detach it from any computation graph and convert to float32
        return scaled.clone().detach().to(dtype=torch.float32)
    else:
        # Otherwise, convert it into a torch.Tensor of type float32
        return torch.tensor(scaled, dtype=torch.float32)

    # ====================================== #

#### <font color="yellow">**decay_epsilon()**</font>

In [None]:
def decay_epsilon(self):
    """
    Decay epsilon value to reduce exploration over time.
    """
    # ========= put your code here ========= #
    # Decay the exploration rate (epsilon) by multiplying with epsilon_decay,
    # but ensure it doesn't go below the minimum value (final_epsilon)
    self.epsilon = max(self.final_epsilon, self.epsilon * self.epsilon_decay)
    # ====================================== #

#### <font color="yellow">**q()**</font>

In [None]:
def q(self, obs, a=None):
    # Ensure obs has batch dimension
    if obs.dim() == 1:
        obs = obs.view(1, -1)  # make it [1, obs_dim]

    # Compute linear Q-values: Q(s) = obs @ w
    q_values = obs @ self.w  

    if a is None:
        return q_values.squeeze(0) # Return Q-values for all actions
    else:
        return q_values[0, a] # Return Q-value for specific action

### <font color="orange">**2. Replay Buffer Class**</font>



- A class use to store state, action, reward, next state, and termination status from each timestep in episode to use as a dataset to train neural networks. This class should include:

    - Constructor (__init__) to initialize the following parameters:

        - memory: FIFO buffer to store the trajectory within a certain time window.

        - batch_size: Number of data samples drawn from memory to train the neural network.

    - Core Functions

        - add(): Add state, action, reward, next state, and termination status to the FIFO buffer. Discard the oldest data in the buffer

        - sample(): Sample data from memory to use in the neural network training.

    - <font color="orange">**Note that some algorithms may not use all of the data mentioned above to train the neural network.**</font>


#### <font color="yellow">**add()**</font>

In [None]:
def add(self, state, action, reward, next_state, done):
    """
    Adds an experience to the replay buffer.

    Args:
        state (Tensor): The current state of the environment.
        action (Tensor): The action taken at this state.
        reward (Tensor): The reward received after taking the action.
        next_state (Tensor): The next state resulting from the action.
        done (bool): Whether the episode has terminated.
    """

    self.memory.append(Transition(state, action, next_state, reward, done)) # type: ignore

#### <font color="yellow">**sample()**</font>

In [None]:
device = torch.device(
        "cuda" if torch.cuda.is_available() else
        "mps" if torch.backends.mps.is_available() else
        "cpu"
    )

def sample(self):
    """
    Samples a batch of experiences from the replay buffer.

    Returns:
        - state_batch: Batch of states.
        - action_batch: Batch of actions.
        - reward_batch: Batch of rewards.
        - next_state_batch: Batch of next states.
        - done_batch: Batch of terminal state flags.
    """

    # Random Batch
    transitions = random.sample(self.memory, self.batch_size)
    batch = Transition(*zip(*transitions)) # type: ignore

    # Transform to tensor
    state_batch = torch.stack(batch.state).to(device)
    action_batch = torch.stack(batch.action).to(device)
    next_state_batch = torch.stack(batch.next_state).to(device)        
    reward_batch = torch.stack(batch.reward).to(device)
    done_batch = torch.tensor(batch.done, dtype=torch.bool).to(device)

    return state_batch, action_batch, reward_batch, next_state_batch, done_batch

### <font color="orange">**3. Algorithm folder**</font>

- This folder should include:

    - Linear Q Learning class

    - Deep Q-Network class

    - REINFORCE Class

    - One class chosen from the Part 1.

- Each class should inherit from the RL Base class in RL_base_function.py and include:

    - A constructor which initializes the same variables as the class it inherits from.

    - Superclass Initialization (super().__init__()).

    - An update() function that updates the agent’s learnable parameters and advances the training step.

    - A select_action() function select the action according to current policy.

    - A learn() function that train the regression or neural network.

#### <font color="yellow">**Linear Q-Learning class**</font>

In [None]:
def __init__(
        self,
        device: None,
        num_of_action: int = 2,
        n_observations: int = 4,
        action_range: list = [-2.5, 2.5],
        learning_rate: float = 0.01,
        initial_epsilon: float = 1.0,
        epsilon_decay: float = 1e-3,
        final_epsilon: float = 0.001,
        discount_factor: float = 0.95,
) -> None:
    """
    Initialize the CartPole Agent.

    Args:
        learning_rate (float): The learning rate for updating Q-values.
        initial_epsilon (float): The initial exploration rate.
        epsilon_decay (float): The rate at which epsilon decays over time.
        final_epsilon (float): The final exploration rate.
        discount_factor (float, optional): The discount factor for future rewards. Defaults to 0.95.
    """        

    super().__init__(
        num_of_action=num_of_action,
        action_range=action_range,
        learning_rate=learning_rate,
        initial_epsilon=initial_epsilon,
        epsilon_decay=epsilon_decay,
        final_epsilon=final_epsilon,
        discount_factor=discount_factor,
    )

    self.device = device
    self.episode_durations = []

    # Initialize the weight matrix for linear Q-value approximation.
    # Shape: [state_dim, num_actions]
    self.w = torch.zeros((n_observations, num_of_action), dtype=torch.float32, device=self.device)

In [None]:
def update(
    self,
    obs,
    action,
    reward,
    next_obs,
    terminated
):
    """
    Updates the weight vector using the Temporal Difference (TD) error 
    in Q-learning with linear function approximation.

    Args:
        obs (dict): The current state observation, containing feature representations.
        action (int): The action taken in the current state.
        reward (float): The reward received for taking the action.
        next_obs (dict): The next state observation.
        next_action (int): The action taken in the next state (used in SARSA).
        terminated (bool): Whether the episode has ended.

    """
    # ========= put your code here ========= #
    """
    Q-learning with linear function approximation.
    """
    # Convert observations to tensor and move to appropriate device
    phi_s = obs.clone().detach().to(self.device)  # current features
    phi_next = next_obs.clone().detach().to(self.device) if not terminated else torch.zeros_like(phi_s)

    # Estimate current and target Q-values
    current_q = self.q(phi_s, a=action)
    next_q = torch.max(self.q(phi_next)).item() if not terminated else 0.0

    # Calculate TD target and TD error
    td_target = reward + self.discount_factor * next_q
    td_error = td_target - current_q.item()

    # Apply gradient update to the weight vector for the taken action
    self.w[:, action] += self.lr * td_error * phi_s

    # Log TD error for analysis
    self.training_error.append(td_error)

    # ====================================== #

In [None]:
def select_action(self, state):
    """
    Select an action based on an epsilon-greedy policy.
    
    Args:
        state (Tensor): The current state of the environment.
    
    Returns:
        Tensor: The selected action.
    """
    # ========= put your code here ========= #
    state_tensor = state.to(self.device)

    # Explore: with probability epsilon, select a random action
    if torch.rand(1).item() < self.epsilon:
        return torch.randint(0, self.num_of_action, (1,)).item()
    else:
        # Exploit: choose the action with the highest Q-value
        q_values = self.q(state_tensor)
        return torch.argmax(q_values).item()
    # ====================================== #

In [None]:
def learn(self, env, max_steps):
    """
    Train the agent on a single step.

    Args:
        env: The environment in which the agent interacts.
        max_steps (int): Maximum number of steps per episode.
    """

    # ===== Initialize trajectory collection variables ===== #
    # Reset environment to get initial state (tensor)
    # Track total episode return (float)
    # Flag to indicate episode termination (boolean)
    # Step counter (int)
    # ========= put your code here ========= #

    # Reset environment and initialize counters
    obs, _ = env.reset()
    total_reward = 0.0
    done = False
    steps = 0

    # Run until episode ends or max_steps is reached
    while not done or steps < max_steps:

        # Extract state vector (shape: [4]) from observation dictionary
        obs_list = torch.tensor([obs['policy'][0, i] for i in range(4)], dtype=torch.float32).to(self.device)
        
        # Choose action based on current policy
        action = self.select_action(obs_list)

        # Scale the discrete action to match environment's action space
        scaled_action = self.scale_action(action).view(1, -1)

        # Take an action in the environment
        next_obs, reward, terminated, truncated, _  = env.step(scaled_action)

        # Extract next state vector
        next_obs_list = torch.tensor([next_obs['policy'][0, i] for i in range(4)], dtype=torch.float32).to(self.device)

        # Determine whether the episode is over
        done = terminated or truncated

        # Update weight with new experience
        self.update(obs_list, action, reward, next_obs_list, done)

        # Accumulate reward and move to next state
        obs = next_obs
        total_reward += reward
        steps += 1

        # If episode ends, decay epsilon and return stats
        if done:
            self.decay_epsilon()
            return steps, total_reward

#### <font color="yellow">**Deep Q-Network (DQN) class**</font>

In [None]:
class DQN_network(nn.Module):
    """
    Neural network model for the Deep Q-Network algorithm.
    
    Args:
        n_observations (int): Number of input features.
        hidden_size (int): Number of hidden neurons.
        n_actions (int): Number of possible actions.
        dropout (float): Dropout rate for regularization.
    """
    def __init__(self, n_observations, hidden_size, n_actions, dropout):
        super(DQN_network, self).__init__()

        # === Define layers (2 hidden layer) ===
        self.fc1 = nn.Linear(n_observations, hidden_size)  # Input → Hidden
        self.dropout = nn.Dropout(dropout)                 # Dropout for regularization
        self.fc2 = nn.Linear(hidden_size, hidden_size)     # Hidden → Hidden
        self.out = nn.Linear(hidden_size, n_actions)       # Hidden → Output (Q-values)

    def init_weights(self):
        """
        Initialize network weights using Xavier initialization for better convergence.
        """
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)  # Xavier initialization
                nn.init.zeros_(m.bias)  # Initialize bias to 0

    def forward(self, x):
        """
        Forward pass through the network.
        
        Args:
            x (Tensor): Input state tensor.
        
        Returns:
            Tensor: Q-value estimates for each action.
        """
        # ========= put your code here ========= #
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        return self.out(x) # Output Q(s, a) for all actions
        # ====================================== #

In [None]:
def __init__(
        self,
        device = None,
        num_of_action: int = 11,
        action_range: list = [-12.0, 12.0],
        n_observations: int = 4,
        hidden_dim: int = 128,
        dropout: float = 0.1,
        learning_rate: float = 0.001,
        tau: float = 0.005,
        initial_epsilon: float = 1.0,
        epsilon_decay: float = 1e-3,
        final_epsilon: float = 0.05,
        discount_factor: float = 0.95,
        buffer_size: int = 10000,
        batch_size: int = 32
) -> None:
    """
    Initialize the CartPole Agent.

    Args:
        learning_rate (float): The learning rate for updating Q-values.
        initial_epsilon (float): The initial exploration rate.
        epsilon_decay (float): The rate at which epsilon decays over time.
        final_epsilon (float): The final exploration rate.
        discount_factor (float, optional): The discount factor for future rewards. Defaults to 0.95.
    """     
    


    # === Set up networks ===
    self.policy_net = DQN_network(n_observations, self.hidden_dim, self.num_of_action, dropout).to(device)
    self.target_net = DQN_network(n_observations, self.hidden_dim, self.num_of_action, dropout).to(device)
    self.target_net.load_state_dict(self.policy_net.state_dict())

    # === Set device and hyperparameters ===
    self.device = device
    self.tau = tau
    self.optimizer = optim.AdamW(self.policy_net.parameters(), lr=self.learning_rate, amsgrad=True)
    self.episode_durations = []
    self.buffer_size = buffer_size
    self.batch_size = batch_size
    self.initial_epsilon = initial_epsilon
    self.state_stats = deque(maxlen=1000)
    self.hidden_dim = hidden_dim
    self.learning_rate = learning_rate
    self.num_of_action = num_of_action        

    super(DQN, self).__init__(
        num_of_action=num_of_action,
        action_range=action_range,
        learning_rate=learning_rate,
        initial_epsilon=initial_epsilon,
        epsilon_decay=epsilon_decay,
        final_epsilon=final_epsilon,  
        discount_factor=discount_factor,
        buffer_size=buffer_size,
        batch_size=batch_size
    )

In [None]:
def select_action(self, state):
    """
    Select an action based on an epsilon-greedy policy.
    
    Args:
        state (Tensor): The current state of the environment.
    
    Returns:
        Tensor: The selected action.
    """
    # ========= put your code here ========= #

    if torch.rand(1).item() < self.epsilon:
        # random action index [0, num_of_action-1] shape: [1, 1]
        return torch.tensor([[random.randrange(self.num_of_action)]], device=self.device, dtype=torch.long)
    else:
        with torch.no_grad():
            q_values = self.policy_net(state)  # shape: [1, num_actions]
            action_idx = torch.argmax(q_values, dim=0, keepdim=True)  # shape: [1, 1]
        return action_idx

    # ====================================== #

In [None]:
def generate_sample(self, batch_size):
    """
    Generates a batch sample from memory for training.

    Returns:
        Tuple: A tuple containing:
            - non_final_mask (Tensor): A boolean mask indicating which states are non-final.
            - non_final_next_states (Tensor): The next states that are not terminal.
            - state_batch (Tensor): The batch of current states.
            - action_batch (Tensor): The batch of actions taken.
            - reward_batch (Tensor): The batch of rewards received.
    """
    # Ensure there are enough samples in memory before proceeding

    # Sample a batch from memory
    # ========= put your code here ========= #
    state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample()
    if len(self.memory) < batch_size:
        return None

    # Create mask True where next_state is NOT terminalfor non-final (non-terminal) states
    non_final_mask = ~done_batch  # done = False → non_final
    non_final_next_states = next_state_batch[non_final_mask]

    return non_final_mask, non_final_next_states, state_batch, action_batch, reward_batch

In [None]:
def calculate_loss(self,
                    non_final_mask, 
                    non_final_next_states, 
                    state_batch, 
                    action_batch, 
                    reward_batch):
    """
    Computes the loss for policy optimization.

    Args:
        non_final_mask (Tensor): Mask indicating which states are non-final.
        non_final_next_states (Tensor): The next states that are not terminal.
        state_batch (Tensor): Batch of current states.
        action_batch (Tensor): Batch of actions taken.
        reward_batch (Tensor): Batch of received rewards.
    
    Returns:
        Tensor: Computed loss.

    This function:
    - Predicts Q(s, a) using the policy network.
    - Computes max Q(s', a') from the target network for non-terminal next states.
    - Calculates the target Q-values using the Bellman equation: 
        target = r + y * max_a' Q_target(s', a')
    - Computes the mean squared error (MSE) loss between predicted and target Q-values.
    """
    # ========= put your code here ========= #

    # Reshape action_batch to match gather's requirements and ensure long type for indexing
    action_batch = action_batch.view(1, -1).long()

    dones = non_final_mask.float()

    # Compute Q(s, a) from the policy network by selecting the Q-values for the taken actions
    q_values = self.policy_net(state_batch).gather(1, action_batch).view(-1)

    # Initialize a tensor to hold Q(s', a') values for each sample in the batch
    next_state_values = torch.zeros(self.batch_size, device=self.device)

    # Compute Q(s', a') using the target network, but only for non-final next states
    with torch.no_grad():
        # Get Q-values from the target network for non-final next states
        next_q_values = self.target_net(non_final_next_states).max(1)[0]

        # Compute the target Q-values using the Bellman equation:
        # target = reward + gamma * max_a' Q(s', a')
        expected_q_values = reward_batch.squeeze(-1) + (1 - dones) * (self.discount_factor * next_q_values)

    # Compute the mean squared error loss between predicted Q(s, a) and target Q-values
    loss = F.mse_loss(q_values, expected_q_values)

    # Return the computed loss for backpropagation
    return loss

In [None]:
def update_policy(self):
    """
    Update the policy using the calculated loss.

    Returns:
        float: Loss value after the update.
    """
    if len(self.memory) < self.batch_size:
        return
    # Generate a sample batch
    sample = self.generate_sample(self.batch_size)
    if sample is None:
        return
    non_final_mask, non_final_next_states, state_batch, action_batch, reward_batch = sample

    # Compute loss
    loss = self.calculate_loss(non_final_mask, non_final_next_states, state_batch, action_batch, reward_batch)
    l2_reg = torch.norm(self.policy_net.fc1.weight, p=2) + torch.norm(self.policy_net.fc2.weight, p=2)
    loss += 1e-4 * l2_reg

    # Perform gradient descent step
    # ========= put your code here ========= #
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

    return loss
    # ====================================== #

In [None]:
def update_target_networks(self):
    """
    Soft update of target network weights using Polyak averaging.
    """
    # Retrieve the state dictionaries (weights) of both networks
    # ========= put your code here ========= #
    policy_state_dict = self.policy_net.state_dict()
    target_state_dict = self.target_net.state_dict()
    # ====================================== #
    
    # Apply the soft update rule to each parameter in the target network
    # ========= put your code here ========= #
    for key in target_state_dict:
        target_state_dict[key] = (
            self.tau * policy_state_dict[key] +
            (1.0 - self.tau) * target_state_dict[key]
        )
    # ====================================== #
    
    # Load the updated weights into the target network
    # ========= put your code here ========= #
    self.target_net.load_state_dict(target_state_dict)
    # ====================================== #

In [None]:
def learn(self, env, max_steps):
    """
    Train the agent on a single step.

    Args:
        env: The environment to train in.
    """

    # ===== Initialize trajectory collection variables ===== #
    # Reset environment to get initial state (tensor)
    # Track total episode return (float)
    # Flag to indicate episode termination (boolean)
    # Step counter (int)
    # ========= put your code here ========= #

    # === Episode Initialization ===
    # - Reset the environment and extract the initial observation
    # - Convert observation to tensor and set initial state
    # - Initialize reward tracker, timestep counter, and done flag
    done = False
    obs, _ = env.reset()
    total_reward = 0.0
    timestep = 0
    total_loss = 0.0
    # ====================================== #

    while not done or timestep < max_steps:
        obs_list = torch.tensor([obs['policy'][0, i] for i in range(4)], dtype=torch.float32).to(self.device)
        state = torch.tensor(obs_list, dtype=torch.float32, device=self.device).unsqueeze(0) 
        # === Action Selection ===
        # - Select action index using epsilon-greedy strategy
        # - Convert index to actual action value if necessary
        action_idx = self.select_action(obs_list)
        action = self.scale_action(action_idx).view(1, -1)

        # === Environment Interaction ===
        # - Take one step in the environment
        # - Extract next state, reward, and done flags
        next_obs, reward, terminated, truncated, _ = env.step(action)
        next_obs_list = torch.tensor([next_obs['policy'][0, i] for i in range(4)], dtype=torch.float32).to(self.device)
        
        done = terminated or truncated

        next_state = torch.tensor(next_obs_list, dtype=torch.float32, device=self.device)

        reward_tensor = torch.as_tensor(reward, dtype=torch.float32, device=self.device)

        # === Store Transition ===
        # - Ensure state/action tensor shapes are correct
        # - Store (s, a, r, s', done) in the replay buffer
        self.memory.add(state, action_idx, reward_tensor, next_state, done)

        # === Learning and Target Update ===
        # - Update Q-network if buffer has enough samples
        # - Soft update target network every fixed interval
        
        if len(self.memory) > 1000: # Perform one step of the optimization (on the policy network)
            loss = self.update_policy()
        
        # Soft update of the target network's weights
        self.update_target_networks()

        # === Bookkeeping ===
        # - Track total reward, step counters, and check termination
        total_reward += reward
        timestep += 1
        obs = next_obs

        # === End-of-Episode Handling ===
        # - Decay epsilon for exploration
        # - Plot training progress
        if done:
            self.decay_epsilon()
            return timestep, total_reward

In [None]:
def save_w(self, path, filename):
    """
    Save weight parameters.
    """
    # ========= put your code here ========= #
    os.makedirs(path, exist_ok=True)
    file_path = os.path.join(path, filename)
    torch.save(self.policy_net.state_dict(), file_path)
    print(f"[INFO] Saved model weights to {file_path}")
    # ====================================== #

In [None]:
def load_w(self, path, filename):
    """
    Load weight parameters.
    """
    # ========= put your code here ========= #
    file_path = os.path.join(path, filename)

    if os.path.exists(file_path):
        self.policy_net.load_state_dict(torch.load(file_path, map_location=self.device))
        self.target_net.load_state_dict(self.policy_net.state_dict())  # Sync target with policy
        print(f"[INFO] Loaded model weights from {file_path}")
    else:
        raise FileNotFoundError(f"[ERROR] File not found: {file_path}")
    # ====================================== #

#### <font color="yellow">**REINFORCE class**</font>

In [None]:
class MC_REINFORCE_network(nn.Module):
    """
    Neural network for the MC_REINFORCE algorithm.
    
    Args:
        n_observations (int): Number of input features.
        hidden_size (int): Number of hidden neurons.
        n_actions (int): Number of possible actions.
        dropout (float): Dropout rate for regularization.
    """

    def __init__(self, n_observations, hidden_size, n_actions, dropout):
        super(MC_REINFORCE_network, self).__init__()
        # ========= put your code here ========= #

        # one layer neural network
        self.fc1 = nn.Linear(n_observations, hidden_size)
        self.fc3 = nn.Linear(hidden_size, n_actions)
        # ====================================== #

    def forward(self, x):
        """
        Forward pass through the network.
        
        Args:
            x (Tensor): Input tensor.
        
        Returns:
            Tensor: Output tensor representing action probabilities.
        """
        # ========= put your code here ========= #
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc3(x))
        action_probs = F.softmax(x, dim=-1) # softmax for discrete action
        action_probs = torch.clamp(action_probs, min=1e-8, max=1.0)
        return action_probs
        # ====================================== #

In [None]:
def __init__(
        self,
        device = None,
        num_of_action: int = 2,
        action_range: list = [-2.5, 2.5],
        n_observations: int = 4,
        hidden_dim: int = 64,
        dropout: float = 0.5,
        learning_rate: float = 0.01,
        discount_factor: float = 0.95,
) -> None:
    """
    Initialize the CartPole Agent.

    Args:
        learning_rate (float): The learning rate for updating Q-values.
        initial_epsilon (float): The initial exploration rate.
        epsilon_decay (float): The rate at which epsilon decays over time.
        final_epsilon (float): The final exploration rate.
        discount_factor (float, optional): The discount factor for future rewards. Defaults to 0.95.
    """     

    # ========= put your code here ========= #
    self.LR = learning_rate

    self.policy_net = MC_REINFORCE_network(n_observations, hidden_dim, num_of_action, dropout).to(device)
    self.optimizer = optim.AdamW(self.policy_net.parameters(), lr=learning_rate)

    self.device = device
    self.hidden = hidden_dim
    self.steps_done = 0

    self.episode_durations = []
    
    # ====================================== #

    super(MC_REINFORCE, self).__init__( # type: ignore
        num_of_action=num_of_action,
        action_range=action_range,
        learning_rate=learning_rate,
        discount_factor=discount_factor,
    )

In [None]:
def calculate_stepwise_returns(self, rewards):
    """
    Compute stepwise returns for the trajectory.

    Args:
        rewards (list): List of rewards obtained in the episode.
    
    Returns:
        Tensor: Normalized stepwise returns.
    """
    # ========= put your code here ========= #

    # Monte Carlo Return Calculation
    R = 0
    returns = []
    for r in reversed(rewards):
        R = r + self.discount_factor * R
        returns.insert(0, R)

    # Convert to tensor and normalize
    returns = torch.tensor(returns, dtype=torch.float32).to(self.device)
    returns = F.normalize(returns, dim=0)

    return returns
    # ====================================== #

In [None]:
def generate_trajectory(self, env):
    """
    Generate a trajectory by interacting with the environment.

    Args:
        env: The environment object.
    
    Returns:
        Tuple: (episode_return, stepwise_returns, log_prob_actions, trajectory)
    """
    # ===== Initialize trajectory collection variables ===== #
    # Reset environment to get initial state (tensor)
    # Store state-action-reward history (list)
    # Store log probabilities of actions (list)
    # Store rewards at each step (list)
    # Track total episode return (float)
    # Flag to indicate episode termination (boolean)
    # Step counter (int)
    # ========= put your code here ========= #

    # Initialization
    state, _ = env.reset()

    trajectory = []
    log_prob_actions = []
    rewards = []
    entropy_list = []

    done = False
    timestep = 0
    episode_return = 0.0
    # ====================================== #
    
    # Trajectory Collection Loop
    while not done:
        
        # Get state tensor and predict action
        state_tensor = torch.tensor([state['policy'][0, i] for i in range(4)], dtype=torch.float32).to(self.device)
        action_probs = self.policy_net(state_tensor)
        dist = distributions.Categorical(action_probs)
        action = dist.sample()

        # Get log-probability and entropy of action
        log_prob = dist.log_prob(action)
        entropy = dist.entropy()
        entropy_list.append(entropy)

        # Scale action and step environment
        action = action.view(1, -1)
        action = self.scale_action(action)            
        next_state, reward, terminated, truncated, _ = env.step(action)

        # Save log-prob, reward, and full transition
        log_prob_actions.append(log_prob)
        rewards.append(reward)
        trajectory.append((state_tensor, action.item(), reward))

        # Update state and counters
        state = next_state
        episode_return += reward
        timestep += 1
        done = terminated or truncated

    # Prepare return values
    stepwise_returns = self.calculate_stepwise_returns(rewards)
    log_prob_actions = torch.stack(log_prob_actions)
    return timestep, episode_return, stepwise_returns, log_prob_actions, trajectory, torch.stack(entropy_list)
    # ====================================== #

In [None]:
def calculate_loss(self, stepwise_returns, log_prob_actions, entropy_list):
    """
    Compute the loss for policy optimization.

    Args:
        stepwise_returns (Tensor): Stepwise returns for the trajectory.
        log_prob_actions (Tensor): Log probabilities of actions taken.
    
    Returns:
        Tensor: Computed loss.
    """
    # ========= put your code here ========= #
    # Policy Gradient Loss
    return -torch.sum(log_prob_actions * stepwise_returns)/len(stepwise_returns)
    # ====================================== #

In [None]:
def update_policy(self, stepwise_returns, log_prob_actions, entropy_list):
    """
    Update the policy using the calculated loss.

    Args:
        stepwise_returns (Tensor): Stepwise returns.
        log_prob_actions (Tensor): Log probabilities of actions taken.
    
    Returns:
        float: Loss value after the update.
    """
    # ========= put your code here ========= #
    # Calculate loss
    loss = self.calculate_loss(stepwise_returns, log_prob_actions, entropy_list)

    # Backward pass and optimizer step
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()
    return loss.item()
    # ====================================== #

In [None]:
def learn(self, env):
    """
    Train the agent on a single episode.

    Args:
        env: The environment to train in.
    
    Returns:
        Tuple: (episode_return, loss, trajectory)
    """
    # ========= put your code here ========= #

    # Train policy for 1 episode
    self.policy_net.train()
    timestep, episode_return, stepwise_returns, log_prob_actions, trajectory, entropy = self.generate_trajectory(env)
    
    # Policy update
    loss = self.update_policy(stepwise_returns, log_prob_actions, entropy)
    
    # Return training info
    return timestep, episode_return, stepwise_returns, loss, trajectory
    # ====================================== #

In [None]:
def save_w(self, path, filename):
    """
    Save model weight.
    """
    # ========= put your code here ========= #
    os.makedirs(path, exist_ok=True)
    file_path = os.path.join(path, filename)
    torch.save(self.policy_net.state_dict(), file_path)
    print(f"[INFO] Saved model weights to {file_path}")
    # ====================================== #

In [None]:
def load_w(self, path, filename):
    """
    Load model weight.
    """
    # ========= put your code here ========= #
    file_path = os.path.join(path, filename)
    if os.path.exists(file_path):
        self.policy_net.load_state_dict(torch.load(file_path, map_location=self.device))
        print(f"[INFO] Loaded model weights from {file_path}")
    else:
        raise FileNotFoundError(f"[ERROR] File not found: {file_path}")
    # ====================================== #

#### <font color="yellow">**A2C class**</font>

In [None]:
class Actor(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, learning_rate=1e-4):
        """
        Actor network for policy approximation.

        Args:
            input_dim (int): Dimension of the state space.
            hidden_dim (int): Number of hidden units in layers.
            output_dim (int): Dimension of the action space.
            learning_rate (float, optional): Learning rate for optimization. Defaults to 1e-4.
        """
        super(Actor, self).__init__()

        # ========= put your code here ========= #

        # Define the neural network structure (one layer neural network for predict output action)
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim),
            nn.ReLU()
        )
        # Set up the optimizer
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

        # Initialize network weights
        self.init_weights()
        # ====================================== #

    def init_weights(self):
        """
        Initialize network weights using Xavier initialization for better convergence.
        """
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)  # Xavier initialization
                nn.init.zeros_(m.bias)  # Initialize bias to 0

    def forward(self, state):
        """
        Forward pass for action selection.

        Args:
            state (Tensor): Current state of the environment.

        Returns:
            Tensor: Selected action values.
        """
        # ========= put your code here ========= #
        x = self.net(state)
        x = torch.softmax(x, dim=-1)
        return x
        # ====================================== #

In [None]:
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim, learning_rate=1e-4):
        """
        Critic network for Q-value approximation.

        Args:
            state_dim (int): Dimension of the state space.
            action_dim (int): Dimension of the action space.
            hidden_dim (int): Number of hidden units in layers.
            learning_rate (float, optional): Learning rate for optimization. Defaults to 1e-4.
        """
        super(Critic, self).__init__()

        # ========= put your code here ========= #

        # Define the neural network (one layer neural network) for value prediction
        self.net = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1) # Output is scalar V(s)
        )

        # Set up the optimizer
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

        # Initialize weights 
        self.init_weights()
        # ====================================== #

    def init_weights(self):
        """
        Initialize network weights using Kaiming initialization.
        """
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.kaiming_uniform_(m.weight, nonlinearity='relu')  # Kaiming initialization
                nn.init.zeros_(m.bias)  # Initialize bias to 0

    def forward(self, state, action=None):
        """
        Forward pass for Q-value estimation.

        Args:
            state (Tensor): Current state of the environment.
            action (Tensor): Action taken by the agent.

        Returns:
            Tensor: Estimated Q-value.
        """
        # ========= put your code here ========= #
        return self.net(state).squeeze(-1)  # output shape: [batch]
        # ====================================== #

In [None]:
def __init__(self, 
            device = None, 
            num_of_action: int = 2,
            action_range: list = [-2.5, 2.5],
            n_observations: int = 4,
            hidden_dim = 256,
            dropout = 0.05, 
            learning_rate: float = 0.01,
            tau: float = 0.005,
            discount_factor: float = 0.95,
            buffer_size: int = 256,
            batch_size: int = 1,
            entropy_coeff: float = 0.01,
            ):
    """
    Actor-Critic algorithm implementation.

    Args:
        device (str): Device to run the model on ('cpu' or 'cuda').
        num_of_action (int, optional): Number of possible actions. Defaults to 2.
        action_range (list, optional): Range of action values. Defaults to [-2.5, 2.5].
        n_observations (int, optional): Number of observations in state. Defaults to 4.
        hidden_dim (int, optional): Hidden layer dimension. Defaults to 256.
        learning_rate (float, optional): Learning rate. Defaults to 0.01.
        tau (float, optional): Soft update parameter. Defaults to 0.005.
        discount_factor (float, optional): Discount factor for Q-learning. Defaults to 0.95.
        batch_size (int, optional): Size of training batches. Defaults to 1.
        buffer_size (int, optional): Replay buffer size. Defaults to 256.
    """

    # Set device and instantiate networks
    self.device = device
    self.actor = Actor(n_observations, hidden_dim, num_of_action, learning_rate).to(device)
    self.critic = Critic(n_observations, num_of_action, hidden_dim, learning_rate).to(device)

    # Store hyperparameters
    self.discount_factor = discount_factor
    self.action_range = action_range
    self.entropy_coeff = entropy_coeff
    self.learning_rate = learning_rate
    self.hidden = hidden_dim

    # Call parent class initializer
    super(A2C_Discrete, self).__init__(
        num_of_action=num_of_action,
        action_range=action_range,
        learning_rate=learning_rate,
        discount_factor=discount_factor,
    )

In [None]:
def select_action(self, state):
    """
    Selects an action based on the current policy with optional exploration noise.
    
    Args:
    state (Tensor): The current state of the environment.
    noise (float, optional): The standard deviation of noise for exploration. Defaults to 0.0.

    Returns:
        Tuple[Tensor, Tensor]: 
            - scaled_action: The final action after scaling.
            - clipped_action: The action before scaling but after noise adjustment.
    """
    # ========= put your code here ========= #
    probs = self.actor(state)                       # Get action probabilities
    probs = torch.clamp(probs, min=1e-6, max=1.0)   # Prevent extremely small probabilities
    dist = torch.distributions.Categorical(probs)   # Create Categorical distribution
    action = dist.sample()                          # Sample action from distribution
    log_prob = dist.log_prob(action).sum(dim=-1)    # Compute log-probability of chosen action
    return action, log_prob
    # ====================================== #

In [None]:
def calculate_loss(self, state, action, reward, next_state, done):
    """
    Computes the loss for policy optimization.

    Args:
        - states (Tensor): The batch of current states.
        - actions (Tensor): The batch of actions taken.
        - rewards (Tensor): The batch of rewards received.
        - next_states (Tensor): The batch of next states received.
        - dones (Tensor): The batch of dones received.

    Returns:
        Tensor: Computed critic & actor loss.
    """
    # ========= put your code here ========= #
    # Convert to tensors
    values = self.critic(state)             # V(s)
    next_values = self.critic(next_state)   # V(s')

    # Compute temporal difference target
    td_target = reward + self.discount_factor * next_values

    # Advantage is the difference between TD target and current value
    advantage = td_target - values

    # Actor Loss: negative log-prob * advantage
    logit = self.actor(state)
    dist = torch.distributions.Categorical(logit)
    log_probs = dist.log_prob(action)
    actor_loss = -(log_probs * advantage.detach()).mean()

    # Critic Loss: MSE of advantage
    critic_loss = (advantage**2).mean()

    return actor_loss, critic_loss
    # ====================================== #

In [None]:
def update_policy(self, state, action, reward, next_state, done): 
    """
    Update the policy using the calculated loss.

    Returns:
        float: Loss value after the update.
    """
    # ========= put your code here ========= #

    # Compute critic and actor loss
    actor_loss, critic_loss = self.calculate_loss(state, action, reward, next_state, done)
    
    # Backpropagate and update critic network parameters
    self.actor.optimizer.zero_grad()
    actor_loss.backward()
    self.actor.optimizer.step()

    # Backpropagate and update actor network parameters
    self.critic.optimizer.zero_grad()
    critic_loss.backward()
    self.critic.optimizer.step()
    # ====================================== #
    return actor_loss, critic_loss

In [None]:
def learn(self, env, max_steps, num_agents):
    """
    Train the agent on a single step.

    Args:
        env: The environment in which the agent interacts.
        max_steps (int): Maximum number of steps per episode.
        num_agents (int): Number of agents in the environment.
        noise_scale (float, optional): Initial exploration noise level. Defaults to 0.1.
        noise_decay (float, optional): Factor by which noise decreases per step. Defaults to 0.99.
    """

    # ===== Initialize trajectory collection variables ===== #
    # Reset environment to get initial state (tensor)
    # Track total episode return (float)
    # Flag to indicate episode termination (boolean)
    # Step counter (int)
    # ========= put your code here ========= #

    # Reset environment
    state, _ = env.reset()
    total_reward = 0.0
    num_step = 0
    total_actorloss = 0.0
    total_criticloss = 0.0
    # ====================================== #

    for step in range(max_steps):


        # Convert observation to tensor
        state_tensor = torch.tensor([state['policy'][0, i] for i in range(4)], dtype=torch.float32).to(self.device)
        
        # Select action from actor
        action, log_prob = self.select_action(state_tensor)

        # Scale and apply action in the environment
        scaled_action = self.scale_action(action.item()).view(1, -1)
        next_state, reward, terminated, truncated, _ = env.step(scaled_action)
        
        # Process next state
        done = terminated or truncated
        next_state_tensor = torch.tensor([next_state['policy'][0, i] for i in range(4)], dtype=torch.float32).to(self.device)
        
        # Update networks using this transition                        
        actor_loss, critic_loss = self.update_policy(state_tensor, action, reward, next_state_tensor, done)

        # Update state
        state = next_state
        total_reward += reward.item()
        total_actorloss += actor_loss
        total_criticloss += critic_loss
        num_step = step

        if done:
            break

    return num_step, total_reward

In [None]:
def save_w(self, path, filename):
    """
    Save model weight.
    """
    # ========= put your code here ========= #
    os.makedirs(path, exist_ok=True)
    file_path_actor = os.path.join(path, filename)

    # print(self.policy_net.state_dict())
    torch.save(self.actor.state_dict(), file_path_actor)
    print(f"[INFO] Saved model weights to {file_path_actor}")
    # ====================================== #

In [None]:
def load_w(self, path, filename):
    """
    Load model weight.
    """
    # ========= put your code here ========= #
    file_path_actor = os.path.join(path, filename)

    if os.path.exists(file_path_actor):
        self.actor.load_state_dict(torch.load(file_path_actor, map_location=self.device))
        print(f"[INFO] Loaded model weights from {file_path_actor}")
    else:
        raise FileNotFoundError(f"[ERROR] File not found: {file_path_actor}")
    # ====================================== #

## <font color="pink">**Part 3: Trainning & Playing to stabilize Cart-Pole Agent**</font>

You need to implement the training loop in train script and main() in the play script (in the "Can be modified" area of both files). Additionally, you must collect data, analyze results, and save models for evaluating agent performance.

- Training the Agent

    - Stabilizing Cart-Pole Task

        ```python
        python scripts/Function_based/train.py --task Stabilize-Isaac-Cartpole-v0
        ```

    - Swing-up Cart-Pole Task (Optional)

        ```python
        python scripts/Function_based/train.py --task SwingUp-Isaac-Cartpole-v0
        ```

- Playing

    - Stabilize Cart-Pole Task

        ```python
        python scripts/Function_based/play.py --task Stabilize-Isaac-Cartpole-v0
        ``` 

    - Swing-up Cart-Pole Task (Optional)

        ```python
        python scripts/Function_based/play.py --task SwingUp-Isaac-Cartpole-v0 
        ```

### <font color="yellow">**train.py**</font>

Example A2C algorithm

In [None]:
# hyperparameters
n_episodes = 2002
param_grid = {
    "num_of_action":[7],
    "action_range":[20.0],
    "learning_rate": [0.0003],
    "hidden_dim": [128],
    "discount":[0.99]
}

# if GPU is to be used
device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)

print("device: ", device)

task_name = str(args_cli.task).split('-')[0]  # Stabilize, SwingUp
Algorithm_name = "A2C"

# Create all combinations
grid = list(itertools.product(*param_grid.values()))
param_names = list(param_grid.keys())

# Do Grid Search
for config_idx, values in enumerate(grid):
    config = dict(zip(param_names, values))
    print(f"\n===== Training Config {config_idx+1}/{len(grid)}: {config} =====")

    Experiment = "Discount "+str(config["discount"])

    # Initialize Weights and Biases (wandb) for tracking and logging metrics during training
    wandb.init( # type: ignore
        project='DRL_HW3',  # The name of the project in wandb
        name="A2C_discrete"  # The name of the current run
    )

    # Define Agent
    agent = A2C_Discrete(
        device=device,
        num_of_action=config["num_of_action"],
        action_range=[-config["action_range"], config["action_range"]],
        learning_rate=config["learning_rate"],
        hidden_dim=config["hidden_dim"],
        discount_factor=config["discount"]
    )

    # reset environment
    obs, _ = env.reset()
    timestep = 0
    # simulate environment
    while simulation_app.is_running():
        # run everything in inference mode
        # with torch.inference_mode():
        sum_reward = 0.0
        sum_step = 0
        for episode in tqdm(range(n_episodes)): # type: ignore  

            step, reward = agent.learn(env, max_steps=1000, num_agents=1)

            sum_step += step
            sum_reward += reward
            wandb.log({ # type: ignore
                'num_step': step,
                'reward': reward
            })

            if episode % 100 == 0: # type: ignore
                print(sum_step / 100.0)
                
                # Show average step and reward every 100 steps
                wandb.log({ # type: ignore
                    'avg_step': sum_step / 100.0,
                    'avg_reward': sum_reward / 100.0
                })
                sum_step = 0
                sum_reward = 0.0                

            # ================= Save Q-Learning agent Area ===================
            w_file = f"{Algorithm_name}_{episode}_{agent.num_of_action}_{agent.action_range[1]}_{agent.discount_factor}_{agent.lr}_{agent.hidden}.pt" # type: ignore
            full_path = os.path.join(f"w/{task_name}", Algorithm_name)
            agent.save_w(full_path, w_file)

        print('Complete')
        if args_cli.video:
            timestep += 1
            # Exit the play loop after recording one video
            if timestep == args_cli.video_length:
                break

        break

    # Finish the wandb run and save the logged metrics
    wandb.finish() # type: ignore 
# ==================================================================== #

### <font color="yellow">**play.py**</font>

Example A2C Algorithm

In [None]:
# hyperparameters
num_of_action = 7
action_range = [-20.0, 20.0]  
learning_rate = 0.01
n_episodes = 50
discount = 0.01
entropy_coeff = 0.01

# if GPU is to be used
device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)

print("device: ", device)

agent = A2C_Discrete(
    device=device,
    num_of_action=num_of_action,
    action_range=action_range,
    learning_rate=learning_rate,
    discount_factor = discount,
    entropy_coeff=entropy_coeff
)

task_name = str(args_cli.task).split('-')[0]  # Stabilize, SwingUp
Algorithm_name = "A2C"  
episode = 4900
q_value_file = f"{Algorithm_name}_{episode}_{agent.num_of_action}_{agent.action_range[1]}_{agent.discount_factor}_{agent.learning_rate}_{agent.entropy_coeff}.json"
full_path = os.path.join(f"w/{task_name}", Algorithm_name)
agent.load_w(full_path, q_value_file)

# reset environment
obs, _ = env.reset()
timestep = 0

# List สำหรับเก็บ obs
obs_list = []

# simulate environment
while simulation_app.is_running():
    # run everything in inference mode
    with torch.inference_mode():

        sum_step = []

        for episode in range(n_episodes):
            obs, _ = env.reset()
            done = False
            episode_obs = []  # เก็บ obs ของแต่ละ episode
            step = 0
            while not done:
                episode_obs.append([float(obs['policy'][0, i]) for i in range(4)])
                obs = torch.tensor([obs['policy'][0, i] for i in range(4)], dtype=torch.float32).to(agent.device)
                # agent stepping
                action = agent.select_action(obs)
                scaled_action = agent.scale_action(action).view(1, -1)

                # env stepping
                next_obs, reward, terminated, truncated, _ = env.step(scaled_action)

                done = terminated or truncated
                obs = next_obs

                step += 1

            # print("Episode:", episode+1, " Step: ",step)
            sum_step.append(step)

            # บันทึก obs ของ episode นี้
            obs_list.append(episode_obs)            

    if args_cli.video:
        timestep += 1
        # Exit the play loop after recording one video
        if timestep == args_cli.video_length:
            break

    break

Show the position of pole of the longest duration episode

In [None]:
# save obs in JSON
os.makedirs("saved_obs", exist_ok=True)
obs_file_path = os.path.join("saved_obs", f"{Algorithm_name}_{agent.num_of_action}_{agent.action_range[1]}_{agent.discount_factor}_{agent.learning_rate}_{agent.entropy_coeff}.json")

obs_list = [obs.detach().cpu().tolist() if isinstance(obs, torch.Tensor) else obs for obs in obs_list]

with open(obs_file_path, "w") as f:
    json.dump(obs_list, f, indent=4)

print(f"Observations saved to {obs_file_path}")

# choose the episode that the longest step
max_n = np.argmax(sum_step)

# choose observation of the best episode
sublist = obs_list[max_n]  # เปลี่ยน index ตามต้องการ

y_values = [item[1] for item in sublist] #*180.0/pi

x_values = list(range(1, len(sublist) + 1))

# plot
plt.figure(figsize=(6, 4))
plt.plot(x_values, y_values, linestyle='-', color='b', label="pole_pose")

plt.xlabel("Step")
plt.ylabel("Pole_pose (rad)")
plt.title(f"{Algorithm_name}_{agent.num_of_action}_{agent.action_range[1]}_{agent.discount_factor}_{agent.learning_rate}_{agent.entropy_coeff}")
plt.ylim(-0.4, 0.4)
plt.legend()
plt.grid(True)

save_dir = "plots"
os.makedirs(save_dir, exist_ok=True)  # สร้างโฟลเดอร์ถ้ายังไม่มี

save_path = os.path.join(save_dir, f"{Algorithm_name}_{agent.num_of_action}_{agent.action_range[1]}_{agent.discount_factor}_{agent.learning_rate}_{agent.entropy_coeff}.png")
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"📸 Saved plot as {save_path}")

plt.show()

## <font color="pink">**Part 4: Evaluate Cart-Pole Agent performance**</font>

You must evaluate the agent's performance in terms of learning efficiency (i.e., how well the agent learns to receive higher rewards) and deployment performance (i.e., how well the agent performs in the Cart-Pole problem). Analyze and visualize the results to determine:

- Which algorithm performs best?

- Why does it perform better than the others?


### <font color="yellow">**Setup**</font>

I use 5 rewards as shown below.

In [None]:
class RewardsCfg:
    """Reward terms for the MDP."""

    # (1) Constant running reward
    alive = RewTerm(func=mdp.is_alive, weight=1.0) # type: ignore
    # (2) Failure penalty
    terminating = RewTerm(func=mdp.is_terminated, weight=-2.0) # type: ignore
    # (3) Primary task: keep pole upright
    pole_pos = RewTerm(
        func=mdp.joint_pos_target_l2,
        weight=-1.0,
        params={"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"]), "target": 0.0},
    )
    # (4) Shaping tasks: lower cart velocity
    cart_vel = RewTerm(
        func=mdp.joint_vel_l1, # type: ignore
        weight=-0.01,
        params={"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"])},
    )
    # (5) Shaping tasks: lower pole angular velocity
    pole_vel = RewTerm(
        func=mdp.joint_vel_l1, # type: ignore
        weight=-0.005,
        params={"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"])},
    )

And use reward per dt as shown below.

In [None]:
def compute(self, dt: float) -> torch.Tensor:
    """Computes the reward signal as a weighted sum of individual terms.

    This function calls each reward term managed by the class and adds them to compute the net
    reward signal. It also updates the episodic sums corresponding to individual reward terms.

    Args:
        dt: The time-step interval of the environment.

    Returns:
        The net reward signal of shape (num_envs,).
    """
    # reset computation
    self._reward_buf[:] = 0.0
    # iterate over all the reward terms
    for name, term_cfg in zip(self._term_names, self._term_cfgs):
        # skip if weight is zero (kind of a micro-optimization)
        if term_cfg.weight == 0.0:
            continue
        # compute term's value
        value = term_cfg.func(self._env, **term_cfg.params) * term_cfg.weight * dt
        # update total reward
        self._reward_buf += value
        # update episodic sum
        self._episode_sums[name] += value

        # Update current reward for this step.
        self._step_reward[:, self._term_names.index(name)] = value / dt

    return self._reward_buf

### <font color="yellow">**Find the best hyperparater of each algorithm**</font>

I will find the best hyperparameter of each algorithm by do grid search of combination of hyperparameter that I interested and pick the best combination of hyperparameter by choosing the best learning curve. Example of Linear Q Algorithm

Linear Q has 3 hyperparameter that is discount factor, learning rate, and epsilon decay.
First, I choose 3 different number of discount factor (0.01, 0.5, and 0.99) and train. Other hyperparameter is same value for every train.

<div>
    <img src="plots/LinearQ_discount.png" alt="Image 1" style="width: 30%; height: auto;">
</div>

Discount 0.01 is the best hyperparameter, then Fix Discount Factor to 0.01 and choose different number of learning rate (0.0001, 0.001, 0.01) and fix the epsilon decay.

<div>
    <img src="plots/LinearQ_learning_rate.png" alt="Image 1" style="width: 30%; height: auto;">
</div>

Learning rate 0.01 is the best hyperparameter, then fix Discount Factor to 0.01 and Learning rate to 0.01 and choose different number of epsilon decay (0.0003, 0.0006, 0.001)

<div>
    <img src="plots/LinearQ_epsilon_decay.png" alt="Image 1" style="width: 30%; height: auto;">
</div>

Epsilon decay 0.0003 is the best hyperparameter, Now, I find the best of combination of hyperparameter that is Discount Factor 0.01, Learning rate 0.01 and Epsilon decay 0.0003

Do like this with another algorithm. The best hyperparameter of each algorithm is shown below.

- Every Algorithm

    - num_of_action = 7

    - action_range = [-20.0, 20.0]  

- Linear Q

    - Discount Factor 0.01

    - Learning Rate 0.01

    - Epsilon Decay 0.0003

- DQN

    - Discount Factor 0.01

    - Learning Rate 0.0001
    
    - Epsilon Decay 0.0003

    - Tau 0.001

    - Hidden Dim 128
    
    - Buffer Size 1000

    - Batch Size 32

- MC-REINFORCE

    - Discount Factor 0.01

    - Learning Rate 0.0001
    
    - Hidden Dim 128

- A2C

    - Discount Factor 0.01

    - Learning Rate 0.0005
    
    - Hidden Dim 128

### <font color="yellow">**In term of learning efficiency**</font>

<div>
    <img src="plots/avg_step.png" alt="Image 1" style="width: 30%; height: auto;">
</div>

MC-REINFORCE is best algorithm because

- MC-REINFORCE uses Full Return, not Value Function, so there is no bias from value estimation, resulting in clear and direct learning signal from real reward.

- A2C uses TD to evaluate advantage if V(s) has not converged (critic does not learn well), which causes the advantage to be wrong and causes the actor to update in the wrong direction.

- DQN has overestimation bias from max, so the gradient will flow only at argmax action. If the network does not learn well enough, it will output only bad actions.

- Linear Q may still be slow to select the best action. Even a small adjustment of weights will make it learn poorly, so it may need to learn more.

### <font color="yellow">**In term of deployment performance**</font>

<div style="display: grid; grid-template-columns: repeat(2, 1fr); gap: 0px;">
    <img src="plots/LinearQ_7_20.0_0.01_0.01_0.0003.png" alt="Image 1" style="width: 100%; height: auto;">
    <img src="plots/DQN_7_20.0_0.01_0.0001_0.0003_128_1000_32_0.001.png" alt="Image 2" style="width: 100%; height: auto;">
    <img src="plots/MC_REINFORCE_7_20.0_0.01_0.001_128.png" alt="Image 3" style="width: 100%; height: auto;">
    <img src="plots/A2C_7_20.0_0.99_0.0005_128.png" alt="Image 4" style="width: 100%; height: auto;"> 
</div>

The best algorithm is Linear Q because

- The resulting Linear Q policy is relatively straightforward, so it can select actions that maintain balance, allowing the pole to be controlled within a narrow range for a long time.

- The MC-REINFORCE Policy is more random than the Gradient from full return, with variance, making the final policy still quite random, causing the pole to still oscillate.

- For DQN, Policy from Q-network may overfit with inconsistent Q-value, thus making argmax action selection from Q is wrong, causing the pole to slowly tilt down and fall.

- A2C with Actor network is driven by advantage from critic, if critic still has bias, the resulting policy is not good enough, so the pole swings less than DQN/REINFORCE but still falls down.