diff --git a/Cargo.lock b/Cargo.lock index 3f103bd9ee7..db82bbcb2a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -223,6 +223,15 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "base64-url" +version = "1.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a99c239d0c7e77c85dddfa9cebce48704b3c49550fcd3b84dd637e4484899f" +dependencies = [ + "base64", +] + [[package]] name = "bigdecimal" version = "0.1.2" @@ -1502,6 +1511,25 @@ dependencies = [ "web3", ] +[[package]] +name = "graph-chain-arweave" +version = "0.26.0" +dependencies = [ + "base64-url", + "diesel", + "graph", + "graph-core", + "graph-runtime-derive", + "graph-runtime-wasm", + "graph-store-postgres", + "pretty_assertions 0.7.2", + "prost", + "prost-types", + "serde", + "test-store", + "tonic-build", +] + [[package]] name = "graph-chain-ethereum" version = "0.26.0" @@ -1581,6 +1609,7 @@ dependencies = [ "futures 0.1.31", "futures 0.3.16", "graph", + "graph-chain-arweave", "graph-chain-ethereum", "graph-chain-near", "graph-chain-tendermint", @@ -1641,6 +1670,7 @@ dependencies = [ "futures 0.3.16", "git-testament", "graph", + "graph-chain-arweave", "graph-chain-ethereum", "graph-chain-near", "graph-chain-tendermint", @@ -1737,6 +1767,7 @@ dependencies = [ "either", "futures 0.3.16", "graph", + "graph-chain-arweave", "graph-chain-ethereum", "graph-chain-near", "graph-chain-tendermint", diff --git a/chain/arweave/.gitignore b/chain/arweave/.gitignore new file mode 100644 index 00000000000..97442b5f148 --- /dev/null +++ b/chain/arweave/.gitignore @@ -0,0 +1 @@ +google.protobuf.rs \ No newline at end of file diff --git a/chain/arweave/Cargo.toml b/chain/arweave/Cargo.toml new file mode 100644 index 00000000000..25e6b0d9ede --- /dev/null +++ b/chain/arweave/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "graph-chain-arweave" +version = "0.26.0" +edition = "2021" + +[build-dependencies] +tonic-build = { version = "0.7.1", features = ["prost"]} + +[dependencies] +base64-url = "1.4.13" +graph = { path = "../../graph" } +prost = "0.10.1" +prost-types = "0.10.1" +serde = "1.0" + +graph-runtime-wasm = { path = "../../runtime/wasm" } +graph-runtime-derive = { path = "../../runtime/derive" } + +[dev-dependencies] +diesel = { version = "1.4.7", features = ["postgres", "serde_json", "numeric", "r2d2"] } +graph-core = { path = "../../core" } +graph-store-postgres = { path = "../../store/postgres" } +pretty_assertions = "0.7.2" +test-store = { path = "../../store/test-store" } diff --git a/chain/arweave/build.rs b/chain/arweave/build.rs new file mode 100644 index 00000000000..e2ede2acef2 --- /dev/null +++ b/chain/arweave/build.rs @@ -0,0 +1,7 @@ +fn main() { + println!("cargo:rerun-if-changed=proto"); + tonic_build::configure() + .out_dir("src/protobuf") + .compile(&["proto/type.proto"], &["proto"]) + .expect("Failed to compile Firehose Arweave proto(s)"); +} diff --git a/chain/arweave/proto/type.proto b/chain/arweave/proto/type.proto new file mode 100644 index 00000000000..b3a41a4a56a --- /dev/null +++ b/chain/arweave/proto/type.proto @@ -0,0 +1,108 @@ +syntax = "proto3"; + +package sf.arweave.type.v1; + +option go_package = "github.com/ChainSafe/firehose-arweave/pb/sf/arweave/type/v1;pbcodec"; + +message BigInt { + bytes bytes = 1; +} + +message Block { + // Firehose block version (unrelated to Arweave block version) + uint32 ver = 1; + // The block identifier + bytes indep_hash = 2; + // The nonce chosen to solve the mining problem + bytes nonce = 3; + // `indep_hash` of the previous block in the weave + bytes previous_block = 4; + // POSIX time of block discovery + uint64 timestamp = 5; + // POSIX time of the last difficulty retarget + uint64 last_retarget = 6; + // Mining difficulty; the number `hash` must be greater than. + BigInt diff = 7; + // How many blocks have passed since the genesis block + uint64 height = 8; + // Mining solution hash of the block; must satisfy the mining difficulty + bytes hash = 9; + // Merkle root of the tree of Merkle roots of block's transactions' data. + bytes tx_root = 10; + // Transactions contained within this block + repeated Transaction txs = 11; + // The root hash of the Merkle Patricia Tree containing + // all wallet (account) balances and the identifiers + // of the last transactions posted by them; if any. + bytes wallet_list = 12; + // (string or) Address of the account to receive the block rewards. Can also be unclaimed which is encoded as a null byte + bytes reward_addr = 13; + // Tags that a block producer can add to a block + repeated Tag tags = 14; + // Size of reward pool + BigInt reward_pool = 15; + // Size of the weave in bytes + BigInt weave_size = 16; + // Size of this block in bytes + BigInt block_size = 17; + // Required after the version 1.8 fork. Zero otherwise. + // The sum of the average number of hashes computed + // by the network to produce the past blocks including this one. + BigInt cumulative_diff = 18; + // Required after the version 1.8 fork. Null byte otherwise. + // The Merkle root of the block index - the list of {`indep_hash`; `weave_size`; `tx_root`} triplets + bytes hash_list_merkle = 20; + // The proof of access; Used after v2.4 only; set as defaults otherwise + ProofOfAccess poa = 21; +} + +// A succinct proof of access to a recall byte found in a TX +message ProofOfAccess { + // The recall byte option chosen; global offset of index byte + string option = 1; + // The path through the Merkle tree of transactions' `data_root`s; + // from the `data_root` being proven to the corresponding `tx_root` + bytes tx_path = 2; + // The path through the Merkle tree of identifiers of chunks of the + // corresponding transaction; from the chunk being proven to the + // corresponding `data_root`. + bytes data_path = 3; + // The data chunk. + bytes chunk = 4; +} + +message Transaction { + // 1 or 2 for v1 or v2 transactions. More allowable in the future + uint32 format = 1; + // The transaction identifier. + bytes id = 2; + // Either the identifier of the previous transaction from the same + // wallet or the identifier of one of the last ?MAX_TX_ANCHOR_DEPTH blocks. + bytes last_tx = 3; + // The public key the transaction is signed with. + bytes owner = 4; + // A list of arbitrary key-value pairs + repeated Tag tags = 5; + // The address of the recipient; if any. The SHA2-256 hash of the public key. + bytes target = 6; + // The amount of Winstons to send to the recipient; if any. + BigInt quantity = 7; + // The data to upload; if any. For v2 transactions; the field is optional + // - a fee is charged based on the `data_size` field; + // data may be uploaded any time later in chunks. + bytes data = 8; + // Size in bytes of the transaction data. + BigInt data_size = 9; + // The Merkle root of the Merkle tree of data chunks. + bytes data_root = 10; + // The signature. + bytes signature = 11; + // The fee in Winstons. + BigInt reward = 12; +} + + +message Tag { + bytes name = 1; + bytes value = 2; +} diff --git a/chain/arweave/src/adapter.rs b/chain/arweave/src/adapter.rs new file mode 100644 index 00000000000..fbf53eb8c9b --- /dev/null +++ b/chain/arweave/src/adapter.rs @@ -0,0 +1,92 @@ +use crate::capabilities::NodeCapabilities; +use crate::{data_source::DataSource, Chain}; +use graph::blockchain as bc; +use graph::prelude::*; +use std::collections::HashSet; + +#[derive(Clone, Debug, Default)] +pub struct TriggerFilter { + pub(crate) block_filter: ArweaveBlockFilter, + pub(crate) transaction_filter: ArweaveTransactionFilter, +} + +impl bc::TriggerFilter for TriggerFilter { + fn extend<'a>(&mut self, data_sources: impl Iterator + Clone) { + let TriggerFilter { + block_filter, + transaction_filter, + } = self; + + block_filter.extend(ArweaveBlockFilter::from_data_sources(data_sources.clone())); + transaction_filter.extend(ArweaveTransactionFilter::from_data_sources(data_sources)); + } + + fn node_capabilities(&self) -> NodeCapabilities { + NodeCapabilities {} + } + + fn extend_with_template( + &mut self, + _data_source: impl Iterator::DataSourceTemplate>, + ) { + } + + fn to_firehose_filter(self) -> Vec { + vec![] + } +} + +/// ArweaveBlockFilter will match every block regardless of source being set. +/// see docs: https://thegraph.com/docs/en/supported-networks/arweave/ +#[derive(Clone, Debug, Default)] +pub(crate) struct ArweaveTransactionFilter { + owners: HashSet>, +} + +impl ArweaveTransactionFilter { + pub fn matches(&self, owner: &[u8]) -> bool { + self.owners.contains(owner) + } + + pub fn from_data_sources<'a>(iter: impl IntoIterator) -> Self { + let owners: Vec> = iter + .into_iter() + .filter(|data_source| { + data_source.source.owner.is_some() + && !data_source.mapping.transaction_handlers.is_empty() + }) + .map(|ds| { + base64_url::decode(&ds.source.owner.clone().unwrap_or_default()).unwrap_or_default() + }) + .collect(); + + Self { + owners: HashSet::from_iter(owners), + } + } + + pub fn extend(&mut self, other: ArweaveTransactionFilter) { + self.owners.extend(other.owners); + } +} + +/// ArweaveBlockFilter will match every block regardless of source being set. +/// see docs: https://thegraph.com/docs/en/supported-networks/arweave/ +#[derive(Clone, Debug, Default)] +pub(crate) struct ArweaveBlockFilter { + pub trigger_every_block: bool, +} + +impl ArweaveBlockFilter { + pub fn from_data_sources<'a>(iter: impl IntoIterator) -> Self { + Self { + trigger_every_block: iter + .into_iter() + .any(|data_source| !data_source.mapping.block_handlers.is_empty()), + } + } + + pub fn extend(&mut self, other: ArweaveBlockFilter) { + self.trigger_every_block = self.trigger_every_block || other.trigger_every_block; + } +} diff --git a/chain/arweave/src/capabilities.rs b/chain/arweave/src/capabilities.rs new file mode 100644 index 00000000000..27c7622aeb5 --- /dev/null +++ b/chain/arweave/src/capabilities.rs @@ -0,0 +1,37 @@ +use graph::{anyhow::Error, impl_slog_value}; +use std::cmp::{Ordering, PartialOrd}; +use std::fmt; +use std::str::FromStr; + +use crate::data_source::DataSource; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct NodeCapabilities {} + +impl PartialOrd for NodeCapabilities { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + +impl FromStr for NodeCapabilities { + type Err = Error; + + fn from_str(_s: &str) -> Result { + Ok(NodeCapabilities {}) + } +} + +impl fmt::Display for NodeCapabilities { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("arweave") + } +} + +impl_slog_value!(NodeCapabilities, "{}"); + +impl graph::blockchain::NodeCapabilities for NodeCapabilities { + fn from_data_sources(_data_sources: &[DataSource]) -> Self { + NodeCapabilities {} + } +} diff --git a/chain/arweave/src/chain.rs b/chain/arweave/src/chain.rs new file mode 100644 index 00000000000..01fefd05042 --- /dev/null +++ b/chain/arweave/src/chain.rs @@ -0,0 +1,334 @@ +use graph::blockchain::{Block, BlockchainKind}; +use graph::cheap_clone::CheapClone; +use graph::data::subgraph::UnifiedMappingApiVersion; +use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints}; +use graph::prelude::{MetricsRegistry, TryFutureExt}; +use graph::{ + anyhow, + blockchain::{ + block_stream::{ + BlockStreamEvent, BlockWithTriggers, FirehoseError, + FirehoseMapper as FirehoseMapperTrait, TriggersAdapter as TriggersAdapterTrait, + }, + firehose_block_stream::FirehoseBlockStream, + BlockHash, BlockPtr, Blockchain, IngestorError, RuntimeAdapter as RuntimeAdapterTrait, + }, + components::store::DeploymentLocator, + firehose::{self as firehose, ForkStep}, + prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory}, +}; +use prost::Message; +use std::sync::Arc; + +use crate::adapter::TriggerFilter; +use crate::capabilities::NodeCapabilities; +use crate::data_source::{DataSourceTemplate, UnresolvedDataSourceTemplate}; +use crate::runtime::RuntimeAdapter; +use crate::trigger::{self, ArweaveTrigger}; +use crate::{ + codec, + data_source::{DataSource, UnresolvedDataSource}, +}; +use graph::blockchain::block_stream::BlockStream; + +pub struct Chain { + logger_factory: LoggerFactory, + name: String, + firehose_endpoints: Arc, + chain_store: Arc, + metrics_registry: Arc, +} + +impl std::fmt::Debug for Chain { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "chain: arweave") + } +} + +impl Chain { + pub fn new( + logger_factory: LoggerFactory, + name: String, + chain_store: Arc, + firehose_endpoints: FirehoseEndpoints, + metrics_registry: Arc, + ) -> Self { + Chain { + logger_factory, + name, + firehose_endpoints: Arc::new(firehose_endpoints), + chain_store, + metrics_registry, + } + } +} + +#[async_trait] +impl Blockchain for Chain { + const KIND: BlockchainKind = BlockchainKind::Arweave; + + type Block = codec::Block; + + type DataSource = DataSource; + + type UnresolvedDataSource = UnresolvedDataSource; + + type DataSourceTemplate = DataSourceTemplate; + + type UnresolvedDataSourceTemplate = UnresolvedDataSourceTemplate; + + type TriggerData = crate::trigger::ArweaveTrigger; + + type MappingTrigger = crate::trigger::ArweaveTrigger; + + type TriggerFilter = crate::adapter::TriggerFilter; + + type NodeCapabilities = crate::capabilities::NodeCapabilities; + + fn triggers_adapter( + &self, + _loc: &DeploymentLocator, + _capabilities: &Self::NodeCapabilities, + _unified_api_version: UnifiedMappingApiVersion, + ) -> Result>, Error> { + let adapter = TriggersAdapter {}; + Ok(Arc::new(adapter)) + } + + async fn new_firehose_block_stream( + &self, + deployment: DeploymentLocator, + block_cursor: Option, + start_blocks: Vec, + subgraph_current_block: Option, + filter: Arc, + unified_api_version: UnifiedMappingApiVersion, + ) -> Result>, Error> { + let adapter = self + .triggers_adapter(&deployment, &NodeCapabilities {}, unified_api_version) + .expect(&format!("no adapter for network {}", self.name,)); + + let firehose_endpoint = match self.firehose_endpoints.random() { + Some(e) => e.clone(), + None => return Err(anyhow::format_err!("no firehose endpoint available")), + }; + + let logger = self + .logger_factory + .subgraph_logger(&deployment) + .new(o!("component" => "FirehoseBlockStream")); + + let firehose_mapper = Arc::new(FirehoseMapper { + endpoint: firehose_endpoint.cheap_clone(), + }); + + Ok(Box::new(FirehoseBlockStream::new( + deployment.hash, + firehose_endpoint, + subgraph_current_block, + block_cursor, + firehose_mapper, + adapter, + filter, + start_blocks, + logger, + self.metrics_registry.clone(), + ))) + } + + async fn new_polling_block_stream( + &self, + _deployment: DeploymentLocator, + _start_blocks: Vec, + _subgraph_current_block: Option, + _filter: Arc, + _unified_api_version: UnifiedMappingApiVersion, + ) -> Result>, Error> { + panic!("Arweave does not support polling block stream") + } + + fn chain_store(&self) -> Arc { + self.chain_store.clone() + } + + async fn block_pointer_from_number( + &self, + logger: &Logger, + number: BlockNumber, + ) -> Result { + let firehose_endpoint = match self.firehose_endpoints.random() { + Some(e) => e.clone(), + None => return Err(anyhow::format_err!("no firehose endpoint available").into()), + }; + + firehose_endpoint + .block_ptr_for_number::(logger, number) + .map_err(Into::into) + .await + } + + fn runtime_adapter(&self) -> Arc> { + Arc::new(RuntimeAdapter {}) + } + + fn is_firehose_supported(&self) -> bool { + true + } +} + +pub struct TriggersAdapter {} + +#[async_trait] +impl TriggersAdapterTrait for TriggersAdapter { + async fn scan_triggers( + &self, + _from: BlockNumber, + _to: BlockNumber, + _filter: &TriggerFilter, + ) -> Result>, Error> { + panic!("Should never be called since not used by FirehoseBlockStream") + } + + async fn triggers_in_block( + &self, + _logger: &Logger, + block: codec::Block, + filter: &TriggerFilter, + ) -> Result, Error> { + // TODO: Find the best place to introduce an `Arc` and avoid this clone. + let shared_block = Arc::new(block.clone()); + + let TriggerFilter { + block_filter, + transaction_filter, + } = filter; + + let txs = block + .clone() + .txs + .into_iter() + .filter(|tx| transaction_filter.matches(&tx.owner)) + .map(|tx| trigger::TransactionWithBlockPtr { + tx: Arc::new(tx.clone()), + block: shared_block.clone(), + }) + .collect::>(); + + let mut trigger_data: Vec<_> = txs + .into_iter() + .map(|tx| ArweaveTrigger::Transaction(Arc::new(tx))) + .collect(); + + if block_filter.trigger_every_block { + trigger_data.push(ArweaveTrigger::Block(shared_block.cheap_clone())); + } + + Ok(BlockWithTriggers::new(block, trigger_data)) + } + + async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result { + panic!("Should never be called since not used by FirehoseBlockStream") + } + + async fn ancestor_block( + &self, + _ptr: BlockPtr, + _offset: BlockNumber, + ) -> Result, Error> { + panic!("Should never be called since FirehoseBlockStream cannot resolve it") + } + + /// Panics if `block` is genesis. + /// But that's ok since this is only called when reverting `block`. + async fn parent_ptr(&self, block: &BlockPtr) -> Result, Error> { + // FIXME (Arweave): Might not be necessary for Arweave support for now + Ok(Some(BlockPtr { + hash: BlockHash::from(vec![0xff; 48]), + number: block.number.saturating_sub(1), + })) + } +} + +pub struct FirehoseMapper { + endpoint: Arc, +} + +#[async_trait] +impl FirehoseMapperTrait for FirehoseMapper { + async fn to_block_stream_event( + &self, + logger: &Logger, + response: &firehose::Response, + adapter: &Arc>, + filter: &TriggerFilter, + ) -> Result, FirehoseError> { + let step = ForkStep::from_i32(response.step).unwrap_or_else(|| { + panic!( + "unknown step i32 value {}, maybe you forgot update & re-regenerate the protobuf definitions?", + response.step + ) + }); + + let any_block = response + .block + .as_ref() + .expect("block payload information should always be present"); + + // Right now, this is done in all cases but in reality, with how the BlockStreamEvent::Revert + // is defined right now, only block hash and block number is necessary. However, this information + // is not part of the actual bstream::BlockResponseV2 payload. As such, we need to decode the full + // block which is useless. + // + // Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe + // define a slimmed down stuct that would decode only a few fields and ignore all the rest. + let block = codec::Block::decode(any_block.value.as_ref())?; + + use ForkStep::*; + match step { + StepNew => Ok(BlockStreamEvent::ProcessBlock( + adapter.triggers_in_block(logger, block, filter).await?, + Some(response.cursor.clone()), + )), + + StepUndo => { + let parent_ptr = block + .parent_ptr() + .expect("Genesis block should never be reverted"); + + Ok(BlockStreamEvent::Revert( + parent_ptr, + Some(response.cursor.clone()), + )) + } + + StepIrreversible => { + panic!("irreversible step is not handled and should not be requested in the Firehose request") + } + + StepUnknown => { + panic!("unknown step should not happen in the Firehose response") + } + } + } + + async fn block_ptr_for_number( + &self, + logger: &Logger, + number: BlockNumber, + ) -> Result { + self.endpoint + .block_ptr_for_number::(logger, number) + .await + } + + // # FIXME + // + // the final block of arweave is itself in the current implementation + async fn final_block_ptr_for( + &self, + _logger: &Logger, + block: &codec::Block, + ) -> Result { + Ok(block.ptr()) + } +} diff --git a/chain/arweave/src/codec.rs b/chain/arweave/src/codec.rs new file mode 100644 index 00000000000..09da7fee1b0 --- /dev/null +++ b/chain/arweave/src/codec.rs @@ -0,0 +1,37 @@ +#[rustfmt::skip] +#[path = "protobuf/sf.arweave.r#type.v1.rs"] +mod pbcodec; + +use graph::{blockchain::Block as BlockchainBlock, blockchain::BlockPtr, prelude::BlockNumber}; + +pub use pbcodec::*; + +impl BlockchainBlock for Block { + fn number(&self) -> i32 { + BlockNumber::try_from(self.height).unwrap() + } + + fn ptr(&self) -> BlockPtr { + BlockPtr { + hash: self.indep_hash.clone().into(), + number: self.number(), + } + } + + fn parent_ptr(&self) -> Option { + if self.height == 0 { + return None; + } + + Some(BlockPtr { + hash: self.previous_block.clone().into(), + number: self.number().saturating_sub(1), + }) + } +} + +impl AsRef<[u8]> for BigInt { + fn as_ref(&self) -> &[u8] { + self.bytes.as_ref() + } +} diff --git a/chain/arweave/src/data_source.rs b/chain/arweave/src/data_source.rs new file mode 100644 index 00000000000..e6e3314c498 --- /dev/null +++ b/chain/arweave/src/data_source.rs @@ -0,0 +1,370 @@ +use graph::blockchain::{Block, TriggerWithHandler}; +use graph::components::store::StoredDynamicDataSource; +use graph::data::subgraph::DataSourceContext; +use graph::prelude::SubgraphManifestValidationError; +use graph::{ + anyhow::{anyhow, Error}, + blockchain::{self, Blockchain}, + prelude::{ + async_trait, info, BlockNumber, CheapClone, DataSourceTemplateInfo, Deserialize, Link, + LinkResolver, Logger, + }, + semver, +}; +use std::collections::BTreeMap; +use std::{convert::TryFrom, sync::Arc}; + +use crate::chain::Chain; +use crate::trigger::ArweaveTrigger; + +pub const ARWEAVE_KIND: &str = "arweave"; + +/// Runtime representation of a data source. +#[derive(Clone, Debug)] +pub struct DataSource { + pub kind: String, + pub network: Option, + pub name: String, + pub(crate) source: Source, + pub mapping: Mapping, + pub context: Arc>, + pub creation_block: Option, +} + +impl blockchain::DataSource for DataSource { + // FIXME + // + // need to decode the base64url encoding? + fn address(&self) -> Option<&[u8]> { + self.source.owner.as_ref().map(String::as_bytes) + } + + fn start_block(&self) -> BlockNumber { + self.source.start_block + } + + fn match_and_decode( + &self, + trigger: &::TriggerData, + block: &Arc<::Block>, + _logger: &Logger, + ) -> Result>, Error> { + if self.source.start_block > block.number() { + return Ok(None); + } + + let handler = match trigger { + // A block trigger matches if a block handler is present. + ArweaveTrigger::Block(_) => match self.handler_for_block() { + Some(handler) => &handler.handler, + None => return Ok(None), + }, + // A transaction trigger matches if a transaction handler is present. + ArweaveTrigger::Transaction(_) => match self.handler_for_transaction() { + Some(handler) => &handler.handler, + None => return Ok(None), + }, + }; + + Ok(Some(TriggerWithHandler::new( + trigger.cheap_clone(), + handler.to_owned(), + ))) + } + + fn name(&self) -> &str { + &self.name + } + + fn kind(&self) -> &str { + &self.kind + } + + fn network(&self) -> Option<&str> { + self.network.as_ref().map(|s| s.as_str()) + } + + fn context(&self) -> Arc> { + self.context.cheap_clone() + } + + fn creation_block(&self) -> Option { + self.creation_block + } + + fn is_duplicate_of(&self, other: &Self) -> bool { + let DataSource { + kind, + network, + name, + source, + mapping, + context, + + // The creation block is ignored for detection duplicate data sources. + // Contract ABI equality is implicit in `source` and `mapping.abis` equality. + creation_block: _, + } = self; + + // mapping_request_sender, host_metrics, and (most of) host_exports are operational structs + // used at runtime but not needed to define uniqueness; each runtime host should be for a + // unique data source. + kind == &other.kind + && network == &other.network + && name == &other.name + && source == &other.source + && mapping.block_handlers == other.mapping.block_handlers + && context == &other.context + } + + fn as_stored_dynamic_data_source(&self) -> StoredDynamicDataSource { + // FIXME (Arweave): Implement me! + todo!() + } + + fn from_stored_dynamic_data_source( + _templates: &BTreeMap<&str, &DataSourceTemplate>, + _stored: StoredDynamicDataSource, + ) -> Result { + // FIXME (Arweave): Implement me correctly + todo!() + } + + fn validate(&self) -> Vec { + let mut errors = Vec::new(); + + if self.kind != ARWEAVE_KIND { + errors.push(anyhow!( + "data source has invalid `kind`, expected {} but found {}", + ARWEAVE_KIND, + self.kind + )) + } + + // Validate that there is a `source` address if there are transaction handlers + let no_source_address = self.address().is_none(); + let has_transaction_handlers = !self.mapping.transaction_handlers.is_empty(); + if no_source_address && has_transaction_handlers { + errors.push(SubgraphManifestValidationError::SourceAddressRequired.into()); + }; + + // Validate that there are no more than one of both block handlers and transaction handlers + if self.mapping.block_handlers.len() > 1 { + errors.push(anyhow!("data source has duplicated block handlers")); + } + if self.mapping.transaction_handlers.len() > 1 { + errors.push(anyhow!("data source has duplicated transaction handlers")); + } + + errors + } + + fn api_version(&self) -> semver::Version { + self.mapping.api_version.clone() + } + + fn runtime(&self) -> &[u8] { + self.mapping.runtime.as_ref() + } +} + +impl DataSource { + fn from_manifest( + kind: String, + network: Option, + name: String, + source: Source, + mapping: Mapping, + context: Option, + ) -> Result { + // Data sources in the manifest are created "before genesis" so they have no creation block. + let creation_block = None; + + Ok(DataSource { + kind, + network, + name, + source, + mapping, + context: Arc::new(context), + creation_block, + }) + } + + fn handler_for_block(&self) -> Option<&MappingBlockHandler> { + self.mapping.block_handlers.first() + } + + fn handler_for_transaction(&self) -> Option<&TransactionHandler> { + self.mapping.transaction_handlers.first() + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Deserialize)] +pub struct UnresolvedDataSource { + pub kind: String, + pub network: Option, + pub name: String, + pub(crate) source: Source, + pub mapping: UnresolvedMapping, + pub context: Option, +} + +#[async_trait] +impl blockchain::UnresolvedDataSource for UnresolvedDataSource { + async fn resolve( + self, + resolver: &Arc, + logger: &Logger, + ) -> Result { + let UnresolvedDataSource { + kind, + network, + name, + source, + mapping, + context, + } = self; + + info!(logger, "Resolve data source"; "name" => &name, "source_address" => format_args!("{:?}", base64_url::encode(&source.owner.clone().unwrap_or_default())), "source_start_block" => source.start_block); + + let mapping = mapping.resolve(resolver, logger).await?; + + DataSource::from_manifest(kind, network, name, source, mapping, context) + } +} + +/// # TODO +/// +/// add templates for arweave subgraphs +impl TryFrom> for DataSource { + type Error = Error; + + fn try_from(_info: DataSourceTemplateInfo) -> Result { + Err(anyhow!("Arweave subgraphs do not support templates")) + } +} + +#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] +pub struct BaseDataSourceTemplate { + pub kind: String, + pub network: Option, + pub name: String, + pub mapping: M, +} + +pub type UnresolvedDataSourceTemplate = BaseDataSourceTemplate; +pub type DataSourceTemplate = BaseDataSourceTemplate; + +#[async_trait] +impl blockchain::UnresolvedDataSourceTemplate for UnresolvedDataSourceTemplate { + async fn resolve( + self, + resolver: &Arc, + logger: &Logger, + ) -> Result { + let UnresolvedDataSourceTemplate { + kind, + network, + name, + mapping, + } = self; + + info!(logger, "Resolve data source template"; "name" => &name); + + Ok(DataSourceTemplate { + kind, + network, + name, + mapping: mapping.resolve(resolver, logger).await?, + }) + } +} + +impl blockchain::DataSourceTemplate for DataSourceTemplate { + fn name(&self) -> &str { + &self.name + } + + fn api_version(&self) -> semver::Version { + self.mapping.api_version.clone() + } + + fn runtime(&self) -> &[u8] { + self.mapping.runtime.as_ref() + } +} + +#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UnresolvedMapping { + pub api_version: String, + pub language: String, + pub entities: Vec, + #[serde(default)] + pub block_handlers: Vec, + #[serde(default)] + pub transaction_handlers: Vec, + pub file: Link, +} + +impl UnresolvedMapping { + pub async fn resolve( + self, + resolver: &Arc, + logger: &Logger, + ) -> Result { + let UnresolvedMapping { + api_version, + language, + entities, + block_handlers, + transaction_handlers, + file: link, + } = self; + + let api_version = semver::Version::parse(&api_version)?; + + info!(logger, "Resolve mapping"; "link" => &link.link); + let module_bytes = resolver.cat(logger, &link).await?; + + Ok(Mapping { + api_version, + language, + entities, + block_handlers, + transaction_handlers, + runtime: Arc::new(module_bytes), + link, + }) + } +} + +#[derive(Clone, Debug)] +pub struct Mapping { + pub api_version: semver::Version, + pub language: String, + pub entities: Vec, + pub block_handlers: Vec, + pub transaction_handlers: Vec, + pub runtime: Arc>, + pub link: Link, +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] +pub struct MappingBlockHandler { + pub handler: String, +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] +pub struct TransactionHandler { + pub handler: String, +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] +pub(crate) struct Source { + // A data source that does not have an owner can only have block handlers. + pub(crate) owner: Option, + #[serde(rename = "startBlock", default)] + pub(crate) start_block: BlockNumber, +} diff --git a/chain/arweave/src/lib.rs b/chain/arweave/src/lib.rs new file mode 100644 index 00000000000..a497e77bf9d --- /dev/null +++ b/chain/arweave/src/lib.rs @@ -0,0 +1,10 @@ +mod adapter; +mod capabilities; +mod chain; +mod codec; +mod data_source; +mod runtime; +mod trigger; + +pub use crate::chain::Chain; +pub use codec::Block; diff --git a/chain/arweave/src/protobuf/sf.arweave.r#type.v1.rs b/chain/arweave/src/protobuf/sf.arweave.r#type.v1.rs new file mode 100644 index 00000000000..98a1359305a --- /dev/null +++ b/chain/arweave/src/protobuf/sf.arweave.r#type.v1.rs @@ -0,0 +1,141 @@ +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BigInt { + #[prost(bytes="vec", tag="1")] + pub bytes: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Block { + /// Firehose block version (unrelated to Arweave block version) + #[prost(uint32, tag="1")] + pub ver: u32, + /// The block identifier + #[prost(bytes="vec", tag="2")] + pub indep_hash: ::prost::alloc::vec::Vec, + /// The nonce chosen to solve the mining problem + #[prost(bytes="vec", tag="3")] + pub nonce: ::prost::alloc::vec::Vec, + /// `indep_hash` of the previous block in the weave + #[prost(bytes="vec", tag="4")] + pub previous_block: ::prost::alloc::vec::Vec, + /// POSIX time of block discovery + #[prost(uint64, tag="5")] + pub timestamp: u64, + /// POSIX time of the last difficulty retarget + #[prost(uint64, tag="6")] + pub last_retarget: u64, + /// Mining difficulty; the number `hash` must be greater than. + #[prost(message, optional, tag="7")] + pub diff: ::core::option::Option, + /// How many blocks have passed since the genesis block + #[prost(uint64, tag="8")] + pub height: u64, + /// Mining solution hash of the block; must satisfy the mining difficulty + #[prost(bytes="vec", tag="9")] + pub hash: ::prost::alloc::vec::Vec, + /// Merkle root of the tree of Merkle roots of block's transactions' data. + #[prost(bytes="vec", tag="10")] + pub tx_root: ::prost::alloc::vec::Vec, + /// Transactions contained within this block + #[prost(message, repeated, tag="11")] + pub txs: ::prost::alloc::vec::Vec, + /// The root hash of the Merkle Patricia Tree containing + /// all wallet (account) balances and the identifiers + /// of the last transactions posted by them; if any. + #[prost(bytes="vec", tag="12")] + pub wallet_list: ::prost::alloc::vec::Vec, + /// (string or) Address of the account to receive the block rewards. Can also be unclaimed which is encoded as a null byte + #[prost(bytes="vec", tag="13")] + pub reward_addr: ::prost::alloc::vec::Vec, + /// Tags that a block producer can add to a block + #[prost(message, repeated, tag="14")] + pub tags: ::prost::alloc::vec::Vec, + /// Size of reward pool + #[prost(message, optional, tag="15")] + pub reward_pool: ::core::option::Option, + /// Size of the weave in bytes + #[prost(message, optional, tag="16")] + pub weave_size: ::core::option::Option, + /// Size of this block in bytes + #[prost(message, optional, tag="17")] + pub block_size: ::core::option::Option, + /// Required after the version 1.8 fork. Zero otherwise. + /// The sum of the average number of hashes computed + /// by the network to produce the past blocks including this one. + #[prost(message, optional, tag="18")] + pub cumulative_diff: ::core::option::Option, + /// Required after the version 1.8 fork. Null byte otherwise. + /// The Merkle root of the block index - the list of {`indep_hash`; `weave_size`; `tx_root`} triplets + #[prost(bytes="vec", tag="20")] + pub hash_list_merkle: ::prost::alloc::vec::Vec, + /// The proof of access; Used after v2.4 only; set as defaults otherwise + #[prost(message, optional, tag="21")] + pub poa: ::core::option::Option, +} +/// A succinct proof of access to a recall byte found in a TX +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ProofOfAccess { + /// The recall byte option chosen; global offset of index byte + #[prost(string, tag="1")] + pub option: ::prost::alloc::string::String, + /// The path through the Merkle tree of transactions' `data_root`s; + /// from the `data_root` being proven to the corresponding `tx_root` + #[prost(bytes="vec", tag="2")] + pub tx_path: ::prost::alloc::vec::Vec, + /// The path through the Merkle tree of identifiers of chunks of the + /// corresponding transaction; from the chunk being proven to the + /// corresponding `data_root`. + #[prost(bytes="vec", tag="3")] + pub data_path: ::prost::alloc::vec::Vec, + /// The data chunk. + #[prost(bytes="vec", tag="4")] + pub chunk: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Transaction { + /// 1 or 2 for v1 or v2 transactions. More allowable in the future + #[prost(uint32, tag="1")] + pub format: u32, + /// The transaction identifier. + #[prost(bytes="vec", tag="2")] + pub id: ::prost::alloc::vec::Vec, + /// Either the identifier of the previous transaction from the same + /// wallet or the identifier of one of the last ?MAX_TX_ANCHOR_DEPTH blocks. + #[prost(bytes="vec", tag="3")] + pub last_tx: ::prost::alloc::vec::Vec, + /// The public key the transaction is signed with. + #[prost(bytes="vec", tag="4")] + pub owner: ::prost::alloc::vec::Vec, + /// A list of arbitrary key-value pairs + #[prost(message, repeated, tag="5")] + pub tags: ::prost::alloc::vec::Vec, + /// The address of the recipient; if any. The SHA2-256 hash of the public key. + #[prost(bytes="vec", tag="6")] + pub target: ::prost::alloc::vec::Vec, + /// The amount of Winstons to send to the recipient; if any. + #[prost(message, optional, tag="7")] + pub quantity: ::core::option::Option, + /// The data to upload; if any. For v2 transactions; the field is optional + /// - a fee is charged based on the `data_size` field; + /// data may be uploaded any time later in chunks. + #[prost(bytes="vec", tag="8")] + pub data: ::prost::alloc::vec::Vec, + /// Size in bytes of the transaction data. + #[prost(message, optional, tag="9")] + pub data_size: ::core::option::Option, + /// The Merkle root of the Merkle tree of data chunks. + #[prost(bytes="vec", tag="10")] + pub data_root: ::prost::alloc::vec::Vec, + /// The signature. + #[prost(bytes="vec", tag="11")] + pub signature: ::prost::alloc::vec::Vec, + /// The fee in Winstons. + #[prost(message, optional, tag="12")] + pub reward: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Tag { + #[prost(bytes="vec", tag="1")] + pub name: ::prost::alloc::vec::Vec, + #[prost(bytes="vec", tag="2")] + pub value: ::prost::alloc::vec::Vec, +} diff --git a/chain/arweave/src/runtime/abi.rs b/chain/arweave/src/runtime/abi.rs new file mode 100644 index 00000000000..4f3d4005dc8 --- /dev/null +++ b/chain/arweave/src/runtime/abi.rs @@ -0,0 +1,191 @@ +use crate::codec; +use crate::trigger::TransactionWithBlockPtr; +use graph::runtime::gas::GasCounter; +use graph::runtime::{asc_new, AscHeap, AscPtr, DeterministicHostError, ToAscObj}; +use graph_runtime_wasm::asc_abi::class::{Array, Uint8Array}; + +pub(crate) use super::generated::*; + +impl ToAscObj for codec::Tag { + fn to_asc_obj( + &self, + heap: &mut H, + gas: &GasCounter, + ) -> Result { + Ok(AscTag { + name: asc_new(heap, self.name.as_slice(), gas)?, + value: asc_new(heap, self.value.as_slice(), gas)?, + }) + } +} + +impl ToAscObj for Vec> { + fn to_asc_obj( + &self, + heap: &mut H, + gas: &GasCounter, + ) -> Result { + let content = self + .into_iter() + .map(|x| asc_new(heap, x.as_slice(), gas)) + .collect::>, _>>()?; + Ok(AscTransactionArray(Array::new(&*content, heap, gas)?)) + } +} + +impl ToAscObj for Vec { + fn to_asc_obj( + &self, + heap: &mut H, + gas: &GasCounter, + ) -> Result { + let content = self + .iter() + .map(|x| asc_new(heap, x, gas)) + .collect::, _>>()?; + Ok(AscTagArray(Array::new(&*content, heap, gas)?)) + } +} + +impl ToAscObj for codec::ProofOfAccess { + fn to_asc_obj( + &self, + heap: &mut H, + gas: &GasCounter, + ) -> Result { + Ok(AscProofOfAccess { + option: asc_new(heap, &self.option, gas)?, + tx_path: asc_new(heap, self.tx_path.as_slice(), gas)?, + data_path: asc_new(heap, self.data_path.as_slice(), gas)?, + chunk: asc_new(heap, self.chunk.as_slice(), gas)?, + }) + } +} + +impl ToAscObj for codec::Transaction { + fn to_asc_obj( + &self, + heap: &mut H, + gas: &GasCounter, + ) -> Result { + Ok(AscTransaction { + format: self.format, + id: asc_new(heap, self.id.as_slice(), gas)?, + last_tx: asc_new(heap, self.last_tx.as_slice(), gas)?, + owner: asc_new(heap, self.owner.as_slice(), gas)?, + tags: asc_new(heap, &self.tags, gas)?, + target: asc_new(heap, self.target.as_slice(), gas)?, + quantity: asc_new( + heap, + self.quantity + .as_ref() + .map(|b| b.as_ref()) + .unwrap_or_default(), + gas, + )?, + data: asc_new(heap, self.data.as_slice(), gas)?, + data_size: asc_new( + heap, + self.data_size + .as_ref() + .map(|b| b.as_ref()) + .unwrap_or_default(), + gas, + )?, + data_root: asc_new(heap, self.data_root.as_slice(), gas)?, + signature: asc_new(heap, self.signature.as_slice(), gas)?, + reward: asc_new( + heap, + self.reward.as_ref().map(|b| b.as_ref()).unwrap_or_default(), + gas, + )?, + }) + } +} + +impl ToAscObj for codec::Block { + fn to_asc_obj( + &self, + heap: &mut H, + gas: &GasCounter, + ) -> Result { + Ok(AscBlock { + indep_hash: asc_new(heap, self.indep_hash.as_slice(), gas)?, + nonce: asc_new(heap, self.nonce.as_slice(), gas)?, + previous_block: asc_new(heap, self.previous_block.as_slice(), gas)?, + timestamp: self.timestamp, + last_retarget: self.last_retarget, + diff: asc_new( + heap, + self.diff.as_ref().map(|b| b.as_ref()).unwrap_or_default(), + gas, + )?, + height: self.height, + hash: asc_new(heap, self.hash.as_slice(), gas)?, + tx_root: asc_new(heap, self.tx_root.as_slice(), gas)?, + txs: asc_new( + heap, + &self + .txs + .iter() + .map(|tx| tx.id.clone().into()) + .collect::>>(), + gas, + )?, + wallet_list: asc_new(heap, self.wallet_list.as_slice(), gas)?, + reward_addr: asc_new(heap, self.reward_addr.as_slice(), gas)?, + tags: asc_new(heap, &self.tags, gas)?, + reward_pool: asc_new( + heap, + self.reward_pool + .as_ref() + .map(|b| b.as_ref()) + .unwrap_or_default(), + gas, + )?, + weave_size: asc_new( + heap, + self.weave_size + .as_ref() + .map(|b| b.as_ref()) + .unwrap_or_default(), + gas, + )?, + block_size: asc_new( + heap, + self.block_size + .as_ref() + .map(|b| b.as_ref()) + .unwrap_or_default(), + gas, + )?, + cumulative_diff: asc_new( + heap, + self.cumulative_diff + .as_ref() + .map(|b| b.as_ref()) + .unwrap_or_default(), + gas, + )?, + hash_list_merkle: asc_new(heap, self.hash_list_merkle.as_slice(), gas)?, + poa: self + .poa + .as_ref() + .map(|poa| asc_new(heap, poa, gas)) + .unwrap_or(Ok(AscPtr::null()))?, + }) + } +} + +impl ToAscObj for TransactionWithBlockPtr { + fn to_asc_obj( + &self, + heap: &mut H, + gas: &GasCounter, + ) -> Result { + Ok(AscTransactionWithBlockPtr { + tx: asc_new(heap, &self.tx.as_ref(), gas)?, + block: asc_new(heap, self.block.as_ref(), gas)?, + }) + } +} diff --git a/chain/arweave/src/runtime/generated.rs b/chain/arweave/src/runtime/generated.rs new file mode 100644 index 00000000000..e8a10fdb158 --- /dev/null +++ b/chain/arweave/src/runtime/generated.rs @@ -0,0 +1,128 @@ +use graph::runtime::{AscIndexId, AscPtr, AscType, DeterministicHostError, IndexForAscTypeId}; +use graph::semver::Version; +use graph_runtime_derive::AscType; +use graph_runtime_wasm::asc_abi::class::{Array, AscString, Uint8Array}; + +#[repr(C)] +#[derive(AscType, Default)] +pub struct AscBlock { + pub timestamp: u64, + pub last_retarget: u64, + pub height: u64, + pub indep_hash: AscPtr, + pub nonce: AscPtr, + pub previous_block: AscPtr, + pub diff: AscPtr, + pub hash: AscPtr, + pub tx_root: AscPtr, + pub txs: AscPtr, + pub wallet_list: AscPtr, + pub reward_addr: AscPtr, + pub tags: AscPtr, + pub reward_pool: AscPtr, + pub weave_size: AscPtr, + pub block_size: AscPtr, + pub cumulative_diff: AscPtr, + pub hash_list_merkle: AscPtr, + pub poa: AscPtr, +} + +impl AscIndexId for AscBlock { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::ArweaveBlock; +} + +#[repr(C)] +#[derive(AscType)] +pub struct AscProofOfAccess { + pub option: AscPtr, + pub tx_path: AscPtr, + pub data_path: AscPtr, + pub chunk: AscPtr, +} + +impl AscIndexId for AscProofOfAccess { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::ArweaveProofOfAccess; +} + +#[repr(C)] +#[derive(AscType)] +pub struct AscTransaction { + pub format: u32, + pub id: AscPtr, + pub last_tx: AscPtr, + pub owner: AscPtr, + pub tags: AscPtr, + pub target: AscPtr, + pub quantity: AscPtr, + pub data: AscPtr, + pub data_size: AscPtr, + pub data_root: AscPtr, + pub signature: AscPtr, + pub reward: AscPtr, +} + +impl AscIndexId for AscTransaction { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::ArweaveTransaction; +} + +#[repr(C)] +#[derive(AscType)] +pub struct AscTag { + pub name: AscPtr, + pub value: AscPtr, +} + +impl AscIndexId for AscTag { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::ArweaveTag; +} + +#[repr(C)] +pub struct AscTransactionArray(pub(crate) Array>); + +impl AscType for AscTransactionArray { + fn to_asc_bytes(&self) -> Result, DeterministicHostError> { + self.0.to_asc_bytes() + } + + fn from_asc_bytes( + asc_obj: &[u8], + api_version: &Version, + ) -> Result { + Ok(Self(Array::from_asc_bytes(asc_obj, api_version)?)) + } +} + +impl AscIndexId for AscTransactionArray { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::ArweaveTransactionArray; +} + +#[repr(C)] +pub struct AscTagArray(pub(crate) Array>); + +impl AscType for AscTagArray { + fn to_asc_bytes(&self) -> Result, DeterministicHostError> { + self.0.to_asc_bytes() + } + + fn from_asc_bytes( + asc_obj: &[u8], + api_version: &Version, + ) -> Result { + Ok(Self(Array::from_asc_bytes(asc_obj, api_version)?)) + } +} + +impl AscIndexId for AscTagArray { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::ArweaveTagArray; +} + +#[repr(C)] +#[derive(AscType)] +pub struct AscTransactionWithBlockPtr { + pub tx: AscPtr, + pub block: AscPtr, +} + +impl AscIndexId for AscTransactionWithBlockPtr { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::ArweaveTransactionWithBlockPtr; +} diff --git a/chain/arweave/src/runtime/mod.rs b/chain/arweave/src/runtime/mod.rs new file mode 100644 index 00000000000..f44391caffd --- /dev/null +++ b/chain/arweave/src/runtime/mod.rs @@ -0,0 +1,6 @@ +pub use runtime_adapter::RuntimeAdapter; + +pub mod abi; +pub mod runtime_adapter; + +mod generated; diff --git a/chain/arweave/src/runtime/runtime_adapter.rs b/chain/arweave/src/runtime/runtime_adapter.rs new file mode 100644 index 00000000000..c5fa9e15059 --- /dev/null +++ b/chain/arweave/src/runtime/runtime_adapter.rs @@ -0,0 +1,11 @@ +use crate::{data_source::DataSource, Chain}; +use blockchain::HostFn; +use graph::{anyhow::Error, blockchain}; + +pub struct RuntimeAdapter {} + +impl blockchain::RuntimeAdapter for RuntimeAdapter { + fn host_fns(&self, _ds: &DataSource) -> Result, Error> { + Ok(vec![]) + } +} diff --git a/chain/arweave/src/trigger.rs b/chain/arweave/src/trigger.rs new file mode 100644 index 00000000000..3963795b666 --- /dev/null +++ b/chain/arweave/src/trigger.rs @@ -0,0 +1,137 @@ +use graph::blockchain; +use graph::blockchain::Block; +use graph::blockchain::TriggerData; +use graph::cheap_clone::CheapClone; +use graph::prelude::web3::types::H256; +use graph::prelude::BlockNumber; +use graph::runtime::asc_new; +use graph::runtime::gas::GasCounter; +use graph::runtime::AscHeap; +use graph::runtime::AscPtr; +use graph::runtime::DeterministicHostError; +use std::{cmp::Ordering, sync::Arc}; + +use crate::codec; + +// Logging the block is too verbose, so this strips the block from the trigger for Debug. +impl std::fmt::Debug for ArweaveTrigger { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + #[derive(Debug)] + pub enum MappingTriggerWithoutBlock { + Block, + Transaction(Arc), + } + + let trigger_without_block = match self { + ArweaveTrigger::Block(_) => MappingTriggerWithoutBlock::Block, + ArweaveTrigger::Transaction(tx) => { + MappingTriggerWithoutBlock::Transaction(tx.tx.clone()) + } + }; + + write!(f, "{:?}", trigger_without_block) + } +} + +impl blockchain::MappingTrigger for ArweaveTrigger { + fn to_asc_ptr( + self, + heap: &mut H, + gas: &GasCounter, + ) -> Result, DeterministicHostError> { + Ok(match self { + ArweaveTrigger::Block(block) => asc_new(heap, block.as_ref(), gas)?.erase(), + ArweaveTrigger::Transaction(tx) => asc_new(heap, tx.as_ref(), gas)?.erase(), + }) + } +} + +#[derive(Clone)] +pub enum ArweaveTrigger { + Block(Arc), + Transaction(Arc), +} + +impl CheapClone for ArweaveTrigger { + fn cheap_clone(&self) -> ArweaveTrigger { + match self { + ArweaveTrigger::Block(block) => ArweaveTrigger::Block(block.cheap_clone()), + ArweaveTrigger::Transaction(tx) => ArweaveTrigger::Transaction(tx.cheap_clone()), + } + } +} + +impl PartialEq for ArweaveTrigger { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::Block(a_ptr), Self::Block(b_ptr)) => a_ptr == b_ptr, + (Self::Transaction(a_tx), Self::Transaction(b_tx)) => a_tx.tx.id == b_tx.tx.id, + _ => false, + } + } +} + +impl Eq for ArweaveTrigger {} + +impl ArweaveTrigger { + pub fn block_number(&self) -> BlockNumber { + match self { + ArweaveTrigger::Block(block) => block.number(), + ArweaveTrigger::Transaction(tx) => tx.block.number(), + } + } + + pub fn block_hash(&self) -> H256 { + match self { + ArweaveTrigger::Block(block) => block.ptr().hash_as_h256(), + ArweaveTrigger::Transaction(tx) => tx.block.ptr().hash_as_h256(), + } + } +} + +impl Ord for ArweaveTrigger { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + // Keep the order when comparing two block triggers + (Self::Block(..), Self::Block(..)) => Ordering::Equal, + + // Block triggers always come last + (Self::Block(..), _) => Ordering::Greater, + (_, Self::Block(..)) => Ordering::Less, + + // Execution outcomes have no intrinsic ordering information so we keep the order in + // which they are included in the `txs` field of `Block`. + (Self::Transaction(..), Self::Transaction(..)) => Ordering::Equal, + } + } +} + +impl PartialOrd for ArweaveTrigger { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl TriggerData for ArweaveTrigger { + fn error_context(&self) -> std::string::String { + match self { + ArweaveTrigger::Block(..) => { + format!("Block #{} ({})", self.block_number(), self.block_hash()) + } + ArweaveTrigger::Transaction(tx) => { + format!( + "Tx #{}, block #{}({})", + base64_url::encode(&tx.tx.id), + self.block_number(), + self.block_hash() + ) + } + } + } +} + +pub struct TransactionWithBlockPtr { + // REVIEW: Do we want to actually also have those two below behind an `Arc` wrapper? + pub tx: Arc, + pub block: Arc, +} diff --git a/core/Cargo.toml b/core/Cargo.toml index 5a72818cf21..652e4883570 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -13,6 +13,7 @@ futures = { version="0.3.4", features=["compat"] } graph = { path = "../graph" } # This dependency is temporary. The multiblockchain refactoring is not # finished as long as this dependency exists +graph-chain-arweave = { path = "../chain/arweave" } graph-chain-ethereum = { path = "../chain/ethereum" } graph-chain-near = { path = "../chain/near" } graph-chain-tendermint = { path = "../chain/tendermint" } diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 52076494940..57b85ea47dc 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -39,6 +39,13 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< let subgraph_start_future = async move { match BlockchainKind::from_manifest(&manifest)? { + BlockchainKind::Arweave => { + instance_manager + .start_subgraph_inner::( + logger, loc, manifest, stop_block, + ) + .await + } BlockchainKind::Ethereum => { instance_manager .start_subgraph_inner::( diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index c17b6d000c7..09fc9a6331b 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -296,6 +296,23 @@ where })?; match kind { + BlockchainKind::Arweave => { + create_subgraph_version::( + &logger, + self.store.clone(), + self.chains.cheap_clone(), + name.clone(), + hash.cheap_clone(), + start_block, + raw, + node_id, + debug_fork, + self.version_switching_mode, + &self.resolver, + ) + .await? + } + BlockchainKind::Ethereum => { create_subgraph_version::( &logger, diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 547ddb722c6..305851ae82c 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -312,6 +312,9 @@ pub trait NodeCapabilities { #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] pub enum BlockchainKind { + /// Arweave chains that are compatible. + Arweave, + /// Ethereum itself or chains that are compatible. Ethereum, @@ -325,6 +328,7 @@ pub enum BlockchainKind { impl fmt::Display for BlockchainKind { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let value = match self { + BlockchainKind::Arweave => "arweave", BlockchainKind::Ethereum => "ethereum", BlockchainKind::Near => "near", BlockchainKind::Tendermint => "tendermint", @@ -338,6 +342,7 @@ impl FromStr for BlockchainKind { fn from_str(s: &str) -> Result { match s { + "arweave" => Ok(BlockchainKind::Arweave), "ethereum" => Ok(BlockchainKind::Ethereum), "near" => Ok(BlockchainKind::Near), "tendermint" => Ok(BlockchainKind::Tendermint), diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index 25869ed409e..2a30f000035 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -120,8 +120,11 @@ impl BlockPtr { self.number } + // FIXME: + // + // workaround for arweave pub fn hash_as_h256(&self) -> H256 { - H256::from_slice(self.hash_slice()) + H256::from_slice(&self.hash_slice()[..32]) } pub fn hash_slice(&self) -> &[u8] { diff --git a/graph/src/runtime/mod.rs b/graph/src/runtime/mod.rs index 644488072da..cf45ef5a96b 100644 --- a/graph/src/runtime/mod.rs +++ b/graph/src/runtime/mod.rs @@ -320,7 +320,22 @@ pub enum IndexForAscTypeId { // ... // LastTendermintType = 2499, - // Reserved discriminant space for a future blockchain type IDs: [2,500, 3,499] + // Arweave types + ArweaveBlock = 2500, + ArweaveProofOfAccess = 2501, + ArweaveTag = 2502, + ArweaveTagArray = 2503, + ArweaveTransaction = 2504, + ArweaveTransactionArray = 2505, + ArweaveTransactionWithBlockPtr = 2506, + // Continue to add more Arweave type IDs here. + // e.g.: + // NextArweaveType = 2507, + // AnotherArweaveType = 2508, + // ... + // LastArweaveType = 3499, + + // Reserved discriminant space for a future blockchain type IDs: [3,500, 4,499] // // Generated with the following shell script: // @@ -331,7 +346,7 @@ pub enum IndexForAscTypeId { // INSTRUCTIONS: // 1. Replace the IDENTIFIER_PREFIX and the SRC_FILE placeholders according to the blockchain // name and implementation before running this script. - // 2. Replace `2500` part with the first number of that blockchain's reserved discriminant space. + // 2. Replace `3500` part with the first number of that blockchain's reserved discriminant space. // 3. Insert the output right before the end of this block. } diff --git a/node/Cargo.toml b/node/Cargo.toml index 849b1cb0763..5da20cb16aa 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -23,6 +23,7 @@ url = "2.2.1" crossbeam-channel = "0.5.4" graph = { path = "../graph" } graph-core = { path = "../core" } +graph-chain-arweave = { path = "../chain/arweave" } graph-chain-ethereum = { path = "../chain/ethereum" } graph-chain-near = { path = "../chain/near" } graph-chain-tendermint = { path = "../chain/tendermint" } diff --git a/node/src/main.rs b/node/src/main.rs index df0b909bd6a..f4c474ca929 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -13,6 +13,7 @@ use graph::log::logger; use graph::prelude::{IndexNodeServer as _, JsonRpcServer as _, *}; use graph::prometheus::Registry; use graph::url::Url; +use graph_chain_arweave::{self as arweave, Block as ArweaveBlock}; use graph_chain_ethereum as ethereum; use graph_chain_near::{self as near, HeaderOnlyBlock as NearFirehoseHeaderOnlyBlock}; use graph_chain_tendermint::{self as tendermint, EventList as TendermintFirehoseEventList}; @@ -243,6 +244,14 @@ async fn main() { // `blockchain_map`. let mut blockchain_map = BlockchainMap::new(); + let (arweave_networks, arweave_idents) = connect_firehose_networks::( + &logger, + firehose_networks_by_kind + .remove(&BlockchainKind::Arweave) + .unwrap_or_else(|| FirehoseNetworks::new()), + ) + .await; + let (eth_networks, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks).await; @@ -266,12 +275,22 @@ async fn main() { let network_identifiers = ethereum_idents .into_iter() + .chain(arweave_idents) .chain(near_idents) .chain(tendermint_idents) .collect(); let network_store = store_builder.network_store(network_identifiers); + let arweave_chains = arweave_networks_as_chains( + &mut blockchain_map, + &logger, + &arweave_networks, + network_store.as_ref(), + &logger_factory, + metrics_registry.clone(), + ); + let ethereum_chains = ethereum_networks_as_chains( &mut blockchain_map, &logger, @@ -345,11 +364,18 @@ async fn main() { ); } + start_firehose_block_ingestor::<_, ArweaveBlock>( + &logger, + &network_store, + arweave_chains, + ); + start_firehose_block_ingestor::<_, NearFirehoseHeaderOnlyBlock>( &logger, &network_store, near_chains, ); + start_firehose_block_ingestor::<_, TendermintFirehoseEventList>( &logger, &network_store, @@ -533,6 +559,55 @@ async fn main() { futures::future::pending::<()>().await; } +/// Return the hashmap of Arweave chains and also add them to `blockchain_map`. +fn arweave_networks_as_chains( + blockchain_map: &mut BlockchainMap, + logger: &Logger, + firehose_networks: &FirehoseNetworks, + store: &Store, + logger_factory: &LoggerFactory, + metrics_registry: Arc, +) -> HashMap> { + let chains: Vec<_> = firehose_networks + .networks + .iter() + .filter_map(|(chain_id, endpoints)| { + store + .block_store() + .chain_store(chain_id) + .map(|chain_store| (chain_id, chain_store, endpoints)) + .or_else(|| { + error!( + logger, + "No store configured for Arweave chain {}; ignoring this chain", chain_id + ); + None + }) + }) + .map(|(chain_id, chain_store, endpoints)| { + ( + chain_id.clone(), + FirehoseChain { + chain: Arc::new(arweave::Chain::new( + logger_factory.clone(), + chain_id.clone(), + chain_store, + endpoints.clone(), + metrics_registry.clone(), + )), + firehose_endpoints: endpoints.clone(), + }, + ) + }) + .collect(); + + for (chain_id, firehose_chain) in chains.iter() { + blockchain_map.insert::(chain_id.clone(), firehose_chain.chain.clone()) + } + + HashMap::from_iter(chains) +} + /// Return the hashmap of ethereum chains and also add them to `blockchain_map`. fn ethereum_networks_as_chains( blockchain_map: &mut BlockchainMap, diff --git a/server/index-node/Cargo.toml b/server/index-node/Cargo.toml index 70fe604bada..1df356130be 100644 --- a/server/index-node/Cargo.toml +++ b/server/index-node/Cargo.toml @@ -9,6 +9,7 @@ either = "1.6.1" futures = "0.3.4" graph = { path = "../../graph" } graph-graphql = { path = "../../graphql" } +graph-chain-arweave = { path = "../../chain/arweave" } graph-chain-ethereum = { path = "../../chain/ethereum" } graph-chain-near = { path = "../../chain/near" } graph-chain-tendermint = { path = "../../chain/tendermint" } diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index 35ba6a1cb7f..bf05b8d66c1 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -510,6 +510,24 @@ impl IndexNodeResolver { ) .await? } + + BlockchainKind::Arweave => { + let unvalidated_subgraph_manifest = + UnvalidatedSubgraphManifest::::resolve( + deployment_hash, + raw, + &self.link_resolver, + &self.logger, + ENV_VARS.max_spec_version.clone(), + ) + .await?; + + validate_and_extract_features( + &self.store.subgraph_store(), + unvalidated_subgraph_manifest, + ) + .await? + } } }; diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index de0a8502743..0329d7e334d 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -1496,9 +1496,16 @@ impl ChainStoreTrait for ChainStore { .map(|rows| { rows.first() .map(|(hash_opt, number_opt)| match (hash_opt, number_opt) { - (Some(hash), Some(number)) => { - Some((hash.parse().unwrap(), *number).into()) - } + (Some(hash), Some(number)) => Some( + ( + // FIXME: + // + // workaround for arweave + H256::from_slice(&hex::decode(hash).unwrap()[..32]), + *number, + ) + .into(), + ), (None, None) => None, _ => unreachable!(), }) diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index e928a9ecc0c..fe08357716f 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -158,7 +158,10 @@ fn graft( match graft { (None, None, None) => Ok(None), (Some(subgraph), Some(hash), Some(block)) => { - let hash = H256::from_slice(hash.as_slice()); + // FIXME: + // + // workaround for arweave + let hash = H256::from_slice(&hash.as_slice()[..32]); let block = block.to_u64().expect("block numbers fit into a u64"); let subgraph = DeploymentHash::new(subgraph.clone()).map_err(|_| { StoreError::Unknown(anyhow!( diff --git a/store/postgres/src/detail.rs b/store/postgres/src/detail.rs index aa6e4811da2..c360114c5ce 100644 --- a/store/postgres/src/detail.rs +++ b/store/postgres/src/detail.rs @@ -116,7 +116,10 @@ impl TryFrom for SubgraphError { block_range, } = value; let block_number = crate::block_range::first_block_in_range(&block_range); - let block_hash = block_hash.map(|hash| H256::from_slice(hash.as_slice())); + // FIXME: + // + // workaround for arweave + let block_hash = block_hash.map(|hash| H256::from_slice(&hash.as_slice()[..32])); // In existing databases, we have errors that have a `block_range` of // `UNVERSIONED_RANGE`, which leads to `None` as the block number, but // has a hash. Conversely, it is also possible for an error to not have a @@ -146,7 +149,7 @@ pub(crate) fn block( ) -> Result, StoreError> { match (&hash, &number) { (Some(hash), Some(number)) => { - let hash = H256::from_slice(hash.as_slice()); + let hash = H256::from_slice(&hash.as_slice()[0..32]); let number = number.to_u64().ok_or_else(|| { constraint_violation!( "the block number {} for {} in {} is not representable as a u64",