## Setup RPC & deployer

In [None]:
from dotenv import load_dotenv
import os
import boa
from eth_account import Account
from web3 import Web3
import logging
import subprocess
import time

load_dotenv()
chain_handle = "base-sepolia"

PRIVATE_KEY = os.environ.get("WEB3_TESTNET_PK")
RPCS = {
    "sepolia": "https://sepolia.drpc.org",
    "base-sepolia": "https://sepolia.base.org",
    "optimism-sepolia": "https://sepolia.optimism.io",
    "arbitrum-sepolia": "https://sepolia-rollup.arbitrum.io/rpc",
}

RPC_URL = RPCS[chain_handle]

deployer = Account.from_key(PRIVATE_KEY)
eth_env = boa.set_network_env(RPC_URL)
# boa.set_env(eth_env)
w3 = Web3(Web3.HTTPProvider(RPC_URL))

# this automatically sets the eoa as the deployer
boa.env.add_account(deployer)
print(f"Deploying with {deployer.address} on Chain id {boa.env.evm.patch.chain_id}")
print(f"Chain balance is {w3.eth.get_balance(deployer.address)/1e18 :.3f} ETH")

## Setup LZ

In [None]:
from LZDeployments import LZDeployments
import json

lz = LZDeployments()
chain_data = lz.get_chain_metadata(chain_handle)
# print(json.dumps(chain_data, indent=2))
LZ_ENDPOINT = chain_data["metadata"]["endpointV2"]
LZ_EID = chain_data["metadata"]["eid"]
SEND_LIB = chain_data["metadata"].get("sendUln302", "unavailable")
RECEIVE_LIB = chain_data["metadata"].get("receiveUln302", "unavailable")
READ_LIB = chain_data["metadata"].get("readLib1002", "unavailable")
EXECUTOR = chain_data["metadata"].get("executor", "0x0000000000000000000000000000000000000000")
print(f"Operating on: {chain_handle}")
print(f"Chain eID: {LZ_EID}\nEndpoint address: {LZ_ENDPOINT}")
dvns_all = chain_data["dvns"]
dvns_lzread = chain_data["dvns_lzread"]
print(f"DVNs: {len(dvns_all)}, Read DVNs: {len(dvns_lzread)}")
print(f"Send lib: {SEND_LIB}\nReceive lib: {RECEIVE_LIB}\nRead lib: {READ_LIB}")
print(f"Executor: {EXECUTOR}")

# I. Deployment

### 1. ExampleMessenger

In [None]:
contract_deployer = boa.load_partial("../examples/OAppExample.vy")

contract = contract_deployer(
    LZ_ENDPOINT,  # endpoint on sepolia base
)

print(f"Example OApp deployed at {contract.address}")

## II. Post-deployment interactions 
## (web3py to simulate real interactions)

### 0. Prepare infra

In [None]:
from ABIs import endpoint_abi


def get_vyper_abi():
    command = ["vyper", "../examples/OAppExample.vy", "-f", "abi_python"]
    try:
        result = subprocess.run(command, capture_output=True, text=True, check=True)
        return result.stdout
    except subprocess.CalledProcessError as e:
        return f"Error: {e.stderr}"


logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
# RPC endpoints

CONTRACT_ADDRESS = contract.address

contract_w3 = w3.eth.contract(address=CONTRACT_ADDRESS, abi=get_vyper_abi())
endpoint_w3 = w3.eth.contract(address=LZ_ENDPOINT, abi=endpoint_abi)
account = w3.eth.account.from_key(deployer.key)


def send_tx_single(func, acc, value=0):
    tx = func.build_transaction(
        {
            "from": account.address,
            "nonce": w3.eth.get_transaction_count(account.address),
            "value": value,
        }
    )
    tx["gas"] = int(5 * w3.eth.estimate_gas(tx))
    signed_tx = w3.eth.account.sign_transaction(tx, private_key=account.key)
    tx_hash = w3.eth.send_raw_transaction(signed_tx.raw_transaction)
    return tx_hash


