In [98]:
import random as rnd
import logging
import networkx as nx
import numpy as np
import pandas as pd
import math
import pickle as pkl
import copy
import matplotlib.pyplot as plt
import pytest
import time
import networkx as nx
import os
from datetime import datetime
import random
import json
import heapq
from collections import defaultdict
from tqdm import tqdm
import logging

In [None]:
# Create results directory (standardized location)
base_result_folder = os.path.join(os.getcwd(), "simulation_results_SA_0608")
timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S')
main_experiment_folder = f"sensitivity_builder_review_time_per_tx_{timestamp_str}"
main_result_folder = os.path.join(base_result_folder, main_experiment_folder)
os.makedirs(main_result_folder, exist_ok=True)


# # Default log path (commented out to disable file logging)
# default_log_filename = os.path.join(main_result_folder, 'default_simulation.log')

# Configure logging (file logging + console output)
# Logging to file is disabled here to avoid inconsistencies in multi-threaded or multi-process execution
logging.basicConfig(
    level=logging.CRITICAL,   # Other levels include DEBUG, INFO, WARNING, ERROR
    format='%(asctime)s %(levelname)s [%(name)s]: %(message)s',
    handlers=[
        # logging.FileHandler(default_log_filename, mode='w', encoding='utf-8'),
        # # logging.StreamHandler()     # Console output also disabled
    ],
    force=True
)

# Default logger
logger = logging.getLogger("EthereumPBS")
logger.info("Default logger initialized.")


In [None]:
class Transaction:
    def __init__(self, rng, tx_id, arrival_time, gas_used, base_fee_per_gas, priority_fee_per_gas, sanctioned_type):
        """
        Transaction object
        - tx_id: unique transaction ID
        - arrival_time: the time the transaction enters the Mempool
        - proposed_block_id: the ID of the block where the transaction is included and finalized (initially None)
        - proposed_time: the time the transaction is proposed with the block
        - confirmed_time: the time the transaction is finally confirmed (initially None)
        - transaction_type: only "DeFi" is retained
        - gas_used: amount of gas used by the transaction
        - gas_price: price the transaction is willing to pay per gas unit (in Gwei/gas)
        - gas_fee: total gas fee = gas_price * gas_used (in Gwei)
        - sanctioned_type: 0 = non-sanctioned, 1 = sanctioned
        - reject_count: number of times the transaction has been rejected
        """
        self.tx_id = tx_id
        self.arrival_time = arrival_time        # Time when transaction enters Mempool
        self.proposed_block_id = None           # Block ID in which the transaction is included and finalized
        self.proposed_time = None           # Time the transaction is proposed (block proposal time)
        self.confirmed_time = None           # Time the transaction is confirmed (block inclusion time)
        self.is_proposed = False
        self.is_confirmed = False
        self.transaction_type = "DeFi"

        self.gas_used = gas_used
        self.base_fee_per_gas = base_fee_per_gas
        self.priority_fee_per_gas = priority_fee_per_gas        # Priority fee per gas
        self.base_fee = base_fee_per_gas * gas_used
        self.priority_fee = priority_fee_per_gas * gas_used
        self.gas_fee = self.base_fee + self.priority_fee

        self.sanctioned_type = sanctioned_type

        self.proposed_builder_id = None
        self.selected_relay_id = None
        self.proposer_validator_id = None

        self.proposed_builder_policy = None
        self.selected_relay_policy = None
        self.proposer_validator_is_censoring = None


    def update_proposed_time(self, block_id, proposed_time, 
                             builder_id, selected_relay_id, proposer_validator_id,
                             builder_policy, selected_relay_policy, proposer_validator_is_censoring):
        """
        Update proposal information of the transaction.
        Ensure the transaction is proposed only once and not re-packed in later slots.
        """
        if self.proposed_time is None:                    # Ensure it is updated only once
            self.proposed_block_id = block_id                      # Assign the block ID to which the transaction is proposed
            self.proposed_time = proposed_time             # Record the proposal time
            self.is_proposed = True

            self.proposed_builder_id = builder_id
            self.selected_relay_id = selected_relay_id
            self.proposer_validator_id = proposer_validator_id
            self.proposed_builder_policy = builder_policy
            self.selected_relay_policy = selected_relay_policy
            self.proposer_validator_is_censoring = proposer_validator_is_censoring

        else:
            raise Exception(f"Transaction {self.tx_id} has already been proposed.")


    def update_confirmed_time(self, confirmed_time):
        """
        Update the confirmation time of the transaction.
        Ensure it has been proposed before being confirmed, and is only confirmed once.
        """
        if self.proposed_time is None:
            raise Exception(f"Transaction {self.tx_id} has not been proposed yet.")
        if self.confirmed_time is None:
            self.confirmed_time = confirmed_time
            self.is_confirmed = True
        else:
            raise Exception(f"Transaction {self.tx_id} has already been confirmed.")




In [None]:
class Mempool:

    def __init__(self, rng, tx_rate, sanctioned_probs=None):
        """
        Mempool for storing transactions
        - rng: random number generator
        - tx_rate: transaction generation rate (transactions per second)
        - tx_count: transaction counter to ensure unique `tx_id`
        - pending_transactions: stores all unconfirmed transactions
        - proposed_transactions: stores proposed transactions
        - sanctioned_probs: probability distribution for sanction types (added)
        """
        self.rng = rng
        self.pending_transactions = {}      # Store unconfirmed transactions as a dictionary
        self.proposed_transactions = {}     # Store proposed transactions
        self.tx_rate = tx_rate
        self.tx_count = 0
        self.sanctioned_probs = sanctioned_probs or [0.995, 0.005]  
        
    def generate_transactions(self, current_time, simulation_time):
        """
        Generate transactions within the given simulation_time,
        using an exponential distribution for arrival intervals
        """
        # Estimate the number of transactions to generate
        estimated_tx_count = int((simulation_time - current_time) * self.tx_rate)
        
        # Generate all inter-arrival times at once
        arrival_intervals = self.rng.exponential(1 / self.tx_rate, size=estimated_tx_count)
        
        # Batch-generate other random properties
        gas_used_batch = self.rng.integers(90_000, 110_000, size=estimated_tx_count)
        priority_fee_per_gas_batch = self.rng.uniform(0.1, 5, size=estimated_tx_count)           # Priority fee range: [0.1, 5] Gwei
        sanctioned_batch = self.rng.choice([0, 1], size=estimated_tx_count, p=self.sanctioned_probs)
  

        # Create transactions iteratively
        for i in range(estimated_tx_count):
            current_time += arrival_intervals[i]

            if current_time >= simulation_time:
                break  # Stop early if the time exceeds simulation_time

            self.tx_count += 1

            # Create a new Transaction instance
            tx = Transaction(
                self.rng, 
                self.tx_count,    
                current_time,
                gas_used=gas_used_batch[i],
                base_fee_per_gas = 3.44,           # Assumed fixed base fee (Gwei/gas), eventually burned
                priority_fee_per_gas=priority_fee_per_gas_batch[i],     # Pass per_gas priority fee
                sanctioned_type=sanctioned_batch[i]
            )

            # Store in pending transactions dictionary
            self.pending_transactions[tx.tx_id] = tx

    def update_transaction_proposed(self, proposed_block):
        """
        Update proposal information of transactions in a proposed block
        """
        for tx in proposed_block.transactions:
            if tx.tx_id in self.pending_transactions:
                tx.update_proposed_time(
                    block_id=proposed_block.block_id, 
                    proposed_time=proposed_block.proposed_time,
                    builder_id=proposed_block.builder_id,
                    selected_relay_id=proposed_block.selected_relay_id,
                    proposer_validator_id=proposed_block.proposer_validator_id,
                    builder_policy=proposed_block.builder_policy,
                    selected_relay_policy=proposed_block.selected_relay_policy,
                    proposer_validator_is_censoring=proposed_block.proposer_validator_is_censoring
                    )
                
                tx.update_confirmed_time(proposed_block.confirmed_time)

                self.proposed_transactions[tx.tx_id] = tx  # Move to proposed_transactions
                del self.pending_transactions[tx.tx_id]     # Remove from pending_transactions
                # print(f"Transaction {tx.tx_id} proposed in Block {proposed_block.block_id} at {proposed_block.proposed_time:.4f} sec.")
                logger.debug(f"Tx {tx.tx_id} proposed in Block {proposed_block.block_id} at {proposed_block.proposed_time:.4f}s")
                logger.debug(f"Tx {tx.tx_id} is removed from pending transactions to proposed transactions.")



In [None]:
class CandidateBlock:
    def __init__(self, builder_id, slot_no, parent_block_id, transactions, block_build_finish_time, 
                 block_count, block_gas_limit, total_gas_used, total_gas_fee, total_priority_fee, tx_counts):
        """
        'CandidateBlock' represents a block constructed by a Builder
        """
        self.block_id = f"{builder_id}_{block_count}"
        self.builder_id = builder_id
        self.slot_no = slot_no
        self.parent_block_id = parent_block_id  # ID of the parent block
        self.block_build_finish_time = block_build_finish_time
        self.relay_approval_times = {}              # {relay_id: approval_time}
        self.transactions = transactions
        self.num_transactions = len(transactions)

        self.total_gas_used = total_gas_used
        self.total_gas_fee = total_gas_fee
        self.total_priority_fee = total_priority_fee
        self.tx_counts = tx_counts  # Transaction type statistics passed directly

        if self.total_gas_used > block_gas_limit:
            raise ValueError("Block gas usage exceeds the limit!")
        
        self.proposed_time = None           # Time when the block is proposed, default is None
        self.confirmed_time = None          # Time when the block is confirmed on-chain, default is None
        self.builder_review_time = None         # Time spent on builder-side censorship review
        self.relay_reject_count = 0
        self.proposer_reject_count = 0
        self.is_relay_approved = False
        self.is_proposed = False
        self.is_confirmed = False
        self.selected_relay_id = None       # ID of the selected relay
        
        self.proposer_validator_id = None   # ID of the validator proposing the block 

        self.builder_profit = 0   # Builder's profit
        self.proposer_profit = 0  # Proposer's profit
        
        self.builder_policy = None           # builder policy：'non', 'weak', 'strict'
        self.selected_relay_policy = None               # relay policy：'non', 'censoring'
        self.proposer_validator_is_censoring = None      # validator policy：‘true’, ‘false’
        
    def __eq__(self, other):
        return isinstance(other, CandidateBlock) and self.block_id == other.block_id

    def __hash__(self):
        return hash(self.block_id)




