Skip to content

feat(ORC-110): Run oracle on refslot #703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ json-stream = "^2.3.2"
oz-merkle-tree = { git = "https://github.com/lidofinance/oz-merkle-tree" }
py-multiformats-cid = "^0.4.4"
conventional-pre-commit = "^4.0.0"
deepdiff = "^8.5.0"

[tool.poetry.group.dev.dependencies]
base58 = "^2.1.1"
225 changes: 180 additions & 45 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,52 @@
import argparse
import re
import sys
from typing import Iterator, cast
from dataclasses import asdict
from typing import Any, Dict, Iterator, Optional, cast

from deepdiff import DeepDiff
from hexbytes import HexBytes
from packaging.version import Version
from prometheus_client import start_http_server

from src import constants
from src import variables
from src import constants, variables
from src.metrics.healthcheck_server import start_pulse_server
from src.metrics.logging import logging
from src.metrics.prometheus.basic import ENV_VARIABLES_INFO, BUILD_INFO
from src.metrics.prometheus.basic import BUILD_INFO, ENV_VARIABLES_INFO
from src.modules.accounting.accounting import Accounting
from src.modules.checks.checks_module import ChecksModule
from src.modules.checks.checks_module import execute_checks
from src.modules.csm.csm import CSOracle
from src.modules.ejector.ejector import Ejector
from src.providers.ipfs import GW3, IPFSProvider, Kubo, MultiIPFSProvider, Pinata, PublicIPFS
from src.types import OracleModule
from src.providers.execution.contracts.base_oracle import BaseOracleContract
from src.providers.ipfs import (
GW3,
IPFSProvider,
Kubo,
MultiIPFSProvider,
Pinata,
PublicIPFS,
)
from src.types import BlockRoot, OracleModule, SlotNumber
from src.utils.blockstamp import build_blockstamp
from src.utils.build import get_build_info
from src.utils.exception import IncompatibleException
from src.web3py.contract_tweak import tweak_w3_contracts
from src.web3py.extensions import (
LidoContracts,
TransactionUtils,
ConsensusClientModule,
KeysAPIClientModule,
LidoValidatorsProvider,
FallbackProviderModule,
KeysAPIClientModule,
LazyCSM,
LidoContracts,
LidoValidatorsProvider,
TransactionUtils,
)
from src.web3py.middleware import add_requests_metric_middleware
from src.web3py.types import Web3

logger = logging.getLogger(__name__)


def main(module_name: OracleModule):
build_info = get_build_info()
logger.info({
'msg': 'Oracle startup.',
'variables': {
**build_info,
'module': module_name,
**variables.PUBLIC_ENV_VARS,
},
})
ENV_VARIABLES_INFO.info(variables.PUBLIC_ENV_VARS)
BUILD_INFO.info(build_info)

logger.info({'msg': f'Start healthcheck server for Docker container on port {variables.HEALTHCHECK_SERVER_PORT}'})
start_pulse_server()

logger.info({'msg': f'Start http server with prometheus metrics on port {variables.PROMETHEUS_PORT}'})
start_http_server(variables.PROMETHEUS_PORT)