def send_tx(func, acc, value=0):
    success = False
    while not success:
        try:
            tx_hash = send_tx_single(func, acc, value)
            success = True
        except Exception as e:
            if "replacement transaction underpriced" in str(e) or "nonce too low" in str(e):
                print(str(e), "Retrying...")
                success = False
                time.sleep(0.5)
            else:
                raise e
    return tx_hash

## Prepare Libs

In [None]:
clean = 0
# libs are per EID (here just for self, but must set for all routes, or opt for default)

# send lib
func = endpoint_w3.functions.setSendLibrary(
    contract_w3.address,
    LZ_EID,
    SEND_LIB if not clean else "0x0000000000000000000000000000000000000000",
)
tx_hash1 = send_tx(func, account)
print(f"Added send lib: {tx_hash1.hex()}")
# receive lib
func = endpoint_w3.functions.setReceiveLibrary(
    contract_w3.address,
    LZ_EID,
    RECEIVE_LIB if not clean else "0x0000000000000000000000000000000000000000",
    0,
)
tx_hash2 = send_tx(func, account)
print(f"Added receive lib: {tx_hash2.hex()}")

In [None]:
from ABIs import lzreadlib_abi

lzreadlib = w3.eth.contract(address=READ_LIB, abi=json.loads(lzreadlib_abi))

print(f"Read lib type: {lzreadlib.functions.messageLibType().call()}")
print(f"Read lib version: {lzreadlib.functions.version().call()}")
eid_check = [4294967295, 4294967294]
supported_eid = None
for eid in eid_check:
    support = lzreadlib.functions.isSupportedEid(eid).call()
    print(f"Supports {eid}: {support}")
    if support and not supported_eid:
        supported_eid = eid

READ_CHANNEL = 4294967295
# read direction
func = endpoint_w3.functions.setSendLibrary(
    contract_w3.address,
    READ_CHANNEL,
    READ_LIB if not clean else "0x0000000000000000000000000000000000000000",
)
tx_hash3 = send_tx(func, account)
print(f"Added read lib: {tx_hash3.hex()}")

# receive direction
func = endpoint_w3.functions.setReceiveLibrary(
    contract_w3.address,
    READ_CHANNEL,
    READ_LIB if not clean else "0x0000000000000000000000000000000000000000",
    0,
)
tx_hash4 = send_tx(func, account)
print(f"Added receive lib: {tx_hash4.hex()}")

## Set up DVNs

In [None]:
import eth_abi


def checksum(address):
    return Web3.to_checksum_address(address)


read_channel = READ_CHANNEL
CONFIG_TYPE_READ = 1
CONFIG_TYPE_ULN = 2
# list all dvns
print("Listing all dvns")
for dvn in dvns_all:
    print(dvn)

oapp = contract.address
confirmations = 1
required_dvns = [dvn["address"] for dvn in dvns_all if dvn["id"] == "layerzero-labs"]
required_dvns = required_dvns[0:1]  # of all eid dvns we pick first one with layerzero id
optional_dvns = [dvn["address"] for dvn in dvns_all if dvn["address"] not in required_dvns]
# all other dvs are optional, with a threshold of 2 (or less is there is not enough)
optional_dvn_threshold = min(1, len(optional_dvns))  # 2 or 1

# sort dvns alphabetically (important, otherwise lz will fail!)
required_dvns.sort()
optional_dvns.sort()
required_dvns = [checksum(dvn) for dvn in required_dvns]
optional_dvns = [checksum(dvn) for dvn in optional_dvns]

print("DVNs for send/receive:")
print(f"Required: {required_dvns}")
print(f"Optional: {optional_dvns}")
print(f"Optional threshold: {optional_dvn_threshold}")