In [None]:

class CensorshipPolicy:
    """
    Manage the censorship behavior of different nodes (Builder, Relay) and compute its effects.
    """

    def __init__(self, builder_policy, relay_policy, 
                 builder_review_time_per_tx=0.002,  
                 relay_review_time_per_tx=0.001,    
                 type_1_rejection_prob_builder=1.0, 
                 type_1_rejection_prob_relay=1.0,
                 weak_detection_prob=1.0,           
                 rng=np.random.default_rng()):
        """
        Initialize the censorship policy.
        :param builder_policy: Censorship policy of the Builder ("non": no censorship, "weak": weak censorship, "strict": strict censorship)
        :param relay_policy: Censorship policy of the Relay ("non": no censorship, "censoring": with censorship)
        :param rng: Random number generator
        - type_1_rejection_prob_builder: Probability that the Builder rejects a transaction with sanctioned_type==1 (range 0 to 1)
        - type_1_rejection_prob_relay: Probability that the Relay rejects a transaction with sanctioned_type==1 (range 0 to 1)
        """
        self.builder_policy = builder_policy
        self.relay_policy = relay_policy

        self.builder_review_time_per_tx = builder_review_time_per_tx        # Builder's review time per transaction
        self.relay_review_time_per_tx = relay_review_time_per_tx        # Relay's review time per transaction

        # Rejection probability for transactions involving sanctioned addresses (sanctioned_type == 1)
        self.type_1_rejection_prob_builder = type_1_rejection_prob_builder
        self.type_1_rejection_prob_relay = type_1_rejection_prob_relay
        self.weak_detection_prob = weak_detection_prob                

        self.rng = rng

        # Track transaction IDs that have been rejected once under weak censorship
        self.weak_censored_tx_ids = set() 



    def builder_censorship(self, tx):
        """
        Apply builder-side censorship to a transaction.
        This affects the probability that a transaction is included in a block and adds review latency.

        :param tx: Transaction object (with attribute sanctioned_type: 0 = not sanctioned, 1 = sanctioned)
        :return: (is_rejected: bool, review_delay: float)
        """
        # print(f"Builder checking tx {tx.tx_id} with sanctioned_type={tx.sanctioned_type}. Builder censorship policy {self.builder_policy}")
        logger.debug(f"Builder checking tx {tx.tx_id} with sanctioned_type={tx.sanctioned_type}, priority_fee={tx.priority_fee}. Builder censorship policy {self.builder_policy}")
        
        if self.builder_policy == "non":
            return False, 0  # No censorship, no delay

        review_time = self.builder_review_time_per_tx  

        if tx.sanctioned_type == 1:
            if self.builder_policy == "strict":
                # Strict censorship: reject based on probability
                if self.rng.random() <= self.type_1_rejection_prob_builder:
                    return True, review_time    
                else:
                    return False, review_time

            elif self.builder_policy == "weak":
                if self.rng.random() > self.weak_detection_prob:            # Missed detection with some probability
                    logger.debug(f"  - tx {tx.tx_id} weakly censored but missed detection.")
                    return False, review_time
                if tx.tx_id not in self.weak_censored_tx_ids:
                    self.weak_censored_tx_ids.add(tx.tx_id)
                    logger.debug(f"  - tx {tx.tx_id} weakly censored first time, rejected.")
                    return True, review_time        # First time is rejected
                else:
                    logger.debug(f"  - tx {tx.tx_id} weakly censored before, accepted now.")
                    return False, review_time           # Accepted on second and subsequent checks

        return False, review_time


    def relay_censorship(self, block):
        """
        Apply relay-side censorship to a block.
        This affects the block's propagation delay.
        Note: As a network relay, a relay should not reorder blocks but process them in FIFO order.

        :param block: Block object (contains a list of transactions, each with a sanctioned_type)
        :return: (is_rejected: bool, review_delay: float)
        """
        # print(f"Relay checking block {block.block_id} with {len(block.transactions)} transactions. Relay censorship policy {self.relay_policy}")
        logger.debug(f"Relay checking block {block.block_id} with {len(block.transactions)} transactions and {block.total_priority_fee} priority fee. Relay censorship policy {self.relay_policy}")
        
        if self.relay_policy == "non":
            return False, 0

        total_review_time = 0

        for tx in block.transactions:
            # print(f"  - tx {tx.tx_id} sanctioned_type={tx.sanctioned_type}")
            logger.debug(f"  - tx {tx.tx_id} sanctioned_type={tx.sanctioned_type}")
            
            review_time = self.relay_review_time_per_tx  
            total_review_time += review_time   

            if self.relay_policy == "censoring":
                if tx.sanctioned_type == 1 and self.rng.random() <= self.type_1_rejection_prob_relay:
                    # print(f" Rejected block {block.block_id} due to tx {tx.tx_id}.")
                    logger.debug(f" Rejected block {block.block_id} due to tx {tx.tx_id}.")
                    return True, total_review_time


        # print(f"Block {block.block_id} passed censorship.")
        logger.debug(f"Block {block.block_id} passed censorship.")
        
        return False, total_review_time



In [None]:
class Builder:

    def __init__(self, mempool, rng, builder_id, censorship_policy, relays, 
                 block_gas_limit=16_000_000, skip_probability=0.1, is_censoring=False, base_block_build_time=4.0, proposer_reward_ratio=0.95, model=None):
        """
        The Builder selects transactions from the Mempool and constructs blocks to submit to Relays.
        - mempool: Mempool object
        - rng: random number generator
        - builder_id: unique identifier for the builder
        - block_gas_limit: Ethereum block gas limit
        - skip_probability: probability of randomly skipping transactions in the mempool (default: 10%)
        - is_censoring: whether the builder performs censorship
        - relays: list of available Relays to submit blocks to
        """
        self.mempool = mempool
        self.rng = rng
        self.builder_id = builder_id
        self.block_gas_limit = block_gas_limit
        self.skip_probability = skip_probability            # Probability of skipping a transaction
        self.block_count = 0                                # Count of blocks built by this builder
        self.is_censoring = is_censoring            # Whether censorship is applied
        # self.seen_transactions = set()                      # Previously proposed but unconfirmed transactions
        self.relays = relays                                # Relays this builder can submit to
        self.base_block_build_time = base_block_build_time  # Base time to build a block
        self.model = model
        self.proposer_reward_ratio = proposer_reward_ratio  

        # Maintain a set of transactions rejected under weak censorship
        self.weak_censored_tx_ids = set()  

        # Assign weak_censored_tx_ids to the censorship policy
        self.censorship_policy = censorship_policy
        self.censorship_policy.weak_censored_tx_ids = self.weak_censored_tx_ids

    # Initialize local blockchain with genesis block
        genesis_block = CandidateBlock(
            builder_id="GENESIS",
            slot_no=0,
            parent_block_id=None,
            transactions=[],                # No transactions in genesis block
            block_build_finish_time=0,
            block_count=0,
            block_gas_limit=0,
            total_gas_used=0,           
            total_gas_fee=0,            
            total_priority_fee=0,
            tx_counts={0: 0, 1: 0},  
        )
        self.local_blockchain = [genesis_block]

    def get_sorted_transactions_from_mempool(self, current_time):
        """
        Fetch transactions from the mempool and sort them by descending gas priority fee.
        - Skip transactions that arrive after the current time
        - Randomly skip some transactions to simulate imperfect competition
        """
        sorted_transactions = sorted(
            (tx for tx in self.mempool.pending_transactions.values() 
             if tx.arrival_time <= current_time),
            key=lambda tx: tx.priority_fee_per_gas,     # Sort by priority fee per gas
            reverse=True
        )
        return [tx for tx in sorted_transactions if self.rng.random() > self.skip_probability]

    def get_latest_chain_head(self):
        """
        Get the latest block ID to use as the parent block of the new candidate block
        """
        return self.local_blockchain[-1].block_id 

    def build_candidate_block(self, current_time, slot_no):
        """
        Construct a candidate block using transactions from the Mempool.
        1. Select transactions in descending order of gas fee
        2. Ensure total gas used stays within the gas limit
        """
        # # Check if the current time is less than the model's time
        # if hasattr(self, 'model') and self.model and current_time < self.model.time:
        #     raise RuntimeError(f"[Time Error] build_candidate_block: current_time={current_time:.4f} < model.time={self.model.time:.4f}")
 

        parent_block_id = self.get_latest_chain_head()  

        block_transactions = []
        total_gas_used = 0
        total_gas_fee = 0                       
        total_priority_fee = 0                 
        tx_counts = {0: 0, 1: 0}                
        over_limit_count = 0                     # Count of consecutively skipped transactions due to gas limit
        builder_review_time = 0                 # Total time spent on censorship review

        sorted_transactions = self.get_sorted_transactions_from_mempool(current_time)
        
        for tx in sorted_transactions:     
            if total_gas_used + tx.gas_used > self.block_gas_limit:
                over_limit_count += 1           
                if over_limit_count >= 5:
                    break                   # Stop after 5 consecutive over-limit transactions
                continue            # Try next transaction
            
            if self.is_censoring:
                is_rejected, review_time = self.censorship_policy.builder_censorship(tx)
                # print(f"[{current_time:.4f}], Builder {self.builder_id} censorship check: tx={tx.tx_id}, is_rejected={is_rejected}, review_time={review_time}")
                logger.debug(f"[{current_time:.4f}], Builder {self.builder_id} censorship check: tx={tx.tx_id}, is_rejected={is_rejected}, review_time={review_time}")
                builder_review_time += review_time         
                if is_rejected:
                    continue         
                
            block_transactions.append(tx)
            total_gas_used += tx.gas_used
            total_gas_fee += tx.gas_fee              
            total_priority_fee += tx.priority_fee              
            tx_counts[tx.sanctioned_type] += 1         
            over_limit_count = 0            

        # If no transactions were selected, return None
        if not block_transactions:
            logger.debug(f"[{current_time:.4f}]. Builder {self.builder_id} no tx available at {current_time:.4f}."
                         f"Next try at {current_time + self.base_block_build_time:.4f}")
            # print(f"[{current_time:.4f}], Builder {self.builder_id} has no transactions to build block at {current_time:.4f}."
            #      f"Next try at {current_time + self.base_block_build_time:.4f}")
            return None, self.base_block_build_time + builder_review_time

        # Calculate the final build time
        total_build_time = self.base_block_build_time + builder_review_time
        final_time = current_time + total_build_time

        # Update the block count
        self.block_count += 1

        # Create the candidate block with all necessary information
        candidate_block = CandidateBlock(
            builder_id=self.builder_id,
            slot_no=slot_no,                    
            parent_block_id=parent_block_id,   
            transactions=block_transactions,
            block_build_finish_time=final_time,         
            block_count=self.block_count,
            block_gas_limit=self.block_gas_limit,
            total_gas_used=total_gas_used,     
            total_gas_fee=total_gas_fee,        
            total_priority_fee=total_priority_fee,       
            tx_counts=tx_counts                 
        )

        candidate_block.builder_review_time = builder_review_time
        candidate_block.proposer_profit = total_priority_fee * self.proposer_reward_ratio
        candidate_block.builder_profit = total_priority_fee * (1 - self.proposer_reward_ratio)
        candidate_block.builder_policy = self.censorship_policy.builder_policy


        
        logger.debug(
            f"[{current_time:.4f}]. Builder {self.builder_id} built block {candidate_block.block_id} "
            f"(Slot {candidate_block.slot_no}, TxCount: {candidate_block.num_transactions}, Gas Used: {candidate_block.total_gas_used}, "
            f"GasFee: {candidate_block.total_gas_fee:.2f} Gwei), PriorityFee: {candidate_block.total_priority_fee} Gwei. Finish Time: {candidate_block.block_build_finish_time:.4f} sec. "
        )
        
        # print(f"[{current_time:.4f}]. Builder {self.builder_id} built block {candidate_block.block_id} for Slot {candidate_block.slot_no} "
        #     f"with {candidate_block.num_transactions} transactions and parent {candidate_block.parent_block_id}. "
        #     f"Total Gas Used: {candidate_block.total_gas_used}, Total Gas Fee: {candidate_block.total_gas_fee} Gwei. "
        #     f"Review Time: {builder_review_time:.4f} sec, base block build time: {self.base_block_build_time:.4f} sec. "
        #     f"Finish Time: {candidate_block.current_time:.4f}")
        
        return candidate_block, total_build_time      


    def submit_block_to_relays(self, candidate_block, submit_time, model):
        """
        Immediately submit the candidate block to all available Relays
        """
        # # Check if the submit_time is less than the model's time
        # if submit_time < model.time:
        #     raise RuntimeError(f"[Time Error] submit_block_to_relays: submit_time={submit_time:.4f} < model.time={model.time:.4f}")
  
        for relay in self.relays:
            relay.receive_block(candidate_block, submit_time, model)
    
    

