
# RL for Blockchain Resource Optimization — **Algorithm Blueprint**
This notebook is a **clean, publication-ready DRC blueprint** that documents the exact algorithmic structure used:
- **DDPG** (continuous actions)
- **P‑DQN** (mixed discrete + continuous)
- **Prioritized Experience Replay**
- **SimPy discrete‑event environment** (`reset`, `step`)

It includes **mathematical definitions, interfaces, and code scaffolds** so your client can understand and cite the implementation while core logic remains protected pending final handover.



## Pipeline at a Glance
1. `SimEnv.reset()` → initial state vector \(s_0\)  
2. Agent policy \(\pi_\theta\) selects action \(a_t\)  
3. `SimEnv.step(a_t)` → \((s_{t+1}, r_t, done, info)\)  
4. Store \((s_t, a_t, r_t, s_{t+1}, done)\) in **Prioritized Replay**  
5. Sample minibatches, compute TD targets, update networks  
6. Soft-update target networks \(\theta^- \leftarrow \tau \theta + (1-\tau)\theta^-\)



## Reward (MAP‑style)
\[
r_t = -\alpha \cdot \text{latency}_t \;+\; \beta \cdot \text{throughput}_t \;-\; \gamma \cdot \text{orphan\_rate}_t
\]
Typical ranges: \(\alpha\in[0.2,2],\ \beta\in[10^{-4},10^{-2}],\ \gamma\in[1,20]\).


## SimPy Environment — Interface & Contract

In [None]:

class SimEnv:
    """
    Discrete-event blockchain simulator (SimPy-based in full implementation).

    State vector s_t (example):
        [latency_to_peers_mean, mempool_depth, available_bandwidth,
         recent_orphan_rate, recent_throughput]

    Action vector a_t (DDPG):
        [neighbors_mask[0:K], block_size_norm, block_interval_norm]

    Mixed action (P-DQN):
        discrete: strategy_id in {0..K-1}
        continuous: [block_size_norm, block_interval_norm]

    Methods
    -------
    reset() -> state
        Initialize network, return s0.

    step(action) -> (state_prime, reward, done, info)
        Advance simulation using gossip + block params from 'action'.
        Returns:
            state_prime : next state vector
            reward : scalar as per reward function
            done   : episode termination flag
            info   : dict for logging (latency, throughput, orphan_rate, etc.)
    """
    def __init__(self, config):
        self.config = config
        # NOTE: Full SimPy processes (tx arrivals, mining, delays) are withheld.
        # Hooks below define the public contract used by the agents.

    def reset(self):
        # TODO: initialize internal SimPy processes and metrics
        raise NotImplementedError("Withheld in teaser. Provided on final handover.")

    def step(self, action):
        # TODO: apply action, run events for Δt, compute metrics & reward
        raise NotImplementedError("Withheld in teaser. Provided on final handover.")


## Prioritized Experience Replay (PER)


Priority for transition \(i\): \(p_i = (|\delta_i| + \epsilon)^\alpha\) where \(\delta_i\) is TD‑error.  
Sampling prob: \(P(i)=\frac{p_i}{\sum_j p_j}\).  
Importance‑sampling weight: \(w_i = \left(\frac{1}{N}\cdot\frac{1}{P(i)}\right)^\beta\), normalized by \(\frac{w_i}{\max_j w_j}\).


In [None]:

import numpy as np

