**Q_network**

In [1]:
import tensorflow as tf


class QNetwork(tf.keras.Model):
    """
    A simple fully-connected Q-network for DQN/DDQN.

    Architecture:
      Input:  state vector of dimension `input_dim`
      Hidden layers: sequence of Dense(hidden_units[i]) with ReLU
      Output layer: Dense(output_dim) producing Q-values (linear activation)

    Example usage:
        model = QNetwork(input_dim=100, output_dim=101)
        q_values = model(tf.random.uniform((batch_size, 100)))
    """

    def __init__(
        self,
        input_dim: int,
        output_dim: int,
        hidden_units: list[int] = [64, 64],
        name: str = "q_network",
    ):
        """
        Initializes the QNetwork.

        Args:
            input_dim: Dimension of the input state vector.
            output_dim: Number of actions (size of the Q-value output).
            hidden_units: List of integers, the number of units in each hidden layer.
            name: Optional name for the Keras model.
        """
        super().__init__(name=name)

        # Create hidden layers
        self.hidden_layers = []
        for i, units in enumerate(hidden_units):
            self.hidden_layers.append(
                tf.keras.layers.Dense(
                    units,
                    activation="relu",
                    kernel_initializer="he_uniform",
                    name=f"hidden_{i+1}"
                )
            )

        # Output layer (linear activation for Q-values)
        self.output_layer = tf.keras.layers.Dense(
            output_dim,
            activation=None,
            kernel_initializer="he_uniform",
            name="q_values"
        )

    def call(self, inputs: tf.Tensor) -> tf.Tensor:
        """
        Forward pass: computes Q-values for each action.

        Args:
            inputs: Tensor of shape (batch_size, input_dim).

        Returns:
            q_values: Tensor of shape (batch_size, output_dim).
        """
        x = tf.cast(inputs, tf.float32)
        for layer in self.hidden_layers:
            x = layer(x)
        q_values = self.output_layer(x)
        return q_values

    def build_graph(self):
        """
        Optional helper: builds the model by passing a dummy input.
        Useful to display the model summary before training.
        """
        dummy = tf.keras.Input(shape=(None, ), dtype=tf.float32)
        self.call(dummy)


if __name__ == "__main__":
    # Quick smoke test
    input_dim = 100
    output_dim = 101
    batch_size = 32

    model = QNetwork(input_dim=input_dim, output_dim=output_dim)
    # Build the model by calling it once
    dummy_input = tf.random.uniform((batch_size, input_dim))
    dummy_q = model(dummy_input)

    print("Input shape:", dummy_input.shape)
    print("Output shape:", dummy_q.shape)
    model.summary()


Input shape: (32, 100)
Output shape: (32, 101)


**Prioritized Replay Buffer**

In [2]:
import random
import numpy as np
from typing import Any, List, Tuple

Transition = Tuple[Any, Any, float, Any, bool]
EPSILON = 1e-6  # small constant to avoid zero priority


class SumSegmentTree:
    """Binary indexed segment tree supporting sum queries and prefix‐sum indexing."""
    def __init__(self, capacity: int):
        # Next power of two for capacity
        self._n = 1
        while self._n < capacity:
            self._n <<= 1
        self._size = capacity
        # Tree array: [1 .. 2*n), 1-based indexing at root=1
        self._tree = np.zeros(2 * self._n, dtype=np.float32)

    def update(self, idx: int, value: float):
        """Set value at leaf idx, then update internal nodes."""
        tree_idx = idx + self._n
        self._tree[tree_idx] = value
        # Walk up and update parents
        parent = tree_idx >> 1
        while parent >= 1:
            self._tree[parent] = self._tree[2*parent] + self._tree[2*parent + 1]
            parent >>= 1

    def sum_total(self) -> float:
        """Returns sum over all leaf values."""
        return float(self._tree[1])

    def find_prefixsum_idx(self, prefix: float) -> int:
        """
        Find highest idx such that cumulative sum up to idx >= prefix.
        Returns a leaf index in [0, size).
        """
        idx = 1
        while idx < self._n:  # while not at leaf
            left = 2 * idx
            if self._tree[left] >= prefix:
                idx = left
            else:
                prefix -= self._tree[left]
                idx = left + 1
        return idx - self._n


