# DreamerV3-XP
This notebook contains the calls needed to replicate the experiments run in this project, visualizations and deeper technical explanations about our project.

### Reproduction
1. Select the Benchmark and tasks by defining a set containing the benchmark (only one at a time) and a set containing all tasks to run. For example: 
    ```
    DEFAULT_DATASETS = {"atari100k"}
    ATARI_TASKS = {"atari100k_krull", "atari100k_battle_zone", "atari100k_boxing"}
    ```
    Pass them to the `run_experiment` function for `datasets` and `tasks` respectively.
2. The configurations defined in `presets.py` will override the `configs.yaml`. Make sure they are as desired.
3. Run the experiment using the following command:

In [None]:
python experiments/experiment_definitions.py run_standard_dreamer --name "DreamerV3 Baseline" --description "DreamerV3 standard configuration run" --num_seeds 2

### Optimized Replay Buffer
Follow steps 1-3 from the previous section. To activate the prioritized replay buffer, `replay_context` has to be 1. The remaining important configurations we used are listed below:
* `"replay.fracs.uniform"`: `0.0`
* `"replay.fracs.priority"`: `1.0`
* `"replay.fracs.recency"`: `0.0`
* `"replay.prio.exponent"`: `0.8`
* `"replay.prio.maxfrac"`: `0.5`
* `"replay.prio.initial"`: `1.0`
* `"replay.prio.zero_on_sample"`: `False`

<br><br>
When setup, run:

In [None]:
python experiments/experiment_definitions.py run_replay_buffer_experiment --name "DreamerV3 Prioritized Replay Buffer" --description "DreamerV3 optimized replay buffer configuration run" --num_seeds 2

### Latent Reward Disagreement (Exp. Decay)
Follow steps 1-3 from the previous section. To activate the latent reward disagreement, set `agent.use_intrinsic` to `True` and `agent.intrinsic.scheduling_strategy` to `"exp_decay"` for exponential decay scheduling. The remaining important configurations we used for our experiments are listed below:
* `"agent.intrinsic.learn_strategy"`: `"joint_mlp"` > Other options are ema and perturbed_starts
* `"agent.intrinsic.exploration_type"`: `"reward_variance"` > Other options are state_disagreement
* `"agent.intrinsic.reward_type"`: `"disagreement"` > Other options include prediction_error and max_disagreement
* `"agent.intrinsic.scheduling_strategy"`: `"exp_decay"`

<br><br>
When setup, run:

In [None]:
python experiments/experiment_definitions.py run_latent_disagreement_experiment_exp_decay --name "DreamerV3 Latent Reward Disagreement with exponential decay scheduling" --description "DreamerV3 guided by latent reward disagreement with exponential decay scheduling" --num_seeds 2

### Latent Reward Disagreement (Exponential Moving Average Slope)
Follow steps 1-3 from the previous section. To activate the latent reward disagreement, set `agent.use_intrinsic` to `True` and `agent.intrinsic.scheduling_strategy` to `"slope_ema"` for EMA Slope scheduling. The remaining important configurations we used for our experiments are listed below:
* `"agent.intrinsic.learn_strategy"`: `"joint_mlp"` > Other options are ema and perturbed_starts
* `"agent.intrinsic.exploration_type"`: `"reward_variance"` > Other options are state_disagreement
* `"agent.intrinsic.reward_type"`: `"disagreement"` > Other options include prediction_error and max_disagreement
* `"agent.intrinsic.scheduling_strategy"`: `"slope_ema"`

<br><br>
When setup, run:

In [None]:
python experiments/experiment_definitions.py run_latent_disagreement_experiment_ema --name "DreamerV3 Latent Reward Disagreement with EMA slope scheduling" --description "DreamerV3 guided by latent reward disagreement with EMA slope scheduling" --num_seeds 2

### Results
The results are logged in the logdir. For plotting the results, please refer to the readme.

## Individual Contributions
Most of the ideation behind our extensions was conducted in brainstorming sessions that all team members attended. Even if it was not their main contribution, all team member contributed to all parts of this work. Parts of the implementation were done in peer-coding sessions.
- Lukas Bierling: Major efforts on the implementation of all extensions and its variants. Collaboration on the ideation and interpretation of results. Coordinated workstreams and repository use. General collaboration on ideation & implementation as indicated above.
- ⁠Davide Paserio: Design & parts of the Prioritized Replay Buffer implementation, contribution to the implementation of the latent reward disagreement. General collaboration on ideation & implementation as indicated above.
- Jan Henrik Bertrand: Desgin & parts of the Latent Reward Disagreement implementation. Design & Implementation of the experimental framework. Running the experiments. General collaboration on ideation & implementation as indicated above.
- ⁠Kiki van Gerwen: Design of the custom plotting tool, contribution to the implementation of the latent reward disagreement. General collaboration on ideation & implementation as indicated above. 
 The code was committed from the Snellius system under Jan's GitHub account due to shared access.