In [None]:
class Relay:
    def __init__(self, relay_id, censorship_policy, is_censoring=False, base_relay_process_time=1.0):
        """
        Relay receives CandidateBlocks submitted by multiple Builders.
        - is_censoring: whether the relay performs censorship (controlled externally)
        """
        self.relay_id = relay_id
        self.censorship_policy = censorship_policy          # Associated censorship policy
        self.is_censoring = is_censoring        # Whether this relay applies censorship
        self.received_blocks = []           # Blocks awaiting censorship
        self.ready_blocks = []          # Blocks that passed review and are ready for proposer selection
        self.base_relay_process_time = base_relay_process_time          # Base time required to process a block

    def clear_received_ready_blocks(self):
        """Clear old blocks at the beginning of a new slot"""
        self.received_blocks.clear()
        self.ready_blocks.clear()
        # print(f"Relay {self.relay_id} clears old blocks at slot start")
        logger.debug(f"Relay {self.relay_id} cleared old blocks")

    def receive_block(self, candidate_block, current_time, model):
        """A Builder submits a block to the Relay; Relay queues it for future processing"""
        self.received_blocks.append(candidate_block)
        # print(f"Relay {self.relay_id} received block {candidate_block.block_id} from Builder {candidate_block.builder_id}")
        logger.debug(f"Relay {self.relay_id} received block {candidate_block.block_id} from Builder {candidate_block.builder_id}")

    def process_received_blocks(self, current_time, model):
        """
        Relay processes received blocks when RelayProcessingEvent is triggered.
        Applies censorship if enabled and schedules approval results.
        """
        if not self.received_blocks:
            return    # No blocks to process

        blocks_to_process = self.received_blocks.copy()
        self.received_blocks.clear()

        block_results = []
        max_process_time = 0
        
        # Review each block in parallel and compute max processing time
        for block in blocks_to_process:

            if self.is_censoring:
                is_rejected, review_time = self.censorship_policy.relay_censorship(block)
            else:
                is_rejected, review_time = False, 0     # No censorship = no review delay

            total_process_time = self.base_relay_process_time + review_time      
            max_process_time = max(max_process_time, total_process_time)      

            block_results.append((block, is_rejected))
        
        # Schedule all block review outcomes to be finalized at the same future time
        ready_time = current_time + max_process_time
        model.schedule_event(ready_time, self.finalize_block_processing, ready_time, block_results)
        # print(f"[{current_time:.4f}] Relay {self.relay_id} started parallel processing {len(block_results)} blocks, "
        #       f"expected finish at {ready_time:.4f}")
        logger.debug(f"[{current_time:.4f}]. Relay {self.relay_id} started parallel processing {len(block_results)} blocks, "
                      f"expected finish at {ready_time:.4f}")
        
    def finalize_block_processing(self, current_time, block_results):
        """Finalize approval/rejection of all reviewed blocks"""
        for block, is_rejected in block_results:
            if not is_rejected:
                block.is_relay_approved = True
                block.relay_approval_times[self.relay_id] = current_time
                self.ready_blocks.append(block)
                # print(f"[{current_time:.4f}], Relay {self.relay_id} approved block {block.block_id}")
                logger.debug(f"[{current_time:.4f}], Relay {self.relay_id} approved block {block.block_id}")
            else:
                block.relay_reject_count += 1
                # print(f"[{current_time:.4f}], Relay {self.relay_id} rejected block {block.block_id}")
                logger.debug(f"[{current_time:.4f}], Relay {self.relay_id} rejected block {block.block_id}")


    def get_best_ready_block(self, latest_chain_head, current_time):
        """
        Select the best approved block whose parent matches the latest chain head and is fully approved.
        """
        valid_blocks = []

        for block in self.ready_blocks:
            # Ensure correct parent block
            if block.parent_block_id != latest_chain_head:
                logger.debug(
                    f"Relay {self.relay_id} skipped block {block.block_id} due to invalid parent block. "
                    f"Expected {latest_chain_head}, got {block.parent_block_id}."
                )
                continue

            # Ensure approval is complete and not in the future
            approval_time = block.relay_approval_times.get(self.relay_id)
            if approval_time is None or approval_time > current_time:
                logger.debug(
                    f"Relay {self.relay_id} skipped block {block.block_id} due to incomplete approval. "
                    f"Approval time: {approval_time}, current time: {current_time}."
                )
                continue

            valid_blocks.append(block)

        if not valid_blocks:
            return None

        max_profit = max(block.proposer_profit for block in valid_blocks)
        best_blocks = [block for block in valid_blocks if block.proposer_profit == max_profit]

        return random.choice(best_blocks)
    
    

In [None]:
class Proposer:
    def __init__(self):
        """
        The Proposer is responsible for selecting the best block from the Relays
        and maintaining the local view of the blockchain.
        """
        # Local blockchain storage, initialized with the genesis block
        genesis_block = CandidateBlock(
            builder_id="GENESIS",
            slot_no=0,
            parent_block_id=None,
            transactions=[],               # Genesis block has no transactions
            block_build_finish_time=0,
            block_count=0,
            block_gas_limit=0,          # Genesis block has no gas limit
            total_gas_used=0,           
            total_gas_fee=0,            
            total_priority_fee=0,       
            tx_counts={0: 0, 1: 0},  
        )
        self.local_blockchain = [genesis_block]
        self.empty_block_counter = 0            # Counter for empty blocks proposed by the Proposer


    def get_latest_chain_head(self):
        """
        Return the current chain head of the local blockchain.
        """
        return self.local_blockchain[-1].block_id  
    
    def propose_block(self, relays, selected_validator, current_time, slot_no):
        """
        Select the block with the highest proposer profit from all ready blocks across relays.
        Ensure its parent matches the current chain head.
        """
        latest_chain_head = self.get_latest_chain_head()  
        valid_blocks = []
        relay_sources = defaultdict(list)           # Track which relays each block came from
        
        for relay in relays:
            block = relay.get_best_ready_block(latest_chain_head, current_time) 
            if block:
                valid_blocks.append(block)
                relay_sources[block.block_id].append(relay.relay_id)  

        if valid_blocks:
            # Choose the block with the highest proposer profit
            max_proposer_profit = max(block.proposer_profit for block in valid_blocks)
            top_blocks = [block for block in valid_blocks if block.proposer_profit == max_proposer_profit]

            # If multiple blocks tie, select randomly
            proposed_block = random.choice(top_blocks)
            proposed_block.selected_relay_id = random.choice(relay_sources[proposed_block.block_id])
            
            # Set proposal and confirmation timestamps
            proposed_block.is_proposed = True
            proposed_block.proposed_time = current_time + 1
            proposed_block.is_confirmed = True
            proposed_block.confirmed_time = current_time + 12
            proposed_block.proposer_validator_id = selected_validator.validator_id  
            
            # Retrieve selected relay instance to record its policy
            selected_relay = next((relay for relay in relays if relay.relay_id == proposed_block.selected_relay_id), None)
            if selected_relay is None:
                raise Exception(f"Relay {proposed_block.selected_relay_id} not found in provided relays list!")

            proposed_block.selected_relay_policy = selected_relay.censorship_policy.relay_policy
            proposed_block.proposer_validator_is_censoring = selected_validator.is_censoring

            # print(f"Proposer selected block {proposed_block.block_id} from Relay {proposed_block.selected_relay_id} at Slot Time {current_time}")
            logger.debug(f"[{current_time:.4f}]. Slot {slot_no}, Validator {selected_validator.validator_id} proposed block {proposed_block.block_id} from Relay {proposed_block.selected_relay_id}.")
            
            # All other valid blocks are considered rejected
            for block in valid_blocks:
                if block != proposed_block:
                    block.proposer_reject_count += 1  
                    
        else:
            if slot_no == 0:
                # If it's the initial slot, no block is proposed
                # print(f"[{current_time:.4f}], Slot {slot_no} (initial slot), no blocks available yet. Returning None.")
                logger.debug(f"[{current_time:.4f}], Slot {slot_no} (initial slot), no blocks available yet. Returning None.")
                return None
            
            # If no valid block is found and it's not the first slot, create an empty block
            self.empty_block_counter += 1
            proposed_block = CandidateBlock(
                builder_id="PROPOSER",
                slot_no=slot_no,
                parent_block_id=latest_chain_head,
                transactions=[],
                block_build_finish_time=current_time,
                block_count=self.empty_block_counter,
                block_gas_limit=0,
                total_gas_used=0,           
                total_gas_fee=0,            
                total_priority_fee=0,       
                tx_counts={0: 0, 1: 0},  
            )
            proposed_block.builder_profit = 0
            proposed_block.proposer_profit = 0
            proposed_block.proposer_validator_id = selected_validator.validator_id 
            proposed_block.is_proposed = True
            proposed_block.proposed_time = current_time + 1
            proposed_block.is_confirmed = True
            proposed_block.confirmed_time = current_time + 12
            proposed_block.selected_relay_id = None
            proposed_block.builder_policy = None
            proposed_block.selected_relay_policy = None
            proposed_block.proposer_validator_is_censoring = selected_validator.is_censoring
            # print(f"[{current_time:.4f}], Proposer generated EMPTY block {proposed_block.block_id} at Slot {slot_no}")
            logger.info(f"[{current_time:.4f}]. Slot {slot_no}, Validator {selected_validator.validator_id} proposed EMPTY block {proposed_block.block_id}.")

        return proposed_block

    def update_local_blockchain(self, proposed_block):
        """
        Update the local blockchain with the newly proposed block.
        """    
        self.local_blockchain.append(proposed_block)
        # print(f"Proposer added Block {proposed_block.block_id} to local blockchain.")
        logger.debug(f"Proposer added Block {proposed_block.block_id} to local blockchain.")


