# LayerZero DVN Configuration Manager

This notebook provides a comprehensive interface for configuring DVNs (Decentralized Verifier Networks) and Executors for the blockhash oracle system.

Key features:
- Outer loop by chains, inner loop by peers (including read_channel)
- Handles DVN and Executor configurations separately as per LayerZero requirements
- Supports both ULN (send/receive) and Read library configurations
- Fetch, plan, and set configurations in a single pass

## 1. Configuration

In [None]:
%load_ext autoreload
%autoreload 2

# Network type selection
NETWORK_TYPE = "mainnets"  # "testnets" or "mainnets"

# DVN Selection
# Available: "layerzero-labs", "nethermind", "curve" (if configured)
DVNS_TO_USE = ["layerzero-labs"]

# Execution mode
DRY_RUN = True  # Set to False to actually apply changes
FORCE_RECONFIGURE = False  # Force reconfiguration even if current config matches desired

# Use default executor (0x0000...) or chain-specific executors
USE_DEFAULT_EXECUTOR = True

# Optional Curve Finance DVN addresses
CURVE_DVNS = {
    "sepolia": "0x3a8bf25ff10ec52dc7efe32aafaef84072fdcf8c",
    "base-sepolia": "0xfe3c4c5676c04a4ebd9961a7c5934be16beb35df",
    "optimism-sepolia": "0x75d7ad554475008cae51298578cda6936c432d4e",
    "arbitrum-sepolia": "0x4b916807a527fdaa66b3bff5a5307f5129b60f43",
}

## 2. Imports and Setup

In [None]:
import json
import os
import sys
import logging
import time
from pathlib import Path
from typing import Dict, List, Tuple, Optional

from dotenv import load_dotenv
from eth_account import Account
from web3 import Web3
from eth_abi import decode, encode
from web3.middleware import ExtraDataToPOAMiddleware

# Add parent directory to path for imports
sys.path.append(str(Path().resolve().parent))
from ABIs import endpointV2_abi

# Import from deployment folder
from LZMetadata import LZMetadata
from DeploymentManager import DeploymentManager

# Setup logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# Load environment variables
load_dotenv()

# Constants from LayerZero documentation
READ_CHANNEL_ID = 4294967295  # max uint32
CONFIG_TYPE_EXECUTOR = 1  # Executor configuration type
CONFIG_TYPE_ULN = 2  # ULN configuration type for send/receive
CONFIG_TYPE_READ = 1  # Read configuration type (same as executor but different context)

## 3. Helper Functions

In [None]:
def checksum(address: str) -> str:
    """Convert address to checksum format"""
    return Web3.to_checksum_address(address)


def inject_curve_dvns(dvns_list: List[Dict], chain_key: str) -> List[Dict]:
    """Add Curve Finance DVN to DVN list if available"""
    if chain_key not in CURVE_DVNS:
        return dvns_list

    curve_dvn = {
        "address": CURVE_DVNS[chain_key],
        "version": 2,
        "canonicalName": "Curve Finance",
        "id": "curve",
        "lzReadCompatible": True,
    }

    if not any(d["address"].lower() == curve_dvn["address"].lower() for d in dvns_list):
        dvns_list.append(curve_dvn)

    return dvns_list