ULNConfig = (
    confirmations,  # confirmations
    len(required_dvns),  # required_dvn_count
    len(optional_dvns),  # optional_dvn_count
    optional_dvn_threshold,  # optional_dvn_threshold
    required_dvns,  # required_dvns
    optional_dvns,  # optional_dvns
)

# ULN_bytes = eth_abi.encode(["(uint64,uint8,uint8,uint8,address[],address[])"], [ULNConfig])
# config_param = (LZ_EID, CONFIG_TYPE_ULN, ULN_bytes)

# for lib in [SEND_LIB, RECEIVE_LIB]:
#     func = endpoint_w3.functions.setConfig(_oapp=oapp, _lib=lib, _params=[config_param])
#     tx_hash = send_tx(func, account)
#     print(f"Added ULN config: {tx_hash.hex()}")

### READ DVNS

required_dvns = [
    dvn["address"]
    for dvn in dvns_all
    if dvn["id"] == "layerzero-labs" and dvn.get("lzReadCompatible", False)
]
optional_dvns = [
    dvn["address"]
    for dvn in dvns_all
    if dvn["address"] not in required_dvns and dvn.get("lzReadCompatible", False)
]
optional_dvn_threshold = min(1, len(optional_dvns))  # 2 or 1 or 0
print("DVNs for read:")
print(f"Required: {required_dvns}")
print(f"Optional: {optional_dvns}")
print(f"Optional threshold: {optional_dvn_threshold}")
required_dvns.sort()
optional_dvns.sort()
required_dvns = [checksum(dvn) for dvn in required_dvns]
optional_dvns = [checksum(dvn) for dvn in optional_dvns]
ULNReadConfig = (
    EXECUTOR,  # address
    len(required_dvns),  # required_dvn_count
    len(optional_dvns),  # optional_dvn_count
    optional_dvn_threshold,  # optional_dvn_threshold
    required_dvns,  # required_dvns
    optional_dvns,  # optional_dvns
)

ULNREAD_bytes = eth_abi.encode(["(address,uint8,uint8,uint8,address[],address[])"], [ULNReadConfig])
read_config_param = (READ_CHANNEL, CONFIG_TYPE_READ, ULNREAD_bytes)
func = endpoint_w3.functions.setConfig(_oapp=oapp, _lib=READ_LIB, _params=[read_config_param])
tx_hash = send_tx(func, account)
print(f"Added ULN-Read config: {tx_hash.hex()}")

## 1. LZ Send & Receive

In [None]:
# add self as peer
from eth_utils import to_bytes

func = contract_w3.functions.setPeer(
    LZ_EID, to_bytes(hexstr=contract_w3.address).rjust(32, b"\x00")
)
tx_hash = send_tx(func, account)
logging.info(f"Added self as peer tx: {tx_hash.hex()}")

In [None]:
gas_limit = 500_000
value = int(0.001 * 10**18)
msg = "170475436437825620930817601234267694881687829390282260281137596999800372275961"

fee = contract_w3.functions.quote_message_fee(
    LZ_EID,
    CONTRACT_ADDRESS,
    msg,
    _gas_limit=gas_limit,
    _value=value,
).call()
logging.info(f"Fee: {fee}")

func = contract_w3.functions.send_message(
    _dst_eid=LZ_EID, _receiver=CONTRACT_ADDRESS, _message=msg, _gas_limit=gas_limit, _value=value
)
tx_hash = send_tx(func, account, 2 * fee[0])
logging.info(f"Tx: {tx_hash.hex()}")
# after we sent the message we can check it on testnet.layerzeroscan.com (use address or txid)
# shortly after, it should be pinged back (we send to ourselves, and event be fired)

## 2. LzRead

In [None]:
# set self as read peer
func = contract_w3.functions.setPeer(
    READ_CHANNEL, to_bytes(hexstr=contract_w3.address).rjust(32, b"\x00")
)
tx_hash = send_tx(func, account)
logging.info(f"Set peer tx: {tx_hash.hex()}")

In [None]:
# prepare reading calldata
from vyper.utils import method_id

