# **Part 2: Setting up `Cart-Pole` Agent.**


- **`Name`** : **Pavaris Asawakijtananont**

- **`Number`** : **65340500037**

## **Base Class**

##### **q**
- calling the action value with using linear approximator to use with Linear Q Learning 

```python
    def q(self, obs, a=None):
        """Returns the linearly-estimated Q-value for a given state and action."""
        obs_val = obs['policy'][0].detach().cpu().numpy()
        if a==None:
            # Get q values from all action in state
            return np.dot(obs_val, self.w)
        else:
            # Get q values given action & state
            return np.dot(obs_val, self.w[:, a])
        # ====================================== #
```

##### **Scale Action**
```python
    def scale_action(self, action):
        return torch.tensor([[action * ((self.action_range[1] - self.action_range[0]) / (self.num_of_action-1 )) + self.action_range[0]]])
```

##### **Select Action**
- select action bu using deterministic policy by using argument max the action value, and balance the exploration and exploitation  Learning with **$\epsilon - greedy$** with probability to exploration with **$\epsilon$**

```python
    def select_action(self, state):
        """ Select an action based on an epsilon-greedy policy. """
        if np.random.rand() < self.epsilon:
            return np.random.randint(0, self.num_of_action)
        else:
            # Exploitation: choose the action with the highest estimated Q-value
            return np.argmax(self.q(state))
```

##### **Decay Epsilon**
- decaying epsilon to balancing exploration and exploitation

```python
    def decay_epsilon(self):
        """ Decay epsilon value to reduce exploration over time. """
        self.epsilon = max(self.final_epsilon, self.epsilon-self.epsilon_decay)
```

## **Linear Q Learning**


##### **Constructor**

- initial Linear Q Learning class with updating parameter including
    - Learning rate
    - Initial Epsilon
    - Epsilon Decay
    - Final Epsilon
    - Discount Factor
    
```python
class Linear_QN(BaseAlgorithm):
    def __init__(
            self,
            num_of_action: int = 2,
            action_range: list = [-2.5, 2.5],
            learning_rate: float = 0.01,
            initial_epsilon: float = 1.0,
            epsilon_decay: float = 1e-3,
            final_epsilon: float = 0.001,
            discount_factor: float = 0.95,
    ) -> None:
```

##### **Updating**
- updating Linear Q Learning with using the gradient descent by using the gradient by using state
- and error term using maximum action value from next state to set as target value, like a Q learning


```python
    def update(self,obs,action: int,reward: float,next_obs,next_action: int,terminated: bool
    ):
        """
        Updates the weight vector using the Temporal Difference (TD) error 
        in Q-learning with linear function approximation.
        """
        # ========= put your code here ========= #
        q_curr = self.q(obs=obs, a=action)
        if terminated:
            target = reward
        else:
            target = reward + self.discount_factor * np.max(self.q(next_obs))
        pass
    
        error = target - q_curr
        self.training_error.append(error)
        # Gradient descent update
        self.w[:, action] += self.lr * error * obs['policy'][0].detach().cpu().numpy()
```

##### **Learn**
- Set the function to make agent learning with environment by updating every timestep by using observation term as gradient

```python
    def learn(self, env):
        """
        Train the agent on a single step.
        """
        obs, _ = env.reset()
        cumulative_reward = 0.0
        done = False
        step = 0
        while not done:
            action = self.select_action(obs)
            next_obs, reward, terminated, truncated, _ = env.step(self.scale_action(action))
            reward_value = reward.item()
            terminated_value = terminated.item() 
            cumulative_reward += reward_value
            done = terminated or truncated
            self.update(
                obs=obs,
                action=action,
                reward=reward_value,
                next_obs=next_obs,
                next_action=action,
                terminated=terminated_value
            )
            done = terminated or truncated
            obs = next_obs
            step += 1
        self.decay_epsilon()
        return cumulative_reward , step
```

## **Deep Q Network**

##### **Neural Network**
- setup neural network to approximate action value from policy
- this neural consist with 1 hidden layer with fully connected layer
- and forward fucntion to approximate  