# Appendix

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import imageio
from IPython.display import Image, display
import io
import matplotlib.animation as animation
from scipy.special import expit as sigmoid
import base64

# Latent reward disagreement
Inspired by Plan2Explore's \cite{sekar2020plan2explore} "disagreement" over latent states predicted by an ensemble of world models, we use the disagreement over reward predictions from an ensemble of world models. To quantify the disagreement, the variance over the predicted rewards is taken and added to the mean of the predicted rewards to incentivize trajectories that are expected to be rewarding. This sum of mean and variance is our intrinsic reward. 
Each ensemble member $k \in \{1, .., K\}$, parameterized by $w_k$, recurrently predicts (i.e., "imagines") future deterministic latent states $h_{t'}^{w_k}$ over imagination horizon $L$ with $t'$ being a timestep within the horizon. The standard reward predictor then predicts the corresponding reward $\hat{r}_{k, t'} \sim p_\phi(\hat{r}_{t'} | h_{t'}^{w_k}, z_{t'})$. Formally,
$$
r_{t}^{intr} = \frac{1}{L} \sum_{t'=t}^{t+L} \left[ \bar{r}_{t'} + \frac{1}{K} \sum_{k=1}^{K} (\hat{r}_{k,t'} - \bar{r}_{t'})^2 \right]
$$
where $\bar{r}_{t'}$ is the mean predicted reward across all ensemble members at timestep $t'$ of the imagination. High variance indicates epistemic uncertainty over the predicted reward, and thus encourages exploration of the associated state. The final reward used for training is a convex combination of extrinsic and intrinsic rewards: 
$$r_t^{\text{total}} = \lambda r_t^{\text{ext}} + (1-\lambda) r_t^{\text{intr}}$$

The following code produces a simple visualization that explains the advantages of having intrinsic motivation alongside the extrinsic reward compared to having only environmental rewards in sparse rewards settings.

In [None]:
# Maze settings
maze_size = (7, 7)
goal_pos = (6, 6)
start_pos = (0, 0)
num_steps = 30

# Intrinsic reward settings
intr_reward_low, intr_reward_high = 0.08, 0.13

def run_agent(record_intrinsic):
    pos = list(start_pos)
    trajectory = [tuple(pos)]
    rewards = []
    intrinsic_rewards = []
    visited = set()
    for step in range(num_steps):
        visited.add(tuple(pos))
        # Random valid move
        moves = []
        for dx, dy in [(-1,0),(1,0),(0,-1),(0,1)]:
            nx, ny = pos[0]+dx, pos[1]+dy
            if 0 <= nx < maze_size[0] and 0 <= ny < maze_size[1]:
                moves.append((nx, ny))
        if moves:
            if record_intrinsic:
                unvisited = [m for m in moves if m not in visited]
                if unvisited:
                    pos = list(unvisited[np.random.randint(len(unvisited))])
                else:
                    pos = list(moves[np.random.randint(len(moves))])
            else:
                pos = list(moves[np.random.randint(len(moves))])
        trajectory.append(tuple(pos))
        # Reward logic
        extrinsic = 1 if tuple(pos) == goal_pos else 0
        if record_intrinsic:
            if extrinsic:
                intr = 0
            elif tuple(pos) not in trajectory[:-1]:  # novel state
                intr = np.random.uniform(intr_reward_low, intr_reward_high)
            else:
                intr = 0
            rewards.append(extrinsic + intr)
            intrinsic_rewards.append(intr)
        else:
            rewards.append(extrinsic)
            intrinsic_rewards.append(0)
    return trajectory, rewards, intrinsic_rewards

extrinsic_traj, extrinsic_rewards, _ = run_agent(record_intrinsic=False)
intrinsic_traj, intrinsic_total_rewards, intrinsic_rewards = run_agent(record_intrinsic=True)

# Generate GIF frames
frames = []
extrinsic_return = 0
intrinsic_return = 0

for t in range(num_steps+1):
    fig, axs = plt.subplots(1,2, figsize=(9,4.5))
    for idx, (ax, traj, rewards, intrinsic, label) in enumerate(
        zip(
            axs,
            [extrinsic_traj, intrinsic_traj],
            [extrinsic_rewards, intrinsic_total_rewards],
            [_, intrinsic_rewards],
            ['Sparse (Extrinsic) Reward', 'Intrinsic + Extrinsic Reward']
        )
    ):
        ax.set_title(label, fontsize=11)
        ax.set_xlim(-0.5, maze_size[0]-0.5)
        ax.set_ylim(-0.5, maze_size[1]-0.5)
        ax.set_xticks([])
        ax.set_yticks([])
        ax.set_aspect('equal')

        # Draw grid
        for i in range(maze_size[0]+1):
            ax.plot([i-0.5, i-0.5], [-0.5, maze_size[1]-0.5], color='gray', linewidth=0.5)
            ax.plot([-0.5, maze_size[0]-0.5], [i-0.5, i-0.5], color='gray', linewidth=0.5)

        # Draw goal
        ax.add_patch(patches.Rectangle(
            (goal_pos[0]-0.5, goal_pos[1]-0.5), 1, 1, color='gold', alpha=0.7, zorder=0
        ))
        ax.text(goal_pos[0], goal_pos[1], "Goal", ha='center', va='center', fontsize=8, color='black')

        # Draw agent's trail and rewards
        for i, (x, y) in enumerate(traj[:t+1]):
            if i == 0:
                color = 'gray'
                intrinsic_val = 0
            else:
                color = (
                    'royalblue' if label.startswith('Intrinsic') and intrinsic[i-1] > 0
                    else ('red' if i != len(traj)-1 else 'green')
                )
                intrinsic_val = intrinsic[i-1]

            # Trail
            ax.plot(x, y, 'o', color=color, markersize=7, alpha=0.7 if i < t else 1.0, zorder=1)
            # Reward marker
            if (x, y) == goal_pos:
                ax.text(x, y+0.3, "+1", ha='center', va='bottom', color='green', fontsize=10, fontweight='bold')
            elif label.startswith('Intrinsic') and i > 0 and intrinsic_val > 0:
                ax.text(x, y+0.2, f"+{intrinsic_val:.2f}", ha='center', va='bottom', color='blue', fontsize=8, fontweight='bold')
            elif label.startswith('Sparse') and i > 0 and (x, y) != goal_pos:
                ax.text(x, y+0.2, "0", ha='center', va='bottom', color='gray', fontsize=8, fontweight='bold')

        # Draw robot agent (last)
        x, y = traj[t]
        ax.text(x, y, "🤖", fontsize=23, ha='center', va='center', zorder=2)

        # Show reward at current step
        if t > 0:
            if (x, y) == goal_pos:
                ax.text(x, y+0.6, "+1", ha='center', va='bottom', color='green', fontsize=13, fontweight='bold')
            elif label.startswith('Intrinsic') and intrinsic[t-1] > 0:
                ax.text(x, y+0.6, f"+{intrinsic[t-1]:.2f}", ha='center', va='bottom', color='blue', fontsize=12, fontweight='bold')
            elif label.startswith('Sparse') and (x, y) != goal_pos:
                ax.text(x, y+0.6, "0", ha='center', va='bottom', color='gray', fontsize=12, fontweight='bold')

        # Compute return so far
        curr_return = np.sum(rewards[:t+1])
        if idx == 0:
            extrinsic_return = curr_return
        else:
            intrinsic_return = curr_return

        # Draw cumulative return
        ax.text(0, -1.2, f"Return: {curr_return:.2f}", fontsize=12, ha='left', color='black')

    plt.tight_layout()
    fig.subplots_adjust(bottom=0.18)
    fig.canvas.draw()
    buf = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8)
    w, h = fig.canvas.get_width_height()
    frame = buf.reshape((h, w, 4))[..., :3]
    frames.append(frame)
    plt.close(fig)