def _construct_web3() -> Web3:
logger.info({'msg': 'Initialize multi web3 provider.'})
web3 = Web3(FallbackProviderModule(
variables.EXECUTION_CLIENT_URI,
@@ -92,34 +87,129 @@ def main(module_name: OracleModule):

logger.info({'msg': 'Add metrics middleware for ETH1 requests.'})
add_requests_metric_middleware(web3)
return web3

logger.info({'msg': 'Sanity checks.'})

def _construct_module(web3: Web3, module_name: OracleModule, refslot: Optional[int] = None) -> Accounting | Ejector | CSOracle:
instance: Accounting | Ejector | CSOracle
if module_name == OracleModule.ACCOUNTING:
logger.info({'msg': 'Initialize Accounting module.'})
instance = Accounting(web3)
instance = Accounting(web3, refslot)
elif module_name == OracleModule.EJECTOR:
logger.info({'msg': 'Initialize Ejector module.'})
instance = Ejector(web3)
instance = Ejector(web3, refslot)
elif module_name == OracleModule.CSM:
logger.info({'msg': 'Initialize CSM performance oracle module.'})
instance = CSOracle(web3)
instance = CSOracle(web3, refslot)
else:
raise ValueError(f'Unexpected arg: {module_name=}.')

logger.info({'msg': 'Sanity checks.'})
instance.check_contract_configs()
return instance


def main(module_name: OracleModule):
build_info = get_build_info()
logger.info({
'msg': 'Oracle startup.',
'variables': {
**build_info,
'module': module_name,
**variables.PUBLIC_ENV_VARS,
},
})
ENV_VARIABLES_INFO.info(variables.PUBLIC_ENV_VARS)
BUILD_INFO.info(build_info)
start_pulse_server()

logger.info({'msg': f'Start http server with prometheus metrics on port {variables.PROMETHEUS_PORT}'})
start_http_server(variables.PROMETHEUS_PORT)
web3 = _construct_web3()
instance: Accounting | Ejector | CSOracle = _construct_module(web3, module_name)
if variables.DAEMON:
instance.run_as_daemon()
else:
instance.cycle_handler()


def check():
logger.info({'msg': 'Check oracle is ready to work in the current environment.'})
def get_transactions(w3, contract: BaseOracleContract, limit: int, block_offset: int):
event_abi = "ProcessingStarted(uint256,bytes32)"
event_topic = Web3.to_hex(primitive=Web3.keccak(text=event_abi))

def get_processing_started_logs(from_block: int, to_block: int):
return w3.eth.get_logs({
"fromBlock": from_block,
"toBlock": to_block,
"address": contract.address,
"topics": [event_topic],
})

latest = w3.eth.block_number
logs = get_processing_started_logs(from_block=latest - block_offset, to_block=latest)
tx_hashes = [log['transactionHash'].hex() for log in logs]
print(f"Found {len(tx_hashes)} submitReportData transactions in latest {block_offset} blocks.")

txs = []
for tx_hash in tx_hashes[:limit]:
tx = w3.eth.get_transaction(tx_hash)
_, params = contract.decode_function_input(tx['input'])
data = {
k: HexBytes(v.hex()) if isinstance(v, (bytes, bytearray)) else v
for k, v in params['data'].items()
}
txs.append(data)

print(f"Will process {len(txs)} transactions")
return txs


def run_on_refslot(module_name: OracleModule, limit, block_offset):
logging.getLogger().setLevel(logging.WARNING)
w3 = _construct_web3()
instance: Accounting | Ejector | CSOracle = _construct_module(w3, module_name, True)
instance.check_contract_configs()

return ChecksModule().execute_module()
txs = get_transactions(w3, instance.report_contract, limit, block_offset)
if not txs:
logger.error("No submitReportData transactions found!")
sys.exit(0)

_camel_to_snake_pattern = re.compile(r'(.)([A-Z][a-z]+)')
_camel_to_snake_pattern2 = re.compile(r'([a-z0-9])([A-Z])')

def camel_to_snake(name: str) -> str:
s1 = _camel_to_snake_pattern.sub(r'\1_\2', name)
return _camel_to_snake_pattern2.sub(r'\1_\2', s1).lower()

def normalize_keys(d: Dict[str, Any]) -> Dict[str, Any]:
return {camel_to_snake(k): v for k, v in d.items()}

for tx in txs:
refslot = int(tx['refSlot'])
print("Input data:", tx)
block_root = BlockRoot(w3.cc.get_block_root(SlotNumber(refslot + 3 * 32)).root)
block_details = w3.cc.get_block_details(block_root)
bs = build_blockstamp(block_details)
instance.refslot = refslot
instance.refresh_contracts_if_address_change()
report_blockstamp = instance.get_blockstamp_for_report(bs)
report = instance.build_report(report_blockstamp)

normalized_input = normalize_keys(tx)
report_dict = asdict(report)
report_dict = {
k: HexBytes(v.hex()) if isinstance(v, (bytes, bytearray)) else v
for k, v in report_dict.items()
}
print("Output data:", report_dict)
diff = DeepDiff(normalized_input, report_dict, ignore_order=True)
if diff:
print("🚨 Differences found:")
print(diff)
else:
print("✅ All fields match!")
instance = _construct_module(w3, module_name, refslot)


def check_providers_chain_ids(web3: Web3, cc: ConsensusClientModule, kac: KeysAPIClientModule):
@@ -163,19 +253,64 @@ def ipfs_providers() -> Iterator[IPFSProvider]:
yield PublicIPFS(timeout=variables.HTTP_REQUEST_TIMEOUT_IPFS)


def parse_args():
"""
Parse command-line arguments using argparse.
The 'module' argument is restricted to valid OracleModule values.
"""
valid_modules = [str(item) for item in OracleModule]

parser = argparse.ArgumentParser(description="Run the Oracle module process.")
subparsers = parser.add_subparsers(dest="module", required=True, help=f"Module to run. One of: {valid_modules}")
check_parser = subparsers.add_parser("check", help="Run the check module.")
check_parser.add_argument(
"--name",
"-n",
type=str,
default=None,
help="Module name to check for a refslot execution."
)
check_parser.add_argument(
"--limit",
type=int,
default=10,
help="Maximum number of items to process."
)
check_parser.add_argument(
"--offset",
type=int,
default=100_000,
help="Starting index offset for processing."
)
for mod in OracleModule:
if mod == OracleModule.CHECK:
continue
subparsers.add_parser(
mod.value,
help=f"Run the {mod.value} module."
)

return parser.parse_args()


if __name__ == '__main__':
module_name_arg = sys.argv[-1]
if module_name_arg not in OracleModule:
msg = f'Last arg should be one of {[str(item) for item in OracleModule]}, received {module_name_arg}.'
args = parse_args()
if args.module not in OracleModule:
msg = f'Last arg should be one of {[str(item) for item in OracleModule]}, received {args.module}.'
logger.error({'msg': msg})
raise ValueError(msg)

module = OracleModule(module_name_arg)
module = OracleModule(args.module)
if module is OracleModule.CHECK:
errors = variables.check_uri_required_variables()
variables.raise_from_errors(errors)

sys.exit(check())
if args.name is None:
errors = variables.check_uri_required_variables()
variables.raise_from_errors(errors)
sys.exit(execute_checks())
else:
errors = variables.check_all_required_variables(module)
variables.raise_from_errors(errors)
run_on_refslot(args.name, args.limit, args.offset)
sys.exit(0)

errors = variables.check_all_required_variables(module)
variables.raise_from_errors(errors)
2 changes: 1 addition & 1 deletion src/metrics/healthcheck_server.py
Original file line number Diff line number Diff line change
@@ -9,7 +9,6 @@
from src import variables
from src.variables import MAX_CYCLE_LIFETIME_IN_SECONDS


_last_pulse = datetime.now()
logger = logging.getLogger(__name__)

@@ -50,6 +49,7 @@ def start_pulse_server():
If bot didn't call pulse for a while (5 minutes but should be changed individually)
Docker healthcheck fails to do request
"""
logger.info({'msg': f'Start healthcheck server for Docker container on port {variables.HEALTHCHECK_SERVER_PORT}'})
server = HTTPServer(('localhost', variables.HEALTHCHECK_SERVER_PORT), RequestHandlerClass=PulseRequestHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
Loading
Oops, something went wrong.