In [107]:
class Validator:
    def __init__(self, validator_id, connected_relays, is_censoring=False):
        self.validator_id = validator_id
        self.connected_relays = connected_relays
        self.is_censoring = is_censoring            # True if connected to censoring relay, False otherwise


In [None]:
class FixedTimeEvent:
    def __init__(self, model, interval=None, start_time=0, offset=0, rng=None):
        """
        Base class for events triggered at fixed intervals.
        :param interval: Fixed time interval between events (in seconds); if `None`, the `event()` method must set `next_event`
        :param start_time: Start time of the first event (default is 0)
        :param rng: Optional random number generator
        """
        if interval is not None and interval <= 0:
            raise ValueError("Interval must be positive or None.")

        self.interval = interval            # Fixed time interval; None means dynamically calculated
        self.offset = offset        # Offset before the first event

        self.next_event = start_time + self.offset      # Time of the next scheduled event
        
        self.rng = rng          # Optional RNG, used in derived classes if needed

        self.model = model

        self.counter = 0        # Track how many times the event has been triggered

    def event(self):
        """
        Method to be executed when the event is triggered (must be implemented by subclasses)
        """
        raise NotImplementedError("Subclasses must implement this method.")

    def reset(self, start_time=0):
        """
        Reset the event's starting time.
        """
        self.next_event = start_time