def decode_dvn_config(hex_data: bytes, config_type: str = "uln") -> Optional[Dict]:
    """Decode DVN configuration from hex data"""
    if not hex_data or hex_data == "0x":
        return None

    if isinstance(hex_data, str):
        hex_data = bytes.fromhex(hex_data.replace("0x", ""))

    try:
        if config_type == "read":
            # Read config includes executor address
            decoded = decode(["(address,uint8,uint8,uint8,address[],address[])"], hex_data)
            return {
                "executor": checksum(decoded[0][0]),
                "requiredDVNCount": decoded[0][1],
                "optionalDVNCount": decoded[0][2],
                "optionalDVNThreshold": decoded[0][3],
                "requiredDVNs": [checksum(addr) for addr in decoded[0][4]],
                "optionalDVNs": [checksum(addr) for addr in decoded[0][5]],
            }
        elif config_type == "executor":
            # Executor config is just (uint32,address)
            decoded = decode(["(uint32,address)"], hex_data)
            return {"executor": checksum(decoded[0][1])}
        else:
            # ULN config for send/receive
            decoded = decode(["(uint64,uint8,uint8,uint8,address[],address[])"], hex_data)
            return {
                "confirmations": decoded[0][0],
                "requiredDVNCount": decoded[0][1],
                "optionalDVNCount": decoded[0][2],
                "optionalDVNThreshold": decoded[0][3],
                "requiredDVNs": [checksum(addr) for addr in decoded[0][4]],
                "optionalDVNs": [checksum(addr) for addr in decoded[0][5]],
            }
    except Exception as e:
        logging.warning(f"Failed to decode DVN config: {e}")
        return None


def get_current_config(
    endpoint_w3, oapp: str, lib: str, eid: int, config_type: int
) -> Optional[Dict]:
    """Fetch current DVN configuration from chain"""
    try:
        config_bytes = endpoint_w3.functions.getConfig(oapp, lib, eid, config_type).call()
        if config_type == CONFIG_TYPE_READ:
            return decode_dvn_config(config_bytes, "read")
        elif config_type == CONFIG_TYPE_EXECUTOR:
            return decode_dvn_config(config_bytes, "executor")
        else:
            return decode_dvn_config(config_bytes, "uln")
    except Exception as e:
        logging.debug(f"Failed to get config: {e}")
        return None


def get_desired_dvns(
    source_dvns: List[Dict],
    peer_dvns: List[Dict],
    dvn_ids: List[str],
    is_read_channel: bool = False,
) -> Tuple[List[str], List[str]]:
    """Determine desired DVNs based on routing rules"""
    if is_read_channel:
        # For read channel, use read-compatible DVNs from source chain
        read_dvns = [
            dvn["address"]
            for dvn in source_dvns
            if dvn.get("lzReadCompatible", False) and dvn["id"] in dvn_ids
        ]
        return read_dvns, []
    else:
        # For regular peers, find common non-read DVNs
        # Note: Curve DVN can be both read-compatible and used for regular channels
        source_dvn_map = {
            d["id"]: d["address"] for d in source_dvns if not d.get("lzReadCompatible", False)
        }
        peer_dvn_map = {
            d["id"]: d["address"] for d in peer_dvns if not d.get("lzReadCompatible", False)
        }

        common_ids = [id for id in dvn_ids if id in source_dvn_map and id in peer_dvn_map]

        # Return addresses from appropriate chain's perspective
        return [source_dvn_map[id] for id in common_ids], [peer_dvn_map[id] for id in common_ids]


def configs_match(
    current: Optional[Dict], required_dvns: List[str], optional_dvns: List[str]
) -> bool:
    """Check if current config matches desired DVNs"""
    if not current:
        return False

    current_required = set(addr.lower() for addr in current.get("requiredDVNs", []))
    current_optional = set(addr.lower() for addr in current.get("optionalDVNs", []))
    desired_required = set(addr.lower() for addr in required_dvns)
    desired_optional = set(addr.lower() for addr in optional_dvns)

    return current_required == desired_required and current_optional == desired_optional


def encode_dvn_config(
    required_dvns: List[str],
    optional_dvns: List[str],
    config_type: str = "uln",
    executor: str = None,
) -> bytes:
    """Encode DVN configuration to bytes according to LayerZero specs"""
    # Sort DVN addresses alphabetically as required by LayerZero
    required_dvns = sorted([checksum(addr) for addr in required_dvns])
    optional_dvns = sorted([checksum(addr) for addr in optional_dvns])
    optional_threshold = len(optional_dvns) if optional_dvns else 0

    if config_type == "read":
        # Read config structure includes executor
        executor = checksum(executor or "0x0000000000000000000000000000000000000000")
        config_struct = (
            executor,
            len(required_dvns),
            len(optional_dvns),
            optional_threshold,
            required_dvns,
            optional_dvns,
        )
        return encode(["(address,uint8,uint8,uint8,address[],address[])"], [config_struct])
    else:
        # ULN config structure for send/receive
        config_struct = (
            0,  # confirmations (uint64)
            len(required_dvns),
            len(optional_dvns),
            optional_threshold,
            required_dvns,
            optional_dvns,
        )
        return encode(["(uint64,uint8,uint8,uint8,address[],address[])"], [config_struct])