method_str = "dummy_endpoint(uint256)"
num = 34710
calldata = method_id(method_str) + boa.util.abi.abi_encode("(uint256)", (num,))

# read the contract itself
res = contract_w3.functions.dummy_endpoint(num).call()

logging.info(f"Read: {res}")

value = int(0.001 * 10**18)
# now quote lzread fee
fee = contract_w3.functions.quote_read_fee(
    READ_CHANNEL, LZ_EID, CONTRACT_ADDRESS, calldata, _gas_limit=gas_limit, _value=value
).call()
logging.info(f"Fee: {fee}")

# now request read
func = contract_w3.functions.request_read(
    READ_CHANNEL, LZ_EID, CONTRACT_ADDRESS, calldata, _gas_limit=gas_limit, _value=value
)
tx_hash = send_tx(func, account, 1 * fee[0])
logging.info(f"Tx: {tx_hash.hex()}")

In [None]:
func = contract_w3.functions.withdraw_eth(w3.eth.get_balance(contract_w3.address))
tx_hash = send_tx(func, account)
logging.info(f"Tx: {tx_hash.hex()}")

In [None]:
# test calldata
import requests

print(calldata.hex())

rpc_endpoint = "https://sepolia.base.org"
# perform eth_call request to contract using calldata
r = requests.post(
    rpc_endpoint,
    json={
        "jsonrpc": "2.0",
        "method": "eth_call",
        "params": [{"to": CONTRACT_ADDRESS, "data": "0x" + calldata.hex()}, "latest"],
        "id": 1,
    },
)
response = r.json()["result"]
# print(r.json()['result'])
print(response)
contract_code = """
@external
@view
def message_as_string(_message: Bytes[128]) -> String[128]:
    message: String[128] = convert(_message, String[128])
    return message
"""
with boa.swap_env(boa.Env()):
    tmp_contract = boa.loads(contract_code)

    res = tmp_contract.message_as_string(bytes.fromhex(response[2:]))
    print(res)

In [None]:
contract_code = """
@external
@view
def foo():
    val_1: uint128 = 123
    val_2: uint128 = 0
    val1_b16: bytes16 = convert(1, bytes16)
    val2_b16: bytes16 = convert(2, bytes16)
    # val_b32: bytes32 = convert(val_uint, bytes32)
    # a16: Bytes[16] = empty(Bytes[16])
    # a32: Bytes[32] = empty(Bytes[32])
    val_B32: Bytes[32] = concat(val1_b16, b'')
    # # a1: bytes16 = convert(1, bytes16)
    print(len(val_B32))
"""
with boa.swap_env(boa.Env()):
    tmp_contract = boa.loads(contract_code)

    res = tmp_contract.foo()
    # print(res)

In [None]:
import boa

mock_contract = """
struct SomeStruct:
    a: uint8
    b: uint8
    c: DynArray[uint8, 10]

@external
def encode() -> Bytes[1024]:
    return abi_encode(SomeStruct(a=1,b=2,c=[]),ensure_tuple=False)
"""

with boa.swap_env(boa.Env()):
    c = boa.loads(mock_contract)
    res_c = c.encode()
    print(res_c)
    res_b = boa.util.abi.abi_encode("(uint8,uint8,uint8[])", (1, 2, []))
    print(res_b)
    print(res_c == res_b)

In [None]:
import eth_abi

ethstd_bytes = boa.util.abi.abi_encode("((uint8,uint8[]))", ((1, [1]),))
ethabi_bytes = eth_abi.encode(["(uint8,uint8[])", "uint8[]"], [(1, [1]), [12]])

print(f"Encodings are equal: {ethstd_bytes==ethabi_bytes}")

print(ethstd_bytes.hex())
print([ethabi_bytes[i * 32 : (i + 1) * 32].hex() for i in range(len(ethabi_bytes) // 32)])

In [None]:
contract.peers(LZ_EID)