class PrioritizedReplayBuffer:
    """
    Prioritized replay with proportional prioritization.

    Parameters
    ----------
    capacity : int
    alpha : float   # priority exponent
    beta0 : float   # initial IS exponent
    beta_inc : float  # beta schedule per sampling step
    eps : float     # small constant to avoid zero priority
    """
    def __init__(self, capacity=100_000, alpha=0.6, beta0=0.4, beta_inc=1e-4, eps=1e-6):
        self.capacity = capacity
        self.alpha = alpha
        self.beta = beta0
        self.beta_inc = beta_inc
        self.eps = eps
        self.ptr = 0
        self.size = 0
        self.states = []
        self.actions = []
        self.rewards = []
        self.next_states = []
        self.dones = []
        self.priorities = np.zeros((capacity,), dtype=np.float32)

    def add(self, s, a, r, s2, d, td_error=None):
        if self.size < self.capacity:
            self.states.append(s); self.actions.append(a)
            self.rewards.append(r); self.next_states.append(s2)
            self.dones.append(d)
        else:
            self.states[self.ptr] = s; self.actions[self.ptr] = a
            self.rewards[self.ptr] = r; self.next_states[self.ptr] = s2
            self.dones[self.ptr] = d

        p = (abs(td_error) + self.eps) if td_error is not None else 1.0
        self.priorities[self.ptr] = p ** self.alpha

        self.ptr = (self.ptr + 1) % self.capacity
        self.size = min(self.size + 1, self.capacity)

    def sample(self, batch_size):
        assert self.size > 0, "Buffer empty"

        probs = self.priorities[:self.size]
        probs = probs / probs.sum()

        idx = np.random.choice(self.size, size=batch_size, p=probs, replace=False)

        # Importance-sampling weights
        self.beta = min(1.0, self.beta + self.beta_inc)
        weights = (self.size * probs[idx]) ** (-self.beta)
        weights = weights / weights.max()

        batch = dict(
            states=[self.states[i] for i in idx],
            actions=[self.actions[i] for i in idx],
            rewards=[self.rewards[i] for i in idx],
            next_states=[self.next_states[i] for i in idx],
            dones=[self.dones[i] for i in idx],
            idx=idx,
            weights=weights.astype(np.float32)
        )
        return batch

    def update_priorities(self, idx, td_errors):
        for i, e in zip(idx, td_errors):
            self.priorities[i] = (abs(e) + self.eps) ** self.alpha


## Deep Deterministic Policy Gradient (DDPG)


Actor \(\mu_\theta(s)\) outputs continuous action \(a\).  
Critic \(Q_\phi(s,a)\) approximates state‑action value.  
Targets: \(y=r + \gamma Q_{\phi^-}(s', \mu_{\theta^-}(s'))\).  
Soft update: \(\theta^- \leftarrow \tau\theta + (1-\tau)\theta^-\), similarly for \(\phi^-\).


In [None]:

import numpy as np

class DDPGAgent:
    """
    DDPG skeleton with exploration noise and soft target updates.

    NOTE: Network architectures and optimizers are intentionally omitted.
    Replace '... raise NotImplementedError' with your model code at handover.
    """
    def __init__(self, state_dim, action_dim, cfg):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.cfg = cfg
        # self.actor = ...
        # self.critic = ...
        # self.actor_tgt = ...
        # self.critic_tgt = ...
        # self.opt_actor = ...
        # self.opt_critic = ...

        # Exploration parameters
        self.noise_sigma = cfg.get("noise_sigma", 0.2)
        self.noise_decay = cfg.get("noise_decay", 0.999)
        self.tau = cfg.get("tau", 0.005)
        self.gamma = cfg.get("gamma", 0.99)

    def select_action(self, state, explore=True):
        # a = self.actor(state)
        # placeholder: zero vector + noise
        a = np.zeros(self.action_dim, dtype=np.float32)
        if explore:
            a = a + np.random.normal(0, self.noise_sigma, size=self.action_dim)
        return a

    def learn(self, batch):
        """
        Use batch = {states, actions, rewards, next_states, dones, weights}
        to update critic and actor.
        """
        raise NotImplementedError("Core update logic withheld for final delivery.")

    def soft_update(self, online, target):
        # for online_param, target_param in zip(online.parameters(), target.parameters()):
        #     target_param.data.copy_(self.tau * online_param.data + (1 - self.tau) * target_param.data)
        raise NotImplementedError("Withheld.")

    def decay_exploration_noise(self):
        self.noise_sigma *= self.noise_decay


## Parameterized DQN (P‑DQN)