def encode_executor_config(eid: int, executor: str) -> bytes:
    """Encode executor configuration separately"""
    executor_config = (eid, checksum(executor))
    return encode(["(uint32,address)"], [executor_config])


def send_tx(w3, func, acc, value=0, gas=0):
    """Send transaction with retry logic"""

    try:
        tx = func.build_transaction(
            {
                "from": acc.address,
                "nonce": w3.eth.get_transaction_count(acc.address),
                "value": value,
            }
        )
    except Exception:
        tx = func.build_transaction(
            {
                "from": acc.address,
                "nonce": w3.eth.get_transaction_count(acc.address),
                "value": value,
                "gasPrice": int(1.1 * w3.eth.gas_price),
            }
        )
    if gas > 0:
        tx["gas"] = gas
    else:
        try:
            tx["gas"] = int(w3.eth.estimate_gas(tx) * 1.2)
        except Exception as e:
            if "no data" in str(e):
                tx["gas"] = 1_000_000
            else:
                raise (e)

    signed_tx = w3.eth.account.sign_transaction(tx, private_key=acc.key)
    tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction)
    receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
    if receipt.status == 0:
        raise Exception("Transaction failed")
    return tx_hash


def set_config(
    endpoint_w3,
    w3,
    account,
    oapp: str,
    lib: str,
    eid: int,
    config_type_enum: int,
    config_bytes: bytes,
    description: str,
) -> bool:
    """Apply configuration to chain"""
    try:
        config_param = (eid, config_type_enum, config_bytes)
        func = endpoint_w3.functions.setConfig(_oapp=oapp, _lib=lib, _params=[config_param])

        if DRY_RUN:
            print(f"    [DRY RUN] Would set {description}")
            return True
        else:
            if w3.eth.chain_id == 5000:
                custom_gas = 3_000_000_000
            else:
                custom_gas = 0
            tx_hash = send_tx(w3, func, account, gas=custom_gas)
            print(f"    ✓ Set {description}: {tx_hash.hex()}")
            time.sleep(1)  # Small delay to avoid nonce issues
            return True
    except Exception as e:
        print(f"    ✗ Failed to set {description}: {str(e)}")
        return False

## 4. Load State and Initialize

In [None]:
# Load deployment state
deployment_manager = DeploymentManager()
deployed_contracts = deployment_manager.get_all_deployed_contracts(NETWORK_TYPE)

if not deployed_contracts:
    raise ValueError(f"No deployments found for {NETWORK_TYPE}")

logging.info(f"Found deployments for {len(deployed_contracts)} chains")

# Load chains configuration
with open("../chain-parse/chains.json", "r") as f:
    chains_config = json.load(f)

all_chains = chains_config[NETWORK_TYPE]

# Find main chain
main_chain = None
for chain_name, config in all_chains.items():
    if config.get("is_main_chain", False):
        main_chain = chain_name
        break

if not main_chain:
    raise ValueError(f"No main chain defined for {NETWORK_TYPE}")

logging.info(f"Main chain: {main_chain}")

# Initialize LayerZero metadata
lz = LZMetadata()

# Get account
if NETWORK_TYPE == "testnets":
    private_key = os.environ.get("WEB3_TESTNET_PK")
    if not private_key:
        raise ValueError("WEB3_TESTNET_PK not found in environment")
    account = Account.from_key(private_key)