# Save GIF
# Save the GIF to an in-memory buffer
gif_buffer = io.BytesIO()
imageio.mimsave(gif_buffer, frames, format='gif', duration=1.01)
gif_buffer.seek(0)

# Display in Jupyter notebook
display(Image(data=gif_buffer.read(), format='gif'))

## EMA gradient as a scheduler for the importance of the intrinsic reward in the total reward

To balance exploration and exploitation, we combine extrinsic and intrinsic rewards using a weighting factor $\lambda$. We experiment with two strategies for adapting $\lambda$ over time. First, we apply exponential decay, gradually reducing the influence of intrinsic rewards as training progresses. Second, we explore a dynamic adjustment using the gradient of an exponential moving average (EMA) of the episode return: $\lambda$ is decreased when performance tends to improve and increased when learning stagnates or regresses. This encourages exploration when necessary and promotes exploitation when training is stable.

The following code generate a simple visualization to understand how the exponential moving average gradient of the episode return works as a scheduler of the importance of the intrinsic reward on the overall reward.

In [None]:
np.random.seed(42)
steps = 5000

# --- 1. Reward pattern: down, up, plateau, down, big up ---
segments = [
    np.linspace(40_000, 15_000, 600),              # down
    np.linspace(15_000, 45_000, 1000),             # up
    np.ones(700) * 45_000,                         # plateau
    np.linspace(45_000, 20_000, 900),              # down
    np.linspace(20_000, 90_000, steps - 3200)      # big up
]
reward = np.concatenate(segments)
reward = reward[:steps]
reward += np.random.normal(0, 15_000, size=steps)  # Even more noise
reward = np.clip(reward, 0, None)

