diff --git a/.env.example b/.env.example index 8fa1da2..71c3efe 100644 --- a/.env.example +++ b/.env.example @@ -9,6 +9,9 @@ RUST_LOG=none,dria_oracle=info # example: ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80 SECRET_KEY=your-secret-key +# Coordinator address (optional) +COORDINATOR_ADDRESS= + ## Arweave configurations # path to wallet, only required if your BYTE_LIMIT is enough that # you may do an Arweave upload to store a large value on-chain diff --git a/Cargo.lock b/Cargo.lock index 84748fb..0660aa4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1951,7 +1951,7 @@ checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" [[package]] name = "dria-oracle" -version = "0.2.0" +version = "0.2.1" dependencies = [ "alloy", "alloy-chains", @@ -1967,7 +1967,6 @@ dependencies = [ "futures-util", "hex", "hex-literal", - "lazy_static", "log", "rand 0.8.5", "reqwest 0.12.9", diff --git a/Cargo.toml b/Cargo.toml index fa59318..e7b5010 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dria-oracle" description = "Dria Knowledge Network Oracle Node" -version = "0.2.0" +version = "0.2.1" edition = "2021" license = "Apache-2.0" readme = "README.md" @@ -19,7 +19,6 @@ tokio = { version = "1.39.2", features = [ "signal", ] } tokio-util = "0.7.13" -lazy_static = "1.5.0" # workflows dkn-workflows = { git = "https://github.com/firstbatchxyz/dkn-compute-node" } diff --git a/README.md b/README.md index d8dce1e..41ac00d 100644 --- a/README.md +++ b/README.md @@ -23,19 +23,31 @@ cargo install --git https://github.com/firstbatchxyz/dria-oracle-node Create an `.env` file by copying `.env.example`. You have to fill the following variables: -- Get an RPC URL from a provider such as Alchemy or Infura, and set it as `RPC_URL`. -- Provide an Ethereum wallet secret koy to `SECRET_KEY`, make sure it has funds to pay for gas and tokens. +- Get an RPC URL from a provider such as [Alchemy](https://www.alchemy.com/) or [Infura](https://www.infura.io/), and set it as `RPC_URL`. +- Provide an Ethereum wallet secret key to `SECRET_KEY`, make sure it has funds to pay for gas and tokens. -Optionally, you can save gas costs using Arweave: +> [!NOTE] +> +> The contract addresses are determined with respect to the chain connected via RPC URL, but you can override it via `COORDINATOR_ADDRESS` environment variable. +> In any case, you should not need to do this. + +### Arweave + +You can save gas costs using [Arweave](https://arweave.org/): -- Provide an Arweave wallet so that you can use Arweave for large results. Alternatively, dont provide a wallet but instead set `ARWEAVE_BYTE_LIMIT` to a very large value. TODO: this should be done automatically if wallet does not exist +- Provide an Arweave wallet via `ARWEAVE_WALLET_PATH` variable so that you can use Arweave for large results. You can create one [here](https://arweave.app/). +- You can set `ARWEAVE_BYTE_LIMIT` to determine the byte length threshold, beyond which values are uploaded to Arweave. It defaults to 1024, so any data less than that many bytes will be written as-is. + +If you omit Arweave, it will only use the client for downloading things from Arweave, but will never upload. + +### LLM Providers As for the LLM providers: - If you are using Ollama, make sure it is running and the host & port are correct. -- If you are using OpenAI, make sure you provide the `OPENAI_API_KEY`. -- If you are using Gemini, make sure you provide the `GEMINI_API_KEY`. -- If you are using OpenRouter, make sure you provide the `OPENROUTER_API_KEY`. +- If you are using OpenAI, provide the `OPENAI_API_KEY`. +- If you are using Gemini, provide the `GEMINI_API_KEY`. +- If you are using OpenRouter, provide the `OPENROUTER_API_KEY`. ## Usage @@ -52,6 +64,11 @@ The CLI provides several methods to interact with the oracle contracts. - [Viewing Tasks](#viewing-tasks) - [Balance & Rewards](#balance--rewards) +> [!TIP] +> +> By default logs will be `info` level, but you can add a `DEBUG=1` env variable and it will use `debug` level instead. +> You can set `RUST_LOG` variable yourself as well. + ### Registration To serve oracle requests, you **MUST** first register as your desired oracle type, i.e. `generator` or `validator`. These are handled by the registration commands `register` and `unregister` which accepts multiple arguments to register at once. You can then see your registrations with `registrations` command. diff --git a/misc/arweave.js b/misc/arweave.js index 8c9c81d..7761e3a 100644 --- a/misc/arweave.js +++ b/misc/arweave.js @@ -1,32 +1,46 @@ /** - * A helper script to print the content of an Arweave transaction, where transaction id is hex-encoded. - * This means that the input is a 64-char hexadecimal. + * A helper script to print the content of an Arweave transaction. * * Usage: * - * bun run ./misc/arweave.js 0x30613233613135613236663864663332366165306137663863633636343437336238373463353966333964623436366665316337313531393634623734393231 + * ```sh + * # calldata as-is + * bun run ./misc/arweave.js 0x7b2261727765617665223a224d49555775656361634b417a62755442335a6a57613463784e6461774d71435a704550694f71675a625a63227d * - * Tip: + * # as an object (with escaped quotes) + * bun run ./misc/arweave.js "{\"arweave\":\"MIUWuecacKAzbuTB3ZjWa4cxNdawMqCZpEPiOqgZbZc\"}" * - * Can be piped to `pbcopy` on macOS to copy the output to clipboard. + * # base64 txid + * bun run ./misc/arweave.js MIUWuecacKAzbuTB3ZjWa4cxNdawMqCZpEPiOqgZbZc + * ``` + * + * Can be piped to `pbcopy` on macOS to copy the output to clipboard. */ // parse input let input = process.argv[2]; if (!input) { console.error("No input provided."); + return; } -// get rid of 0x +let arweaveTxId; if (input.startsWith("0x")) { - input = input.slice(2); + // if it starts with 0x, we assume its all hex + arweaveTxId = JSON.parse( + Buffer.from(input.slice(2), "hex").toString() + ).arweave; +} else if (input.startsWith("{")) { + // if it starts with {, we assume its a JSON string + console.log("input", input); + arweaveTxId = JSON.parse(input).arweave; +} else { + // otherwise, we assume its a base64 txid + arweaveTxId = input; } -const inputDecoded = Buffer.from(input, "hex").toString(); -const obj = JSON.parse(inputDecoded); +// construct the URL // download the actual response from Arweave -const url = `https://arweave.net/${obj.arweave}`; -console.log(url); +const url = `https://arweave.net/${arweaveTxId}`; const res = await fetch(url); - console.log(await res.text()); diff --git a/src/cli/commands/coordinator.rs b/src/cli/commands/coordinator.rs index a52fc61..93d711f 100644 --- a/src/cli/commands/coordinator.rs +++ b/src/cli/commands/coordinator.rs @@ -9,7 +9,6 @@ use crate::{ use alloy::{ eips::BlockNumberOrTag, primitives::{utils::format_ether, U256}, - rpc::types::Log, }; use dkn_workflows::{DriaWorkflowsConfig, Model, ModelProvider}; use eyre::{eyre, Context, Result}; @@ -45,6 +44,7 @@ impl DriaOracle { } } } + log::info!( "Running as: {}", kinds @@ -122,8 +122,12 @@ impl DriaOracle { next = event_stream.next() => { match next { Some(Ok((event, log))) => { - self.handle_event_log(event, log, &kinds, &model_config) - .await + log::debug!( + "Handling task {} (tx: {})", + event.taskId, + log.transaction_hash.unwrap_or_default() + ); + self.handle_event_log(event, &kinds, &model_config).await } Some(Err(e)) => log::error!("Could not handle event: {}", e), None => { @@ -141,19 +145,16 @@ impl DriaOracle { async fn handle_event_log( &self, event: StatusUpdate, - log: Log, kinds: &[OracleKind], - model_config: &DriaWorkflowsConfig, + workflows: &DriaWorkflowsConfig, ) { let task_id = event.taskId; - log::debug!( - "Handling task {} (tx: {})", - task_id, - log.transaction_hash.unwrap_or_default() - ); + let Ok(status) = TaskStatus::try_from(event.statusAfter) else { + log::error!("Could not parse task status: {}", event.statusAfter); + return; + }; - // handle request - match handle_request(self, kinds, model_config, event).await { + match handle_request(self, kinds, workflows, status, event.taskId, event.protocol).await { Ok(Some(receipt)) => { log::info!( "Task {} processed successfully. (tx: {})", @@ -171,7 +172,7 @@ impl DriaOracle { async fn handle_previous_tasks( &self, from_block: BlockNumberOrTag, - model_config: &DriaWorkflowsConfig, + workflows: &DriaWorkflowsConfig, kinds: &[OracleKind], ) -> Result<()> { log::info!( @@ -183,19 +184,30 @@ impl DriaOracle { .await?; for (event, log) in prev_tasks { + let status_before = TaskStatus::try_from(event.statusBefore)?; + let status_after = TaskStatus::try_from(event.statusAfter)?; let task_id = event.taskId; log::info!( "Previous task: {} ({} -> {})", task_id, - TaskStatus::try_from(event.statusBefore).unwrap_or_default(), - TaskStatus::try_from(event.statusAfter).unwrap_or_default() + status_before, + status_after ); log::debug!( "Handling task {} (tx: {})", task_id, log.transaction_hash.unwrap_or_default() ); - match handle_request(self, kinds, model_config, event).await { + match handle_request( + self, + kinds, + workflows, + status_after, + event.taskId, + event.protocol, + ) + .await + { Ok(Some(receipt)) => { log::info!( "Task {} processed successfully. (tx: {})", @@ -212,6 +224,7 @@ impl DriaOracle { Ok(()) } + pub(in crate::cli) async fn view_task_events( &self, from_block: impl Into + Clone, @@ -233,12 +246,13 @@ impl DriaOracle { let task_events = self.get_tasks_in_range(from_block, to_block).await?; - for (event, _) in task_events { + for (event, log) in task_events { log::info!( - "Task: {} ({} -> {})", + "Task {} changed from {} to {} at block {}", event.taskId, TaskStatus::try_from(event.statusBefore).unwrap_or_default(), - TaskStatus::try_from(event.statusAfter).unwrap_or_default() + TaskStatus::try_from(event.statusAfter).unwrap_or_default(), + log.block_number.unwrap_or_default() ); } @@ -290,6 +304,42 @@ impl DriaOracle { Ok(()) } + pub(in crate::cli) async fn process_task( + &self, + workflows: &DriaWorkflowsConfig, + kinds: &[OracleKind], + task_id: U256, + ) -> Result<()> { + log::info!("Processing task {}.", task_id); + let request = self.get_task_request(task_id).await?; + + log::info!( + "Request Information:\nRequester: {}\nStatus: {}\nInput: {}\nModels: {}", + request.requester, + TaskStatus::try_from(request.status)?, + bytes_to_string(&request.input)?, + bytes_to_string(&request.models)? + ); + + // TODO: !!! + let status = TaskStatus::try_from(request.status)?; + match handle_request(self, kinds, workflows, status, task_id, request.protocol).await { + Ok(Some(receipt)) => { + log::info!( + "Task {} processed successfully. (tx: {})", + task_id, + receipt.transaction_hash + ) + } + Ok(None) => { + log::info!("Task {} ignored.", task_id) + } + Err(e) => log::error!("Could not process task: {:?}", e), + } + + Ok(()) + } + pub async fn request_task( &self, input: &str, diff --git a/src/cli/commands/mod.rs b/src/cli/commands/mod.rs index 00fb812..5c47ee2 100644 --- a/src/cli/commands/mod.rs +++ b/src/cli/commands/mod.rs @@ -43,7 +43,19 @@ pub enum Commands { models: Vec, }, /// View status of a given task. - View { task_id: U256 }, + View { + #[arg(help = "Task id.", required = true)] + task_id: U256, + }, + /// Process a single task. + Process { + #[arg(help = "Task id.", required = true)] + task_id: U256, + #[arg(help = "The oracle kinds to handle the task as.", required = false)] + kinds: Vec, + #[arg(short, long = "model", help = "The models to use for this task.", required = true, value_parser = parse_model)] + models: Vec, + }, /// View tasks between specific blocks. Tasks { #[arg(long, help = "Starting block number, defaults to 'earliest'.", value_parser = parse_block_number_or_tag)] diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 3cd9a5f..d67bd2e 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -1,7 +1,10 @@ mod commands; +use std::time::Duration; + use commands::Commands; mod parsers; +use dkn_workflows::DriaWorkflowsConfig; use parsers::*; use crate::{DriaOracle, DriaOracleConfig}; @@ -25,6 +28,9 @@ struct Cli { /// Ethereum wallet's secret (private) key. #[arg(short, long, env = "SECRET_KEY", value_parser = parse_secret_key)] secret_key: B256, + + #[arg(short, long, env = "TX_TIMEOUT_SECS", default_value = "30")] + tx_timeout: Option, } /// Main CLI entry point. @@ -39,7 +45,8 @@ pub async fn cli() -> Result<()> { // create node let config = DriaOracleConfig::new(&secret_key, rpc_url) - .wrap_err("could not create oracle configuration")?; + .wrap_err("could not create oracle configuration")? + .with_tx_timeout(Duration::from_secs(30)); // timeout is 30secs by default let node = DriaOracle::new(config) .await .wrap_err("could not create oracle node")?; @@ -89,6 +96,14 @@ pub async fn cli() -> Result<()> { } } Commands::View { task_id } => node.view_task(task_id).await?, + Commands::Process { + task_id, + kinds, + models, + } => { + node.process_task(&DriaWorkflowsConfig::new(models), &kinds, task_id) + .await? + } Commands::Tasks { from, to } => { node.view_task_events( from.unwrap_or(BlockNumberOrTag::Earliest), diff --git a/src/compute/generation/execute.rs b/src/compute/generation/execute.rs index ea1c43f..d43a1f6 100644 --- a/src/compute/generation/execute.rs +++ b/src/compute/generation/execute.rs @@ -156,7 +156,7 @@ mod tests { // cargo test --package dria-oracle --lib --all-features -- compute::generation::execute::tests::test_raw_workflow --exact --show-output --ignored dotenvy::dotenv().unwrap(); - let contract_result = hex_literal::hex!("7b2261727765617665223a2239397a4252676c4c663443696b35676c57444f667542463736456e417a4a6344303431545a614c6d6f6934227d"); + let contract_result = hex_literal::hex!("7b2261727765617665223a224d49555775656361634b417a62755442335a6a57613463784e6461774d71435a704550694f71675a625a63227d"); let request = GenerationRequest::try_parse_bytes(&contract_result.into()) .await .unwrap(); diff --git a/src/compute/generation/postprocess/swan.rs b/src/compute/generation/postprocess/swan.rs index 987bff2..35ba752 100644 --- a/src/compute/generation/postprocess/swan.rs +++ b/src/compute/generation/postprocess/swan.rs @@ -49,10 +49,15 @@ impl PostProcess for SwanPurchasePostProcessor { } // then, do post processing on them to cast them to `Address` - // TODO: handle error let addresses = shopping_list_lines .into_iter() - .map(|line| Address::from_str(line).unwrap()) + .filter_map(|line| match Address::from_str(line) { + Ok(address) => Some(address), + Err(e) => { + log::warn!("Could not parse address from {}: {}", line, e); + None + } + }) .collect::>(); // `abi.encode` the list of addresses to be decodable by contract @@ -74,7 +79,7 @@ mod tests { use super::*; #[test] - fn test_swan_purchase_post_processor() { + fn test_swan_post_processor_encoding() { const INPUT: &str = r#" some blabla here and there @@ -113,7 +118,7 @@ some more blabla here } #[test] - fn test_swan_post_processor_2() { + fn test_swan_post_processor_encoding_2() { const INPUT: &str = r#" 0x36f55f830D6E628a78Fcb70F73f9D005BaF88eE3 @@ -138,6 +143,26 @@ some more blabla here assert_eq!(addresses, expected_addresses, "must have listed addresses"); } + #[test] + fn test_swan_post_processor_with_fails() { + // only the 3rd one shall pass here + const INPUT: &str = r#" + +0x36f55f830D6E628a78Fcb70F73f9D005BaF +im not even an address lol +0x26F5B12b67D5F006826824A73F58b88D6bdAA74B +00 0 00 0 0 0 0 00 0\t\t\t\t + +"#; + + let post_processor = SwanPurchasePostProcessor::new("", ""); + + let (output, _, _) = post_processor.post_process(INPUT.to_string()).unwrap(); + let addresses = >::abi_decode(&output, true).unwrap(); + let expected_addresses = vec![address!("26F5B12b67D5F006826824A73F58b88D6bdAA74B")]; + assert_eq!(addresses, expected_addresses, "must have listed addresses"); + } + #[tokio::test] #[ignore = "requires OpenAI api key"] async fn test_swan_purchase_workflow() -> Result<()> { diff --git a/src/compute/handler.rs b/src/compute/handler.rs index 30e9fca..411d7fd 100644 --- a/src/compute/handler.rs +++ b/src/compute/handler.rs @@ -1,8 +1,11 @@ use crate::{ - contracts::{OracleCoordinator::StatusUpdate, OracleKind, TaskStatus}, + contracts::{OracleKind, TaskStatus}, DriaOracle, }; -use alloy::rpc::types::TransactionReceipt; +use alloy::{ + primitives::{FixedBytes, U256}, + rpc::types::TransactionReceipt, +}; use dkn_workflows::DriaWorkflowsConfig; use eyre::Result; @@ -16,41 +19,43 @@ pub async fn handle_request( node: &DriaOracle, kinds: &[OracleKind], workflows: &DriaWorkflowsConfig, - event: StatusUpdate, + status: TaskStatus, + task_id: U256, + protocol: FixedBytes<32>, ) -> Result> { - log::debug!("Received event for task {} ()", event.taskId); + log::debug!("Received event for task {} ()", task_id); // we check the `statusAfter` field of the event, which indicates the final status of the listened task - let response_tx_hash = match TaskStatus::try_from(event.statusAfter)? { + let response_tx_hash = match status { TaskStatus::PendingGeneration => { if kinds.contains(&OracleKind::Generator) { - handle_generation(node, workflows, event.taskId, event.protocol).await? + handle_generation(node, workflows, task_id, protocol).await? } else { log::debug!( "Ignoring generation task {} as you are not generator.", - event.taskId + task_id ); return Ok(None); } } TaskStatus::PendingValidation => { if kinds.contains(&OracleKind::Validator) { - handle_validation(node, event.taskId).await? + handle_validation(node, task_id).await? } else { log::debug!( "Ignoring generation task {} as you are not validator.", - event.taskId + task_id ); return Ok(None); } } TaskStatus::Completed => { - log::debug!("Task {} is completed.", event.taskId); + log::debug!("Task {} is completed.", task_id); return Ok(None); } // this is kind of unexpected, but we dont have to return an error just for this TaskStatus::None => { - log::error!("None status received in an event: {}", event.taskId); + log::error!("None status received in an event: {}", task_id); return Ok(None); } }; diff --git a/src/configurations/mod.rs b/src/configurations/mod.rs index 7d602a9..9779130 100644 --- a/src/configurations/mod.rs +++ b/src/configurations/mod.rs @@ -26,7 +26,7 @@ impl Default for DriaOracleConfig { impl DriaOracleConfig { pub fn new(secret_key: &B256, rpc_url: Url) -> Result { let signer = - PrivateKeySigner::from_bytes(secret_key).wrap_err("Could not parse private key")?; + PrivateKeySigner::from_bytes(secret_key).wrap_err("could not parse private key")?; let wallet = EthereumWallet::from(signer); Ok(Self { @@ -50,6 +50,8 @@ impl DriaOracleConfig { /// Required environment variables: /// - `SECRET_KEY` /// - `RPC_URL` + /// - `TX_TIMEOUT_SECS` (optional) + #[deprecated = "do this in tests instead"] pub fn new_from_env() -> Result { // parse private key let private_key_hex = env::var("SECRET_KEY").wrap_err("SECRET_KEY is not set")?; @@ -89,6 +91,7 @@ impl DriaOracleConfig { } /// Enables `env_logger`. + #[deprecated = "do this in tests instead"] pub fn enable_logs(self) -> Self { if let Err(e) = env_logger::try_init() { log::error!("Error during env_logger::try_init: {}", e); diff --git a/src/contracts/addresses.rs b/src/contracts/addresses.rs index f673a54..363c43e 100644 --- a/src/contracts/addresses.rs +++ b/src/contracts/addresses.rs @@ -1,12 +1,9 @@ use alloy::primitives::{address, Address}; -use alloy_chains::{ - Chain, - NamedChain::{AnvilHardhat, Base, BaseSepolia}, -}; -use lazy_static::lazy_static; -use std::collections::HashMap; +use alloy_chains::NamedChain; /// Contract addresses. +/// +/// All contracts can be derived from the `coordinator` contract. #[derive(Debug, Clone)] pub struct ContractAddresses { /// Token used within the registry and coordinator. @@ -27,41 +24,18 @@ impl std::fmt::Display for ContractAddresses { } } -lazy_static! { - /// Contract addresses per chain-id. - pub static ref ADDRESSES: HashMap = { - let mut contracts = HashMap::new(); - - // localhost - contracts.insert( - AnvilHardhat.into(), - ContractAddresses { - token: address!("5FbDB2315678afecb367f032d93F642f64180aa3"), - registry: address!("e7f1725E7734CE288F8367e1Bb143E90bb3F0512"), - coordinator: address!("9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0"), - }, - ); - - // base-sepolia - contracts.insert( - BaseSepolia.into(), - ContractAddresses { - token: address!("4200000000000000000000000000000000000006"), - registry: address!("408d245a853137e44a2465d5c66061f97582eae9"), - coordinator: address!("13f977bde221b470d3ae055cde7e1f84debfe202"), - }, - ); - - // base mainnet - contracts.insert( - Base.into(), - ContractAddresses { - token: address!("4200000000000000000000000000000000000006"), - registry: address!("7645eef691ad9dc0f29b6abfc73cca4c8be44051"), - coordinator: address!("17b6d1eddcd5f9ca19bb2ffed2f3deb6bd74bd20"), - }, - ); - - contracts +/// Returns the coordinator contract address for a given chain. +/// +/// Will return an error if the chain is not supported, i.e. a coordinator address +/// is not deployed there. +pub fn get_coordinator_address(chain: NamedChain) -> eyre::Result
{ + + let addresses = match chain { + NamedChain::AnvilHardhat => address!("9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0"), + NamedChain::BaseSepolia => address!("13f977bde221b470d3ae055cde7e1f84debfe202"), + NamedChain::Base => address!("17b6d1eddcd5f9ca19bb2ffed2f3deb6bd74bd20"), + _ => return Err(eyre::eyre!("Chain {} is not supported", chain)), }; + + Ok(addresses) } diff --git a/src/node/mod.rs b/src/node/mod.rs index 92ea614..746e3d1 100644 --- a/src/node/mod.rs +++ b/src/node/mod.rs @@ -7,6 +7,7 @@ mod anvil; use super::DriaOracleConfig; use crate::contracts::*; +use alloy::hex::FromHex; use alloy::providers::fillers::{ BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller, WalletFiller, }; @@ -59,12 +60,41 @@ impl DriaOracle { .get_chain_id() .await .wrap_err("could not get chain id")?; - let chain = Chain::from_id(chain_id_u64); + let chain = Chain::from_id(chain_id_u64) + .named() + .expect("expected a named chain"); log::info!("Connected to chain: {}", chain); + // get coordinator address from static list or the environment + // address within env can have 0x at the start, or not, does not matter + let coordinator_address = if let Ok(addr) = env::var("COORDINATOR_ADDRESS") { + Address::from_hex(addr).wrap_err("could not parse coordinator address in env")? + } else { + get_coordinator_address(chain)? + }; + + // create a coordinator instance and get token & registry addresses + let coordinator = OracleCoordinator::new(coordinator_address, &provider); + let token_address = coordinator + .feeToken() + .call() + .await + .wrap_err("could not get token address from the coordinator")? + ._0; + let registry_address = coordinator + .registry() + .call() + .await + .wrap_err("could not get registry address from the coordinator")? + ._0; + let node = Self { config, - addresses: ADDRESSES[&chain].clone(), + addresses: ContractAddresses { + coordinator: coordinator_address, + registry: registry_address, + token: token_address, + }, provider, }; @@ -158,10 +188,12 @@ impl core::fmt::Display for DriaOracle { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "Dria Oracle Node v{}\nAddress: {}\nRPC URL: {}", + "Dria Oracle Node v{}\nOracle Address: {}\nRPC URL: {}\nCoordinator: {}\nTx timeout: {}s", env!("CARGO_PKG_VERSION"), self.address(), self.config.rpc_url, + self.addresses.coordinator, + self.config.tx_timeout.map(|t| t.as_secs()).unwrap_or_default() ) } } diff --git a/src/storage/arweave.rs b/src/storage/arweave.rs index 3f2ca5d..8dfa077 100644 --- a/src/storage/arweave.rs +++ b/src/storage/arweave.rs @@ -1,6 +1,3 @@ -use crate::bytes_to_string; - -use super::traits::IsExternalStorage; use alloy::primitives::Bytes; use async_trait::async_trait; use bundlr_sdk::{currency::arweave::ArweaveBuilder, tags::Tag, BundlrBuilder}; @@ -8,21 +5,34 @@ use eyre::{eyre, Context, Result}; use reqwest::{Client, Url}; use std::{env, path::PathBuf}; -const DEFAULT_BASE_URL: &str = "https://node1.bundlr.network"; // "https://gateway.irys.xyz"; -const DEFAULT_WALLET_PATH: &str = "./secrets/wallet.json"; +use crate::bytes_to_string; + +use super::IsExternalStorage; + +const DEFAULT_UPLOAD_BASE_URL: &str = "https://node1.bundlr.network"; +const DEFAULT_DOWNLOAD_BASE_URL: &str = "https://arweave.net"; const DEFAULT_BYTE_LIMIT: usize = 1024; // 1KB +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct ArweaveKey { + /// The base64url encoded key, can be used to download data directly. + pub arweave: String, +} + /// External data storage for Arweave. /// /// - `put` corresponds to uploading (via Irys) /// - `get` corresponds to downloading pub struct ArweaveStorage { /// Path to Arweave keypair (usually JSON) - wallet: PathBuf, - /// Base URL for Arweave gateway, e.g: - /// - - /// - - base_url: Url, + wallet: Option, + /// Base URL for uploading data on Arweave, e.g.: + /// - https://gateway.irys.xyz + /// - https://node1.bundlr.network + upload_base_url: Url, + /// Base URL for downloading data from Arweave, e.g.: + /// - https://arweave.net + download_base_url: Url, /// Reqwest client for downloads. client: Client, /// Byte limit for the data to be considered for Arweave. @@ -32,21 +42,53 @@ pub struct ArweaveStorage { byte_limit: usize, } -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub struct ArweaveKey { - /// The base64url encoded key, can be used to download data directly. - pub arweave: String, -} - impl ArweaveStorage { - /// Creates a new Arweave instance. - pub fn new(base_url: &str, wallet: &str, byte_limit: usize) -> Result { - Ok(Self { - wallet: PathBuf::from(wallet), - base_url: Url::parse(base_url).wrap_err("could not parse base URL")?, + /// Creates an Arweave storage client with the given wallet's path. + pub fn new(wallet: &str) -> Result { + let ar = Self::new_readonly(); + Ok(ar.with_wallet(wallet)) + } + + /// Creates an Arweave storage client without a wallet, used only for downloads. + pub fn new_readonly() -> Self { + Self { + wallet: None, + upload_base_url: Url::parse(DEFAULT_UPLOAD_BASE_URL).unwrap(), + download_base_url: Url::parse(DEFAULT_DOWNLOAD_BASE_URL).unwrap(), + byte_limit: DEFAULT_BYTE_LIMIT, client: Client::new(), - byte_limit, - }) + } + } + + /// Sets the wallet path for the Arweave storage. + pub fn with_wallet(mut self, wallet: &str) -> Self { + self.wallet = Some(PathBuf::from(wallet)); + self + } + + /// Sets the byte limit for the data to be considered for Arweave, default is 1024 bytes (1KB). + /// + /// - If the data exceeds this limit, it will be uploaded to Arweave. + /// - Otherwise, it will be stored as is. + /// + /// If this is too large, you may spend quite a bit of gas fees. + pub fn with_upload_byte_limit(mut self, limit: usize) -> Self { + self.byte_limit = limit; + self + } + + /// Sets the download base URL for Arweave. + /// + /// We don't need to change this usually, as `http://arweave.net` is enough. + pub fn with_download_base_url(mut self, url: &str) -> Result { + self.download_base_url = Url::parse(url).wrap_err("could not parse download base URL")?; + Ok(self) + } + + /// Sets the upload base URL for Arweave. + pub fn with_upload_base_url(mut self, url: &str) -> Result { + self.upload_base_url = Url::parse(url).wrap_err("could not parse upload base URL")?; + Ok(self) } /// Parses a given bytes input to a string, @@ -58,7 +100,7 @@ impl ArweaveStorage { // then, check storage if let Some(key) = ArweaveStorage::is_key(&input_string) { // if its a txid, we download the data and parse it again - let input_bytes_from_arweave = ArweaveStorage::default() + let input_bytes_from_arweave = ArweaveStorage::new_readonly() .get(key) .await .wrap_err("could not download from Arweave")?; @@ -72,22 +114,28 @@ impl ArweaveStorage { /// Creates a new Arweave instance from the environment variables. /// - /// Required environment variables: - /// - /// - `ARWEAVE_WALLET_PATH` - /// - `ARWEAVE_BASE_URL` - /// - `ARWEAVE_BYTE_LIMIT` + /// - `ARWEAVE_WALLET_PATH` is required + /// - `ARWEAVE_BASE_URL` is optional + /// - `ARWEAVE_BYTE_LIMIT` is optional /// /// All these variables have defaults if they are missing. pub fn new_from_env() -> Result { - let wallet = env::var("ARWEAVE_WALLET_PATH").unwrap_or(DEFAULT_WALLET_PATH.to_string()); - let base_url = env::var("ARWEAVE_BASE_URL").unwrap_or(DEFAULT_BASE_URL.to_string()); - let byte_limit = env::var("ARWEAVE_BYTE_LIMIT") - .unwrap_or(DEFAULT_BYTE_LIMIT.to_string()) - .parse::() - .wrap_err("could not parse ARWEAVE_BYTE_LIMIT")?; - - Self::new(&base_url, &wallet, byte_limit) + // use wallet from env + let wallet = + env::var("ARWEAVE_WALLET_PATH").wrap_err("could not read wallet path from env")?; + let mut ar = Self::new(&wallet)?; + + // get base url if it exists + if let Ok(base_url) = env::var("ARWEAVE_BASE_URL") { + ar = ar.with_upload_base_url(&base_url)?; + } + + // update upload byte limit if needed + if let Ok(byte_limit) = env::var("ARWEAVE_BYTE_LIMIT") { + ar = ar.with_upload_byte_limit(byte_limit.parse().unwrap_or(DEFAULT_BYTE_LIMIT)); + } + + Ok(ar) } /// Puts the value if it is larger than the byte limit. @@ -109,20 +157,13 @@ impl ArweaveStorage { } } -impl Default for ArweaveStorage { - fn default() -> Self { - Self::new(DEFAULT_BASE_URL, DEFAULT_WALLET_PATH, DEFAULT_BYTE_LIMIT) - .expect("Failed to create Default Arweave instance") - } -} - #[async_trait(?Send)] impl IsExternalStorage for ArweaveStorage { type Key = ArweaveKey; type Value = Bytes; async fn get(&self, key: Self::Key) -> Result { - let url = self.base_url.join(&key.arweave)?; + let url = self.download_base_url.join(&key.arweave)?; log::debug!("Fetching from Arweave: {}", url); let response = self @@ -141,6 +182,11 @@ impl IsExternalStorage for ArweaveStorage { } async fn put(&self, value: Self::Value) -> Result { + let wallet_path = self + .wallet + .as_ref() + .ok_or_else(|| eyre!("Wallet path is not set"))?; + #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] #[allow(unused)] @@ -158,8 +204,8 @@ impl IsExternalStorage for ArweaveStorage { // ensure that wallet exists // NOTE: we do this here instead of `new` so that we can work without any wallet // in case we only want to download data. - if !self.wallet.try_exists()? { - return Err(eyre!("Wallet does not exist at {}.", self.wallet.display())); + if !wallet_path.try_exists()? { + return Err(eyre!("Wallet does not exist at {}.", wallet_path.display())); } // create tag @@ -170,12 +216,12 @@ impl IsExternalStorage for ArweaveStorage { // create Arweave currency instance let currency = ArweaveBuilder::new() - .keypair_path(self.wallet.clone()) + .keypair_path(wallet_path.clone()) .build()?; // create the Bundlr instance let bundlr = BundlrBuilder::new() - .url(self.base_url.clone()) + .url(self.upload_base_url.clone()) .currency(currency) .fetch_pub_info() .await? @@ -188,7 +234,7 @@ impl IsExternalStorage for ArweaveStorage { let res = serde_json::from_value::(response_body)?; log::debug!("Uploaded to Arweave: {:#?}", res); - log::info!("Uploaded at {}", self.base_url.join(&res.id)?); + log::info!("Uploaded at {}", self.upload_base_url.join(&res.id)?); // the key is in base64 format, we want to convert that to hexadecimals Ok(ArweaveKey { arweave: res.id }) @@ -219,10 +265,12 @@ mod tests { #[tokio::test] #[ignore = "run manually"] async fn test_download_data() -> Result<()> { + dotenvy::dotenv().unwrap(); + // https://gateway.irys.xyz/Zg6CZYfxXCWYnCuKEpnZCYfy7ghit1_v4-BCe53iWuA let tx_id = "Zg6CZYfxXCWYnCuKEpnZCYfy7ghit1_v4-BCe53iWuA".to_string(); let key = ArweaveKey { arweave: tx_id }; - let arweave = ArweaveStorage::default(); + let arweave = ArweaveStorage::new_from_env()?; let result = arweave.get(key).await?; let val = serde_json::from_slice::(&result)?; @@ -234,7 +282,9 @@ mod tests { #[tokio::test] #[ignore = "run manually with Arweave wallet"] async fn test_upload_and_download_data() -> Result<()> { - let arweave = ArweaveStorage::default(); + dotenvy::dotenv().unwrap(); + + let arweave = ArweaveStorage::new_from_env()?; let input = b"Hi there Im a test data".to_vec(); // put data diff --git a/tests/oracle_test.rs b/tests/oracle_test.rs index c6050d6..770e43e 100644 --- a/tests/oracle_test.rs +++ b/tests/oracle_test.rs @@ -81,10 +81,16 @@ async fn test_oracle_string_input() -> Result<()> { let task_id = event.taskId; assert_eq!(event.statusBefore, TaskStatus::None as u8); assert_eq!(event.statusAfter, TaskStatus::PendingGeneration as u8); - let generation_receipt = - handle_request(&generator, &[OracleKind::Generator], &workflows, event) - .await? - .unwrap(); + let generation_receipt = handle_request( + &generator, + &[OracleKind::Generator], + &workflows, + TaskStatus::PendingGeneration, + event.taskId, + event.protocol, + ) + .await? + .unwrap(); // handle validation by reading the latest event let tasks = node @@ -98,10 +104,16 @@ async fn test_oracle_string_input() -> Result<()> { assert_eq!(event.taskId, task_id); assert_eq!(event.statusBefore, TaskStatus::PendingGeneration as u8); assert_eq!(event.statusAfter, TaskStatus::PendingValidation as u8); - let validation_receipt = - handle_request(&validator, &[OracleKind::Validator], &workflows, event) - .await? - .unwrap(); + let validation_receipt = handle_request( + &validator, + &[OracleKind::Validator], + &workflows, + TaskStatus::PendingValidation, + event.taskId, + event.protocol, + ) + .await? + .unwrap(); let tasks = node .get_tasks_in_range( diff --git a/tests/swan_test.rs b/tests/swan_test.rs index b464d90..f12a707 100644 --- a/tests/swan_test.rs +++ b/tests/swan_test.rs @@ -101,10 +101,16 @@ async fn test_swan() -> Result<()> { let task_id = event.taskId; assert_eq!(event.statusBefore, TaskStatus::None as u8); assert_eq!(event.statusAfter, TaskStatus::PendingGeneration as u8); - let generation_receipt = - handle_request(&generator, &[OracleKind::Generator], &workflows, event) - .await? - .unwrap(); + let generation_receipt = handle_request( + &generator, + &[OracleKind::Generator], + &workflows, + TaskStatus::PendingGeneration, + event.taskId, + event.protocol, + ) + .await? + .unwrap(); // handle validation by reading the latest event let tasks = node @@ -118,10 +124,16 @@ async fn test_swan() -> Result<()> { assert_eq!(event.taskId, task_id); assert_eq!(event.statusBefore, TaskStatus::PendingGeneration as u8); assert_eq!(event.statusAfter, TaskStatus::PendingValidation as u8); - let validation_receipt = - handle_request(&validator, &[OracleKind::Validator], &workflows, event) - .await? - .unwrap(); + let validation_receipt = handle_request( + &validator, + &[OracleKind::Validator], + &workflows, + TaskStatus::PendingValidation, + event.taskId, + event.protocol, + ) + .await? + .unwrap(); let tasks = node .get_tasks_in_range(