else:
    sys.path.append(os.path.expanduser("~/projects/keys/scripts"))
    from secure_key_utils import get_web3_account
    from getpass import getpass

    ENCRYPTED_PK = os.environ.get("ENCRYPTED_PK")
    account = get_web3_account(ENCRYPTED_PK, getpass())

logging.info(f"Deployer address: {account.address}")

# Load ABI for LZBlockRelay
try:
    with open("../../contracts/messengers/LZBlockRelay.abi", "r") as f:
        relay_abi = json.load(f)
except Exception as e:
    print(f"Error loading LZBlockRelay ABI: {e}")
    # Fallback to getting ABI from vyper
    import subprocess

    result = subprocess.run(
        ["vyper", "../../contracts/messengers/LZBlockRelay.vy", "-f", "abi"],
        capture_output=True,
        text=True,
    )
    relay_abi = json.loads(result.stdout)

## 5. Main Configuration Loop

In [None]:
# Initialize state dictionary
state_dict = {}

# Load basic info for all chains first
print("Loading chain information...")
for chain_name in deployed_contracts.keys():
    config = all_chains[chain_name]

    # Setup RPC
    ankr_key = os.environ.get("ANKR_API_KEY")
    drpc_key = os.environ.get("DRPC_API_KEY")
    rpc_order = ["public", "drpc", "ankr"]
    for rpc_type in rpc_order:
        if config.get(rpc_type) is not None:
            if rpc_type == "ankr":
                rpc_url = config[rpc_type].format(ankr_key)
            elif rpc_type == "drpc":
                rpc_url = config[rpc_type].format(drpc_key)
            else:
                rpc_url = config[rpc_type]
            break

    # Get LayerZero metadata
    try:
        lz_metadata = lz.get_chain_metadata(chain_name)
        state_dict[chain_name] = {
            "config": config,
            "rpc_url": rpc_url,
            "contracts": deployed_contracts[chain_name],
            "eid": lz_metadata["metadata"]["eid"],
            "endpoint": lz_metadata["metadata"]["endpointV2"],
            "send_lib": lz_metadata["metadata"].get("sendUln302", "unavailable"),
            "receive_lib": lz_metadata["metadata"].get("receiveUln302", "unavailable"),
            "read_lib": lz_metadata["metadata"].get("readLib1002", "unavailable"),
            "dvns": inject_curve_dvns(lz_metadata["dvns"].copy(), chain_name),
            "executor": lz_metadata["metadata"].get(
                "executor", "0x0000000000000000000000000000000000000000"
            ),
        }
    except Exception as e:
        logging.warning(f"Failed to get LZ metadata for {chain_name}: {e}")

# Build EID to name mapping
eid_to_name = {state_dict[chain]["eid"]: chain for chain in state_dict.keys()}

print(f"\nLoaded {len(state_dict)} chains")
print(f"DVNs to use: {', '.join(DVNS_TO_USE)}")
print(f"Mode: {'DRY RUN' if DRY_RUN else 'LIVE'}")
print(f"Executor: {'DEFAULT (0x0000...)' if USE_DEFAULT_EXECUTOR else 'CHAIN-SPECIFIC'}")
print("\n" + "=" * 80)