# --- 2. EMA ---
def compute_ema(data, alpha=0.01):
    ema = np.zeros_like(data)
    ema[0] = data[0]
    for t in range(1, len(data)):
        ema[t] = alpha * data[t] + (1 - alpha) * ema[t - 1]
    return ema

ema = compute_ema(reward, alpha=0.01)

window = 500
n_frames = steps - window

slopes = np.zeros(n_frames)
lambdas = np.zeros(n_frames)

for i in range(n_frames):
    ema_window = ema[i:i+window]
    ema_min, ema_max = ema_window.min(), ema_window.max()
    if ema_max - ema_min == 0:
        scaled = np.zeros_like(ema_window)
    else:
        scaled = (ema_window - ema_min) / (ema_max - ema_min)
    slope = scaled[-1] - scaled[0]
    slopes[i] = slope
    lambdas[i] = sigmoid(slope)

# --- 3. Animation ---
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(6, 4), sharex=True, gridspec_kw={'height_ratios':[2,1]})

n_gif_frames = 60
frame_indices = np.linspace(0, n_frames-1, n_gif_frames).astype(int)

def animate(frame):
    ax1.clear()
    ax2.clear()
    start = frame
    end = frame + window

    # Top plot: EMA (scaled), current window
    full_scaled = (ema - ema.min()) / (ema.max() - ema.min())
    ax1.plot(np.arange(steps), full_scaled, color='blue', alpha=0.3, lw=1, label='EMA of rewards')
    window_ema = ema[start:end]
    win_min, win_max = window_ema.min(), window_ema.max()
    if win_max - win_min == 0:
        window_scaled = np.zeros_like(window_ema)
    else:
        window_scaled = (window_ema - win_min) / (win_max - win_min)
    ax1.plot(np.arange(start, end), window_scaled, color='red', lw=1.5, label='Current window')
    ax1.scatter([start, end-1], [window_scaled[0], window_scaled[-1]], color='black', zorder=10, s=24)
    # Draw slope as orange line
    ax1.plot(
        [start, end-1],
        [window_scaled[0], window_scaled[-1]],
        color='orange', lw=2, label='Slope'
    )
    ax1.set_ylabel('EMA (window min-max scaled)')
    ax1.legend(loc='upper left', fontsize=8)
    ax1.set_title(f'Window: {start}-{end-1} | Slope: {slopes[frame]:.2f} | Lambda: {lambdas[frame]:.2f}')
    ax1.set_ylim(-0.2, 1.2)

    # Bottom plot: Lambda
    ax2.plot(np.arange(frame+1), lambdas[:frame+1], color='green', lw=1.5, label='Lambda')
    ax2.set_xlim(0, n_frames)
    ax2.set_ylim(0, 1)
    ax2.set_ylabel('Lambda')
    ax2.set_xlabel('Step')
    ax2.legend(loc='upper left', fontsize=8)
    ax2.axvline(frame, color='grey', lw=1, alpha=0.3)
    plt.tight_layout()

ani = animation.FuncAnimation(
    fig, animate, frames=frame_indices, interval=300, repeat=False
)

gif_path = "lambda_animation.gif"
ani.save(gif_path, writer='pillow', fps=4)
plt.close(fig)  # Prevent extra empty plot

# Display the GIF in the notebook
display(Image(filename=gif_path))