class MinSegmentTree:
    """Similar to SumSegmentTree but supports range minimum query over priorities."""
    def __init__(self, capacity: int):
        # Next power of two for capacity
        self._n = 1
        while self._n < capacity:
            self._n <<= 1
        self._size = capacity
        # Initialize with +inf so unused leaves don't interfere
        self._tree = np.full(2 * self._n, float('inf'), dtype=np.float32)

    def update(self, idx: int, value: float):
        """Set value at leaf idx, then update internal nodes with min."""
        tree_idx = idx + self._n
        self._tree[tree_idx] = value
        parent = tree_idx >> 1
        while parent >= 1:
            self._tree[parent] = min(self._tree[2*parent], self._tree[2*parent + 1])
            parent >>= 1

    def min(self) -> float:
        """Returns minimum over all leaf values."""
        return float(self._tree[1])


class PrioritizedReplayBuffer:
    """
    Standalone Prioritized Experience Replay Buffer.

    Stores transitions with priorities, supports sampling by priority,
    and updating priorities. Internally uses a SumSegmentTree for
    proportional sampling and a MinSegmentTree for retrieving the
    minimum priority (for importance‐sampling weight normalization).
    """

    def __init__(self, capacity: int, alpha: float = 0.6):
        """
        Args:
            capacity: Maximum number of transitions to store.
            alpha: Priority exponent (0 = uniform sampling, 1 = full prioritization).
        """
        self.capacity = capacity
        self.alpha = alpha

        # Segment trees
        self._sum_tree = SumSegmentTree(capacity)
        self._min_tree = MinSegmentTree(capacity)

        # Experience storage
        self._data: List[Transition] = [None] * capacity
        self._next_idx = 0
        self._size = 0

        # Track maximal priority for new transitions
        self._max_priority = 1.0

    def __len__(self) -> int:
        return self._size
    size = __len__

    def store(self, transition: Transition):
        """
        Adds a new transition to the buffer with maximal priority.

        Args:
            transition: A tuple (state, action, reward, next_state, done).
        """
        idx = self._next_idx
        self._data[idx] = transition

        # Assign max priority to new transition
        priority = self._max_priority ** self.alpha
        self._sum_tree.update(idx, priority)
        self._min_tree.update(idx, priority)

        # Advance pointer
        self._next_idx = (self._next_idx + 1) % self.capacity
        self._size = min(self._size + 1, self.capacity)
    add=store
    
    def sample(
        self,
        batch_size: int,
        beta: float = 0.4
    ) -> Tuple[List[Transition], List[int], np.ndarray]:
        """
        Samples a batch of transitions with probabilities proportional to priority.
        Returns transitions, their indices, and importance‐sampling weights.

        Args:
            batch_size: Number of transitions to sample.
            beta: Importance-sampling exponent (0 = no correction, 1 = full correction).

        Returns:
            transitions: List of sampled transitions.
            indices: List of indices in the buffer.
            weights: Array of shape (batch_size,) of IS weights in [0,1].
        """
        assert self._size > 0, "Cannot sample from an empty buffer"

        # Total priority mass
        total_sum = self._sum_tree.sum_total()
        segment = total_sum / batch_size

        transitions = []
        indices = []
        weights = np.empty(batch_size, dtype=np.float32)

        # Minimum probability for weight normalization
        min_prob = self._min_tree.min() / total_sum
        max_weight = (min_prob * self._size) ** (-beta)

        for i in range(batch_size):
            a = segment * i
            b = segment * (i + 1)
            s = random.uniform(a, b)
            idx = self._sum_tree.find_prefixsum_idx(s)

            transitions.append(self._data[idx])
            indices.append(idx)

            # Compute importance-sampling weight
            p_i = self._sum_tree._tree[idx + self._sum_tree._n] / total_sum
            w = (p_i * self._size) ** (-beta)
            weights[i] = w / max_weight  # normalize to [0, 1]

        return transitions, indices, weights

    def update_priorities(self, indices: List[int], priorities: List[float]):
        """
        Updates the priorities of sampled transitions.

        Args:
            indices: List of buffer indices for the transitions.
            priorities: List of new priority values (e.g. absolute TD errors).
        """
        for idx, p in zip(indices, priorities):
            # Add a small epsilon and apply alpha exponent
            p_adjusted = (abs(p) + EPSILON) ** self.alpha
            self._sum_tree.update(idx, p_adjusted)
            self._min_tree.update(idx, p_adjusted)
            # Track max raw priority for new inserts
            self._max_priority = max(self._max_priority, abs(p) + EPSILON)


buffer test

In [3]:
import numpy as np
from collections import defaultdict

# 1) Create a tiny buffer
buf = PrioritizedReplayBuffer(capacity=8, alpha=0.6)