```python

class DQN_network(nn.Module):
    """ Neural network model for the Deep Q-Network algorithm. """
    def __init__(self, n_observations, hidden_size, n_actions, dropout):
        super(DQN_network, self).__init__()
        # ========= put your code here ========= #
        self.fc1 = nn.Linear(n_observations, hidden_size) # Input layer
        self.fc2 = nn.Linear(hidden_size, n_actions) # hidden layer
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        """ Forward pass through the network."""
        val = x
        val = F.relu(self.fc1(val))
        val = self.dropout(val)
        val = F.relu(self.fc2(val))
        val = self.dropout(val)

        return val
```

##### **Constructor**

- initial variable for Deep Q Network
    - `tau` : constant for soft update in target network
    - `hidden_dim` : number of neuron in hidden layer
    - `learning_rate` : learning rate to updating gradient
    - `dropout` : probability to black out neuron
    - `buffer_size` : buffer size to collect experience
    - `batch_size` : number of sampling to use to updating network 
    
```python
class DQN(BaseAlgorithm):
    def __init__(
            self,
            device = None,
            num_of_action: int = 2,
            action_range: list = [-2.5, 2.5],
            n_observations: int = 4,
            hidden_dim: int = 64,
            dropout: float = 0.5,
            learning_rate: float = 0.005,
            tau: float = 0.005,
            initial_epsilon: float = 1.0,
            epsilon_decay: float = 1e-3,
            final_epsilon: float = 0.001,
            discount_factor: float = 0.95,
            buffer_size: int = 1000,
            batch_size: int = 1,
    ) -> None:
```



##### **Calculate Loss**

-  Calculate DQN loss with following the equation

$$
L = (y_j +\gamma \max_{a'}Q(\phi_{j+1} , a' ; \theta))^2
$$

```python
    def calculate_loss(self, non_final_mask, non_final_next_states, state_batch, action_batch, reward_batch):
        """ Computes the loss for policy optimization. """
        q = self.policy_net(state_batch).gather(1, action_batch) # [batch_size, 1]
        q_next = torch.zeros(size=(self.batch_size , self.num_of_action), device=self.device)
        if non_final_next_states.size(0) > 0:
            with torch.no_grad():
                q_next_values = self.target_net(non_final_next_states).detach()
                q_next[non_final_mask.squeeze()] = q_next_values # Define Next Q value from next state , squeeze make dimension [batch_size , 1] to [batch_size]
        q_expected = (torch.max(q_next , dim=1)[0].unsqueeze(1) * self.discount_factor) + reward_batch # Find Maximum Q Value over action : Dimension
        loss = F.mse_loss(target=q_expected,input=q) # tensor(0.6990, device='cuda:0', grad_fn=<MseLossBackward0>)
        return loss
```

##### **Generate Sample**
- generate random sample(contain with state transition) with number of batch size to used for updating  

```python
    def generate_sample(self, batch_size):
        """
        Generates a batch sample from memory for training.

        Returns:
            Tuple: A tuple containing:
                - non_final_mask (Tensor): A boolean mask indicating which states are non-final.
                - non_final_next_states (Tensor): The next states that are not terminal.
                - state_batch (Tensor): The batch of current states.
                - action_batch (Tensor): The batch of actions taken.
                - reward_batch (Tensor): The batch of rewards received.
        """
        # Ensure there are enough samples in memory before proceeding
        # sample for training with batch size
        if len(self.memory) < batch_size:
            return None
        batch = self.memory.sample()         
        # ========= put your code here ========= #)
        state_batch = torch.stack([torch.tensor(batch[i].state, dtype=torch.float) for i in range(self.batch_size)]).to(self.device)
        next_states_batch = torch.stack([torch.tensor(batch[i].next_state, dtype=torch.float) for i in range(self.batch_size)]).to(self.device)
        action_batch = torch.stack([torch.tensor(batch[i].action, dtype=torch.int64) for i in range(self.batch_size)]).to(self.device)
        reward_batch = torch.stack([torch.tensor(batch[i].reward, dtype=torch.float) for i in range(self.batch_size)]).to(self.device)
        non_final_mask = torch.stack([torch.tensor(not batch[i].done, dtype=torch.bool) for i in range(self.batch_size)]).to(self.device)
        non_final_next_states = next_states_batch[non_final_mask]
        # Return All dimension : [batch_size , 1]
        return (non_final_mask.unsqueeze(1), non_final_next_states.squeeze(1), state_batch.squeeze(1), action_batch, reward_batch.unsqueeze(1))
```

