Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ RUST_LOG=none,dria_oracle=info
# example: ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80
SECRET_KEY=your-secret-key

# Coordinator address (optional)
COORDINATOR_ADDRESS=

## Arweave configurations
# path to wallet, only required if your BYTE_LIMIT is enough that
# you may do an Arweave upload to store a large value on-chain
Expand Down
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "dria-oracle"
description = "Dria Knowledge Network Oracle Node"
version = "0.2.0"
version = "0.2.1"
edition = "2021"
license = "Apache-2.0"
readme = "README.md"
Expand All @@ -19,7 +19,6 @@ tokio = { version = "1.39.2", features = [
"signal",
] }
tokio-util = "0.7.13"
lazy_static = "1.5.0"

# workflows
dkn-workflows = { git = "https://github.com/firstbatchxyz/dkn-compute-node" }
Expand Down
31 changes: 24 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,31 @@ cargo install --git https://github.com/firstbatchxyz/dria-oracle-node

Create an `.env` file by copying `.env.example`. You have to fill the following variables:

- Get an RPC URL from a provider such as Alchemy or Infura, and set it as `RPC_URL`.
- Provide an Ethereum wallet secret koy to `SECRET_KEY`, make sure it has funds to pay for gas and tokens.
- Get an RPC URL from a provider such as [Alchemy](https://www.alchemy.com/) or [Infura](https://www.infura.io/), and set it as `RPC_URL`.
- Provide an Ethereum wallet secret key to `SECRET_KEY`, make sure it has funds to pay for gas and tokens.

Optionally, you can save gas costs using Arweave:
> [!NOTE]
>
> The contract addresses are determined with respect to the chain connected via RPC URL, but you can override it via `COORDINATOR_ADDRESS` environment variable.
> In any case, you should not need to do this.

### Arweave

You can save gas costs using [Arweave](https://arweave.org/):

- Provide an Arweave wallet so that you can use Arweave for large results. Alternatively, dont provide a wallet but instead set `ARWEAVE_BYTE_LIMIT` to a very large value. TODO: this should be done automatically if wallet does not exist
- Provide an Arweave wallet via `ARWEAVE_WALLET_PATH` variable so that you can use Arweave for large results. You can create one [here](https://arweave.app/).
- You can set `ARWEAVE_BYTE_LIMIT` to determine the byte length threshold, beyond which values are uploaded to Arweave. It defaults to 1024, so any data less than that many bytes will be written as-is.

If you omit Arweave, it will only use the client for downloading things from Arweave, but will never upload.

### LLM Providers

As for the LLM providers:

- If you are using Ollama, make sure it is running and the host & port are correct.
- If you are using OpenAI, make sure you provide the `OPENAI_API_KEY`.
- If you are using Gemini, make sure you provide the `GEMINI_API_KEY`.
- If you are using OpenRouter, make sure you provide the `OPENROUTER_API_KEY`.
- If you are using OpenAI, provide the `OPENAI_API_KEY`.
- If you are using Gemini, provide the `GEMINI_API_KEY`.
- If you are using OpenRouter, provide the `OPENROUTER_API_KEY`.

## Usage

Expand All @@ -52,6 +64,11 @@ The CLI provides several methods to interact with the oracle contracts.
- [Viewing Tasks](#viewing-tasks)
- [Balance & Rewards](#balance--rewards)

> [!TIP]
>
> By default logs will be `info` level, but you can add a `DEBUG=1` env variable and it will use `debug` level instead.
> You can set `RUST_LOG` variable yourself as well.

### Registration

To serve oracle requests, you **MUST** first register as your desired oracle type, i.e. `generator` or `validator`. These are handled by the registration commands `register` and `unregister` which accepts multiple arguments to register at once. You can then see your registrations with `registrations` command.
Expand Down
38 changes: 26 additions & 12 deletions misc/arweave.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,46 @@
/**
* A helper script to print the content of an Arweave transaction, where transaction id is hex-encoded.
* This means that the input is a 64-char hexadecimal.
* A helper script to print the content of an Arweave transaction.
*
* Usage:
*
* bun run ./misc/arweave.js 0x30613233613135613236663864663332366165306137663863633636343437336238373463353966333964623436366665316337313531393634623734393231
* ```sh
* # calldata as-is
* bun run ./misc/arweave.js 0x7b2261727765617665223a224d49555775656361634b417a62755442335a6a57613463784e6461774d71435a704550694f71675a625a63227d
*
* Tip:
* # as an object (with escaped quotes)
* bun run ./misc/arweave.js "{\"arweave\":\"MIUWuecacKAzbuTB3ZjWa4cxNdawMqCZpEPiOqgZbZc\"}"
*
* Can be piped to `pbcopy` on macOS to copy the output to clipboard.
* # base64 txid
* bun run ./misc/arweave.js MIUWuecacKAzbuTB3ZjWa4cxNdawMqCZpEPiOqgZbZc
* ```
*
* Can be piped to `pbcopy` on macOS to copy the output to clipboard.
*/

// parse input
let input = process.argv[2];
if (!input) {
console.error("No input provided.");
return;
}

// get rid of 0x
let arweaveTxId;
if (input.startsWith("0x")) {
input = input.slice(2);
// if it starts with 0x, we assume its all hex
arweaveTxId = JSON.parse(
Buffer.from(input.slice(2), "hex").toString()
).arweave;
} else if (input.startsWith("{")) {
// if it starts with {, we assume its a JSON string
console.log("input", input);
arweaveTxId = JSON.parse(input).arweave;
} else {
// otherwise, we assume its a base64 txid
arweaveTxId = input;
}
const inputDecoded = Buffer.from(input, "hex").toString();
const obj = JSON.parse(inputDecoded);

// construct the URL
// download the actual response from Arweave
const url = `https://arweave.net/${obj.arweave}`;
console.log(url);
const url = `https://arweave.net/${arweaveTxId}`;
const res = await fetch(url);

console.log(await res.text());
88 changes: 69 additions & 19 deletions src/cli/commands/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{
use alloy::{
eips::BlockNumberOrTag,
primitives::{utils::format_ether, U256},
rpc::types::Log,
};
use dkn_workflows::{DriaWorkflowsConfig, Model, ModelProvider};
use eyre::{eyre, Context, Result};
Expand Down Expand Up @@ -45,6 +44,7 @@ impl DriaOracle {
}
}
}

log::info!(
"Running as: {}",
kinds
Expand Down Expand Up @@ -122,8 +122,12 @@ impl DriaOracle {
next = event_stream.next() => {
match next {
Some(Ok((event, log))) => {
self.handle_event_log(event, log, &kinds, &model_config)
.await
log::debug!(
"Handling task {} (tx: {})",
event.taskId,
log.transaction_hash.unwrap_or_default()
);
self.handle_event_log(event, &kinds, &model_config).await
}
Some(Err(e)) => log::error!("Could not handle event: {}", e),
None => {
Expand All @@ -141,19 +145,16 @@ impl DriaOracle {
async fn handle_event_log(
&self,
event: StatusUpdate,
log: Log,
kinds: &[OracleKind],
model_config: &DriaWorkflowsConfig,
workflows: &DriaWorkflowsConfig,
) {
let task_id = event.taskId;
log::debug!(
"Handling task {} (tx: {})",
task_id,
log.transaction_hash.unwrap_or_default()
);
let Ok(status) = TaskStatus::try_from(event.statusAfter) else {
log::error!("Could not parse task status: {}", event.statusAfter);
return;
};

// handle request
match handle_request(self, kinds, model_config, event).await {
match handle_request(self, kinds, workflows, status, event.taskId, event.protocol).await {
Ok(Some(receipt)) => {
log::info!(
"Task {} processed successfully. (tx: {})",
Expand All @@ -171,7 +172,7 @@ impl DriaOracle {
async fn handle_previous_tasks(
&self,
from_block: BlockNumberOrTag,
model_config: &DriaWorkflowsConfig,
workflows: &DriaWorkflowsConfig,
kinds: &[OracleKind],
) -> Result<()> {
log::info!(
Expand All @@ -183,19 +184,30 @@ impl DriaOracle {
.await?;

for (event, log) in prev_tasks {
let status_before = TaskStatus::try_from(event.statusBefore)?;
let status_after = TaskStatus::try_from(event.statusAfter)?;
let task_id = event.taskId;
log::info!(
"Previous task: {} ({} -> {})",
task_id,
TaskStatus::try_from(event.statusBefore).unwrap_or_default(),
TaskStatus::try_from(event.statusAfter).unwrap_or_default()
status_before,
status_after
);
log::debug!(
"Handling task {} (tx: {})",
task_id,
log.transaction_hash.unwrap_or_default()
);
match handle_request(self, kinds, model_config, event).await {
match handle_request(
self,
kinds,
workflows,
status_after,
event.taskId,
event.protocol,
)
.await
{
Ok(Some(receipt)) => {
log::info!(
"Task {} processed successfully. (tx: {})",
Expand All @@ -212,6 +224,7 @@ impl DriaOracle {

Ok(())
}

pub(in crate::cli) async fn view_task_events(
&self,
from_block: impl Into<BlockNumberOrTag> + Clone,
Expand All @@ -233,12 +246,13 @@ impl DriaOracle {

let task_events = self.get_tasks_in_range(from_block, to_block).await?;

for (event, _) in task_events {
for (event, log) in task_events {
log::info!(
"Task: {} ({} -> {})",
"Task {} changed from {} to {} at block {}",
event.taskId,
TaskStatus::try_from(event.statusBefore).unwrap_or_default(),
TaskStatus::try_from(event.statusAfter).unwrap_or_default()
TaskStatus::try_from(event.statusAfter).unwrap_or_default(),
log.block_number.unwrap_or_default()
);
}

Expand Down Expand Up @@ -290,6 +304,42 @@ impl DriaOracle {
Ok(())
}

pub(in crate::cli) async fn process_task(
&self,
workflows: &DriaWorkflowsConfig,
kinds: &[OracleKind],
task_id: U256,
) -> Result<()> {
log::info!("Processing task {}.", task_id);
let request = self.get_task_request(task_id).await?;

log::info!(
"Request Information:\nRequester: {}\nStatus: {}\nInput: {}\nModels: {}",
request.requester,
TaskStatus::try_from(request.status)?,
bytes_to_string(&request.input)?,
bytes_to_string(&request.models)?
);

// TODO: !!!
let status = TaskStatus::try_from(request.status)?;
match handle_request(self, kinds, workflows, status, task_id, request.protocol).await {
Ok(Some(receipt)) => {
log::info!(
"Task {} processed successfully. (tx: {})",
task_id,
receipt.transaction_hash
)
}
Ok(None) => {
log::info!("Task {} ignored.", task_id)
}
Err(e) => log::error!("Could not process task: {:?}", e),
}

Ok(())
}

pub async fn request_task(
&self,
input: &str,
Expand Down
14 changes: 13 additions & 1 deletion src/cli/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,19 @@ pub enum Commands {
models: Vec<Model>,
},
/// View status of a given task.
View { task_id: U256 },
View {
#[arg(help = "Task id.", required = true)]
task_id: U256,
},
/// Process a single task.
Process {
#[arg(help = "Task id.", required = true)]
task_id: U256,
#[arg(help = "The oracle kinds to handle the task as.", required = false)]
kinds: Vec<OracleKind>,
#[arg(short, long = "model", help = "The models to use for this task.", required = true, value_parser = parse_model)]
models: Vec<Model>,
},
/// View tasks between specific blocks.
Tasks {
#[arg(long, help = "Starting block number, defaults to 'earliest'.", value_parser = parse_block_number_or_tag)]
Expand Down
17 changes: 16 additions & 1 deletion src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
mod commands;
use std::time::Duration;

use commands::Commands;

mod parsers;
use dkn_workflows::DriaWorkflowsConfig;
use parsers::*;

use crate::{DriaOracle, DriaOracleConfig};
Expand All @@ -25,6 +28,9 @@ struct Cli {
/// Ethereum wallet's secret (private) key.
#[arg(short, long, env = "SECRET_KEY", value_parser = parse_secret_key)]
secret_key: B256,

#[arg(short, long, env = "TX_TIMEOUT_SECS", default_value = "30")]
tx_timeout: Option<u64>,
}

/// Main CLI entry point.
Expand All @@ -39,7 +45,8 @@ pub async fn cli() -> Result<()> {

// create node
let config = DriaOracleConfig::new(&secret_key, rpc_url)
.wrap_err("could not create oracle configuration")?;
.wrap_err("could not create oracle configuration")?
.with_tx_timeout(Duration::from_secs(30)); // timeout is 30secs by default
let node = DriaOracle::new(config)
.await
.wrap_err("could not create oracle node")?;
Expand Down Expand Up @@ -89,6 +96,14 @@ pub async fn cli() -> Result<()> {
}
}
Commands::View { task_id } => node.view_task(task_id).await?,
Commands::Process {
task_id,
kinds,
models,
} => {
node.process_task(&DriaWorkflowsConfig::new(models), &kinds, task_id)
.await?
}
Commands::Tasks { from, to } => {
node.view_task_events(
from.unwrap_or(BlockNumberOrTag::Earliest),
Expand Down
2 changes: 1 addition & 1 deletion src/compute/generation/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ mod tests {
// cargo test --package dria-oracle --lib --all-features -- compute::generation::execute::tests::test_raw_workflow --exact --show-output --ignored
dotenvy::dotenv().unwrap();

let contract_result = hex_literal::hex!("7b2261727765617665223a2239397a4252676c4c663443696b35676c57444f667542463736456e417a4a6344303431545a614c6d6f6934227d");
let contract_result = hex_literal::hex!("7b2261727765617665223a224d49555775656361634b417a62755442335a6a57613463784e6461774d71435a704550694f71675a625a63227d");
let request = GenerationRequest::try_parse_bytes(&contract_result.into())
.await
.unwrap();
Expand Down
Loading