In [None]:
class BlockBuildingEvent(FixedTimeEvent):
    def __init__(self, builder, model, rng=None, offset=4):
        """
        Event class for Builders to build blocks periodically.
        Each Builder runs independently.
        """
        super().__init__(model=model, interval=None, start_time=0, offset=offset, rng=rng)
        self.builder = builder

    def event(self):
        """Trigger the Builder to build a block and submit it to the Relay."""
        current_time = self.next_event  
        
        slot_no =  int(current_time // 12 + 1)   # Dynamically calculate the slot number. Builders build for the next slot, so add 1.

        candidate_block, total_build_time = self.builder.build_candidate_block(current_time, slot_no)

        if candidate_block:
            submit_time = current_time + total_build_time
            # Schedule block submission to relays after build is complete
            self.model.schedule_event(submit_time, self.builder.submit_block_to_relays, candidate_block, submit_time, self.model)  
            # print(f"Builder {self.builder.builder_id} submits block {candidate_block.block_id} to Relay at {submit_time:.4f}")
            logger.debug(f"Builder {self.builder.builder_id} submits block {candidate_block.block_id} to Relay at {submit_time:.4f}")

            # Add block to the global list of all candidate blocks
            self.model.schedule_event(submit_time, self.model.all_candidate_blocks.append, candidate_block) 
            # print(f"Builder {self.builder.builder_id} added block {candidate_block.block_id} to all_candidate_blocks at {submit_time:.4f}")
            logger.debug(f"Builder {self.builder.builder_id} added block {candidate_block.block_id} to all_candidate_blocks at {submit_time:.4f}")
        

        # # Schedule the next block-building event
        next_build_time = current_time + total_build_time
        self.next_event = next_build_time
        self.model.schedule_event(next_build_time, self.event)

        # print(f"Builder {self.builder.builder_id}, current time {current_time:.4f}, next event at {next_build_time:.4f} (after {total_build_time:.4f} sec)")
        logger.debug(f"Builder {self.builder.builder_id}, current time {current_time:.4f}, next event at {next_build_time:.4f} (after {total_build_time:.4f} sec)")



In [None]:
class RelayProcessingEvent(FixedTimeEvent):
    def __init__(self, relay, model, interval=2, rng=None, offset=6.5):
        """
        Event class for Relays to periodically process and review received blocks.
        Each Relay runs independently.
        """
        super().__init__(model=model, interval=interval, start_time=0, offset=offset, rng=rng)
        self.relay = relay

    def event(self):
        """Trigger the Relay to process its 'received_blocks' queue."""
        current_time = self.next_event  
        self.relay.process_received_blocks(current_time, self.model)

        # Schedule the next relay processing event
        self.next_event = current_time + self.interval
        self.model.schedule_event(self.next_event, self.event)

        logger.debug(f"Relay {self.relay.relay_id} processed blocks at {current_time:.4f} and scheduled next processing at {self.next_event:.4f}")
        # print(f"Relay {self.relay.relay_id} processed blocks at {current_time:.4f} and scheduled next processing at {self.next_event:.4f}")



In [None]:
class SlotStartEvent(FixedTimeEvent):
    def __init__(self, proposer, builders, relays, validators, mempool, rng=None, model=None):
        """
        Event triggered at the start of each slot.
        Responsibilities include:
        - Proposer selects the best block
        - Relays clear old blocks
        - Update transaction and block proposed time
        - Builders and Proposer update their local blockchain
        - Mempool clears confirmed transactions (applied at Slot N+1)
        """
        super().__init__(model=model, interval=12, start_time=0, rng=rng)
        self.proposer = proposer
        self.builders = builders
        self.relays = relays
        self.validators = validators
        self.mempool = mempool
        self.model = model


    def event(self):
        """Execute actions at the beginning of each slot."""
        current_time = self.next_event
        slot_no = int(current_time // self.interval)       # Dynamically calculate the slot number

        # print(f"Slot {slot_no} starts at {current_time:.4f}")
        logger.info(f"Slot {slot_no} starts at {current_time:.4f}")


        # Step 1: Select a validator and retrieve its connected relays
        selected_validator = self.rng.choice(self.validators)

        selected_relays = selected_validator.connected_relays

        selected_relay_ids = [relay.relay_id for relay in selected_relays]
        logger.info(f"Slot {slot_no}: selected Validator {selected_validator.validator_id} "
                    f"(censoring={selected_validator.is_censoring}) connected relays: {selected_relay_ids}")


        # Step 2: Proposer selects the best block from relays and proposes it
        # print(f"[{current_time:.4f}], Proposer selects block from Relays at Slot {slot_no}")
        logger.debug(f"[{current_time:.4f}], Proposer selects block from Relays at Slot {slot_no}")
        proposed_block = self.proposer.propose_block(selected_relays, selected_validator, current_time, slot_no)
        

        # Step 3: Update transaction/block states, relay/builder states
        if proposed_block:
            self.mempool.update_transaction_proposed(proposed_block)        # Update proposed and confirmed transaction info
            # print(f"[{current_time:.4f}], Mempool updates transactions' proposed information after proposing block at Slot {slot_no}")
            logger.debug(f"[{current_time:.4f}], Mempool updates transactions' proposed information after proposing block at Slot {slot_no}")
            logger.debug(f"[{current_time:.4f}], Mempool updates transactions' comfirmed information in advance after proposing block at Slot {slot_no}")

            # Add the proposed block to the list of all candidate blocks (for empty blocks as well)
            if proposed_block not in self.model.all_candidate_blocks:
                self.model.all_candidate_blocks.append(proposed_block) 
                logger.debug(f"[{current_time:.4f}], Proposer generates EMPTY block {proposed_block.block_id} at Slot {slot_no}")
                
            self.proposer.update_local_blockchain(proposed_block)           # Update local blockchain in proposer
            # print(f"[{current_time:.4f}], Proposer updates local blockchain after proposing block at Slot {slot_no}")
            logger.debug(f"[{current_time:.4f}], Proposer updates local blockchain after proposing block at Slot {slot_no}")
            
            for builder in self.builders:
                # builder.update_seen_transactions(proposed_block)
                # # print(f"[{current_time:.4f}], Builder {builder.builder_id} updates seen transactions after proposing block at Slot {slot_no}")
                # logger.debug(f"[{current_time:.4f}], Builder {builder.builder_id} updates seen transactions after proposing block at Slot {slot_no}")
                
                builder.local_blockchain = self.proposer.local_blockchain.copy()
                # print(f"[{current_time:.4f}], Builder {builder.builder_id} updates local blockchain after proposing block at Slot {slot_no}")
                logger.debug(f"[{current_time:.4f}], Builder {builder.builder_id} updates local blockchain after proposing block at Slot {slot_no}")
            
            for relay in self.relays:
                relay.clear_received_ready_blocks()
            # print(f"[{current_time:.4f}], Relay clears old blocks at Slot {slot_no}")
            logger.debug(f"[{current_time:.4f}], Relay clears old blocks at Slot {slot_no}")
        
        elif slot_no == 0:
            # Slot 0: no block yet is acceptable
            logger.warning(f"[{current_time:.4f}], Slot {slot_no}, no block available yet, which is expected.")
        else:
            # Any other slot should have a proposed block
            logger.critical(f"[{current_time:.4f}], Critical: Proposer has no block at Slot {slot_no}")
            raise RuntimeError(f"[{current_time:.4f}], Proposer has no block to propose at Slot {slot_no}")


        # Schedule the next SlotStartEvent
        self.next_event = current_time + self.interval
        self.model.schedule_event(self.next_event, self.event)



In [None]:
class Model:
    def __init__(
        self,
        proposer,
        mempool,
        builders=None,
        relays=None,
        validators=None,
        rng=None,
        run_id=0,
        stop_time=0,
        experiment_name="non_censoring",
        experiment_params=None,  
        builder_relay_mapping=None,
        validator_relay_mapping=None
    ):

        self.rng = rng or np.random.default_rng()
        self.time = 0
        self.mempool = mempool
        self.proposer = proposer
        self.all_candidate_blocks = []          # Store all candidate blocks
        self.run_id = run_id
        self.stop_time = stop_time
        self.experiment_name = experiment_name
        self.experiment_params = experiment_params or {}        # Store full experiment parameters
        self.event_queue = []                               # Event priority queue: (time, count, function, args)
        self.event_count = 0                        # Counter to avoid heap conflicts when times are equal

        self.builders = builders
        self.relays = relays
        self.validators = validators

        # Explicitly store builder-relay and validator-relay mappings
        self.builder_relay_mapping = builder_relay_mapping
        self.validator_relay_mapping = validator_relay_mapping
        
        logger.info("Builder-Relay and Validator-Relay mappings were set externally and remain stable.")


        # # Initialize events
        # # Set a fixed offset (can also be randomized using rng.uniform)
        # initial_offsets = self.rng.uniform(0, 4, size=len(builders))
        # self.builder_events = [
        #     BlockBuildingEvent(builder, self, rng=self.rng, offset=initial_offsets[i]) 
        #     for i, builder in enumerate(self.builders)
        # ]

        self.builder_events = [BlockBuildingEvent(builder, self, rng=self.rng, offset=5.5)for builder in self.builders]
        self.relay_events = [RelayProcessingEvent(relay, self, interval=2, rng=self.rng, offset=10.5) for relay in self.relays]
        self.slot_event = SlotStartEvent(proposer, self.builders, self.relays, self.validators, mempool, self.rng, model=self)
        
        # Event list
        # self.events = [self.transaction_event, self.slot_event] + self.builder_events + self.relay_events
        self.events = [self.slot_event] + self.builder_events + self.relay_events

        # Register all initial events in the scheduler
        for event in self.events:
            self.schedule_event(event.next_event, event.event)


    # Centralized event scheduler for discrete-event simulation
    def schedule_event(self, event_time, func, *args):
        heapq.heappush(self.event_queue, (event_time, self.event_count, func, args))
        self.event_count += 1


    # Save all transaction records to CSV
    def save_transactions_to_csv(self, filename_prefix="transactions"):
        folder = self.experiment_params["result_folder"]
        filename = os.path.join(folder, f"{filename_prefix}.csv")

        all_transactions = list(self.mempool.proposed_transactions.values()) + list(self.mempool.pending_transactions.values())

        tx_data = [{
            # "num_builders": self.experiment_params.get("num_builders"),
            # "num_relays": self.experiment_params.get("num_relays"),
            # "num_validators": self.experiment_params.get("num_validators"),
            # "builder_censorship_ratio": self.experiment_params.get("builder_censorship_ratio"),
            # "strict_builder_ratio": self.experiment_params.get("strict_builder_ratio"),
            # "relay_censorship_ratio": self.experiment_params.get("relay_censorship_ratio"),
            # "validator_censorship_ratio": self.experiment_params.get("validator_censorship_ratio"),
            # "base_block_build_time": self.experiment_params.get("base_block_build_time"),
            # "base_relay_process_time": self.experiment_params.get("base_relay_process_time"),
            # "avg_relays_per_builder": self.experiment_params.get("avg_relays_per_builder"), 
            # "censoring_builder_relay_ratio": self.experiment_params.get("censoring_builder_relay_ratio"),
            # "avg_relays_per_validator": self.experiment_params.get("avg_relays_per_validator"), 
            # "censoring_validator_relay_ratio_non_censoring":self.experiment_params.get("censoring_validator_relay_ratio_non_censoring"),
            # "builder_skip_probability": self.experiment_params.get("builder_skip_probability"),
            # "builder_review_time_per_tx": self.experiment_params.get("builder_review_time_per_tx"),
            # "relay_review_time_per_tx": self.experiment_params.get("relay_review_time_per_tx"),
            # "type_1_rejection_prob_builder": self.experiment_params.get("type_1_rejection_prob_builder"),
            # "type_1_rejection_prob_relay": self.experiment_params.get("type_1_rejection_prob_relay"),
            # "weak_detection_prob": self.experiment_params.get("weak_detection_prob"),
            # "proposer_reward_ratio": self.experiment_params.get("proposer_reward_ratio"),
            # "tx_rate": self.experiment_params.get("tx_rate"),
            # "sanctioned_probs": self.experiment_params.get("sanctioned_probs"),
            # "stop_time": self.stop_time,
            "run_id": self.run_id,                        
            "tx_id": tx.tx_id,
            "arrival_time": tx.arrival_time,
            "transaction_type": tx.transaction_type,
            "sanctioned_type": tx.sanctioned_type,
            "gas_used": tx.gas_used,
            "base_fee_per_gas": tx.base_fee_per_gas,
            "priority_fee_per_gas": tx.priority_fee_per_gas,
            "priority_fee": tx.priority_fee,
            "gas_fee": tx.gas_fee,
            "proposed_block_id": tx.proposed_block_id,
            "proposed_builder_id": tx.proposed_builder_id,
            "proposed_builder_policy": tx.proposed_builder_policy,
            "selected_relay_id": tx.selected_relay_id,
            "selected_relay_policy": tx.selected_relay_policy,
            "proposer_validator_id": tx.proposer_validator_id,  
            "proposer_validator_is_censoring": tx.proposer_validator_is_censoring,  
            "proposed_time": tx.proposed_time,
            "confirmed_time": tx.confirmed_time,
            "is_proposed": tx.is_proposed,
            "is_confirmed": tx.is_confirmed,           
        } for tx in all_transactions]

        df_tx = pd.DataFrame(tx_data)

        # Append mode: append if file exists, otherwise create
        if os.path.exists(filename):
            df_tx.to_csv(filename, mode='a', header=False, index=False)
        else:
            df_tx.to_csv(filename, mode='w', header=True, index=False)

        # print(f"All transaction data appended to {os.path.abspath(filename)}")
        logger.info(f"All transaction data appended to {os.path.abspath(filename)}")


    # Save all block records to CSV
    def save_blocks_to_csv(self, filename_prefix="blocks"):
        folder = self.experiment_params["result_folder"]
        filename = os.path.join(folder, f"{filename_prefix}.csv")

        blocks_data = [{
            # "num_builders": self.experiment_params.get("num_builders"),
            # "num_relays": self.experiment_params.get("num_relays"),
            # "num_validators": self.experiment_params.get("num_validators"),
            # "builder_censorship_ratio": self.experiment_params.get("builder_censorship_ratio"),
            # "strict_builder_ratio": self.experiment_params.get("strict_builder_ratio"),
            # "relay_censorship_ratio": self.experiment_params.get("relay_censorship_ratio"),
            # "validator_censorship_ratio": self.experiment_params.get("validator_censorship_ratio"),
            # "base_block_build_time": self.experiment_params.get("base_block_build_time"),
            # "base_relay_process_time": self.experiment_params.get("base_relay_process_time"),
            # "avg_relays_per_builder": self.experiment_params.get("avg_relays_per_builder"), 
            # "censoring_builder_relay_ratio": self.experiment_params.get("censoring_builder_relay_ratio"),
            # "avg_relays_per_validator": self.experiment_params.get("avg_relays_per_validator"), 
            # "censoring_validator_relay_ratio_non_censoring":self.experiment_params.get("censoring_validator_relay_ratio_non_censoring"),
            # "builder_skip_probability": self.experiment_params.get("builder_skip_probability"),
            # "builder_review_time_per_tx": self.experiment_params.get("builder_review_time_per_tx"),
            # "relay_review_time_per_tx": self.experiment_params.get("relay_review_time_per_tx"),
            # "type_1_rejection_prob_builder": self.experiment_params.get("type_1_rejection_prob_builder"),
            # "type_1_rejection_prob_relay": self.experiment_params.get("type_1_rejection_prob_relay"),
            # "weak_detection_prob": self.experiment_params.get("weak_detection_prob"),
            # "proposer_reward_ratio": self.experiment_params.get("proposer_reward_ratio"),
            # "tx_rate": self.experiment_params.get("tx_rate"),
            # "sanctioned_probs": self.experiment_params.get("sanctioned_probs"),
            # "stop_time": self.stop_time,
            "run_id": self.run_id,                    
            "block_id": block.block_id,
            "builder_id": block.builder_id,
            "builder_policy": block.builder_policy,
            "slot_no": block.slot_no,
            "parent_block_id": block.parent_block_id,
            "num_transactions": block.num_transactions,
            "num_non_sanctioned_tx": block.tx_counts.get(0, 0),  
            "num_sanctioned_tx": block.tx_counts.get(1, 0),  
            "total_gas_used": block.total_gas_used,
            "total_gas_fee": block.total_gas_fee,
            "total_priority_fee": block.total_priority_fee,
            "proposed_time": block.proposed_time,
            "confirmed_time": block.confirmed_time,
            "builder_review_time": block.builder_review_time,
            "relay_reject_count": block.relay_reject_count,
            "proposer_reject_count": block.proposer_reject_count,
            "is_relay_approved": block.is_relay_approved,
            "is_proposed": block.is_proposed,
            "is_confirmed": block.is_confirmed,
            "selected_relay_id": block.selected_relay_id,
            "selected_relay_policy": block.selected_relay_policy,
            "proposer_validator_id": block.proposer_validator_id,  
            "proposer_validator_is_censoring": block.proposer_validator_is_censoring,  
            "builder_profit": block.builder_profit,
            "proposer_profit": block.proposer_profit
        } for block in self.all_candidate_blocks]

        df_blocks = pd.DataFrame(blocks_data)

        # Append mode: append if file exists, otherwise create
        if os.path.exists(filename):
            df_blocks.to_csv(filename, mode='a', header=False, index=False)
        else:
            df_blocks.to_csv(filename, mode='w', header=True, index=False)
        
        # print(f"All candidate blocks data saved to {os.path.abspath(filename)}")
        logger.info(f"All candidate blocks data saved to {os.path.abspath(filename)}")


    def run(self, stop_time, max_event_count=None):
        """
        Run the simulation until stop_time or until max_event_count is reached.
        This prevents infinite loops.
        """
        event_counter = 0                   # Track how many events have been processed
        while self.event_queue and self.time < stop_time:
            event_time, _, event_callable, args = heapq.heappop(self.event_queue)
            
            # # Check if the event time is in the future
            # future_event_times = [et[0] for et in self.event_queue]
            # if future_event_times and future_event_times[0] < event_time:
            #     raise RuntimeError(
            #         f"[Event Queue Error] Next event time={future_event_times[0]:.4f} < current event time={event_time:.4f}"
            #     )

            if event_time >= stop_time:
                # print(f"No events left before stop_time={stop_time}. Ending simulation at time={self.time:.4f}")
                logger.info(f"No events left before stop_time={stop_time}. Ending simulation at time={self.time:.4f}")
                break

            self.time = event_time
            
            # Trigger the scheduled event
            event_callable(*args)
            event_counter += 1

            if max_event_count is not None and event_counter >= max_event_count:
                # print(f"Reached max_event_count={max_event_count}. Stopping simulation at time={self.time:.4f}")
                logger.critical(f"Reached max_event_count={max_event_count}. Stopping at time={self.time:.4f}")
                break
        
        # Save results after simulation ends
        self.save_transactions_to_csv()
        self.save_blocks_to_csv()



In [None]:
class AgentSetup:
    def __init__(self, rng):
        self.rng = rng

    def initialize_agents(
        self,
        num_builders=20,
        num_relays=10,
        num_validators=10,
        builder_censorship_ratio=0.0,
        strict_builder_ratio=0.0,               # Proportion of strict builders among censoring builders
        relay_censorship_ratio=0.0,
        validator_censorship_ratio=0.0,
        base_block_build_time=4.0,
        base_relay_process_time=1.0,
        builder_skip_probability=0.1,
        builder_review_time_per_tx=0.00315,
        relay_review_time_per_tx=0.00315,
        type_1_rejection_prob_builder=1.0,
        type_1_rejection_prob_relay=1.0,
        weak_detection_prob=1.0,
        proposer_reward_ratio=0.95
    ):
        num_censoring_builders = int(num_builders * builder_censorship_ratio)
        num_strict = int(num_censoring_builders * strict_builder_ratio)

        builders = []
        for i in range(num_builders):
            if i < num_strict:
                policy = "strict"
            elif i < num_censoring_builders:
                policy = "weak"
            else:
                policy = "non"
            
            is_censoring = policy in ["weak", "strict"]

            builder = Builder(
                mempool=None,
                rng=self.rng,
                builder_id=i,
                censorship_policy=CensorshipPolicy(
                    builder_policy=policy,
                    relay_policy="non",
                    type_1_rejection_prob_builder=type_1_rejection_prob_builder,
                    builder_review_time_per_tx=builder_review_time_per_tx,
                    weak_detection_prob=weak_detection_prob,
                    rng=self.rng
                ),
                relays=[],
                is_censoring=is_censoring,
                base_block_build_time=base_block_build_time,
                skip_probability=builder_skip_probability,
                proposer_reward_ratio=proposer_reward_ratio,
            )

            builders.append(builder)


        num_censoring_relays = int(num_relays * relay_censorship_ratio)
        relays = [
            Relay(
                relay_id=i,
                censorship_policy=CensorshipPolicy(
                    builder_policy="non",
                    relay_policy="censoring" if i < num_censoring_relays else "non",
                    type_1_rejection_prob_relay=type_1_rejection_prob_relay,
                    relay_review_time_per_tx=relay_review_time_per_tx,
                    rng=self.rng
                ),
                is_censoring=(i < num_censoring_relays),
                base_relay_process_time=base_relay_process_time
            ) for i in range(num_relays)
        ]


        num_censoring_validators = int(num_validators * validator_censorship_ratio)
        validators = [
            Validator(
                validator_id=i,
                connected_relays=[],
                is_censoring=(i < num_censoring_validators)
            ) for i in range(num_validators)
        ]

        return builders, relays, validators


    """
    censoring_builder_relay_ratio = 0.75
    - Censoring builders connect to: 75% censoring relays + 25% non-censoring relays
    - Non-censoring builders connect to: 75% non-censoring relays + 25% censoring relays
    """

    def construct_builder_relay_mapping(
        self,
        builders,
        relays,
        avg_relays_per_builder=4,
        censoring_builder_relay_ratio=0.75
    ):
        builder_relay_mapping = {}

        censoring_relays = [r for r in relays if r.is_censoring]
        non_censoring_relays = [r for r in relays if not r.is_censoring]

        # Step 1: Ensure each relay is connected to at least one builder
        for relay in relays:
            possible_builders = [b for b in builders if relay.is_censoring == b.is_censoring]
            if not possible_builders:
                possible_builders = builders                # If no matching type, allow cross-type connection
            chosen_builder = self.rng.choice(possible_builders)
            chosen_builder.relays.append(relay)

        # Step 2: Then fill in remaining connections according to ratio
        for builder in builders:
            current_relays = set(builder.relays)
            remaining_relays_needed = max(0, avg_relays_per_builder - len(current_relays))

            if remaining_relays_needed > 0:
                if builder.is_censoring:
                    num_censoring = max(1, int(remaining_relays_needed * censoring_builder_relay_ratio)) if censoring_relays else 0
                    num_non_censoring = remaining_relays_needed - num_censoring
                else:
                    num_non_censoring = max(1, int(remaining_relays_needed * censoring_builder_relay_ratio)) if non_censoring_relays else 0
                    num_censoring = remaining_relays_needed - num_non_censoring

                # If either group is empty, select all from the other group
                if not censoring_relays:
                    num_non_censoring = remaining_relays_needed
                    num_censoring = 0
                if not non_censoring_relays:
                    num_censoring = remaining_relays_needed
                    num_non_censoring = 0


                available_censoring_relays = [r for r in censoring_relays if r not in current_relays]
                available_non_censoring_relays = [r for r in non_censoring_relays if r not in current_relays]

                selected_censoring = self.rng.choice(
                    available_censoring_relays, 
                    size=min(num_censoring, len(available_censoring_relays)), 
                    replace=False
                ).tolist()

                selected_non_censoring = self.rng.choice(
                    available_non_censoring_relays, 
                    size=min(num_non_censoring, len(available_non_censoring_relays)), 
                    replace=False
                ).tolist()

                selected_relays = selected_censoring + selected_non_censoring
                builder.relays.extend(selected_relays)

            builder_relay_mapping[builder.builder_id] = [relay.relay_id for relay in builder.relays]

        return builder_relay_mapping


    """
    - Censoring validators (e.g. 20%) connect only to censoring relays (censoring_validator_relay_ratio = 1)
    - Non-censoring validators (e.g. 80%) connect to 75% non-censoring relays + 25% censoring relays (censoring_validator_relay_ratio = 0.25)
    """

    def construct_validator_relay_mapping(
        self,
        validators,
        relays,
        avg_relays_per_validator=4,
        censoring_validator_relay_ratio_censoring=1.0,          # For clarity only; not directly used
        censoring_validator_relay_ratio_non_censoring=0.25      # Ratio for non-censoring validators connecting to censoring relays
    ):
        validator_relay_mapping = {}

        censoring_relays = [r for r in relays if r.is_censoring]
        non_censoring_relays = [r for r in relays if not r.is_censoring]

        # Step 1: Ensure each relay is connected to at least one validator
        for relay in relays:
            possible_validators = [v for v in validators if relay.is_censoring == v.is_censoring]
            if not possible_validators:
                possible_validators = validators
            chosen_validator = self.rng.choice(possible_validators)
            chosen_validator.connected_relays.append(relay)

        # Step 2: Fill remaining connections for each validator
        for validator in validators:
            current_relays = set(validator.connected_relays)
            remaining_relays_needed = max(0, avg_relays_per_validator - len(current_relays))

            if remaining_relays_needed > 0:
                if validator.is_censoring:
                    if censoring_relays:                    # Censoring validators connect only to censoring relays
                        selected_relays = self.rng.choice(
                            [r for r in censoring_relays if r not in current_relays],
                            size=min(remaining_relays_needed, len([r for r in censoring_relays if r not in current_relays])),
                            replace=False
                        ).tolist()
                    else:                                   # No censoring relays available, select from non-censoring relays
                        selected_relays = self.rng.choice(
                            [r for r in non_censoring_relays if r not in current_relays],
                            size=min(remaining_relays_needed, len([r for r in non_censoring_relays if r not in current_relays])),
                            replace=False
                        ).tolist()

                else:
                    if non_censoring_relays and censoring_relays:
                        num_censoring = max(1, int(remaining_relays_needed * censoring_validator_relay_ratio_non_censoring))
                        num_non_censoring = remaining_relays_needed - num_censoring

                        selected_censoring = self.rng.choice(
                            [r for r in censoring_relays if r not in current_relays],
                            size=min(num_censoring, len([r for r in censoring_relays if r not in current_relays])),
                            replace=False
                        ).tolist()

                        selected_non_censoring = self.rng.choice(
                            [r for r in non_censoring_relays if r not in current_relays],
                            size=min(num_non_censoring, len([r for r in non_censoring_relays if r not in current_relays])),
                            replace=False
                        ).tolist()

                        selected_relays = selected_censoring + selected_non_censoring
                    elif censoring_relays:
                        selected_relays = self.rng.choice(
                            [r for r in censoring_relays if r not in current_relays],
                            size=min(remaining_relays_needed, len(censoring_relays)),
                            replace=False
                        ).tolist()
                    else:
                        selected_relays = self.rng.choice(
                            [r for r in non_censoring_relays if r not in current_relays],
                            size=min(remaining_relays_needed, len(non_censoring_relays)),
                            replace=False
                        ).tolist()

                validator.connected_relays.extend(selected_relays)
                
            validator_relay_mapping[validator.validator_id] = [relay.relay_id for relay in validator.connected_relays]

        return validator_relay_mapping



In [None]:
def simulate(
    builders,      # Instances of Builder
    relays,          # Instances of Relay
    validators,         # Instances of Validator
    builder_censorship_ratio,
    strict_builder_ratio,
    relay_censorship_ratio,
    validator_censorship_ratio,
    builder_relay_mapping, 
    validator_relay_mapping,
    base_block_build_time,
    base_relay_process_time,
    avg_relays_per_builder,
    censoring_builder_relay_ratio,
    avg_relays_per_validator,
    censoring_validator_relay_ratio_non_censoring,
    builder_skip_probability,
    builder_review_time_per_tx,
    relay_review_time_per_tx,
    type_1_rejection_prob_builder,
    type_1_rejection_prob_relay,
    weak_detection_prob,
    proposer_reward_ratio,
    tx_rate=13,
    sanctioned_probs=[0.995, 0.005],
    rng_seed=None,
    run_id=1,
    stop_time=3601.1,
    max_event_count=10000,
    experiment_name="non_censoring",
    result_folder=None 
):
    
    """
    Run the Ethereum PBS simulation by initializing the 'Model' and executing it for 'stop_time' seconds.
    """

    logger.info(f"Result output directory: {result_folder}")
    assert result_folder is not None and os.path.exists(result_folder), "result_folder does not exist or is not provided!"

    rng = np.random.default_rng(rng_seed)  # Fix the random seed for reproducibility

    # Create the mempool
    mempool = Mempool(rng, tx_rate, sanctioned_probs=sanctioned_probs)

    # Pre-generate all transactions
    mempool.generate_transactions(current_time=0, simulation_time=stop_time)

    for builder in builders:
        builder.mempool = mempool

    # Create the Proposer
    proposer = Proposer()

    # Store all experimental parameters into a dictionary
    experiment_params = {
        "num_builders": len(builders),
        "num_relays": len(relays),
        "num_validators": len(validators),
        "builder_censorship_ratio": builder_censorship_ratio,
        "strict_builder_ratio": strict_builder_ratio,
        "relay_censorship_ratio": relay_censorship_ratio,
        "validator_censorship_ratio": validator_censorship_ratio,
        "builder_relay_mapping": builder_relay_mapping,      
        "validator_relay_mapping": validator_relay_mapping,  
        "base_block_build_time": base_block_build_time,
        "base_relay_process_time": base_relay_process_time,
        "avg_relays_per_builder": avg_relays_per_builder,
        "censoring_builder_relay_ratio": censoring_builder_relay_ratio,
        "avg_relays_per_validator": avg_relays_per_validator,
        "censoring_validator_relay_ratio_non_censoring": censoring_validator_relay_ratio_non_censoring,
        "builder_skip_probability": builder_skip_probability,
        "builder_review_time_per_tx": builder_review_time_per_tx,
        "relay_review_time_per_tx": relay_review_time_per_tx,
        "type_1_rejection_prob_builder": type_1_rejection_prob_builder,
        "type_1_rejection_prob_relay": type_1_rejection_prob_relay,
        "weak_detection_prob": weak_detection_prob,
        "proposer_reward_ratio": proposer_reward_ratio,
        "tx_rate": tx_rate,
        "sanctioned_probs": sanctioned_probs,
        "result_folder": result_folder,
        "rng_seed": rng_seed,
        "run_id": run_id,
        "stop_time": stop_time,
        "max_event_count": max_event_count,
        "experiment_name": experiment_name
    }

    # Initialize the simulation model
    model = Model(
        proposer=proposer,
        mempool=mempool,
        builders=builders,
        relays=relays,
        validators=validators,
        rng=rng,
        run_id=run_id,
        stop_time=stop_time,
        experiment_name=experiment_name,
        experiment_params=experiment_params,
        builder_relay_mapping=builder_relay_mapping,
        validator_relay_mapping=validator_relay_mapping
    )


    # print(f"Simulation starts: run_id={run_id}, type={experiment_name}")
    # print(f"Simulation starts with transaction sanctioned_probs={sanctioned_probs}, {num_builders} Builders ({builder_censorship_ratio*100:.0f}% {builder_policy} censoring), "
    #       f"{num_relays} Relays ({relay_censorship_ratio*100:.0f}% {relay_policy} censoring), "
    #       f"avg_relays_per_builder={avg_relays_per_builder}, tx_rate={tx_rate}/s, duration={stop_time}s.")
    
    logger.info(f"Simulation starts: run_id={run_id}, experiment_name={experiment_name}.")
    
    # Run the simulation
    model.run(stop_time, max_event_count=max_event_count)

    # print("Simulation finished.")
    logger.info(f"Simulation finished: run_id={run_id}.")

    
    return experiment_params



In [None]:
def save_experiment_params(params_dict, folder, filename_prefix="experiment_params.json"):
    """
    Save experiment parameters to a JSON file located in the result directory.
    """
    # tx_rate = params_dict["tx_rate"]
    # filename = f"{filename_prefix}_run_{tx_rate}.json"
    file_path = os.path.join(folder, filename_prefix)
    
    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(params_dict, f, ensure_ascii=False, indent=4)

    logger.info(f"Experiment parameters saved to {file_path}")

In [None]:

# Define baseline model parameters
params = {
    "num_builders": 20,
    "num_relays": 10,
    "num_validators": 10,
    "builder_censorship_ratio": 0.5,
    "strict_builder_ratio": 0.5,         
    "relay_censorship_ratio": 0.5,
    "validator_censorship_ratio": 0.2,
    "base_block_build_time": 4.0,
    "base_relay_process_time": 1.0,
    "avg_relays_per_builder": 4,
    "censoring_builder_relay_ratio": 0.75,
    "avg_relays_per_validator": 4,
    "censoring_validator_relay_ratio_non_censoring": 0.25,
    "builder_skip_probability": 0.1,
    "builder_review_time_per_tx": 0.00315,
    "relay_review_time_per_tx": 0.00315,
    "type_1_rejection_prob_builder": 1.0,
    "type_1_rejection_prob_relay": 1.0,
    "weak_detection_prob": 1.0,
    "proposer_reward_ratio": 0.95,
    "tx_rate": 13,
    "sanctioned_probs": [0.995, 0.005]
}


# Initialize base random seed
base_seed = 10

# builder_review_times_per_tx = np.linspace(0.000, 0.01, 11)
builder_review_times_per_tx = [0.000, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.007, 0.008, 0.009, 0.010, 
                               0.015, 0.020, 0.025, 0.030, 0.035, 0.040, 0.045, 0.050, 0.060, 0.070, 0.080, 0.090, 0.100]

# relay_censorship_ratios = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]

for builder_review_time_per_tx in builder_review_times_per_tx:
    # Create a subfolder for each experiment
    date_str = datetime.now().strftime('%Y%m%d_%H%M%S')
    experiment_name = f"builder_review_time_per_tx_{builder_review_time_per_tx:.3f}_{date_str}"
    
    # Store results under the main result folder
    result_folder = os.path.join(main_result_folder, experiment_name)
    os.makedirs(result_folder, exist_ok=True)

    # # Dynamically update logger handler (commented out to disable file logging)
    # for handler in logger.handlers[:]:
    #     logger.removeHandler(handler)

    # log_filename = os.path.join(result_folder, 'simulation.log')
    # file_handler = logging.FileHandler(log_filename, mode='w', encoding='utf-8')
    # file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s [%(name)s]: %(message)s'))
    # logger.addHandler(file_handler)

    # logger.info(f"Simulation started. Results are saved to {result_folder}")


    # Run simulation
    for run_id in tqdm(range(1, 11), desc=f"Simulations Progress: builder_review_time_per_tx_{builder_review_time_per_tx:.3f}"): 
        
        # Ensure consistent agent initialization and mapping for each run  
        # Set up random number generator
        rng = np.random.default_rng(base_seed)

        # Instantiate AgentSetup
        agent_setup = AgentSetup(rng)

        # Update the parameter
        params["builder_review_time_per_tx"] = builder_review_time_per_tx

        # Remove parameters not needed by the simulate function
        params_for_simulate = params.copy()
        params_for_simulate.pop('num_builders')
        params_for_simulate.pop('num_relays')
        params_for_simulate.pop('num_validators')

        # Initialize all agents
        builders, relays, validators = agent_setup.initialize_agents(
            num_builders=params["num_builders"],
            num_relays=params["num_relays"],
            num_validators=params["num_validators"],
            builder_censorship_ratio=params["builder_censorship_ratio"],
            strict_builder_ratio=params["strict_builder_ratio"],   
            relay_censorship_ratio=params["relay_censorship_ratio"],
            validator_censorship_ratio=params["validator_censorship_ratio"],
            base_block_build_time=params["base_block_build_time"],       
            base_relay_process_time=params["base_relay_process_time"],
            builder_skip_probability=params["builder_skip_probability"],
            builder_review_time_per_tx=params["builder_review_time_per_tx"],
            relay_review_time_per_tx=params["relay_review_time_per_tx"],
            type_1_rejection_prob_builder=params["type_1_rejection_prob_builder"],
            type_1_rejection_prob_relay=params["type_1_rejection_prob_relay"],
            weak_detection_prob=params["weak_detection_prob"],
            proposer_reward_ratio=params["proposer_reward_ratio"]
        )

        # Construct and retrieve connection mappings
        builder_relay_mapping = agent_setup.construct_builder_relay_mapping(
            builders=builders,
            relays=relays,
            avg_relays_per_builder=params["avg_relays_per_builder"],                
            censoring_builder_relay_ratio=params["censoring_builder_relay_ratio"]
        )

        validator_relay_mapping = agent_setup.construct_validator_relay_mapping(
            validators=validators,
            relays=relays,
            avg_relays_per_validator=params["avg_relays_per_validator"],      
            censoring_validator_relay_ratio_censoring=1.0,                    
            censoring_validator_relay_ratio_non_censoring=params["censoring_validator_relay_ratio_non_censoring"]     
        )

        # Run simulation
        experiment_params = simulate(
            builders=builders,
            relays=relays,
            validators=validators,
            builder_relay_mapping=builder_relay_mapping,
            validator_relay_mapping=validator_relay_mapping,
            **params_for_simulate,           # Use the cleaned parameter dictionary
            rng_seed=base_seed + run_id, 
            run_id=run_id,
            stop_time=3601.1,
            max_event_count=None,
            experiment_name=experiment_name,
            result_folder=result_folder         # Save results to the specific subfolder
        )

        # Save simulation output directly
        save_experiment_params(experiment_params, result_folder)

    logger.info(f"Completed simulation for builder_review_time_per_tx = {builder_review_time_per_tx:.3f}")

    # # Clean up logger handler (commented out to disable file logging)
    # logger.removeHandler(file_handler)
    # file_handler.close()



Simulations Progress: relay_censorship_ratio_0.000: 100%|██████████| 10/10 [13:33<00:00, 81.32s/it]
Simulations Progress: relay_censorship_ratio_0.100: 100%|██████████| 10/10 [12:26<00:00, 74.62s/it]
Simulations Progress: relay_censorship_ratio_0.200: 100%|██████████| 10/10 [12:09<00:00, 72.98s/it]
Simulations Progress: relay_censorship_ratio_0.300: 100%|██████████| 10/10 [12:34<00:00, 75.41s/it]
Simulations Progress: relay_censorship_ratio_0.400: 100%|██████████| 10/10 [12:13<00:00, 73.39s/it]
Simulations Progress: relay_censorship_ratio_0.500: 100%|██████████| 10/10 [11:25<00:00, 68.58s/it]
Simulations Progress: relay_censorship_ratio_0.600: 100%|██████████| 10/10 [10:01<00:00, 60.15s/it]
Simulations Progress: relay_censorship_ratio_0.700: 100%|██████████| 10/10 [12:02<00:00, 72.30s/it]
Simulations Progress: relay_censorship_ratio_0.800: 100%|██████████| 10/10 [14:12<00:00, 85.25s/it]
Simulations Progress: relay_censorship_ratio_0.900: 100%|██████████| 10/10 [12:32<00:00, 75.25s/it]


In [None]:
# def run_simulation_parallel(params_tuple):
#     builder_time, relay_time, run_id, result_folder = params_tuple
    
#     # Explicitly disable default subprocess terminal output and avoid extra log files
#     logging.getLogger().handlers.clear()
#     logging.basicConfig(level=logging.CRITICAL)

#     # # Dynamically add logger handler (disabled: no longer writing log files)
#     # file_handler = logging.FileHandler(os.path.join(result_folder, 'simulation.log'), mode='a', encoding='utf-8')
#     # file_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s [%(name)s]: %(message)s'))
    
#     # logger.handlers.clear()
#     # logger.addHandler(file_handler)
    
#     logger = logging.getLogger("EthereumPBS")
#     logger.setLevel(logging.CRITICAL)
    
#     logger.info(f"Simulation started. Results are saved to {result_folder}")

#     # RNG for mapping (shared seed)
#     mapping_rng_seed = 10
#     mapping_rng = np.random.default_rng(mapping_rng_seed)
#     agent_setup = AgentSetup(mapping_rng)

#     # Separate RNG for each simulation run
#     simulation_rng_seed = mapping_rng_seed + run_id

#     # Deep copy of params to fully isolate for each thread/process
#     local_params = copy.deepcopy(params)  
#     local_params["builder_review_time_per_tx"] = builder_time
#     local_params["relay_review_time_per_tx"] = relay_time

#     builders, relays, validators = agent_setup.initialize_agents(
#         num_builders=local_params["num_builders"],
#         num_relays=local_params["num_relays"],
#         num_validators=local_params["num_validators"],
#         builder_censorship_ratio=local_params["builder_censorship_ratio"],
#         strict_builder_ratio=local_params["strict_builder_ratio"],
#         relay_censorship_ratio=local_params["relay_censorship_ratio"],
#         validator_censorship_ratio=local_params["validator_censorship_ratio"],
#         base_block_build_time=local_params["base_block_build_time"],
#         base_relay_process_time=local_params["base_relay_process_time"],
#         builder_skip_probability=local_params["builder_skip_probability"],
#         builder_review_time_per_tx=local_params["builder_review_time_per_tx"],
#         relay_review_time_per_tx=local_params["relay_review_time_per_tx"],
#         type_1_rejection_prob_builder=local_params["type_1_rejection_prob_builder"],
#         type_1_rejection_prob_relay=local_params["type_1_rejection_prob_relay"],
#         weak_detection_prob=local_params["weak_detection_prob"],
#         proposer_reward_ratio=local_params["proposer_reward_ratio"]
#     )

#     builder_relay_mapping = agent_setup.construct_builder_relay_mapping(
#         builders=builders,
#         relays=relays,
#         avg_relays_per_builder=local_params["avg_relays_per_builder"],
#         censoring_builder_relay_ratio=local_params["censoring_builder_relay_ratio"]
#     )

#     validator_relay_mapping = agent_setup.construct_validator_relay_mapping(
#         validators=validators,
#         relays=relays,
#         avg_relays_per_validator=local_params["avg_relays_per_validator"],
#         censoring_validator_relay_ratio_censoring=1.0,
#         censoring_validator_relay_ratio_non_censoring=local_params["censoring_validator_relay_ratio_non_censoring"]
#     )

#     params_for_simulate = local_params.copy()
#     params_for_simulate.pop('num_builders')
#     params_for_simulate.pop('num_relays')
#     params_for_simulate.pop('num_validators')

#     experiment_params = simulate(
#         builders=builders,
#         relays=relays,
#         validators=validators,
#         builder_relay_mapping=builder_relay_mapping,
#         validator_relay_mapping=validator_relay_mapping,
#         **params_for_simulate,
#         rng_seed=simulation_rng_seed, 
#         run_id=run_id,
#         stop_time=361.1,
#         max_event_count=None,
#         experiment_name=f"builder_review_{builder_time}_relay_review_{relay_time}",
#         result_folder=result_folder
#     )

#     save_experiment_params(experiment_params, result_folder)

#     logger.info(f"Completed simulation (run_id={run_id}).")
#     # logger.removeHandler(file_handler)
#     # file_handler.close()


In [None]:
# all_params_list = []

# builder_review_times_per_tx = np.linspace(0.000, 0.01, 11)
# relay_review_times_per_tx = np.linspace(0.000, 0.01, 11)

# for builder_time in builder_review_times_per_tx:
#     for relay_time in relay_review_times_per_tx:
#         date_str = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3]
#         experiment_name = f"builder_review_{builder_time:.3f}_relay_review_{relay_time:.3f}_{date_str}"
#         result_folder = os.path.join(main_result_folder, experiment_name)
#         os.makedirs(result_folder, exist_ok=True)

#         for run_id in range(1, 11):
#             all_params_list.append((builder_time, relay_time, run_id, result_folder))


In [None]:
# """
# Run simulations in parallel using thread pool
# """
# from concurrent.futures import ThreadPoolExecutor, as_completed
# import threading
# from tqdm import tqdm

# num_cores = max(1, os.cpu_count() - 1)
# lock = threading.Lock()

# progress_bar = tqdm(total=len(all_params_list), desc="Total Simulations Progress", dynamic_ncols=True)

# with ThreadPoolExecutor(max_workers=num_cores) as executor:
#     futures = {executor.submit(run_simulation_parallel, param_tuple): param_tuple for param_tuple in all_params_list}

#     for future in as_completed(futures):
#         param = futures[future]
#         with lock:
#             progress_bar.update(1)
#             progress_bar.set_postfix_str(f"builder_review={param[0]:.3f}, relay_review={param[1]:.3f}, run_id={param[2]}")

# progress_bar.close()



In [None]:
# """
# Run simulations in parallel using process pool
# """
# from concurrent.futures import ProcessPoolExecutor, as_completed
# from tqdm import tqdm
# import multiprocessing

# num_cores = max(1, multiprocessing.cpu_count() - 1)

# progress_bar = tqdm(total=len(all_params_list), desc="Total Simulations Progress", dynamic_ncols=True)

# with ProcessPoolExecutor(max_workers=num_cores) as executor:
#     futures = {executor.submit(run_simulation_parallel, param_tuple): param_tuple for param_tuple in all_params_list}

#     for future in as_completed(futures):
#         param = futures[future]
#         progress_bar.update(1)
#         progress_bar.set_postfix_str(f"builder_review={param[0]:.3f}, relay_review={param[1]:.3f}, run_id={param[2]}")

# progress_bar.close()


In [None]:
# Print Builder → Relay mappings
print("Builder → Relay Mapping:")
for b, r_ids in builder_relay_mapping.items():
    print(f"Builder {b} → Relays {r_ids}")

# Print Relay → Builder mappings
relay_to_builders = {relay.relay_id: [] for relay in relays}
for builder in builders:
    for relay in builder.relays:
        relay_to_builders[relay.relay_id].append(builder.builder_id)

print("\nRelay → Builder Mapping:")
for relay_id, builder_ids in relay_to_builders.items():
    print(f"Relay {relay_id} → Builders {builder_ids}")

# Print Validator → Relay mappings
print("\nValidator → Relay Mapping:")
for v, r_ids in validator_relay_mapping.items():
    print(f"Validator {v} → Relays {r_ids}")

# Print Relay → Validator mappings
relay_to_validators = {relay.relay_id: [] for relay in relays}
for validator in validators:
    for relay in validator.connected_relays:
        relay_to_validators[relay.relay_id].append(validator.validator_id)

print("\nRelay → Validator Mapping:")
for relay_id, validator_ids in relay_to_validators.items():
    print(f"Relay {relay_id} → Validators {validator_ids}")


# Print Builder parameters
print("\nBuilders' Parameters:")
for builder in builders:
    print(f"Builder {builder.builder_id}:\n"
          f"  - is_censoring: {builder.is_censoring}\n"
          f"  - builder_policy: {builder.censorship_policy.builder_policy}\n"
          f"  - type_1_rejection_prob_builder: {builder.censorship_policy.type_1_rejection_prob_builder}\n"
          f"  - weak_detection_prob: {builder.censorship_policy.weak_detection_prob}\n"
          f"  - base_block_build_time: {builder.base_block_build_time}\n"
          f"  - builder_review_time_per_tx: {builder.censorship_policy.builder_review_time_per_tx}\n"
          f"  - skip_probability: {builder.skip_probability}\n"
          f"  - proposer_reward_ratio: {builder.proposer_reward_ratio}")

# Print Relay parameters
print("\nRelays' Parameters:")
for relay in relays:
    print(f"Relay {relay.relay_id}:\n"
          f"  - is_censoring: {relay.is_censoring}\n"
          f"  - relay_policy: {relay.censorship_policy.relay_policy}\n"
          f"  - type_1_rejection_prob_relay: {relay.censorship_policy.type_1_rejection_prob_relay}\n"
          f"  - base_relay_process_time: {relay.base_relay_process_time}\n"
          f"  - relay_review_time_per_tx: {relay.censorship_policy.relay_review_time_per_tx}")

# Print Validator parameters
print("\nValidators' Parameters:")
for validator in validators:
    print(f"Validator {validator.validator_id}:\n"
          f"  - is_censoring: {validator.is_censoring}")
