# Reinforcement Learning

The **agent** and **enviroment**

The environment is an entity that the agent can interact with, e.g. Pong game.

The agent controls the paddle to hit the ball back and forth. An agent can “interact” with the environment by using a predefined **action set**: $A = \{A_1, A_2, ... \}$ (all possible actions)

<div style="background-color: #54c7ec; color: #fff; font-weight: 700; padding-left: 10px; padding-top: 5px; padding-bottom: 5px"><strong>NOTE:</strong></div>
<div style="background-color: #f3f4f7; padding-left: 10px; padding-top: 10px; padding-bottom: 10px; padding-right: 10px">
<p>The goal of reinforcement learning algorithms is to teach the agent how to interact “well” with the environment so that the agent is able to obtain a good score under a predefined evaluation metric</p>
</div>

The agent receive a reward $r$ of $1$ when the ball hits the wall on the opposite side. 

At an arbitrary time step (a point at which observations can be made), $t$, the agent first observes the current state of the environment, $S_t$, and the corresponding reward value, $R_t$.

The agent then decides what to do next based on the state and reward information. The action the agent intends to perform, $A_t$, gets fed back into the environment such that we can obtain the new state $S_{t+1}$ and reward $R_{t+1}$.

$$(S_t,R_t) \rightarrow A_t \rightarrow (S_{t+1},R_{t+1})$$

The observation of the environment state $s$($s$ is a general representation of state regardless of time step $t$) from the agent’s perspective does not always contain all the information about the environment. 

If the observation only contains partial state information, the environment is *partially observable*. Nevertheless, if the observation contains the complete state information of the environment, the environment is *fully observable*.

The action $a$ ($a$ is a general representation of action regardless of time step $t$) is usually conditioned on the state $s$ to represent the behavior of the agent (Under assumption of fully observable environments.)

To provide feedback from the environment to the agent, a reward function $R$ generates an immediate reward $R_t$ according to the environment status and sends it to the agent at every time step. $R_t=R(S_t)$.

> Trajectory: 
> 
> A **trajectory** is defined: 
> 
> $\tau = (S_0, R_0,  A_0, S_1, R_1, A_1,...)$
> 
> A *trajectory*, being referred to also as an *episode*, is a sequence that goes from the initial state to the terminal state (for finite cases)

The initial state in a trajectory, $S_0$, is randomly sampled from the *start-state distribution*, denoted by $ρ_0$, in which:

$$S_0 \sim ρ_0(.)$$

The transition from a state (after taking an action) to the next state can be either *deterministic transition process* or *stochastic transition process*

For the *deterministic transition*, the next state $S_{t+1}$ is governed by a deterministic function:

$$S_{t+1} = f(S_t, A_t)$$

For the *stochastic transition* process, the next state $S_{t+1}$ is described as a probabilistic distribution:

$$S_{t+1} \sim p(S_{t+1}|S_t, A_t)$$

**Exploitation** means maximizing the agent performance using the existing knowledge, and its performance is usually evaluated by the expected reward. Given the actual knowledge, the agent doesn't take risk to explore.

The policy he took here is the **greedy policy**, which means the agent constantly performs the action that yields the highest expected reward based on current information, rather than taking risky trials which may lead to lower expected rewards.

**Exploration** means increasing existing knowledge by taking actions and interacting with the environment.

### Markov Decision Process

A Markov process (MP) is a *discrete stochastic process* with *Markov property*.

<center>
<img src="assets/markov-process-example.png" height=360>
</center>

This graph simulates how a person works on two tasks and goes to bed in the end.

If I'm doing the "Task 1" and exists the `30%` probability of stoping and go to play a "Game" and then, the probability of returning to do the "Task 1" is only `10%` and the probability of keep playing is `90%`.

Graphical Model of a Markov Process
<center>
<img src="assets/graphical-model-mp.png" height=80> </center>

$a \rightarrow b$ indicates the variable $b$ is depended on a vaiable $a$

The probabilistic graphical model can help us to have a more intuitive sense of the relationships between variables in reinforcement learning, as well as providing rigorous references when we derive the gradients with respect to different variables along the MP chains

MP follows the assumption of **Markov chain** (*memoryless property*) where the next state $S_{t+1}$
is only dependent on the current state $S_t$, with the probability of a state
jumping to the next state described as follows:

$$P(S_{t+1}|S_{t}) = P(S_{t+1}|S_0, ..., S_{t})$$

Also the *time homogeneus property*

$$P(S_{t+1}|S_{t}) = P(S_{t+2}|S_{t+1})$$

Given a finite *state set* $S$, we can have *state transition matrix* $P$

$ Ｓ= \{g, t_1, t_2, r, p, b\}$



<center>
<img src="assets/matrix-transition.PNG" height=170>
</center>

Where $P_{i,j}$ represents the probability of transferring the current state $S_i$ to the next state $S_j$.

$$ P(s=t_1|s=g)=10\%$$

The sum of each row must be equal to $1$ and the $P$ is always a square matrix.

A Markov Process can be represented by a tuple $<Ｓ, P>$

The next state is sample from $P$:

$$S_{t+1} \sim P_{S_t} $$

For continuous case a finite matrix can not be used to represent the transition

#### Markov Reward Process

We need to add feedback from the enviroment to the agent, so we extent

$<Ｓ, P>$ to $<Ｓ, P, R, \gamma>$, in which $R$ represent the *reward function* and $\gamma$ *reward discount factor*.

$$R_t = R(S_t)$$

So the Graphical Model of a Markov Process is updated to 
<center>
<img src="assets/graphical-model-mp-reward.png" height=170>
</center>

But, if we are considering to move to the next state, we should take account the rewards of the following next states that we will take in order to reach the $T$.

If a single trajectory $\tau$ has $T$ time steps, and the current time step $t=0$, then **the return** is the cummulative reward discounted by $\gamma \in (0,1)$  of a trajectory.

$$G_{t=0:T} = G_{t=0}^{(T)} = R(\tau)_{t=0}^{(T)} =  R_1 + \gamma R_2 + ... + \gamma^{T-1} R_T$$

<!-- $\gamma R_1 + ... + \gamma^{T} R_T$ is the discount factor of the future rewards that we get from the current time step $t=0$ following a set of actions ($a_1, a_2, ..., a_{T}$) -->

where $R_t$ is the **immediate reward** at time step $t$, and $T$ represents the time step of the terminal state and $r$ as a general representation of immediate reward value.

The discounted factor is especially critical when handling with infinite MRP cases

The value function $V(s)$ represents the expected return from the state $S_0$ that take the state value $s$ until final state.

<!-- $$V(s) = E[R_t|S_0=s]$$ -->
$$V(s) = E[G_{t=0}^{(T)}|S_0=s]$$

A simple way to estimate the $V(s)$ is Monte Carlo method, we can randomly sample a large number of trajectories starting from state $s$ according to the given state transition matrix $P$.

If the agent acts according to the policy $π$, we denote the *value function* as $V^π(s)$


<!-- tasks = ['Bed', 'Game', 'Pass', 'Rest', 'Task1', 'Task2']
rewards = {'Bed':0, 'Game':-1, 'Pass':10, 'Rest':1, 'Task1':-2, 'Task2':-2}

transition = np.array(
    [
        [1.0, 0.0, 1.0, 0.0, 0.0, 0.3],
        [0.0, 0.9, 0.0, 0.0, 0.3, 0.0],
        [0.0, 0.0, 0.0, 0.0, 0.0, 0.6],
        [0.0, 0.0, 0.0, 0.0, 0.0, 0.1],
        [0.0, 0.1, 0.0, 0.1, 0.0, 0.0],
        [0.0, 0.0, 0.0, 0.9, 0.7, 0.0]
    ]
)



for tk in  tasks:
    n_iter = 1
    print("V(S={})=".format(tk), end='')
    while n_iter<=10_000:    
        array = []
        task = tk
        target = 'Bed'
        G = 0
        gamma = 0.9
        t = 0
        while True:
            idx = tasks.index(task)
            reward = rewards[task]
            G = G + (gamma** t)*reward
            t+=1
            prob = transition[:, idx].copy()
            task = np.random.choice(tasks, p=prob)
            if task == target:
                array.append(G)
                n_iter+=1
                break

    print("{:.3f}".format(sum(array)/len(array)))} -->

In [1]:
import numpy as np
import matplotlib.pyplot as plt

In [179]:
gamma = 0.9
tasks = ['Bed', 'Game', 'Pass', 'Rest', 'Task1', 'Task2']
rewards = {'Bed':0, 'Game':-1, 'Pass':10, 'Rest':1, 'Task1':-2, 'Task2':-2}
value_function = {'Bed':0, 'Game':0, 'Pass':0, 'Rest':0, 'Task1':0, 'Task2':0}

transition = np.array(
    [
        [1.0, 0.0, 1.0, 0.0, 0.0, 0.3],
        [0.0, 0.9, 0.0, 0.0, 0.3, 0.0],
        [0.0, 0.0, 0.0, 0.0, 0.0, 0.6],
        [0.0, 0.0, 0.0, 0.0, 0.0, 0.1],
        [0.0, 0.1, 0.0, 0.1, 0.0, 0.0],
        [0.0, 0.0, 0.0, 0.9, 0.7, 0.0]
    ]
)

In [180]:
def get_trajectory(task:str, target:str):
    def trajectory(task:str, target:str, data:list):
        data.append(task)
        if task == target:
            return data
        else:
            idx = tasks.index(task)
            next_task =np.random.choice(tasks, p=transition[:, idx])
            return trajectory(next_task, target, data)

    return trajectory(task, target, [])

In [None]:
for task in tasks:
    n_iter = 10_000
    v = 0
    while n_iter > 0:
        # 1. Simulating trajectory
        simulated_trajectory = get_trajectory(task, 'Bed')

        # 2. Simulating rewards
        simulated_rewards = np.array([rewards[x] for x in simulated_trajectory])

        # 3. Computing return
        calculated_return = np.sum(
            np.cumprod(
                np.ones_like(simulated_rewards)*gamma)/gamma *simulated_rewards)
        v = v + calculated_return
        n_iter -= 1
    n_iter = 10_000
    value_function[task] = v/n_iter

In [191]:
value_function

{'Bed': 0.0,
 'Game': -5.922879947584755,
 'Pass': 10.0,
 'Rest': 3.9446244226534253,
 'Task1': -1.199615908058622,
 'Task2': 3.7855726236614347}

The agent policy usually selects the next state with higher value.

E.g. if we are in the *Game* state, we only have, two options: *Game* or *Task1*, we should choice *Task1* (since the higher value of $V(S=\text{Task1})$). And if we are in th *Task1* state, we would have two options: *Game* or *Task2*.  We should choice *Task2* (since the higher value of $V(S=\text{Task2})$) and so on.

$$\text{Game} \rightarrow \text{Task1} \rightarrow \text{Task2} \rightarrow \text{Pass} \rightarrow \text{Bed}$$

#### Markov Decision Process (MDP)

The actions taken above only depend on the expected value of discounted reward  given a state. The action that maximize this expected value is taken. 

But we do more granularity this expected value and compute at level of action

In the tuple $$< Ｓ, P, R, \gamma>$$ we add $Ａ$

$$< Ｓ, Ａ, P, R, \gamma>$$

$$ P(s'|s,a) = P(S_{t+1} = s'|S_{t}=s, A_t=a)$$

Where $Ａ$ represent *finite action set* $\{a_0, a_1, a_2, a_3, ..., a_{T-1}\}$ and the immediate reward becomes:

$$R_{t+1} = R(S_t=s, A_t=a, S_{t+1}=s')$$

<!-- Reward Function ($ R(s, a, s') $):  -->
The immediate reward received when transitioning from state $ s $ to state $ s' $ after action $ a $.


A *policy* $\pi$ represents the way in which the agent behaves based on its observations of the enviroment.

$$\pi(a|s) = p(A_t=a|S_t = s)$$

**Expected return** is the expectation of returns over all possible trajectories under a policy. Therefore, **the goal of reinforcement learning is to find the higher expected return by optimizing the policy**.

The probability of the T-step trajectory for MDP is:
Based on a behavior of the agent will generate a path or trajectory.

$$p(\tau|\pi)_{t=0}^{T-1} = p_0(S_0) \prod_{t=0}^{T-1} p(S_{t+1}|S_t, A_t)\pi(A_t|S_t)$$

If is given the initial state $S_0$, the probability of the T-step trajectory for MDP will updated to:

$$ p(\tau|\pi)_{t=0}^{T-1} = \prod_{t=0}^{T-1} p(S_{t+1}|S_t, A_t)\pi(A_t|S_t)$$


Given state $S_t$ and action $A_t$, if only exists way to go state $S_{t+1}$ the probability of the T-step trajectory is reduced to:

$$ \prod_{t=0}^{T-1}p(S_{t+1}|S_t, A_t)\pi( A_t|S_t) =  \prod_{t=0}^{T-1}p(S_{t+1}|S_t, A_t)$$

<!-- $$p(S_{t+1}|S_t, A_t)\pi( A_t|S_t) = \prod_{t=0}^{T-1} \pi( A_t|S_t)$$ -->

Given the reward function $R$ and all possible trajectories $τ$, the expected return $J(π)$ starting from time step $t=0$ is defined as follows


$$J(\pi) = \sum_{\tau} P(\tau|\pi)_{t=0}^{T-1} R(\tau)_{t=0}^{T} = E_{\tau \sim \pi}[R(\tau)_{t=0}^{T}]$$


The RL optimization problem is to improve the policy for maximizing the expected return with optimization methods. 

The **optimal policy** $π^∗$ can be expressed as

$$\pi^* = \argmax_{\pi} J(\pi)$$

Given policy $\pi$, the **value function** $V(s)$ can be defined as:

$$V^{\pi} (s) = E_{\tau \sim \pi}[R(\tau)_{t}^{T}|S_{t}=s]$$

where $τ ∼ π$ means the trajectories $τ$ are sampled given the policy $π$

In MDP, given an action, we have the **action-value function** $Q^\pi(s, a)$, which depends on both the state and the action just taken

$$Q^{\pi} (s, a) = E_{\tau \sim \pi}[R(\tau)_{t}^{T}|S_{t}=s, A_{t}=a]$$

We need to keep in mind that the $Q^π(s, a)$ depends on $π$, as the
estimation of the value is an expectation over the trajectories by the policy
$π$. This also indicates if the $π$ changes, the corresponding $Q^π(s, a)$ will also change.

We therefore usually call the value function estimated with a specific policy *the on-policy value function* (with lower case $q$), for the distinction from
the optimal value function estimated with the optimal policy.

$$q_{\pi}(s,a) = E_{\tau \sim \pi}[R(\tau)_{t}^{T}|S_t=s, A_t=a]$$

$$v_{\pi}(s) = E_{a \sim \pi}[q_{\pi}(s,a)] $$

$$v_{\pi}(s) = \sum_{a} \pi(a|s)  q_{\pi}(s,a) $$
<!-- =E_{\tau \sim \pi}[R(\tau)|S_0=s] -->

We can use *Monte Carlo method* to estimate

Bellman Equation for **Value Function**

Given state $s$ in time step $t$ ($s'$ in time step $t+1$) with a policy $\pi$, and the all trajectories $\tau$ upto time step $T$ with respective rewards, we can define  the value function  $v^{\pi} (s)$

$$v^{\pi} (s) = E_{\tau \sim \pi}[R(\tau)_{t=t}^{T}|S_{t}=s] = \sum_{\tau} P(\tau|\pi)_{t=t}^{T-1} R(\tau)_{t=t}^{T} $$



We can split the above into 
1.  $P(\tau|\pi)_{t=t}^{T-1} =  P(s'|s, a) \pi(a|s) P(\tau|\pi)_{t=t+1}^{T-1}$

2. $R(\tau)_{t=t}^{T} = R_{t+1} + \gamma R(\tau)_{t=t+1}^{T}$ (*Return* after take action $a$ in the time step $t$)


$$v^{\pi} (s) = \sum_{\tau} P(s'|s,a) \pi(a|s) P(\tau|\pi)_{t=t+1}^{T-1} [R_{t+1} + \gamma R(\tau)_{t=t+1}^{T}]$$

$$v^{\pi} (s) = \sum_{s'} P(s'|s,a) \sum_{a} \pi(a|s) \sum_{\tau} P(\tau|\pi)_{t=t+1}^{T-1} [R_{t+1} + \gamma R(\tau)_{t=t+1}^{T}]$$

$$v^{\pi} (s) = \sum_{s'} P(s'|s,a) \sum_{a} \pi(a|s)[R_{t+1} + \gamma \sum_{\tau} P(\tau|\pi)_{t=t+1}^{T-1} R(\tau)_{t=t+1}^{T}]$$

$$v^{\pi} (s) = \sum_{s'} P(s'|s,a) \sum_{a} \pi(a|s)[R_{t+1} + \gamma v^{\pi} (s')]$$


Bellman Equation for **Value Action Function**

Let be state $s$ in time step $t$ (state $s'$ and action $a'$ in time step $t$) with a policy $\pi$, and the all trajectories $\tau$ upto time step $T$ with respective rewards, we can define  the value function  $q^{\pi} (s, a)$

$$q^{\pi} (s, a) = E_{\tau \sim \pi}[R(\tau)_{t=t+1}^{T}|S_{t}=s, A_{t}=a] = \sum_{\tau} P(\tau|\pi)_{t=t+1}^{T-1} R(\tau)_{t=t+1}^{T} $$



We can split the above into 
1.  $P(\tau|\pi)_{t=t+1}^{T-1} =  P(s'|s,a) \pi(a|s) P(\tau|\pi)_{t=t+1}^{T-1}$

2. $R(\tau)_{t=t+1}^{T} = R_0 + \gamma R(\tau)_{t=t+1}^{T}$

3. Since $s$  and $a$ are given, $\pi(a|s) = 1$


$$q^{\pi} (s,a) = \sum_{\tau} P(s'|s,a) \pi(a|s) P(\tau|\pi)_{t=t+1}^{T-1} [R_{t+1} + \gamma R(\tau)_{t=t+1}^{T}]$$

$$q^{\pi} (s,a) = \sum_{s'} P(s'|s,a) \sum_{\tau} P(\tau|\pi)_{t=t+1}^{T-1}[ R_{t+1} + \gamma R(\tau)_{t=t+1}^{T}]$$

$$q^{\pi} (s,a) = \sum_{s'} P(s'|s,a) [R_{t+1} + \gamma \sum_{\tau} P(\tau|\pi)_{t=t+1}^{T-1} R(\tau)_{t=t+1}^{T}]$$

$$q^{\pi} (s, a) = \sum_{s'} P(s'|s,a) [R_{t+1} + \gamma v^{\pi} (s')]$$

$$q^{\pi} (s, a) = \sum_{s'} P(s'|s,a) [R_{t+1} + \gamma \sum_{a'} \pi(a'|s') q^{\pi} (s',a')]$$

<!-- #### Bellman Ecuation and Optimality

Assumptions
- Let $x_t$ be the state at same time $t$
- The initial decision begin at $t=0$, so the intial state is $x_0$
- The set of available actions  that depends on current state $a_t \in \Gamma(x_t)$
- The next state after taken the action $a_t$ is $x_{t+1}=T(x_t, a_t)$
- The payoff from taking the action $a_t$ is $F(x_t,a_t )$
- Discount factor $0< \beta<1$

$V(x_0)$ denote the *optimal value* that can be obtained by maximizing this *objetive function* subject to contraints.

$$V(x_0) = \max_{\{ a_t\}_{t=0}^{\infty}} \sum_{t=0}^{\infty} \beta^t F(x_t, a_t)$$

*Principle of Optimality*: An optimal policy has the property that whatever the initial state and initial decision are, the remaining decisions must constitute an optimal policy with regard to the state resulting from the first decision. (See Bellman, 1957, Chap. III.3.)
$$V(x_0) = \max_{\{ a_0\}}[ F(x_0, a_0) +  \max_{\{ a_t\}_{t=1}^{\infty}}\sum_{t=1}^{\infty} \beta^t F(x_t, a_t)]$$

$$V(x_0) = \max_{\{ a_0\}}[F(x_0, a_0)+V(x_1)]$$

It reads from inner to outer, first maximize from the step $t=1$ to next, then add the payoff of the initial state and maximize it.  -->


#### Optimal Value Functions

Since on-policy **value functions** are estimated with respect to the policy
itself, different policies will lead to different value functions, even for the
same set of states and actions. Among all those different value functions,
we define the optimal value function as

$$v_*(s) = \max_{\pi} v_{\pi}(s)$$

For **action-value function**

$$q_{*}(s, a) = \max_{\pi} q_{\pi}(s, a)  $$

We will update the policy $\pi$ such that it converges to the optimal policy. We will update the matrix transition.

<!-- #### Bellman Optimality Equation

Bellman equation for optimal value functions (*Optimal value function* and *Optimal action-value function*)

We choose the action (in which we can choose) that maximizes the expected return.

Bellman equation for optimal value function

$$v^{\pi}_{*} (s) = \max_{a} \sum_{s'} P(s'|s,a) \sum_{a} \pi(a|s)[R_{t+1} + \gamma \max_{a'} v^{\pi}_{*} (s')]$$

Bellman equation for optimal action-value function

The action $a$ is given for time $t$ so we can not maximize it.

$$q^{\pi}_{*} (s, a) = \sum_{s'} P(s'|s,a) [R_{t+1} + \gamma \sum_{a'} \pi(a'|s') \max_{a'} q^{\pi}_{*} (s',a')]$$ -->

#### Dynamic Programming

"Dynamic" means that the problem has sequential or temporal components.

"Programming" 

DP solves a complex problem by breaking it down into smaller subproblems.

<!-- ### 📘 Full Explanation of **Dynamic Programming (DP)** in Reinforcement Learning (RL)

**Dynamic Programming (DP)** is a fundamental technique in **Reinforcement Learning (RL)** used to solve problems where the agent can model the environment completely — that is, when the agent knows all the **transition probabilities** and **reward functions**.

It is one of the earliest and most classical approaches to solving **Markov Decision Processes (MDPs)**. -->

<!-- ---

## 🔍 What Is Dynamic Programming?

In the context of RL:

> **Dynamic Programming (DP)** refers to a collection of algorithms that can compute optimal policies **given a perfect model of the environment**, described as a **Markov Decision Process (MDP)**.

--- -->

Prerequisites for Using DP

To apply DP methods in RL, you need:

1. **Full knowledge of the environment**
   - Transition probability $ P(s' | s, a) $
   - Reward function $ R(s, a, s') $
2. The problem must be expressible as an **MDP**
3. Sufficient computational resources (DP scales poorly with large state spaces)

Two properties that a *problem* must have for DP to be applicable:
1. *Optimal substructure*: Optimal solution can be decomposed into optimal solutions for its sub-problems:
2. *Overlapping sub-problems*: implies that the number of sub-problems is finite and the problem ocurr recursively so sub-solutions can be cached and reused.

> Recursion is a programming technique that uses functions to solve problems by breaking them down into smaller, more manageable problems. It's a method that involves a function calling itself repeatedly until a base case is reached

Model a problem as an MDP and using Bellman's recursion equation allow us meet the two properties of DP.



<!-- --- -->

<!-- ## 🧠 Core Concepts in DP for RL

| Concept | Description |
|--------|-------------|
| **Policy Evaluation** | Estimating the value function $ V^\pi(s) $ for a given policy $ \pi $ |
| **Policy Improvement** | Improving a policy using the value function |
| **Policy Iteration** | Alternating between policy evaluation and improvement |
| **Value Iteration** | Directly computing the optimal value function without explicitly maintaining a policy | -->

<!-- Markov Decision Process (MDP) Recap

An MDP is defined by:

- A set of **states**: $ \mathcal{S} $
- A set of **actions**: $ \mathcal{A} $
- A **transition probability function**:  
  $ P(s' | s, a) = \text{Probability of moving to state } s' \text{ from state } s \text{ after action } a $
- A **reward function**:  
  $ R(s, a, s') = \text{Expected reward received after transitioning to } s' \text{ from } s \text{ via } a $
- A **discount factor**:  
  $ \gamma \in [0, 1] $ — how much future rewards are valued compared to immediate ones

---

Bellman Equations: Foundation of DP

DP relies on recursive equations known as **Bellman equations**.

Value Function Under a Policy $ \pi $

$$
V^\pi(s) = \sum_{a} \pi(a|s) \sum_{s'} P(s' | s, a) \left[ R(s, a, s') + \gamma V^\pi(s') \right]
$$

This equation expresses the value of being in a state $ s $ under policy $ \pi $ as the expected sum of current and discounted future rewards. -->

<!-- --- -->



Types of Dynamic Programming Algorithms

**Policy Evaluation**

- **Goal**: Evaluate how good a policy is — estimate its value function.
- **Method**: Iteratively apply the Bellman expectation backup until convergence:
  <!-- $$
  V_{k+1}(s) = \sum_a \pi(a|s) \sum_{s'} P(s'|s,a)\left[R_0 + \gamma V_k(s')\right]
  $$ -->

  $$
  V(s) \Leftarrow \sum_a \pi(a|s) \sum_{s'} P(s'|s,a)\left[R_{t+1} + \gamma V(s')\right]
  $$

  If the taken action only have one possible next state, then the Bellman expectation backup can be simplified to:

  $$
  V(s) \Leftarrow  \sum_{s'} P(s'|s, a)\left[R_{t+1} + \gamma V(s')\right]
  $$


**Policy Improvement**

- **Goal**: Improve a policy based on its value function.
- **Method**: For each state $ s $, choose the action that maximizes expected return:
  $$
  \pi'(s) = \arg\max_a \sum_{s'} P(s'|s,a)\left[R_{t+1} + \gamma V^\pi(s')\right]
  $$

**Policy Iteration** 
<!-- or *generalize policy iteration* (GPI) -->

- **Goal**: Find the optimal policy through repeated policy evaluation and improvement.
- **Steps**:
  1. Initialize a random policy $ \pi_0 $
  2. While policy not converged:
     - Evaluate $ V^{\pi}(s) $
     - Improve $ \pi $ using $ V^{\pi} $
- **Result**: Converges to the **optimal policy** $ \pi^* $

**Value Iteration**

- **Goal**: Compute the optimal value function directly without needing to evaluate intermediate policies.
- **Method**:
  <!-- $$
  V_{k+1}(s) = \max_a \sum_{s'} P(s'|s,a)\left[R_0 + \gamma V_k(s')\right]
  $$ -->

  $$
  V(s) \Leftarrow \max_a \sum_{s'} P(s'|s,a)\left[R_{t+1} + \gamma V(s')\right]
  $$
- After convergence, extract the optimal policy:
  $$
  \pi^*(s) = \arg\max_a \sum_{s'} P(s'|s,a)\left[R_{t+1} + \gamma V^*(s')\right]
  $$


Advantages of Dynamic Programming

| Advantage | Description |
|----------|-------------|
| **Guaranteed Convergence** | If the MDP is finite and the updates are done correctly, DP converges to the optimal solution |
| **Mathematically Exact** | Solves Bellman equations exactly (within numerical precision) |
| **Basis for Approximate Methods** | Many modern RL algorithms (like Q-learning, DQN) are inspired by DP concepts |


Disadvantages of Dynamic Programming

| Disadvantage | Description |
|--------------|-------------|
| **Requires Full Model** | Needs full knowledge of transitions and rewards — rarely available in real-world scenarios |
| **Computationally Expensive** | Infeasible for large state spaces (curse of dimensionality) |
| **Not Suitable for Partial Observability** | Assumes the environment is fully observable (MDP), not applicable to POMDPs |


In [1]:
import numpy as np
import random
from pprint import pprint
import time
import pandas as pd

# Example of a grid world environment
# Method: Every Visit

class Env:
    def __init__(self):
        self.n_episodes = 1000
        self.n_states = 16
        self.n_actions = 4
        self.gamma = 0.9
        self.theta = 1e-6
        self.target_state = (0, 2)
        self.value = np.zeros((self.n_states//4, self.n_states//4))
        self.prob = np.ones((self.n_states//4, self.n_states//4, self.n_actions), 
                            dtype=np.int32)
        self.states = [(i, j) for i in range(4) for j in range(4)]
        self.actions = np.arange(self.n_actions)
        self.blocked_states_01 = [(0, 1), (0, 2), (0, 3)]
        self.blocked_states_02 = [(2, 0), (2, 1), (2, 2)]
        self.state = (3, 0)
        self.rewards = np.ones((self.n_states//4, self.n_states//4), dtype=np.int32) * -1
        self.rewards[self.target_state] = 5
        # for state in self.blocked_states_01:
        #    self.rewards[state] = -10
        # for state in  self.blocked_states_02:
        #    self.rewards[state] = -10
    
    def reset(self):
        self.state = (3, 0)
        # self.value = np.zeros((self.n_states//5, self.n_states//5))
        # return self.state
    
    def show_action(self, action:int):
        if action == 0: 
            return "→"
        elif action == 1: 
            return "←"
        elif action == 2: 
            return "↓"
        elif action == 3: 
            return "↑"

    def policy(self):
        return random.choice(range(4))
        
    def step(self, action):
        x, y = self.state
        if action == 0: y += 1  # Right
        elif action == 1: y -= 1  # Left
        elif action == 2: x += 1  # Down
        elif action == 3: x -= 1  # Up
        # The the agent keep in the same state if 
        # the action inplies over the max or min values of the enviroment
        next_state = (max(0, min(x, 3)), max(0, min(y, 3)))

        # next_reward = 1 if next_state == self.goal else -0.01
        # reward = 1 if self.state == self.goal else -0.01
        reward = self.rewards[next_state]
        # reward = self.rewards[self.state]
        # reward = self.rewards[next_state] - 5 if self.state == next_state else self.rewards[next_state]
        # reward, next_reward,
        # done = next_state == self.goal
        return next_state, reward #,  done
    
    def extract_optimal_policy(self):
        data = np.empty_like(self.value)
        data = data.astype(np.object_)
        for state in self.states:
            prob = self.prob[state]/sum(self.prob[state])
            action = np.argmax(prob)
            data[state] = self.show_action(action)
        print(data)

In [2]:
def get_policy_iteration():
    env = Env()
    converge = False
    start = time.time()
    
    # 1. Policy Evaluation
    # Evaluate for all states and actions
    while not converge:
        
        delta = 0
        for state in env.states:
            values = []
            prob = env.prob[state]/np.sum(env.prob[state])
            env.state = state
            for action in env.actions:
                next_state, reward = env.step(action)
                values.append(prob[action]*( 
                    reward + 
                    env.gamma * env.value[next_state]))
            
            delta = max(delta, abs(sum(values) - env.value[state]))
            env.value[state] = sum(values)
        if delta < env.theta:
            # print('Break Evaluation')
            # break
            # 2. Policy Improvement
            # 2.1. Update Policy
            delta = 0
            
            for state in env.states:
                values = []
                prob = env.prob[state]/sum(env.prob[state])
                env.state = state
                for action in env.actions:
                    
                    next_state, reward = env.step(action)
                    values.append(prob[action]*( 
                            reward +  
                            env.gamma * env.value[next_state]))
                action_max = np.argmax(values)
                env.prob[state][action_max]+=1
                delta = max(delta, abs(
                    env.prob[state][action_max]/sum(env.prob[state]) - prob[action_max])
                    )
            
            if delta < 0.001:
                # print('Break Policy')
                converge=True
            else:
                pass

    end = time.time()
    print('Time: {:0>2}:{:0>2}:{:0>2}'.format(round((end-start)//60, 0),  round(((end-start)%60)//1, 0), round((end-start)%1, 0)))
    return env

def get_value_iterarion():
    env = Env()
    converge = False
    
    # 1. Policy Evaluation
    # Evaluate for all states and actions
    start = time.time()
    while not converge:
        
        delta = 0
        for state in env.states:
            values = []
            prob = env.prob[state]/np.sum(env.prob[state])
            env.state = state
            for action in env.actions:
                # env.state = state
                next_state, reward = env.step(action)
                values.append(prob[action]*( 
                    reward + 
                    env.gamma * env.value[next_state]))
                
            action_max=np.argmax(values)
            next_state, reward = env.step(action_max)
            value = prob[action_max]*(
                    reward + 
                    env.gamma * env.value[next_state])
            
            delta = max(delta, abs(value - env.value[state]))
            env.value[state] = value
            env.prob[state][action_max]+=1
        if delta < env.theta:
            converge=True
            # print('Break Evaluation')
            break
    end = time.time()
    print('Time: {:0>2}:{:0>2}:{:0>2}'.format(int(round((end-start)//60, 0)),  
                                               int(round(((end-start)%60)//1,0)), 
                                               int(round((end-start)%1, 0))))
    return env

In [3]:
policy_iteration = get_policy_iteration()
policy_iteration.extract_optimal_policy()

value_iteration = get_value_iterarion()
value_iteration.extract_optimal_policy()

Time: 0.0:0.0:1.0
[['→' '→' '↑' '←']
 ['→' '↑' '↑' '↑']
 ['↑' '↑' '↑' '↑']
 ['↑' '↑' '↑' '↑']]
Time: 00:08:01
[['→' '→' '↑' '←']
 ['↑' '↑' '↑' '←']
 ['↑' '↑' '↑' '←']
 ['↑' '↑' '↑' '←']]


#### Monte Carlo

- MC doesn't require full knowledge of the environment, only a little prior knowledge about it, then using sampling (implies it has a component of uncertainty) we can create experiences to generate learning.
- When using MC in reinforcement learning, we will
average the returns for each state-action pair from different episodes
- Initial estimations are not good, but they improve as we get more data
- Also, we assume that the problem is episodic and an episode will always terminate regardless of the actions taken.

#### Sample Average Method

The estimated value of action a at time $t$, denoted $Q_t(s, a)$, is computed as the average of all rewards received so far when action $a$ was taken.

$Q_t(s, a) = (\text{Sum of rewards when action a was taken prior to time t}) / (\text{Number of times action a was taken prior to time t}) $

<!-- $$Q_k(s,a) = \frac{G_1 + G_2 + ..., G_{k-1}}{k-1} $$ -->
$$Q_t(s,a) = \frac{G₁ + G₂ + ... + G_{t-1}}{N_{t-1}(s,a)}$$

❓ Why do we say “prior to time $t$” when defining $Q_t(s,a)$?

This is not arbitrary — it’s a deliberate convention to ensure we’re using only past data to make decisions at time $t$.

When the agent is about to act at time $t$, it must choose $A_t$ using the knowledge it has so far — that is, from time steps $1$ through $t−1$. It hasn’t seen $R_t$ yet — that reward comes after taking the action.

Why Use Sample Average?

> - *Unbiased Estimator*: By the Law of Large Numbers, as the number of samples → ∞, Q_t(a) → Q*(a).
> - *No Hyperparameters (for step size)*: The step size 1/n is determined by the number of samples — no need to tune a learning rate.

##### Incremental Monte Carlo Prediction

Exists a more efficient computational method that allow us get rid of the list of observed returns and simplify the mean calculation step.

Using the estimation $Q_t(s,a)$ we can chose the best action $a$ at time step $t$, and then update the estimation $Q_{t+1}(s,a)$ after receiving the reward $R_t$.
<!-- Let $Q(S_k, A_k)$ be the estimation of the state-action value after it ( the tuple $(S, A)$) has been selected for $k-1$ times can be rewitten as: -->

$$Qₜ₊₁(s,a) = Qₜ(s,a) + (1/N_{t}(s,a)) × (Gₜ − Qₜ(s,a))$$

<!-- $$Q_{k} = \frac{G_1 + G_2 + ..., G_{k-1}}{k-1} $$ -->

<!-- It is only the estimation of $Q_{k}$, since the actual value is $Q_{k} = \frac{G_1 + G_2 + ..., G_{k}}{k} $ -->

<!-- This can be computed by the following:

$$Q_{k+1} = \frac{1}{k} \sum_{i=1}^{k}G_i$$
$$=\frac{1}{k}(G_k +  \frac{(k-1)}{(k-1)}\sum_{i=1}^{k-1}G_i)$$
$$=\frac{1}{k}(G_k +  (k-1)Q_{k})$$
$$=Q_{k} + \frac{1}{k}(G_k - Q_{k})$$

More general form: -->

$$ \text{NewEstimate} \Leftarrow \text{OldEstimate} + \text{StepSize}.(\text{Target} - \text{OldEstimate}) $$

The “StepSize” is a parameter that controls how fast the estimate is being updated.

##### State-Value Prediction

*Task*: estimate the state-value function for a given policy $π$, that means averaging the returns from a particular policy. But we can use incremental update to estimate the state-value function.

There are two types of estimations, *First-visit MC* and *every-visit MC*. The *First-visit MC* only considers the return of the first visit to state $s$ in the whole episode, however, every-visit MC considers every visit to state $s$ in the episode

**Policy Iteration** (Monte Carlo Control) for MC

Policy iteration is a method for finding the optimal policy in a Markov decision process (MDP). It works by iteratively improving the policy until it converges to the optimal policy.

*Policy iteration* has two steps:

1. *Policy Evaluation*: Compute the value function for the current policy (using sample average method, ).
2. *Policy Improvement*: Improve the policy based on the value function.

To improve the policy we will use the greedy policy. The greedy policy will always choose the *action* that
has maximal action-value function for a given *state*.

$$\pi(s) = \argmax_a q(s, a) $$

For each policy improvement, we will need to construct $π_{t+1}$ based on $\pi_t$ .


$$ \pi_{t+1}(s) =\argmax_a q_{\pi_t}(s, a) $$

<!-- $$q_{\pi_t}(s, \pi_{t+1}(s)) = \argmax_a q(s, a) \ge q_{\pi_t}(s, \pi_t(s)) $$ -->



In [423]:
import numpy as np
import random
from pprint import pprint
import time
import pandas as pd
from itertools import product

In [493]:
class Env:
    def __init__(self):
        self.n_episodes = 1000
        self.n_states = 16
        self.n_actions = 5
        self.gamma = 0.9
        self.theta = 1e-6
        self.epsilon_decay_rate = 0.995
        self.epsilon = 1.0
        self.states = [(i, j) for i in range(4) for j in range(4)]
        self.actions = np.arange(self.n_actions)
        self.target_state = (0, 2)
        self.state = (
            random.choice(range(0,3)), 
            random.choice(range(0,3))
            )
        # (3, 0)
        self.q_values = np.zeros((self.n_states//4, self.n_states//4, self.n_actions))
        self.counts = np.ones((self.n_states//4, self.n_states//4, self.n_actions))
        self.rewards = np.ones((self.n_states//4, self.n_states//4), dtype=np.int32) * -1
        self.rewards[self.target_state] = 1
        self.selected = np.ones(self.n_actions)
    
    def reset(self):
        self.state = (
            random.choice(range(0,3)), 
            random.choice(range(0,3))
            )
    
    def show_action(self, action:int):
        if action == 0: 
            return "→"
        elif action == 1: 
            return "←"
        elif action == 2: 
            return "↓"
        elif action == 3: 
            return "↑"
        elif action == 4: 
            return "*"
        
    def available_actions(self):
        x, y = self.state
        actions = []
        if (y+1)<=3:
            actions.append(0)
        if (y-1)>=0:
            actions.append(1)
        if (x+1)<=3:
            actions.append(2)
        if (x-1)>=0:
            actions.append(3)
        if self.state == self.target_state:
            actions.append(4)
        return actions

    def policy(self):
        actions = self.available_actions()
        if random.random() < self.epsilon:
            return random.choice(actions)
        else:
            # Policy Improment (Greedy)
            q_values = self.q_values[self.state].copy()
            q_values = [q_value if idx in actions else -float('inf') for idx, q_value in enumerate(q_values)]
            action = int(
                    round(np.argmax(q_values), 0))
            # self.selected[action]+=1
            # prob = self.selected/self.selected.sum()
            return action # np.random.choice(self.actions, p=prob)
        
    def step(self, action:int):
        x, y = self.state
        if action == 0: y += 1  # Right
        elif action == 1: y -= 1  # Left
        elif action == 2: x += 1  # Down
        elif action == 3: x -= 1  # Up
        elif action == 4: pass # Keep
        # The the agent keep in the same state if 
        # the action inplies over the max or min values of the enviroment
        next_state = x, y # (max(0, min(x, 3)), max(0, min(y, 3)))
        reward = self.rewards[next_state]
        # if x>3 or y>3 or x<0 or y<0:
        #    reward-=1
        if (self.state == self.target_state) and (action == 4):
            reward+=0.5
        done = False
        if next_state == self.target_state:
            done=True
        return next_state, reward,  done

In [494]:
env = Env()
n_episodes = env.n_episodes
trajectory = []

while n_episodes > 0:
    env.reset()
    # state = env.state
    while True:
        action = env.policy()
        # print(env.state, action, env.available_actions())
        next_state, reward, done = env.step(action)
        trajectory.append((env.state ,action, reward))
        env.state = next_state # type: ignore
        if done:
            break

    # Policy Evaluation (Update the Q-values)
    G = 0
    visited = []
    for item in reversed(trajectory):
        state, action, reward = item
        G = reward + env.gamma * G
        if (state, action) not in visited:
            env.q_values[state][action] =  (
                env.q_values[state][action] + (
                    1/env.counts[state][action])*(G - env.q_values[state][action]))
            env.counts[state][action] +=1
    n_episodes-=1
    env.epsilon*=env.epsilon_decay_rate
    
zeros = np.empty((env.n_states//4, env.n_states//4), dtype=np.object_)

for state in product([0, 1, 2, 3],
                     [0, 1, 2, 3]):
    
    env.state = state
    actions = env.available_actions()
    q_values = env.q_values[state].copy()
    q_values = [q_value if idx in actions else -float('inf') for idx, q_value in enumerate(q_values)]
    action = int(round(np.argmax(q_values),0))
    zeros[state] = env.show_action(action)
print(zeros)

[['→' '→' '*' '←']
 ['↑' '↑' '↑' '↑']
 ['↑' '→' '↑' '↑']
 ['→' '→' '↑' '↑']]


#### Temporal Difference Learning

- Temporal Difference (TD) combines ideas from DP and MC.

- Similar to DP, TD use bootstrapping in the estimation (updating estimates based on other estimates (we use $R_{t} + \gamma V_t({s'})$ instead of $G_t$), rather than waiting for the final outcome), however, like MC, it does not require full knowegde of the enviroment in the learning process, but applies a sampling-based optimization approach.

TD utilizes the error, the difference between the *target value* and the *estimate value* at different time step TD(0).

$$V_{t}(s) \leftarrow V_t(s) + \alpha[R_t(s,a) + \gamma V_t({s'}) - V_t(s)]$$

If we observe carefully, the target value during update for MC is $G_t$ which is known only after one episode, so we need to wait for the final outcome to update the estimation, however, for TD we use bootstrapping (*updating estimates based on other estimates*).

An estimate can be an metric that summarize the values of $G$ previously at $t$ and previous episodes.
like $R_t(s,s') + \gamma V_t(s')$. when I take action $a$ at state $s$ we receive the immediate reward $R_t(s,s')$ and move to state $s'$, then we can use the estimation of $V_t(s')$ to estimate the return from state $s'$ and no wait for the final outcome.

This reduce variance of the estimator


**Sarsa: On-Policy TD Control**

The update rule can, therefore, be framed as

$$Q_t(s, a) \leftarrow Q_t(s, a) + \alpha[R_{t}(s,s') + \gamma Q_t(s', a') - Q_t(s, a)]$$





<table>
    <thead>
        <tr>
            <th>Aspect</th>
            <th>On-Policy (e.g., SARSA)</th>
            <th>Off-Policy (e.g., Q-Learning)</th>
        </tr>
    </thead>
    <tbody>
        <tr>
            <td><strong>Behavior vs. Target Policy</strong></td>
            <td>Same policy for learning and acting (&#949-greede policy for both, for example)</td>
            <td>Different policies for learning and acting (&#949-greede policy for one and Max for another, for example)</td>
        </tr>
        <tr>
            <td><strong>Exploration</strong></td>
            <td>Directly updates using exploratory actions</td>
            <td>Updates using the greedy (optimal) action</td>
        </tr>
        <tr>
            <td><strong>Learning Stability</strong></td>
            <td>More stable and robust</td>
            <td>Can be unstable without proper safeguards</td>
        </tr>
        <tr>
            <td><strong>Learning Speed</strong></td>
            <td>Slower due to learning from exploratory actions</td>
            <td>Faster because it learns the optimal policy</td>
        </tr>
        <tr>
            <td><strong>Sample Efficiency</strong></td>
            <td>Lower</td>
            <td>Higher</td>
        </tr>
        <tr>
            <td><strong>Use Case</strong></td>
            <td>Evaluating or improving the current behavior</td>
            <td>Learning the best policy, even while exploring</td>
        </tr>
    </tbody>
</table>

**Q-Learning: Off-Policy TD Control**

Q-learning is an off-policy TD method that is very similar to Sarsa and plays
an important role in deep reinforcement learning application such as the
deep Q-network

At step $t$


$$Q_t(s, a) \leftarrow Q_t(s, a) + \alpha[R_{t}(s,a) + \gamma \max_a(Q_t(s', a)) - Q_t(s, a)]$$

The main difference that Q-learning has from Sarsa is that the target value now is no longer dependent on the policy being used (how the action is chosen) but only on the state-action function (max).

In [None]:
# ----------------------------------------------------
# ----------- Q-LEARNING IMPLEMENTATION --------------
# ----------------------------------------------------

# Initialize environment and Q-table
env = Env()

# Initialize Q-table
# state:[0, 0, 0, 0], point out the Q-values are
# zeros for each state and for each action.
# The index of [0, 0, 0, 0] represent the actions

# Training parameters
learning_rate = 0.1

for episode in range(500):
    env.reset()
    while True:
        action = env.policy()
        next_state, reward, done = env.step(action)
        # 1. Let `state` be the current state
        # 2. Based on this we take the action `action`
        # 3. This return `next_state` and its `reward` 
        # 4. Instead take the same action (like SARSA), take the action that max Q
        #    not like current policy
        env.q_values[env.state][action] += learning_rate * (
            reward + env.gamma * max(env.q_values[next_state]) - env.q_values[env.state][action])
        env.state = next_state
        if done:
            break
    # print(episode)
    env.epsilon*=env.epsilon_decay_rate

zeros = np.empty((env.n_states//4, env.n_states//4), dtype=np.object_)

for state in product([0, 1, 2, 3],
                     [0, 1, 2, 3]):
    
    env.state = state
    actions = env.available_actions()
    q_values = env.q_values[state].copy()
    q_values = [q_value if idx in actions else -float('inf') for idx, q_value in enumerate(q_values)]
    action = int(round(np.argmax(q_values),0))
    zeros[state] = env.show_action(action)
print(zeros)

[['→' '→' '*' '←']
 ['↑' '↑' '↑' '←']
 ['↑' '↑' '↑' '←']
 ['→' '→' '↑' '←']]


In [None]:
# ----------------------------------------------------
# ----------- SARSA IMPLEMENTATION -------------------
# ----------------------------------------------------

# Initialize environment and Q-table
env = Env()

# Initialize Q-table
# state:[0, 0, 0, 0], point out the Q-values are
# zeros for each state and for each action.
# The index of [0, 0, 0, 0] represent the actions

# Training parameters
learning_rate = 0.1

for episode in range(500):
    env.reset()
    state = env.state
    while True:
        action = env.policy()
        next_state, reward, done = env.step(action)
        env.state = next_state
        next_action = env.policy()
        # 1. Let `state` be the current state
        # 2. Based on this we take the action `action`
        # 3. This return `next_state` and its `reward` 
        # 4. Instead take the same action (like SARSA), take the action that max Q
        #    not like current policy
        env.q_values[state][action] += learning_rate * (
            reward + env.gamma * env.q_values[next_state][next_action] - env.q_values[state][action])
        state= next_state
        
        if done:
            break
    # print(episode)
    env.epsilon*=env.epsilon_decay_rate

zeros = np.empty((env.n_states//4, env.n_states//4), dtype=np.object_)

for state in product([0, 1, 2, 3],
                     [0, 1, 2, 3]):
    
    env.state = state
    actions = env.available_actions()
    q_values = env.q_values[state].copy()
    q_values = [q_value if idx in actions else -float('inf') for idx, q_value in enumerate(q_values)]
    action = int(round(np.argmax(q_values),0))
    zeros[state] = env.show_action(action)
print(zeros)

[['→' '→' '*' '←']
 ['→' '↑' '↑' '←']
 ['↑' '↑' '↑' '↑']
 ['→' '↑' '↑' '←']]


https://arxiv.org/pdf/2012.13773

https://github.com/wassname/rl-portfolio-management?tab=readme-ov-file

#### About *Done*

1. *Episode Terminatio*n: It signifies that the current sequence of interactions between the agent and the environment has concluded. An "episode" is a complete run from an initial state until a terminal state is reached.

2. *Achieving a Goal*: In many tasks, "done" means the agent has successfully achieved its objective. For example:

    * In a game of Chess, reaching checkmate.

    * In a robotic arm task, picking up the object.

    * In Frozen Lake, reaching the "Goal" (G) tile.

3. *Failing the Task*: "Done" can also mean the agent has failed or entered an undesirable state, leading to the end of the episode. For example:

    * Falling into a hole in Frozen Lake.

    * Crashing a self-driving car.

4. *Running out of moves or time in a game*.

    * Reaching a Maximum Limit: Environments often have a maximum number of steps or a time limit per episode. If the agent hasn't reached a goal or failed state but hits this limit, the episode is also considered "done." This is often distinguished as "truncated" in newer Gym/Gymnasium APIs (more on that below).


*Done* vs. *Truncated* (in Gymnasium / Newer Gym versions):
With the evolution of OpenAI Gym (now Gymnasium), the concept of "done" has been split into two distinct signals:

* *done* (or terminated): This specifically indicates that the episode ended because the agent reached a natural terminal state (e.g., goal achieved, fell in a hole, crashed).

* *truncated*: This indicates that the episode ended because a time limit or step limit was reached, even if the agent was still in a valid (non-terminal) state. The agent could have continued if the limit wasn't imposed.

## Policy Optimization

In reinforcement learning, the ultimate goal of the agent is to improve its policy to acquire better rewards. *Policy improvement* in the optimization domain is called *policy optimization*.

Apart from some linear or non linear methods, the *parameterization* of *value functions* with *deep neural networks* is one way of achieving **value function approximation**, and it’s the most popular way in the modern deep reinforcement learning domain.

Value function approximation is useful because we cannot always acquire the true value function easily, and
actually we cannot get the true function for most cases in practice.

The gradient-based optimization methods can be used for improving parameterized policies, usually through the method called **policy gradient** in reinforcement learning terminology.

The methods in policy optimization fall into two main categories: **(1) value-based optimization** methods like Q-learning, DQN, etc., which optimize the action-value function to obtain the preferences for the action choice, and **(2) policy-based optimization methods** like REINFORCE, the cross-entropy method, etc., which directly
optimize the policy according to the sampled reward values.

A combination of these two categories was found to be a more effective approach by people which forms one of the most widely used architecture in model-free reinforcement learning called **actor-critic**. Actor-critic methods employ the optimization of value function as the guidance of policy improvement

### Value-Based Optimization

A value-based optimization method always needs to alternate between
value function estimation under the current policy and policy improvement with the estimated value function

From the previous sections, we see that the Q-learning can be used for
solving some simple tasks in reinforcement learning. However, the realworld applications or even the quasi-real-world applications may have
much larger and complicated state and action spaces, and the action is
usually continuous in practice

For example, the Go game has 10170 states. In these cases, the traditional lookup table method in Q-learning cannot
work well with the *limitation of its scalability*, because each state will have an entry V (s) and each state-action pair will need an entry Q(s, a). The values in the table are updated one-by-one in practice. Therefore the *requirement of the memory and computational resources will be huge with tabular-based Q-learning*

#### Value Function Approximation

In order to apply the value-based reinforcement learning in relatively largescale tasks, function approximators are applied to handle the above
limitations
- **Linear Methods**:
    - Linear function approximators are used to approximate the value function, which is a linear combination of the features extracted from the state or state-action pairs.
    - The linear function approximator can be expressed as:
    
    $$
    V(s) = \theta^T \phi(s)
    $$
    
    where $\theta$ is the weight vector and $\phi(s)$ is the feature vector for state $s$.
    - Polinomial function approximators are used to approximate the value function, which is a polynomial function of the features extracted from the state or state-action pairs.
    - The polynomial function approximator can be expressed as:
    $$    V(s) = \sum_{i=0}^{n} \theta_i \phi_i(s)$$

    - Foureier basis function approximators are used to approximate the value function, which is a linear combination of the Fourier basis functions.
    - The Fourier basis function approximator can be expressed as:
    $$    V(s) = \sum_{i=0}^{n} \theta_i \cos(\omega_i s + \phi_i)$$
    for $s ∈ [0, 1]$ and $i = 0, …, n$

    - Coarse coding is a method that uses a coarse representation of the state space to approximate the value function. It is used to reduce the dimensionality of the state space and make the value function approximation more efficient.
    - The coarse coding method can be expressed as:
    $$    V(s) = \sum_{i=0}^{n} \theta_i \phi_i(s)$$
    where $\phi_i(s)$ is the coarse representation of the state $s$ and $\theta_i$ is the weight vector. And the coase representation is defined as:
    $$\phi_i(s) = \begin{cases}
        1 & \text{if } s \in C_i \\
        0 & \text{otherwise}
    \end{cases}$$
    
    - Radial basis functions (RBF) are used to approximate the value function, which is a linear combination of the radial basis functions.
    - The RBF function approximator can be expressed as:
    $$    V(s) = \sum_{i=0}^{n} \theta_i \phi_i(s)$$
    where $\phi_i(s)$ is the radial basis function for state $s$ and $\theta_i$ is the weight vector. The radial basis function is defined as:
    $$\phi_i(s) = \exp\left(-\frac{||s - c_i||^2}{2\sigma_i^2}\right)$$
    where $c_i$ is the center of the radial basis function and $\sigma_i$ is the width of the radial basis function.

* **Non-linear methods**
  - Artificial neural networks: different from the above function approximation methods, artificial neural networks are widely used as
non-linear function approximators, which are proven to have universal approximation ability under certain conditions.

* **Other methods**
  - Decision trees: decision trees can be used to approximate the value function by partitioning the state space into different regions and assigning a value to each region.

The benefit of using function approximation:
- **Scalability**: Function approximation can handle large state and action spaces, which is essential for real-world applications.
- **Generalization**: Function approximation can generalize the value function to unseen states, which is crucial for tasks with continuous or high-dimensional state spaces.
- **Efficiency**: Function approximation can reduce the memory and computational requirements, making it feasible to apply reinforcement learning to complex tasks.

(ANN  also redice the need for manually defined features, which is a common requirement in traditional machine learning methods)

Types of Value-Based Optimization Methods
- **Model-free methods**: These methods do not require a model of the environment and learn the value function directly from the data. They are often used in reinforcement learning tasks where the environment is complex and difficult to model. The parameters w of the approximators can be updated with Monte Carlo (MC) or TD learning
- **Model-based methods**: These methods require a model of the environment and use it to learn the value function. They are often used in reinforcement learning tasks where the environment is simple and easy to model.


Model-free agents learn the optimal policy through trial and error, while model-based agents first learn a model of the environment and then use that model to plan

The *most practical approximation method* for present DRL algorithms is using the *neural network*, for its great **scalability** and **generalization** for
various specific functions. 

A neural network is a *differential method* with *gradient-based optimization*, which has a guarantee of *convergence* to optimum within convex cases and can achieve near-optimal solutions for some non-convex functions approximation.

Drawbacks: it may require a large amount of data for training in practice and may cause other difficulties.

Challenges of useing Value Function Approximation
- Most supervised learning methods are not suitable for reinforcement learning tasks, because the data is not independent and identically distributed (i.i.d.) and the data is not labeled.
- The data is generated by the agent's interaction with the environment, which is not i.i.d. (The next row depends on the previous rows).
- The data is not stationary, which means the distribution of the data changes over time. The value function is estimated with current policy, or at least the state-visitfrequency determined by current policy, and the policy is updated all the
time during training

##### Gradient-Based Value Function Approximation

Considering the value function is parameterized as 

$$
V^{\pi}(s) = V^{\pi}(s;\theta)
$$

or 

$$
Q^{\pi}(s,a) = Q^{\pi}(s,a;\theta)
$$

The optimization objective is set to MSE of approximated value function ($V^{\pi}(s;\theta)$ or $Q^{\pi}(s,a;\theta)$) and target value function ($V_{\pi}(s)$ or $Q_{\pi}(s,a)$)

$$
J(\theta) = \mathbb{E}_{\pi} \left[ (V^\pi(s;\theta) - V_\pi(s))^2 \right]
$$

or

$$
J(\theta) = \mathbb{E}_{\pi} \left[ (Q^\pi(s,a;\theta) - Q_\pi(s,a))^2 \right]
$$

Therefore, the gradient of the objective function is given by

$$
\nabla J(\theta) = \alpha (Q^\pi(s,a;\theta) - Q_\pi(s,a)) \nabla Q^\pi(s,a;\theta) 
$$
or 

$$
\nabla J(\theta) = \alpha (V^\pi(s;\theta) - V_\pi(s)) \nabla V^\pi(s;\theta) 
$$

where the gradients are estimated with each sample in the batch and the
weights are updated in a stochastic manner. 

The target (true) value
functions $V^\pi(s)$ or $Q^\pi(s,a)$ in above equations are usually estimated, sometimes
with a target network (DQN)


#### Deep Q-Network

Let us consider the function approximation in Q-learning with some
parameter $θ$.

The update rule  can be expressed as

$$y = R(s,a) + γ \max_{a'} Q(s',a';θ)$$

$$L(Q(s,a;\theta), y)= \frac{1}{2}(y - Q(s, a;\theta))^2$$
<!-- $$\theta \leftarrow \argmin_{\theta} L(Q(s,a;\theta), y) $$ -->

$$\theta \leftarrow \theta + \alpha \nabla L  $$

where $y$ is the target Q-value and $L$ is the loss function, usually MSE.

**Deep Q-Learning (DQL)**

Deep Q-Learning replace Q-Table with a Deep Neural Network (DNN) that approximates the *Q-function*

Instead storing Q-values in a table, a NN takes a state a input and output Q-values for all possibles actions.

$$Q(s,a; \theta)$$

where $\theta$ represent the weights.

DQL Algorithm steps:
1. Initialize a NN $Q(s, a; \theta)$ randomly
2. For each episode:
   * Observe the current state $s$
   * Choose an action using an $\epsilon -\text{greedy}\; \text{policy}$ (explotation vs exploration)
   * Excecute $a$, receive reward $r$ and the next state $s'$ 
   * Sample a batch from the Replay Buffer.
   * Compute the Target-Q-value: $y = r + \gamma \max_{a'} Q(s', a';\theta)$
   * Compute the Loss: $L = (y - Q(s, a;\theta))^2$
   * Update NN weights $\theta$ using backpropagation
   * Periodically update target NN $Q(s, a;\theta)$ to stabilize learning.


> **Replay Buffer**: A memory that stores past experiences $(s, a, r, s')$ to break correlation between consecutive samples and improve learning stability.It draws a mini-batch of samples from this buffer uniformly to apply the Q-learning update

> **Target Network**: A separate NN $Q(s, a; \theta^-)$ with weights $\theta^-$ that are periodically updated from the main network. This helps to stabilize training by providing a fixed target for a number of steps.

Improvements over basic DQL:
- Dueling DQN
- Double DQN
- Prioritized Experience Replay
- Noisy Nets
- Distributional RL
- Rainbow DQN (combines several improvements)
- Multi-step Learning

In [None]:
import gymnasium as gym
import random
import numpy as np
import collections
import cv2
import torch
import torch.nn as nn
import torch.optim as optim

# -------------------------
# Preprocessing wrapper
# -------------------------
class PreprocessFrame(gym.ObservationWrapper):
    def __init__(self, env):
        super(PreprocessFrame, self).__init__(env)
        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(84, 84, 1), dtype=np.uint8)

    def observation(self, obs):
        # Convert to grayscale and resize
        obs = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
        obs = cv2.resize(obs, (84, 110))
        obs = obs[18:102, :]   # crop
        obs = np.expand_dims(obs, -1)
        return obs.astype(np.uint8)


class FrameStack(gym.Wrapper):
    def __init__(self, env, k):
        super(FrameStack, self).__init__(env)
        self.k = k
        self.frames = collections.deque(maxlen=k)
        shp = env.observation_space.shape
        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(shp[0], shp[1], k), dtype=np.uint8)

    def reset(self):
        obs = self.env.reset()[0]
        obs = self.env.observation(obs)
        for _ in range(self.k):
            self.frames.append(obs)
        return self._get_obs(), {}

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        obs = self.env.observation(obs)
        self.frames.append(obs)
        return self._get_obs(), reward, terminated, truncated, info

    def _get_obs(self):
        return np.concatenate(list(self.frames), axis=-1)


# -------------------------
# Replay Buffer
# -------------------------
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        transitions = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*transitions)
        return (np.array(states),
                np.array(actions),
                np.array(rewards, dtype=np.float32),
                np.array(next_states),
                np.array(dones, dtype=np.uint8))

    def __len__(self):
        return len(self.buffer)


# -------------------------
# Deep Q-Network
# -------------------------
class DQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DQN, self).__init__()
        c, h, w = input_shape
        self.net = nn.Sequential(
            nn.Conv2d(c, 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(self._feature_size(input_shape), 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )

    def _feature_size(self, shape):
        o = torch.zeros(1, *shape)
        return self.net[:-3](o).view(1, -1).size(1)

    def forward(self, x):
        return self.net(x)


# -------------------------
# Training Loop
# -------------------------
def train_dqn(env, num_episodes=1000, buffer_size=100000, batch_size=32,
              gamma=0.99, lr=1e-4, start_eps=1.0, end_eps=0.1,
              eps_decay=1000000, target_update=1000):

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    replay_buffer = ReplayBuffer(buffer_size)

    n_actions = env.action_space.n
    state_shape = (4, 84, 84)  # 4 stacked frames
    policy_net = DQN(state_shape, n_actions).to(device)
    target_net = DQN(state_shape, n_actions).to(device)
    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()

    optimizer = optim.Adam(policy_net.parameters(), lr=lr)

    steps_done = 0

    def select_action(state, eps_threshold):
        if random.random() < eps_threshold:
            return random.randrange(n_actions)
        with torch.no_grad():
            state = torch.tensor(np.array([state]), device=device, dtype=torch.float32).permute(0,3,1,2)
            return policy_net(state).argmax(1).item()

    for episode in range(num_episodes):
        state, _ = env.reset()
        total_reward = 0
        done = False

        while not done:
            eps_threshold = end_eps + (start_eps - end_eps) * \
                            np.exp(-1.0 * steps_done / eps_decay)

            action = select_action(state, eps_threshold)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            replay_buffer.push(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
            steps_done += 1

            # Train
            if len(replay_buffer) > batch_size:
                states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)

                states_v = torch.tensor(states, device=device, dtype=torch.float32).permute(0,3,1,2)
                actions_v = torch.tensor(actions, device=device, dtype=torch.int64)
                rewards_v = torch.tensor(rewards, device=device, dtype=torch.float32)
                next_states_v = torch.tensor(next_states, device=device, dtype=torch.float32).permute(0,3,1,2)
                dones_t = torch.tensor(dones, device=device, dtype=torch.float32)

                state_action_values = policy_net(states_v).gather(1, actions_v.unsqueeze(-1)).squeeze(-1)
                next_state_values = target_net(next_states_v).max(1)[0]
                expected_state_action_values = rewards_v + gamma * next_state_values * (1 - dones_t)

                loss = nn.MSELoss()(state_action_values, expected_state_action_values.detach())

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            # Update target net
            if steps_done % target_update == 0:
                target_net.load_state_dict(policy_net.state_dict())

        print(f"Episode {episode}, reward {total_reward}")

    return policy_net


if __name__ == "__main__":
    env = gym.make("ALE/Breakout-v5", render_mode=None)
    env = PreprocessFrame(env)
    env = FrameStack(env, 4)

    trained_model = train_dqn(env, num_episodes=1000)


In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque

# Define Deep Q-Network (DQN)
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, output_dim)
        )
    
    def forward(self, x):
        return self.fc(x)

# Custom environment
class CustomEnv:
    def __init__(self):
        self.states = [(i, j) for i in range(5) for j in range(5)]
        self.goal = (4, 4)
        self.state = (0, 0)
    
    def reset(self):
        self.state = (0, 0)
        return self._state_to_tensor()

    def step(self, action):
        x, y = self.state
        if action == 0: y += 1  # Right
        elif action == 1: y -= 1  # Left
        elif action == 2: x += 1  # Down
        elif action == 3: x -= 1  # Up
        self.state = (max(0, min(x, 4)), max(0, min(y, 4)))
        reward = 1 if self.state == self.goal else -0.01
        done = self.state == self.goal
        return self._state_to_tensor(), reward, done
    
    def _state_to_tensor(self):
        return torch.tensor([self.state[0] / 4, self.state[1] / 4], dtype=torch.float32)

# Hyperparameters
epsilon = 0.1  # Exploration factor
learning_rate = 0.001
discount_factor = 0.9
batch_size = 32
buffer_capacity = 1000
episodes = 500

# Initialize environment and networks
env = CustomEnv()
state_dim = 2  # (x, y) coordinates
num_actions = 4

policy_net = DQN(state_dim, num_actions)
target_net = DQN(state_dim, num_actions)

# Both start with the same parameters
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
loss_fn = nn.MSELoss()
replay_buffer = deque(maxlen=buffer_capacity)

# Training loop
for episode in range(episodes):
    state = env.reset()
    done = False
    while not done:
        # Epsilon-greedy action selection
        if random.random() < epsilon:
            action = random.choice(range(num_actions))
        else:
            with torch.no_grad():
                action = torch.argmax(policy_net(state)).item()
        
        next_state, reward, done = env.step(action)
        replay_buffer.append((state, action, reward, next_state, done))
        state = next_state
        
        # Training the network
        if len(replay_buffer) >= batch_size:
            batch = random.sample(replay_buffer, batch_size)
            states, actions, rewards, next_states, dones = zip(*batch)
            
            states = torch.stack(states)
            actions = torch.tensor(actions, dtype=torch.long)
            rewards = torch.tensor(rewards, dtype=torch.float32)
            next_states = torch.stack(next_states)
            dones = torch.tensor(dones, dtype=torch.float32)
            # state => Q_values => Q_value of chosen action (target action)
            # Similar to Q_values[state][action]
            q_values = policy_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)
            # state => Q_values => max Q_value 
            # Similar to max(Q_values[next_state])
            
            next_q_values = target_net(next_states).max(1)[0]
            # As reward and discount_factor are constant the only parameter
            # to update is theta
            target_q_values = rewards + discount_factor * next_q_values * (1 - dones)
            
            # Try to fit the parameter in order to q_values => target_q_values.detach()
            # as target_q_values contains the target actions
            loss = loss_fn(q_values, target_q_values.detach())
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
    # Update target network every 10 episodes
    if episode % 10 == 0:
        target_net.load_state_dict(policy_net.state_dict())
        print(f"Episode {episode}, Loss: {loss.item():.4f}")

print("Training complete!")

Episode 0, Loss: 0.0000
Episode 10, Loss: 0.0188
Episode 20, Loss: 0.0001
Episode 30, Loss: 0.0001
Episode 40, Loss: 0.0010
Episode 50, Loss: 0.0011
Episode 60, Loss: 0.0002
Episode 70, Loss: 0.0002
Episode 80, Loss: 0.0003
Episode 90, Loss: 0.0002
Episode 100, Loss: 0.0002
Episode 110, Loss: 0.0000
Episode 120, Loss: 0.0000
Episode 130, Loss: 0.0000
Episode 140, Loss: 0.0001
Episode 150, Loss: 0.0000
Episode 160, Loss: 0.0000
Episode 170, Loss: 0.0000
Episode 180, Loss: 0.0000
Episode 190, Loss: 0.0000
Episode 200, Loss: 0.0000
Episode 210, Loss: 0.0000
Episode 220, Loss: 0.0000
Episode 230, Loss: 0.0000
Episode 240, Loss: 0.0000
Episode 250, Loss: 0.0000
Episode 260, Loss: 0.0000
Episode 270, Loss: 0.0000
Episode 280, Loss: 0.0000
Episode 290, Loss: 0.0000
Episode 300, Loss: 0.0000
Episode 310, Loss: 0.0000
Episode 320, Loss: 0.0000
Episode 330, Loss: 0.0000
Episode 340, Loss: 0.0000
Episode 350, Loss: 0.0000
Episode 360, Loss: 0.0000
Episode 370, Loss: 0.0000
Episode 380, Loss: 0.00

#### Reinforcement Learning with RNNs using PyTorch

Reinforcement Learning (RL) combined with Recurrent Neural Networks (RNNs) is powerful for sequential decision-making tasks where the agent needs memory of past states. This combination is useful in environments with partial observability or when actions depend on historical patterns.

Use Cases
* Trading Algorithms: Remembering market trends over time

* Robot Control: Handling sensor inputs with temporal dependencies

* Game AI: For games where remembering past states is crucial

* Conversational Agents: Maintaining context in dialogues

* Inventory Management: Predicting demand patterns over time

Implementation Example: Simple Trading Agent
Let's implement a trading agent that uses RL with an RNN (LSTM) to decide whether to buy, hold, or sell an asset based on price history.

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random

# Environment Simulation
class TradingEnvironment:
    def __init__(self, data_length=1000):
        # Generate synthetic price data (sin wave + noise + slight upward trend)
        self.prices = [100 + 10 * np.sin(i/20) + np.random.normal(0, 2) + i/100 for i in range(data_length)]
        self.reset()
    
    def reset(self):
        self.current_step = 0
        self.position = 0  # 0: no position, 1: long
        self.cash = 10000
        self.portfolio_value = 10000
        self.done = False
        return self._get_state()
    
    def _get_state(self):
        # Return last 10 prices and current position
        start_idx = max(0, self.current_step - 10)
        price_history = self.prices[start_idx:self.current_step]
        # Pad with zeros if not enough history
        if len(price_history) < 10:
            price_history = [0]*(10-len(price_history)) + price_history
        return torch.FloatTensor([price_history + [self.position]])
    
    def step(self, action):
        """ action: 0=hold, 1=buy, 2=sell """
        current_price = self.prices[self.current_step]
        reward = 0
        
        if action == 1 and self.position == 0:  # Buy
            self.position = 1
            self.buy_price = current_price
            reward = -1  # Small penalty for transaction
        
        elif action == 2 and self.position == 1:  # Sell
            self.position = 0
            reward = (current_price - self.buy_price) / self.buy_price * 100  # % return
        
        # Move to next time step
        self.current_step += 1
        if self.current_step >= len(self.prices) - 1:
            self.done = True
        
        # Calculate portfolio value
        if self.position == 1:
            self.portfolio_value = self.cash + (current_price / self.buy_price) * self.cash
        else:
            self.portfolio_value = self.cash
        
        next_state = self._get_state()
        return next_state, reward, self.done, {'portfolio_value': self.portfolio_value}

# RNN-based Q-Network
class RNNQNetwork(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(RNNQNetwork, self).__init__()
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x, hidden=None):
        # x shape: (batch, seq_len, input_size)
        batch_size = x.size(0)
        
        if hidden is None:
            h_0 = torch.zeros(1, batch_size, self.hidden_size)
            c_0 = torch.zeros(1, batch_size, self.hidden_size)
            hidden = (h_0, c_0)
        
        out, hidden = self.lstm(x, hidden)
        out = self.fc(out[:, -1, :])  # Take last output
        return out, hidden

# Replay Buffer
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)
    
    def __len__(self):
        return len(self.buffer)

# Training Parameters
BATCH_SIZE = 32
GAMMA = 0.99
EPS_START = 1.0
EPS_END = 0.01
EPS_DECAY = 0.995
TARGET_UPDATE = 10
LEARNING_RATE = 0.001
MEMORY_CAPACITY = 1000
NUM_EPISODES = 100

# Initialize environment and networks
env = TradingEnvironment()
input_size = 11  # 10 prices + position
hidden_size = 32
output_size = 3  # 3 actions

policy_net = RNNQNetwork(input_size, hidden_size, output_size)
target_net = RNNQNetwork(input_size, hidden_size, output_size)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=LEARNING_RATE)
memory = ReplayBuffer(MEMORY_CAPACITY)

# Training loop
for episode in range(NUM_EPISODES):
    state = env.reset()
    hidden = None
    total_reward = 0
    eps_threshold = EPS_END + (EPS_START - EPS_END) * EPS_DECAY**episode
    
    while not env.done:
        # Select action
        if random.random() < eps_threshold:
            action = random.randint(0, 2)
        else:
            with torch.no_grad():
                q_values, hidden = policy_net(state.unsqueeze(0), hidden)
                action = q_values.argmax().item()
        
        # Execute action
        next_state, reward, done, _ = env.step(action)
        total_reward += reward
        
        # Store transition
        memory.push(state, action, reward, next_state, done)
        
        # Move to next state
        state = next_state
        
        # Train if enough samples
        if len(memory) >= BATCH_SIZE:
            transitions = memory.sample(BATCH_SIZE)
            batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(*transitions)
            
            batch_state = torch.stack(batch_state)
            batch_next_state = torch.stack(batch_next_state)
            batch_action = torch.tensor(batch_action, dtype=torch.long)
            batch_reward = torch.tensor(batch_reward, dtype=torch.float)
            batch_done = torch.tensor(batch_done, dtype=torch.float)
            
            # Current Q values
            current_q, _ = policy_net(batch_state)
            current_q = current_q.gather(1, batch_action.unsqueeze(1))
            
            # Target Q values
            with torch.no_grad():
                next_q, _ = target_net(batch_next_state)
                max_next_q = next_q.max(1)[0]
                target_q = batch_reward + GAMMA * max_next_q * (1 - batch_done)
            
            # Compute loss and optimize
            loss = nn.MSELoss()(current_q.squeeze(), target_q)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
    # Update target network
    if episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())
    
    print(f"Episode {episode}, Total Reward: {total_reward:.2f}, Portfolio Value: {env.portfolio_value:.2f}")

# Test the trained agent
state = env.reset()
hidden = None
portfolio_values = []

while not env.done:
    with torch.no_grad():
        q_values, hidden = policy_net(state.unsqueeze(0), hidden)
        action = q_values.argmax().item()
    
    state, _, _, info = env.step(action)
    portfolio_values.append(info['portfolio_value'])

print(f"Final Portfolio Value: {portfolio_values[-1]:.2f}")

Episode 0, Total Reward: -179.77, Portfolio Value: 20000.00
Episode 1, Total Reward: -126.33, Portfolio Value: 20000.00
Episode 2, Total Reward: -160.90, Portfolio Value: 10000.00
Episode 3, Total Reward: -160.98, Portfolio Value: 10000.00
Episode 4, Total Reward: -179.05, Portfolio Value: 20000.00
Episode 5, Total Reward: -152.42, Portfolio Value: 20191.61
Episode 6, Total Reward: -163.44, Portfolio Value: 20524.05
Episode 7, Total Reward: -213.66, Portfolio Value: 20383.07
Episode 8, Total Reward: -174.49, Portfolio Value: 10000.00
Episode 9, Total Reward: -102.97, Portfolio Value: 10000.00
Episode 10, Total Reward: -170.37, Portfolio Value: 20000.00
Episode 11, Total Reward: -160.51, Portfolio Value: 10000.00
Episode 12, Total Reward: -146.67, Portfolio Value: 10000.00
Episode 13, Total Reward: -132.03, Portfolio Value: 20000.00
Episode 14, Total Reward: -181.49, Portfolio Value: 19814.03
Episode 15, Total Reward: -173.33, Portfolio Value: 19814.03
Episode 16, Total Reward: -148.65,

### Policy-Based Optimization

### Combination of Policy-Based and Value-Based Method

In [2]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import defaultdict
import math

# Application



- QMIX: Monotonic Value Function Factorisation for Deep Multi-Agent Reinforcement Learning
https://arxiv.org/pdf/1803.11485

- *Reinforcement Learning 5*: Function Approximation and Deep Reinforcement Learning:
    https://www.youtube.com/watch?v=wAk1lxmiW4c&list=PLqYmG7hTraZBKeNJ-JE_eyJHZ7XgBoAyb&index=5
- *Reinforcement Learning 10*: Classic Games Case Study:
    https://www.youtube.com/watch?v=ld28AU7DDB4

- *Monte Carlo Tree Search*: https://www.youtube.com/results?search_query=mc+tree+search

- *AlphaZero*: https://github.com/michaelnny/alpha_zero

- *Actor-Critic*: https://www.youtube.com/watch?v=K2qjAixgLqk

# Monte Carlo Tree Search (MCTS)

Monte Carlo Tree Search (MCTS) is a **heuristic search algorithm** for decision-making in environments with large or infinite state spaces, particularly effective in **games with perfect information** (like Go, Chess, Hex) and increasingly used in imperfect information games and general planning problems.

> In Reinforcement Learning (RL), planning problems refer to cases where the model of the environment is known — i.e., you know the transition probabilities $𝑃(𝑠′∣𝑠,𝑎)P(s′∣s,a)$ and the reward function $𝑅(𝑠,𝑎)$.


Instead of exhaustively exploring the entire game tree (which is infeasible for complex games), MCTS **selectively builds a search tree** by:
1. **Randomly simulating** playouts from the current state
2. **Gradually focusing** on the most promising branches using statistical estimates
3. **Balancing exploration vs. exploitation** using the **Upper Confidence Bound (UCB)** principle


**The Four Phases of MCTS**

MCTS operates in iterative cycles, each consisting of four phases:

1. **Selection**
- Start from the root node (current game state)
- Traverse down the tree by selecting child nodes using a **tree policy** (typically UCB1)
- Continue until reaching a **leaf node** (a node that hasn't been fully expanded)

**UCB1 Formula** (for maximizing player):
```
UCB1(node) = Q(node)/N(node) + c * √(ln(N(parent)) / N(node))
```
- `Q(node)`: Total reward from simulations through this node
- `N(node)`: Number of visits to this node
- `c`: Exploration parameter (typically √2 for games)
- The first term = **exploitation** (average win rate)
- The second term = **exploration** (uncertainty bonus)

2. **Expansion**
- If the leaf node represents a **non-terminal state**, add one or more child nodes (legal moves) to the tree
- Select one of the newly added children for simulation

3. **Simulation (Rollout)**
- From the selected node, perform a **random playout** to a terminal state
- Use a **default policy** (often completely random moves)
- Record the outcome (win/loss/draw or reward value)

4. **Backpropagation**
- Propagate the simulation result back up the tree to the root
- Update statistics for all nodes along the path:
  - Increment visit count `N(node)`
  - Add simulation result to total reward `Q(node)`


Key Properties

**Asymmetric Tree Growth**
- Unlike minimax, MCTS doesn't explore all branches equally
- Focuses computational resources on **promising lines of play**
- Tree grows deeper in critical variations

**Anytime Algorithm**
- Can be stopped at any time and return the best move found so far
- Quality of solution improves with more computation time
- Perfect for real-time decision making

**No Domain Knowledge Required** (in basic form)
- Only needs:
  - Game rules (to generate legal moves)
  - Terminal state detection
  - Outcome evaluation
- However, **domain-specific enhancements** significantly improve performance


**Convergence Properties**
- **Theoretical guarantee**: With infinite simulations, MCTS converges to the optimal move
- **Practical reality**: Works well with finite resources due to efficient exploration


Famous Applications

 **AlphaGo (2016)**
- Combined MCTS with **deep neural networks**
- **Policy Network**: Guided tree expansion (reduced branching factor)
- **Value Network**: Evaluated positions (reduced need for rollouts)
- **Defeated world champion Lee Sedol** in Go

 **AlphaZero (2017)**
- Generalized AlphaGo to **Chess, Shogi, and Go**
- Learned entirely through **self-play**
- Used MCTS for both **training** (generating data) and **inference** (making moves)

 **Other Applications**
- **Video games**: NPC decision making, strategy games
- **Automated planning**: Robotics, logistics
- **Theorem proving**: Guiding proof search
- **Optimization problems**: Combinatorial optimization


**Enhancements & Variants**


**Domain-Specific Improvements**
- **Progressive Bias**: Incorporate heuristic evaluation into UCB
- **RAVE (Rapid Action Value Estimation)**: Share statistics between similar moves
- **Transposition Tables**: Cache previously evaluated positions

**Neural Network Integration**
- **Neural MCTS**: Use networks to guide selection and evaluation
- **Policy Prior**: Replace uniform expansion with learned move probabilities
- **Value Estimation**: Replace rollouts with neural network predictions

**Parallel MCTS**
- **Tree Parallelization**: Multiple threads share the same tree
- **Root Parallelization**: Each thread builds separate tree, combine results
- **Leaf Parallelization**: Parallelize rollouts from leaf nodes


<!-- Advantages vs. Disadvantages

**Advantages**
✅ **No evaluation function needed** (basic version)  
✅ **Anytime algorithm** - interruptible with good results  
✅ **Handles large branching factors** better than minimax  
✅ **Naturally handles stochastic environments**  
✅ **Memory efficient** - only stores explored portion of tree  

**Disadvantages**
❌ **Computationally expensive** per iteration  
❌ **Can be misled by rare but critical events** (requires many simulations)  
❌ **Struggles with "trap" positions** where one mistake loses immediately  
❌ **Less effective in games with high randomness** (without modifications)  
❌ **No guaranteed optimality** with finite computation   -->


Modern Developments

**MuZero (2019)**
- Extends AlphaZero to **unknown environments**
- Learns a **world model** (dynamics, reward, representation)
- Uses MCTS with learned model for planning

**MCTS in Large Language Models**

- Used for **reasoning and planning** in LLMs
- **Tree of Thoughts** prompting technique
- **Self-refine** and **chain-of-thought** improvements

**Real-World Planning**
- **Autonomous vehicles**: Trajectory planning
- **Healthcare**: Treatment planning
- **Finance**: Portfolio optimization

When to Use MCTS

**Use MCTS when:**
- State space is too large for exhaustive search
- You can simulate the environment cheaply
- You need an anytime algorithm
- Perfect information is available (or can be handled)
- No good heuristic evaluation function exists

**Consider alternatives when:**
- Computation time is extremely limited
- The game has very high randomness
- You have a very strong evaluation function (minimax might be better)
- Memory is severely constrained

<!-- Key Takeaways

1. **MCTS is a simulation-based search algorithm** that builds a tree asymmetrically
2. **UCB1 balances exploration and exploitation** during tree traversal
3. **Four phases**: Selection → Expansion → Simulation → Backpropagation
4. **Revolutionized AI in games** through AlphaGo/AlphaZero
5. **Highly flexible** and applicable beyond games to general planning problems
6. **Combines well with machine learning** for state-of-the-art performance -->

MCTS represents a beautiful marriage of **Monte Carlo methods** (random sampling) and **tree search** (structured exploration), making it one of the most important algorithms in modern AI decision-making.

Pure MCTS implementation
This code will:
1. Define a small Game interface (RoomLog implementation)
2. Implement a pure MCTS algorithm (Selection, Expansion, Simulation, Backpropagation)
3. Demonstrate a single MCTS decision from the empty board and play one game where MCTS plays vs Random

Notes:
- This is intentionally small and didactic, not hyper-optimized.
- The rollout policy is purely random (pure MCTS).
- The UCT formula uses an exploration constant c (default sqrt(2)).

In [26]:
import copy
import json
from pathlib import Path
import numpy as np
import pandas as pd
from collections import defaultdict, deque
import math, random, time
import concurrent.futures

In [32]:
class DataSet:
    def __init__(self, path):
        self.path = path
        self.dim_aulas = None
        self.dim_periodo_franja = None
        self.dim_frecuencia = None
        self.dim_horario = None
        self.items = None
        self.items_bimestral = None
        self.load_all()

    def read_json(self, name: str):
        with open(self.path / name, "r") as f:
            return json.load(f)

    def dim_aulas_loader(self):
        self.dim_aulas = self.read_json("dim_aulas.json")

    def dim_periodo_franja_loader(self):
        self.dim_periodo_franja = self.read_json("dim_periodo_franja.json")

    def dim_frecuencia_loader(self):
        self.dim_frecuencia = self.read_json("dim_frecuencia.json")

    def dim_horario_loader(self):
        self.dim_horario = self.read_json("dim_horario.json")

    def items_loader(self):
        self.items = self.read_json("items.json")

    def items_bimestral_loader(self):
        self.items_bimestral = self.read_json("items_bimestral.json")

    def load_all(self):
        self.dim_horario_loader()
        self.dim_aulas_loader()
        self.dim_frecuencia_loader()
        self.dim_periodo_franja_loader()
        self.items_loader()
        self.items_bimestral_loader()


class RoomLog:
    def __init__(self, dataset, sede: str):
        self.dataset = dataset
        self.sede = sede
        self.dim_aulas = dataset.dim_aulas[sede].copy()
        self.dim_periodo_franja = dataset.dim_periodo_franja.copy()
        self.dim_frecuencia = dataset.dim_frecuencia.copy()
        self.dim_horario = dataset.dim_horario.copy()
        self.items = dataset.items[sede].copy()
        self.items_bimestral = dataset.items_bimestral[sede].copy()
        self.roomlog = self.get_roomlog()
        self.idx_item = 0
        self.n_items = len(self.items)
        self.n_assignments = 0
        self.stats = {'[Conflict]': 0,
                      '[Aforo-Alum<0]': 0,
                      '[Aforo=Alumn]|[Aforo-Alumn<=2]': 0,
                      '[Aforo-Alumn>2]': 0}
        self.n_aulas = len(self.roomlog.keys())

    def get_roomlog(self):
        # self = env_01
        aulas = self.dim_aulas['AULA']
        room_log = {}
        for aula in aulas:
            room_log[aula] = {}
            for periodo_franja in self.dim_periodo_franja.keys():
                franjas = self.dim_periodo_franja[periodo_franja]['FRANJAS']
                dias = self.dim_periodo_franja[periodo_franja]['DIAS']
                for dia in dias:
                    room_log[aula][dia] = {}
                    for franja in franjas:
                        room_log[aula][dia][franja] = 0

        for item in self.items_bimestral:
            # conflict = 0
            dias = self.dim_frecuencia[item['FRECUENCIA']]['DIAS']
            franjas = self.dim_horario[item['HORARIO']]
            for dia in dias:
                for franja in franjas:
                    room_log[item['AULA']][dia][franja] = 1
        return room_log


    def step(self, action: int):
        if self.idx_item >= self.n_items:
            return None, None

        item = self.items[self.idx_item].copy()
        aula = self.dim_aulas['AULA'][action]
        aforo = self.dim_aulas['AFORO'][action]
        roomlog = self.roomlog.copy()

        if (aforo - item['ALUMN']) < 0:
            reward = aforo - item['ALUMN'] - 2
            response = '[Aforo-Alum<0]'
            
        elif ((aforo - item['ALUMN']) >= 0) and ((aforo - item['ALUMN']) <= 2):
            reward = 1 + (item['ALUMN']/aforo)
            response = '[Aforo=Alumn]|[Aforo-Alumn<=2]'
        else:
            reward = 0
            response = '[Aforo-Alumn>2]'

        dias = self.dim_frecuencia[item['FRECUENCIA']]['DIAS']
        franjas = self.dim_horario[item['HORARIO']]

        for dia in dias:
            for franja in franjas:
                roomlog[aula][dia][franja] = 1

        self.roomlog = roomlog.copy()
        self.idx_item += 1
        return reward, response

    def get_available_actions(self):
        if self.idx_item >= self.n_items:
            return []

        item = self.items[self.idx_item].copy()
        aulas = self.dim_aulas['AULA']
        roomlog = self.roomlog.copy()
        dias = self.dim_frecuencia[item['FRECUENCIA']]['DIAS']
        franjas = self.dim_horario[item['HORARIO']]

        available = []
        for idx, aula in enumerate(aulas):
            conflict = []
            for dia in dias:
                for franja in franjas:
                    conflict.append(True if roomlog[aula][dia][franja] == 1 else False)
            if not any(conflict):
                available.append(idx)
        return available

    def is_terminal(self):
        return self.idx_item >= self.n_items | (len(self.get_available_actions()) == 0)


class Node:
    def __init__(self, move=None, parent=None, untried_actions=None):
        self.move = move                  # the move that led to this node (from parent)
        self.parent = parent              # parent node
        self.children = []                # list of child nodes
        self.w = 0.0                   # number of wins for player_just_moved
        self.visits = 0                   # visit count
        self.untried_actions = [] if untried_actions is None else untried_actions[:]  # moves not expanded yet
        # self.player_just_moved = player_just_moved  # who moved to get to this node

    def uct_select_child(self, c_param=math.sqrt(2)):
        # Select a child according to UCT (upper confidence bound applied to trees)
        # If a child has 0 visits we consider its UCT value infinite to ensure it's visited.
        best = max(self.children, key=lambda child: (
            float('inf') if child.visits == 0 else
            (child.w / child.visits) + c_param * math.sqrt(math.log(self.visits) / child.visits)
        ))
        return best

    def add_child(self, move, untried_actions):
        child = Node(move=move, parent=self, untried_actions=untried_actions)
        self.untried_actions.remove(move)
        self.children.append(child)
        return child

    def update(self, reward):
        self.visits += 1
        self.w += reward


def UCT(state, iter_max=5000, c_param=math.sqrt(2)):
    # PATH = Path("project")
    # SEDE = 'Ica'
    # iter_max = 5000
    # c_param=math.sqrt(2)
    # dataset_01 = DataSet(PATH)
    # state = RoomLog(dataset_01, SEDE)
    root_node = Node(move=None,
                     parent=None,
                     untried_actions=state.get_available_actions())

    idx_item = state.idx_item
    roomlog = copy.deepcopy(state.roomlog.copy())

    for i in range(iter_max):
        # i = 0
        rewards = []
        node = root_node
        state.idx_item = idx_item
        state.roomlog = copy.deepcopy(roomlog.copy())

        # 1. Selection: descend until we find a node with untried actions or a leaf (no children)
        while node.untried_actions == [] and node.children:
            node = node.uct_select_child(c_param)
            reward, _ = state.step(node.move)
            rewards.append(reward)

        # 2. Expansion: if we can expand (i.e. state not terminal) pick an untried action
        if node.untried_actions:
            action = random.choice(node.untried_actions)
            reward, _ = state.step(action)
            
            rewards.append(reward)

            child_untried = state.get_available_actions()
            node = node.add_child(move=action,
                                  untried_actions=child_untried)

        # 3. Simulation: play randomly until the game ends
        while not state.is_terminal():
            possible_moves = state.get_available_actions()
            reward, _ = state.step(random.choice(possible_moves))
            rewards.append(reward)

        # 4. Backpropagation: update node statistics with simulation result
        n_items = min(max(state.idx_item, 1), state.n_items)

        while node is not None:
            node.update(sum(rewards)/n_items)
            node = node.parent
            # # for reward in rewards:

    # return the move that was most visited
    best_child = max(root_node.children, key=lambda c: c.visits)
    state.idx_item = idx_item
    state.roomlog = copy.deepcopy(roomlog.copy())

    return best_child.move, root_node, state  # also return root node so the caller can inspect children statistics


def UCT_worker(state, iter_max, c_param):
    # Clone state for isolation
    cloned_state = RoomLog(state.dataset, state.sede)
    cloned_state.idx_item = state.idx_item
    cloned_state.roomlog = copy.deepcopy(state.roomlog)

    move, root, _ = UCT(cloned_state, iter_max=iter_max, c_param=c_param)
    return root


def parallel_UCT(state, iter_max=5000, c_param=math.sqrt(2), n_workers=4):
    # split iterations across workers
    iters_per_worker = iter_max // n_workers
    roots = []

    with concurrent.futures.ProcessPoolExecutor(max_workers=n_workers) as executor:
        futures = [executor.submit(UCT_worker, state, iters_per_worker, c_param)
                   for _ in range(n_workers)]
        for f in concurrent.futures.as_completed(futures):
            roots.append(f.result())

    # merge children statistics from workers
    merged_root = Node(move=None, parent=None,
                       untried_actions=state.get_available_actions())
    move_to_node = {}

    for r in roots:
        for child in r.children:
            if child.move not in move_to_node:
                move_to_node[child.move] = Node(move=child.move,
                                                parent=merged_root,
                                                untried_actions=[])
            move_to_node[child.move].visits += child.visits
            move_to_node[child.move].w += child.w

    merged_root.children = list(move_to_node.values())
    best_child = max(merged_root.children, key=lambda c: c.visits)

    return best_child.move, merged_root, state

In [41]:
from collections import namedtuple

In [57]:
def run():
    assignamnent =namedtuple('Assignamnent', ['CODIGO_DE_CURSO', 'HORARIO', 'FRECUENCIA', 'AULA', 'ALUMN',
       'ASSIGNMENTS'])
    # --- Demo: use UCT on an empty RoomLog board ---
    path = Path("project")
    sede = 'Ica'
    dataset_01 = DataSet(path)
    state = RoomLog(dataset_01, sede)
    n_assigments = 0
    n_no_assigments = 0
    aulas = []
    while state.idx_item < len(state.items):
        start_time = time.time()
        result = copy.deepcopy(state.items[state.idx_item].copy())
        if len(state.get_available_actions()) == 0:
            result['ASSIGNMENTS'] = {'AULA': None,
                                     'AFORO': None}
            
            state.idx_item += 1
            n_no_assigments +=1
        else:
            move, root_node, state = UCT(state, iter_max=5000)
            # move, root_node, state = parallel_UCT(state, iter_max=5000, n_workers=7)
            result['ASSIGNMENTS'] = {'AULA':state.dim_aulas['AULA'][move],
                                     'AFORO':state.dim_aulas['AFORO'][move]}
            n_assigments+=1
            state.step(move)
        # print(assignamnent(**result))
        aulas.append(assignamnent(**result))
        duration = time.time() - start_time
        print(f"> Duration of Searching: {duration:.2f}s",
               f"\n # Assigments: {n_assigments} | # No Assigments: {n_no_assigments}")
    # df = pd.DataFrame(aulas)
    # df.to_excel('sususu.xlsx', index=False)
    # return df

In [58]:
run()

> Duration of Searching: 14.80s 
 # Assigments: 1 | # No Assigments: 0
> Duration of Searching: 18.78s 
 # Assigments: 2 | # No Assigments: 0
> Duration of Searching: 17.95s 
 # Assigments: 3 | # No Assigments: 0
> Duration of Searching: 18.45s 
 # Assigments: 4 | # No Assigments: 0
> Duration of Searching: 16.80s 
 # Assigments: 5 | # No Assigments: 0
> Duration of Searching: 11.03s 
 # Assigments: 6 | # No Assigments: 0
> Duration of Searching: 11.98s 
 # Assigments: 7 | # No Assigments: 0
> Duration of Searching: 11.97s 
 # Assigments: 8 | # No Assigments: 0
> Duration of Searching: 10.72s 
 # Assigments: 9 | # No Assigments: 0
> Duration of Searching: 10.20s 
 # Assigments: 10 | # No Assigments: 0
> Duration of Searching: 10.31s 
 # Assigments: 11 | # No Assigments: 0
> Duration of Searching: 9.93s 
 # Assigments: 12 | # No Assigments: 0
> Duration of Searching: 9.73s 
 # Assigments: 13 | # No Assigments: 0
> Duration of Searching: 9.77s 
 # Assigments: 14 | # No Assigments: 0
> Du

# AlphaZero