##### **Update Policy Network**
- updating policy network using gradient descest by using calculated loss to step the policy

```python
    def update_policy(self):
        if self.memory.__len__() < self.batch_size:
            return
        sample = self.generate_sample(self.batch_size)
        if sample is None:
            return
        non_final_mask, non_final_next_states, state_batch, action_batch, reward_batch = sample
        loss = self.calculate_loss(non_final_mask, non_final_next_states, state_batch, action_batch, reward_batch) # tensor(0.7219, device='cuda:0', grad_fn=<MseLossBackward0>)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return loss.item()
```

##### **Update Target Network**
- updating target network with soft updating to make target network not correlate to policy network , we control ratio of policy network and target network weight

```python
    def update_target_networks(self):
        target_net_state_dict = self.target_net.state_dict() # get target network weights
        policy_net_state_dict = self.policy_net.state_dict()
        for key in target_net_state_dict:
            target_net_state_dict[key] = self.tau * policy_net_state_dict[key] + (1.0 - self.tau) * target_net_state_dict[key]
        self.target_net.load_state_dict(target_net_state_dict)
```


## **MC REINFORCE**

##### **Neural Network**

```python
class MC_REINFORCE_network(nn.Module):
    """ Neural network for the MC_REINFORCE algorithm. """

    def __init__(self, n_observations, hidden_size, n_actions, dropout):
        super(MC_REINFORCE_network, self).__init__()
        self.fc1 = nn.Linear(n_observations, hidden_size) # Input layer
        self.fc2 = nn.Linear(hidden_size, n_actions) # hidden layer
        self.softmax = nn.Softmax(dim=1)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        """ Forward pass through the network. """
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.softmax(x)
        return x

```

##### **Constructor**
- initial value in MC_REINFORCE class most variable is same as Linear Q Learning

```python
class MC_REINFORCE(BaseAlgorithm):
    def __init__(
            self,
            device = None,
            num_of_action: int = 2,
            action_range: list = [-2.5, 2.5],
            n_observations: int = 4,
            hidden_dim: int = 64,
            dropout: float = 0.5,
            learning_rate: float = 0.01,
            discount_factor: float = 0.95,
    ) -> None:
        """
        Initialize the CartPole Agent.
        """     
        self.LR = learning_rate

        self.policy_net = MC_REINFORCE_network(n_observations, hidden_dim, num_of_action, dropout).to(device)
        self.optimizer = optim.AdamW(self.policy_net.parameters(), lr=learning_rate)
        self.device = device
        self.steps_done = 0
        self.episode_durations = []
        super(MC_REINFORCE, self).__init__(
            num_of_action=num_of_action,
            action_range=action_range,
            learning_rate=learning_rate,
            discount_factor=discount_factor,
        )
```

##### **Calculate Return**
- calculate return from reward and discount from discount factor
```python
    def calculate_stepwise_returns(self, rewards):
        """
        Compute stepwise returns for the trajectory.

        Args:
            rewards (list): List of rewards obtained in the episode.
        
        Returns:
            Tensor: Normalized stepwise returns. # Dim = [1]
        """
        stepwise_return = 0
        stepwise_return_arr = []
        for r in reversed(rewards):
            stepwise_return = stepwise_return*self.discount_factor + r
            stepwise_return_arr.append(stepwise_return)
        tensor_norm = F.normalize(input=torch.tensor(list(reversed(stepwise_return_arr))),dim=0)
        return tensor_norm.tolist() # > tensor([-0.1740, -0.1021, 0.3525,  0.4109,  0.4675,  0.5201])

```