# 2) Push in some “dummy” transitions
for i in range(8):
    # (state, action, reward, next_state, done)
    tr = (np.array([i]), i, float(i), np.array([i+1]), False)
    buf.store(tr)

print("Buffer size (should be 8):", len(buf))

# 3) Sample a batch
batch, idxs, weights = buf.sample(batch_size=4, beta=0.4)
print("\nSampled transitions:")
for t, idx, w in zip(batch, idxs, weights):
    print(f" idx={idx:2d}  tr={t}  w={w:.4f}")

# 4) Check shapes
assert len(batch) == 4
assert len(idxs)  == 4
assert weights.shape == (4,)

# 5) Artificially “update” their priorities (e.g. new TD‐errors = [1,2,3,4])
new_prios = [1.0, 2.0, 3.0, 4.0]
buf.update_priorities(idxs, new_prios)

# 6) Re‐sample and see if the indices with higher prios appear more often
counts = defaultdict(int)           # will auto‐zero missing keys
for _ in range(5000):
    _, (idx,), _ = buf.sample(batch_size=1, beta=1.0)
    counts[idx] += 1
# Now inspect just the ones we care about:
for idx in idxs:
    print(f" idx={idx:2d}  count={counts[idx]}")


# If everything is correctly:
#  - No exceptions should be raised
#  - The ‘counts’ for higher‐priority indices (those we gave larger new_prios)
#    should be noticeably larger than for the smaller‐priority ones.


Buffer size (should be 8): 8

Sampled transitions:
 idx= 1  tr=(array([1]), 1, 1.0, array([2]), False)  w=1.0000
 idx= 3  tr=(array([3]), 3, 3.0, array([4]), False)  w=1.0000
 idx= 5  tr=(array([5]), 5, 5.0, array([6]), False)  w=1.0000
 idx= 7  tr=(array([7]), 7, 7.0, array([8]), False)  w=1.0000
 idx= 1  count=437
 idx= 3  count=724
 idx= 5  count=869
 idx= 7  count=1132


**DDQN + PER Agent**

In [4]:
import numpy as np
import tensorflow as tf
from q_network import QNetwork
from per_buffer import PrioritizedReplayBuffer