Two components:  
1) Discrete Q‑head \(Q(s,k)\) for strategy \(k\in\{0,\dots,K-1\}\) (ε‑greedy for exploration).  
2) Parameter networks output continuous \(\theta_k(s)\) (e.g., block size, interval).  
Action = \((k, \theta_k(s))\).  
Target: \(y = r + \gamma \max_{k'} Q^-(s',k')\) with double‑DQN option.


In [None]:

import numpy as np

class PDQNAgent:
    """
    Parameterized DQN skeleton for mixed action spaces.

    NOTE: Neural modules are deliberately abstracted.
    """
    def __init__(self, state_dim, num_strategies, param_dim, cfg):
        self.state_dim = state_dim
        self.num_strategies = num_strategies  # K
        self.param_dim = param_dim            # e.g., [block_size, block_interval]
        self.cfg = cfg
        self.eps = cfg.get("eps_start", 1.0)
        self.eps_min = cfg.get("eps_min", 0.05)
        self.eps_decay = cfg.get("eps_decay", 0.995)
        self.gamma = cfg.get("gamma", 0.99)
        # self.q_net = ...
        # self.q_tgt = ...
        # self.param_nets = [...]

    def select_action(self, state, explore=True):
        if explore and np.random.rand() < self.eps:
            k = np.random.randint(self.num_strategies)
        else:
            # k = argmax_k Q(s,k)  # placeholder
            k = 0
        # theta = param_nets[k](s)  # placeholder
        theta = np.zeros(self.param_dim, dtype=np.float32)
        return (k, theta)

    def learn(self, batch):
        raise NotImplementedError("Core PDQN update logic withheld for final delivery.")

    def decay_epsilon(self):
        self.eps = max(self.eps_min, self.eps * self.eps_decay)


## Training Loop — Contract (Pseudo-Executable Skeleton)

In [None]:

def training_loop(env, agent, replay, metrics, episodes=10, batch_size=64):
    """
    Contract of the main loop (structure only).
    """
    for ep in range(episodes):
        s = env.reset()
        done = False
        while not done:
            a = agent.select_action(s, explore=True)
            s2, r, done, info = env.step(a)
            replay.add(s, a, r, s2, float(done))

            if replay.size >= batch_size:
                batch = replay.sample(batch_size)
                agent.learn(batch)

            # metrics.record(r, info)  # implement externally
            s = s2

        # agent.soft_update(...); agent.decay_exploration_noise()
        # if isinstance(agent, PDQNAgent): agent.decay_epsilon()


## Evaluation & Plots (Interfaces)

In [None]:

class MetricsTracker:
    def __init__(self):
        self.history = []  # append dicts with {'reward':..., 'latency':..., 'throughput':..., 'orphan_rate':...}

    def record(self, reward, info):
        row = {'reward': reward}
        row.update(info or {})
        self.history.append(row)

    def to_dataframe(self):
        import pandas as pd
        return pd.DataFrame(self.history)

    def plot_learning_curve(self, df=None):
        import matplotlib.pyplot as plt
        if df is None:
            df = self.to_dataframe()
        plt.figure()
        plt.plot(df['reward'])
        plt.title('Reward vs. step')
        plt.xlabel('step'); plt.ylabel('reward')
        plt.show()

    def plot_throughput_vs_latency(self, df=None):
        import matplotlib.pyplot as plt
        if df is None:
            df = self.to_dataframe()
        plt.figure()
        plt.scatter(df['latency'], df['throughput'])
        plt.title('Throughput vs. Confirmation Latency')
        plt.xlabel('latency'); plt.ylabel('throughput')
        plt.show()

    def plot_orphan_rate_over_time(self, df=None):
        import matplotlib.pyplot as plt
        if df is None:
            df = self.to_dataframe()
        plt.figure()
        plt.plot(df['orphan_rate'])
        plt.title('Orphan Block Rate Over Time')
        plt.xlabel('step'); plt.ylabel('orphan_rate')
        plt.show()