In [None]:
# Statistics
total_configs_checked = 0
total_configs_updated = 0
total_configs_failed = 0
CHAINS_TO_CHECK = ["mantle"]
problematic_chains = []
DRY_RUN = False
# Main configuration loop - outer loop by chains
for chain_name in sorted(deployed_contracts.keys()):
    if chain_name == main_chain:
        continue
    if chain_name not in CHAINS_TO_CHECK:
        continue
    if "LZBlockRelay" not in deployed_contracts[chain_name]:
        continue
    pre_total_configs_failed = total_configs_failed
    print(f"\n{'='*60}")
    print(f"Processing {chain_name}...")
    print(f"{'='*60}")

    # Setup Web3 connection
    w3 = Web3(Web3.HTTPProvider(state_dict[chain_name]["rpc_url"]))
    w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0)

    # Setup contracts
    relay_address = deployed_contracts[chain_name]["LZBlockRelay"]
    relay_w3 = w3.eth.contract(address=relay_address, abi=relay_abi)
    endpoint_w3 = w3.eth.contract(address=state_dict[chain_name]["endpoint"], abi=endpointV2_abi)

    oapp = checksum(relay_address)
    chain_dvns = state_dict[chain_name]["dvns"]
    is_read_enabled = state_dict[chain_name]["read_lib"] != "unavailable"

    # Determine executor to use
    executor = (
        "0x0000000000000000000000000000000000000000"
        if USE_DEFAULT_EXECUTOR
        else state_dict[chain_name]["executor"]
    )

    # Get all configured peers (including read channel)
    configured_peers = []

    # Check regular peers
    for peer_chain in state_dict.keys():
        if peer_chain == chain_name or peer_chain == main_chain:
            continue

        peer_eid = state_dict[peer_chain]["eid"]
        try:
            peer_bytes = relay_w3.functions.peers(peer_eid).call()
            if peer_bytes.hex() != "0" * 64:
                configured_peers.append((peer_eid, peer_chain))
        except Exception as e:
            print(f"Error checking peer {peer_chain}: {e}")
            pass

    # Check read channel if read-enabled
    if is_read_enabled:
        try:
            peer_bytes = relay_w3.functions.peers(READ_CHANNEL_ID).call()
            if peer_bytes.hex() != "0" * 64:
                configured_peers.append((READ_CHANNEL_ID, "READ_CHANNEL"))
        except Exception as e:
            print(f"Error checking read channel: {e}")
            pass

    print(f"\nFound {len(configured_peers)} configured peers")

    # Inner loop by peers
    for peer_eid, peer_name in configured_peers:
        print(f"\n  Checking {chain_name} <-> {peer_name} (EID {peer_eid})...")

        is_read_channel = peer_eid == READ_CHANNEL_ID

        # Get peer info if not read channel
        if not is_read_channel:
            peer_dvns = state_dict[peer_name]["dvns"]
            peer_is_read_enabled = state_dict[peer_name]["read_lib"] != "unavailable"
        else:
            peer_dvns = []
            peer_is_read_enabled = False

        # Process SEND configuration (chain -> peer)
        if is_read_enabled or is_read_channel:
            total_configs_checked += 1

            # Determine library and config type
            if is_read_channel:
                lib = checksum(state_dict[chain_name]["read_lib"])
                config_type_enum = CONFIG_TYPE_READ
                config_type_str = "read"
            else:
                lib = checksum(state_dict[chain_name]["send_lib"])
                config_type_enum = CONFIG_TYPE_ULN
                config_type_str = "uln"

            # Get current DVN config
            current_dvn = get_current_config(endpoint_w3, oapp, lib, peer_eid, config_type_enum)

            # Get current executor config (only for non-read channels)
            current_executor = None
            if not is_read_channel and not USE_DEFAULT_EXECUTOR:
                current_executor = get_current_config(
                    endpoint_w3, oapp, lib, peer_eid, CONFIG_TYPE_EXECUTOR
                )

            # Get desired DVNs
            if is_read_channel:
                required_dvns, _ = get_desired_dvns(chain_dvns, [], DVNS_TO_USE, True)
                optional_dvns = []
            else:
                required_dvns, _ = get_desired_dvns(chain_dvns, peer_dvns, DVNS_TO_USE, False)
                optional_dvns = []

            # Check if DVN update needed
            dvn_needs_update = required_dvns and (
                FORCE_RECONFIGURE or not configs_match(current_dvn, required_dvns, optional_dvns)
            )

            # Check if executor update needed (only for send configs on non-read channels)
            executor_needs_update = False
            if (
                not is_read_channel
                and not USE_DEFAULT_EXECUTOR
                and executor != "0x0000000000000000000000000000000000000000"
            ):
                executor_needs_update = (
                    not current_executor
                    or current_executor.get("executor", "").lower() != executor.lower()
                )

            if dvn_needs_update or executor_needs_update:
                print("    Send config needs update:")

                # Set executor config first if needed
                if executor_needs_update:
                    print(f"      Executor: {executor[:10]}...")
                    executor_bytes = encode_executor_config(peer_eid, executor)
                    if set_config(
                        endpoint_w3,
                        w3,
                        account,
                        oapp,
                        lib,
                        peer_eid,
                        CONFIG_TYPE_EXECUTOR,
                        executor_bytes,
                        f"executor config for {peer_name}",
                    ):
                        total_configs_updated += 1
                    else:
                        total_configs_failed += 1

                # Set DVN config
                if dvn_needs_update:
                    if current_dvn:
                        print(
                            f"      Current DVNs: R: {current_dvn.get('requiredDVNs', [])}, O: {current_dvn.get('optionalDVNs', [])}"
                        )
                    else:
                        print("      Current DVNs: Not configured")
                    print(f"      Desired DVNs: R: {required_dvns}, O: {optional_dvns}")

                    # Encode and set config
                    config_bytes = encode_dvn_config(
                        required_dvns,
                        optional_dvns,
                        config_type_str,
                        executor if is_read_channel else None,
                    )

                    if set_config(
                        endpoint_w3,
                        w3,
                        account,
                        oapp,
                        lib,
                        peer_eid,
                        config_type_enum,
                        config_bytes,
                        f"send DVN config to {peer_name}",
                    ):
                        total_configs_updated += 1
                    else:
                        total_configs_failed += 1
            else:
                print("    Send config already correct")

        # Process RECEIVE configuration (peer -> chain)
        if not is_read_channel and peer_is_read_enabled:
            total_configs_checked += 1

            lib = checksum(state_dict[chain_name]["receive_lib"])
            config_type_enum = CONFIG_TYPE_ULN

            # Get current config
            current = get_current_config(endpoint_w3, oapp, lib, peer_eid, config_type_enum)

            # Get desired DVNs (from peer's perspective)
            _, required_dvns = get_desired_dvns(peer_dvns, chain_dvns, DVNS_TO_USE, False)
            optional_dvns = []

            # Check if update needed
            if required_dvns and (
                FORCE_RECONFIGURE or not configs_match(current, required_dvns, optional_dvns)
            ):
                print("    Receive config needs update:")
                if current:
                    print(
                        f"      Current DVNs: R: {current.get('requiredDVNs', [])}, O: {current.get('optionalDVNs', [])}"
                    )
                else:
                    print("      Current DVNs: Not configured")
                print(f"      Desired DVNs: R: {required_dvns}, O: {optional_dvns}")

                # Encode and set config
                config_bytes = encode_dvn_config(required_dvns, optional_dvns, "uln")

                if set_config(
                    endpoint_w3,
                    w3,
                    account,
                    oapp,
                    lib,
                    peer_eid,
                    config_type_enum,
                    config_bytes,
                    f"receive DVN config from {peer_name}",
                ):
                    total_configs_updated += 1
                else:
                    total_configs_failed += 1

            else:
                print("    Receive config already correct")
    if total_configs_failed > pre_total_configs_failed:
        print(f"    Failed to set config for {chain_name} <-> {peer_name}")
        problematic_chains.append(chain_name)


print("\n" + "=" * 80)
print("CONFIGURATION SUMMARY")
print("=" * 80)
print(f"Total configurations checked: {total_configs_checked}")
print(f"Configurations updated: {total_configs_updated}")
print(f"Configurations failed: {total_configs_failed}")
print(
    f"Configurations already correct: {total_configs_checked - total_configs_updated - total_configs_failed}"
)

if DRY_RUN:
    print("\n⚠️  DRY RUN MODE - No actual changes were made")
    print("Set DRY_RUN = False to apply changes")
else:
    print("\n✅ Configuration complete!")