class DDQNPERAgent:
    def __init__(
        self,
        state_dim,
        action_dim,
        gamma=0.99,
        learning_rate=1e-3,
        epsilon=1.0,
        epsilon_min=0.1,
        epsilon_decay=0.995,
        batch_size=64,
        memory_capacity=100_000,
        alpha=0.6,
        beta=0.4,
        beta_increment=1e-6,
        target_update_freq=1000,
    ):
        # Dimensions
        self.state_dim = state_dim
        self.action_dim = action_dim

        # Hyperparameters
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.batch_size = batch_size
        self.beta = beta
        self.beta_increment = beta_increment
        self.target_update_freq = target_update_freq

        # Replay memory with Prioritized Experience Replay
        self.memory = PrioritizedReplayBuffer(capacity=memory_capacity, alpha=alpha)

        # Q-Networks: online & target
        self.q_network = QNetwork(self.state_dim, self.action_dim)
        self.target_q_network = QNetwork(self.state_dim, self.action_dim)
        # Initialize target weights
        self.target_q_network.set_weights(self.q_network.get_weights())

        # Optimizer and loss
        self.optimizer = tf.keras.optimizers.Adam(learning_rate)
        self.loss_fn = tf.keras.losses.Huber()

        # Training step counter
        self.train_step_count = 0

    def act(self, state):
        """
        Selects an action using epsilon-greedy policy.
        """
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.action_dim)
        # Greedy action from Q-network
        state_tensor = tf.convert_to_tensor(state[None, :], dtype=tf.float32)
        q_values = self.q_network(state_tensor)
        return int(tf.argmax(q_values[0]).numpy())

    def store_transition(self, state, action, reward, next_state, done):
        """
        Adds a transition to the replay buffer.
        """
        transition = (state, action, reward, next_state, done)
        self.memory.add(transition)

    def train_step(self):
        """
        Samples a batch from memory and updates the Q-network.
        """
        # Do not train until enough samples
        if self.memory.size() < self.batch_size:
            return

        # Sample from PER memory
        (states, actions, rewards, next_states, dones,
         weights, indices) = self.memory.sample(self.batch_size, beta=self.beta)

        # Convert to tensors
        states_tf = tf.convert_to_tensor(states, dtype=tf.float32)
        next_states_tf = tf.convert_to_tensor(next_states, dtype=tf.float32)
        actions_tf = tf.convert_to_tensor(actions, dtype=tf.int32)
        rewards_tf = tf.convert_to_tensor(rewards, dtype=tf.float32)
        dones_tf = tf.convert_to_tensor(dones, dtype=tf.float32)
        weights_tf = tf.convert_to_tensor(weights, dtype=tf.float32)

        # Double DQN target computation
        # 1) Action selection with online network
        q_next_online = self.q_network(next_states_tf)
        best_actions = tf.argmax(q_next_online, axis=1)
        # 2) Q-value evaluation with target network
        q_next_target = self.target_q_network(next_states_tf)
        batch_indices = tf.range(self.batch_size, dtype=tf.int64)
        target_q_values = tf.gather_nd(q_next_target,
                                        tf.stack([batch_indices, best_actions], axis=1))
        # Compute TD targets: r + gamma * Q_target(s', argmax Q_online)
        targets = rewards_tf + self.gamma * target_q_values * (1 - dones_tf)

        # Train online Q-network
        with tf.GradientTape() as tape:
            q_values = self.q_network(states_tf)
            # Select Q-values for taken actions
            action_mask = tf.one_hot(actions_tf, self.action_dim, dtype=tf.float32)
            q_selected = tf.reduce_sum(q_values * action_mask, axis=1)

            # Compute Huber loss per sample
            loss_unweighted = self.loss_fn(targets, q_selected)
            # Apply importance-sampling weights
            loss = tf.reduce_mean(weights_tf * loss_unweighted)

        # Backpropagation
        grads = tape.gradient(loss, self.q_network.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.q_network.trainable_variables))

        # Update priorities in replay buffer using absolute TD errors
        td_errors = tf.abs(targets - q_selected).numpy() + 1e-6
        self.memory.update_priorities(indices, td_errors)

        # Increment training step counter
        self.train_step_count += 1

        # Periodic target network update
        if (self.train_step_count % self.target_update_freq) == 0:
            self.update_target_network()

        # Anneal epsilon and beta
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        self.beta = min(1.0, self.beta + self.beta_increment)

    def update_target_network(self):
        """
        Copies online network weights to target network.
        """
        self.target_q_network.set_weights(self.q_network.get_weights())

    def save(self, path):
        """
        Saves the online Q-network to the given path.
        """
        self.q_network.save(path)

    def load(self, path):
        """
        Loads weights into the online Q-network and syncs the target network.
        """
        self.q_network = tf.keras.models.load_model(path)
        self.target_q_network.set_weights(self.q_network.get_weights())


**Agent test**

In [5]:
# scripts/smoke_test_agent.py

import os, sys

try:
    here = os.path.dirname(__file__)
except NameError:
    here = os.getcwd()

root = os.path.abspath(os.path.join(here, os.pardir))
if root not in sys.path:
    sys.path.insert(0, root)


import numpy as np
from ddqn_per_agent import DDQNPERAgent
from pbn_env import make_env


def smoke_test():
    # 1) Build the env
    env = make_env(seed=0)

    # 2) Inspect spaces
    state_dim  = env.observation_space.n  # e.g. 100
    action_dim = env.action_space.n       # e.g. 101
    print(f"State dim = {state_dim}, Action dim = {action_dim}")

    # 3) Build the agent
    agent = DDQNPERAgent(
        state_dim=state_dim,
        action_dim=action_dim,
        batch_size=32,
        memory_capacity=1000,
        target_update_freq=50,
    )

    # 4) Warm up: one env reset
    state, _ = env.reset()
    print("Initial state (first 10 bits):", state[:10].astype(int))

    # 5) One episode of random+train steps
    for step in range(20):
        # a) Action (epsilon-greedy)
        action = agent.act(state)

        # b) Env step
        next_state, reward, terminated, truncated, info = env.step(action)

        # c) Store and train
        agent.store_transition(state, action, reward, next_state, terminated)
        agent.train_step()

        # d) Next state (with auto-reset on done)
        if terminated or truncated:
            state, _ = env.reset()
        else:
            state = next_state

        # e) Print a line or two
        print(f"Step {step:2d}: a={action:3d}, r={reward:+.1f}, eps={agent.epsilon:.3f}")

    # 6) Final sanity checks
    print("Final epsilon:", agent.epsilon)
    print("Replay buffer size:", agent.memory.size())

    print("✅ Smoke test completed without errors.")


if __name__ == "__main__":
    smoke_test()


FileNotFoundError: [Errno 2] No such file or directory: 'data_prepared/gene_names_safe.txt'