diff --git a/chain/starknet/src/adapter.rs b/chain/starknet/src/adapter.rs index e04df8e979c..f21456046e4 100644 --- a/chain/starknet/src/adapter.rs +++ b/chain/starknet/src/adapter.rs @@ -1,27 +1,503 @@ -use graph::blockchain::{EmptyNodeCapabilities, TriggerFilter as TriggerFilterTrait}; +use std::collections::{hash_map::Entry, HashMap}; + +use graph::{ + anyhow::Error, + blockchain::{EmptyNodeCapabilities, TriggerFilter as TriggerFilterTrait}, + components::store::BlockNumber, + firehose::{ + BlockHeaderOnly as FirehoseFilterBlockHeaderOnly, BlockRange as FirehoseFilterBlockRange, + ContractEventFilter as FirehoseFilterContractEventFilter, + TopicWithRanges as FirehoseFilterTopicWithRanges, + TransactionEventFilter as FirehoseFilterTransactionEventFilter, + }, +}; +use prost::Message; +use prost_types::Any; + +const BLOCK_HEADER_ONLY_TYPE_URL: &str = + "type.googleapis.com/zklend.starknet.transform.v1.BlockHeaderOnly"; +const TRANSACTION_EVENT_FILTER_TYPE_URL: &str = + "type.googleapis.com/zklend.starknet.transform.v1.TransactionEventFilter"; use crate::{ + codec::Event, data_source::{DataSource, DataSourceTemplate}, + felt::Felt, Chain, }; +/// Starknet contract address, represented by the primitive [Felt] type. +type Address = Felt; + +/// Starknet event signature, encoded as the first element of any event's `keys` array. +type EventSignature = Felt; + +/// A hashmap for quick lookup on whether an event matches with the filter. If the event's signature +/// exists, further comparison needs to be made against the block ranges. +type EventSignatureWithRanges = HashMap>; + +/// Contains event and block filters. The two types of filters function independently: event +/// filters are only applied to event triggers; and block filters are only applied to block +/// triggers. #[derive(Default, Clone)] -pub struct TriggerFilter; +pub struct TriggerFilter { + event: EventFilter, + block: BlockFilter, +} -impl TriggerFilterTrait for TriggerFilter { - #[allow(unused)] - fn extend_with_template(&mut self, data_source: impl Iterator) { - todo!() +/// An event trigger is matched if and only if *ALL* conditions are met for the event: +/// +/// - it's emitted from one of the `contract_addresses` keys +/// - its event signature matches with one of the keys in [EventSignatureWithRanges] +/// - it's emitted in a block within one of the block ranges for that matched signature +#[derive(Default, Clone)] +struct EventFilter { + contract_addresses: HashMap, +} + +/// A block trigger is matched if and only if its height falls into any one of the defined +/// `block_ranges`. +/// +/// Note that this filter is only used to match block triggers interally inside `graph-node`, and +/// is never sent to upstream Firehose providers, as we always need the block headers for marking +/// chain head. +#[derive(Default, Clone)] +struct BlockFilter { + block_ranges: Vec, +} + +/// A range of blocks defined by starting and (optional) ending height. +/// +/// `start_block` is inclusive. `end_block` (if defined) is exclusive. +#[derive(Clone, PartialEq, Eq)] +struct BlockRange { + start_block: BlockNumber, + end_block: Option, +} + +impl TriggerFilter { + pub fn is_block_matched(&self, block_height: BlockNumber) -> bool { + self.block.is_matched(block_height) } - #[allow(unused)] - fn extend<'a>(&mut self, data_sources: impl Iterator + Clone) {} + pub fn is_event_matched( + &self, + event: &Event, + block_height: BlockNumber, + ) -> Result { + self.event.is_matched(event, block_height) + } +} + +impl TriggerFilterTrait for TriggerFilter { + fn extend_with_template(&mut self, _data_source: impl Iterator) {} + + fn extend<'a>(&mut self, data_sources: impl Iterator + Clone) { + self.event + .extend(EventFilter::from_data_sources(data_sources.clone())); + self.block + .extend(BlockFilter::from_data_sources(data_sources)); + } fn node_capabilities(&self) -> EmptyNodeCapabilities { - todo!() + Default::default() } fn to_firehose_filter(self) -> Vec { - todo!() + // An empty event filter list means that the subgraph is not interested in events at all. + // So we can stream just header-only blocks. + if self.event.is_empty() { + return vec![Any { + type_url: BLOCK_HEADER_ONLY_TYPE_URL.into(), + value: FirehoseFilterBlockHeaderOnly {}.encode_to_vec(), + }]; + } + + let event_filters = self + .event + .contract_addresses + .iter() + .map( + |(contract_address, sig_with_ranges)| FirehoseFilterContractEventFilter { + contract_address: contract_address.into(), + topics: sig_with_ranges + .iter() + .map(|(sig, ranges)| FirehoseFilterTopicWithRanges { + topic: sig.into(), + block_ranges: ranges + .iter() + .map(|range| FirehoseFilterBlockRange { + start_block: range.start_block as u64, + end_block: range.end_block.unwrap_or_default() as u64, + }) + .collect(), + }) + .collect(), + }, + ) + .collect(); + + vec![Any { + type_url: TRANSACTION_EVENT_FILTER_TYPE_URL.into(), + value: FirehoseFilterTransactionEventFilter { event_filters }.encode_to_vec(), + }] + } +} + +impl EventFilter { + pub fn from_data_sources<'a>(iter: impl IntoIterator) -> Self { + iter.into_iter() + // Using `filter_map` instead of `filter` to avoid having to unwrap source address in + // `fold` below. + .filter_map(|data_source| { + if data_source.mapping.event_handlers.is_empty() { + None + } else { + data_source + .source + .address + .as_ref() + .map(|source_address| (data_source, source_address.to_owned())) + } + }) + .fold( + Self::default(), + |mut filter_opt, (data_source, source_address)| { + filter_opt.extend(Self { + contract_addresses: [( + source_address, + data_source + .mapping + .event_handlers + .iter() + .map(|event_handler| { + ( + event_handler.event_selector.clone(), + vec![BlockRange { + start_block: data_source.source.start_block, + end_block: data_source.source.end_block, + }], + ) + }) + .collect(), + )] + .into_iter() + .collect(), + }); + filter_opt + }, + ) + } + + pub fn extend(&mut self, other: EventFilter) { + if other.is_empty() { + return; + } + + let EventFilter { contract_addresses } = other; + + for (address, sig_with_ranges) in contract_addresses.into_iter() { + match self.contract_addresses.entry(address) { + Entry::Occupied(entry) => { + let entry = entry.into_mut(); + for (sig, mut block_ranges) in sig_with_ranges.into_iter() { + match entry.entry(sig) { + Entry::Occupied(sig_entry) => { + // TODO: merge overlapping block ranges + sig_entry.into_mut().append(&mut block_ranges); + } + Entry::Vacant(sig_entry) => { + sig_entry.insert(block_ranges); + } + } + } + } + Entry::Vacant(entry) => { + entry.insert(sig_with_ranges); + } + } + } + } + + pub fn is_empty(&self) -> bool { + self.contract_addresses.is_empty() + } + + pub fn is_matched(&self, event: &Event, block_height: BlockNumber) -> Result { + let from_addr: Felt = event.from_addr.as_slice().try_into()?; + + Ok(match self.contract_addresses.get(&from_addr) { + Some(entry) => { + let event_sig = match event.keys.first() { + Some(sig) => sig, + // Non-standard events with an empty `keys` array never match. + None => return Ok(false), + }; + let event_sig: Felt = event_sig.as_slice().try_into()?; + + match entry.get(&event_sig) { + Some(block_ranges) => block_ranges.iter().any(|range| { + if block_height >= range.start_block { + match range.end_block { + // `end_block` is exclusive + Some(end_block) => block_height < end_block, + None => true, + } + } else { + false + } + }), + None => false, + } + } + None => false, + }) + } +} + +impl BlockFilter { + pub fn from_data_sources<'a>(iter: impl IntoIterator) -> Self { + iter.into_iter() + .filter(|data_source| data_source.mapping.block_handler.is_some()) + .fold(Self::default(), |mut filter_opt, data_source| { + filter_opt.extend(Self { + block_ranges: vec![BlockRange { + start_block: data_source.source.start_block, + end_block: data_source.source.end_block, + }], + }); + filter_opt + }) + } + + pub fn extend(&mut self, other: BlockFilter) { + if other.is_empty() { + return; + } + + let BlockFilter { block_ranges } = other; + + // TODO: merge overlapping block ranges + for new_range in block_ranges.into_iter() { + if !self.block_ranges.contains(&new_range) { + self.block_ranges.push(new_range); + } + } + } + + pub fn is_empty(&self) -> bool { + self.block_ranges.is_empty() + } + + pub fn is_matched(&self, block_height: BlockNumber) -> bool { + self.block_ranges.iter().any(|range| { + if block_height >= range.start_block { + match range.end_block { + Some(end_block) => block_height < end_block, + None => true, + } + } else { + false + } + }) + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use super::*; + + #[test] + fn starknet_trigger_empty_filter_to_firehose() { + let filter = TriggerFilter::default(); + + // Produces a "header-only" Firehose filter to not stream any transaction. + assert_eq!( + filter.to_firehose_filter(), + vec![Any { + type_url: BLOCK_HEADER_ONLY_TYPE_URL.into(), + value: FirehoseFilterBlockHeaderOnly {}.encode_to_vec(), + }] + ); + } + + #[test] + fn starknet_trigger_filter_no_events_to_firehose() { + let filter = create_block_only_filter(); + + // [BlockFilter] is discarded as we would always stream block headers. + assert_eq!( + filter.to_firehose_filter(), + vec![Any { + type_url: BLOCK_HEADER_ONLY_TYPE_URL.into(), + value: FirehoseFilterBlockHeaderOnly {}.encode_to_vec(), + }] + ); + } + + #[test] + fn starknet_trigger_event_filter_to_firehose() { + let filter = create_event_only_filter(); + + // [BlockFilter] is discarded as we would always stream block headers. + assert_eq!( + filter.to_firehose_filter(), + vec![Any { + type_url: TRANSACTION_EVENT_FILTER_TYPE_URL.into(), + value: FirehoseFilterTransactionEventFilter { + event_filters: vec![FirehoseFilterContractEventFilter { + contract_address: Address::from_str("0x1234").unwrap().as_ref().into(), + topics: vec![FirehoseFilterTopicWithRanges { + topic: EventSignature::from_str("0x8888").unwrap().as_ref().into(), + block_ranges: vec![FirehoseFilterBlockRange { + start_block: 100, + end_block: 200 + }] + }] + }] + } + .encode_to_vec(), + }] + ); + } + + #[test] + fn starknet_trigger_block_filter_matched() { + let filter = create_block_only_filter(); + + assert!(filter.is_block_matched(100)); + assert!(filter.is_block_matched(199)); + } + + #[test] + fn starknet_trigger_block_filter_not_matched() { + let filter = create_block_only_filter(); + + assert_eq!(filter.is_block_matched(99), false); + + // `end_block` is exclusive + assert_eq!(filter.is_block_matched(200), false); + } + + #[test] + fn starknet_trigger_block_filter_open_ended() { + let filter = TriggerFilter { + event: EventFilter::default(), + block: BlockFilter { + block_ranges: vec![BlockRange { + start_block: 100, + end_block: None, + }], + }, + }; + + assert_eq!(filter.is_block_matched(99), false); + assert_eq!(filter.is_block_matched(100), true); + assert_eq!(filter.is_block_matched(1000), true); + } + + #[test] + fn starknet_trigger_event_filter_matched() { + let filter = create_event_only_filter(); + + assert_eq!( + filter + .is_event_matched( + &Event { + from_addr: Address::from_str("0x1234").unwrap().as_ref().into(), + keys: vec![EventSignature::from_str("0x8888").unwrap().as_ref().into()], + data: vec![] + }, + 100 + ) + .unwrap(), + true + ); + } + + #[test] + fn starknet_trigger_event_filter_not_matched() { + let filter = create_event_only_filter(); + + // Address mismatch + assert_eq!( + filter + .is_event_matched( + &Event { + from_addr: Address::from_str("0x4321").unwrap().as_ref().into(), + keys: vec![EventSignature::from_str("0x8888").unwrap().as_ref().into()], + data: vec![] + }, + 100 + ) + .unwrap(), + false + ); + + // Missing event keys (non-standard events) + assert_eq!( + filter + .is_event_matched( + &Event { + from_addr: Address::from_str("0x1234").unwrap().as_ref().into(), + keys: vec![], + data: vec![] + }, + 100 + ) + .unwrap(), + false + ); + + // Block out of range + assert_eq!( + filter + .is_event_matched( + &Event { + from_addr: Address::from_str("0x4321").unwrap().as_ref().into(), + keys: vec![EventSignature::from_str("0x8888").unwrap().as_ref().into()], + data: vec![] + }, + 200 + ) + .unwrap(), + false + ); + } + + fn create_block_only_filter() -> TriggerFilter { + TriggerFilter { + event: EventFilter::default(), + block: BlockFilter { + block_ranges: vec![BlockRange { + start_block: 100, + end_block: Some(200), + }], + }, + } + } + + fn create_event_only_filter() -> TriggerFilter { + TriggerFilter { + event: EventFilter { + contract_addresses: [( + Address::from_str("0x1234").unwrap(), + [( + EventSignature::from_str("0x8888").unwrap(), + vec![BlockRange { + start_block: 100, + end_block: Some(200), + }], + )] + .into_iter() + .collect(), + )] + .into_iter() + .collect(), + }, + block: BlockFilter::default(), + } } } diff --git a/chain/starknet/src/chain.rs b/chain/starknet/src/chain.rs index 060a502d80d..4e4088c61c3 100644 --- a/chain/starknet/src/chain.rs +++ b/chain/starknet/src/chain.rs @@ -374,37 +374,37 @@ impl TriggersAdapterTrait for TriggersAdapter { panic!("Should never be called since not used by FirehoseBlockStream") } - #[allow(unused)] async fn triggers_in_block( &self, logger: &Logger, block: codec::Block, filter: &crate::adapter::TriggerFilter, ) -> Result, Error> { + let block_height = block.height as BlockNumber; let shared_block = Arc::new(block.clone()); - let mut triggers: Vec<_> = shared_block - .transactions - .iter() - .flat_map(|transaction| -> Vec { - let transaction = Arc::new(transaction.clone()); - transaction - .events - .iter() - .map(|event| { - StarknetTrigger::Event(StarknetEventTrigger { - event: Arc::new(event.clone()), - block: shared_block.clone(), - transaction: transaction.clone(), - }) - }) - .collect() - }) - .collect(); - - triggers.push(StarknetTrigger::Block(StarknetBlockTrigger { - block: shared_block, - })); + let mut triggers = vec![]; + + // Using for loops instead of iterators to make error handling easier (using ?). + for transaction in shared_block.transactions.iter() { + let transaction = Arc::new(transaction.clone()); + + for event in transaction.events.iter() { + if filter.is_event_matched(event, block_height)? { + triggers.push(StarknetTrigger::Event(StarknetEventTrigger { + event: Arc::new(event.clone()), + block: shared_block.clone(), + transaction: transaction.clone(), + })); + } + } + } + + if filter.is_block_matched(block_height) { + triggers.push(StarknetTrigger::Block(StarknetBlockTrigger { + block: shared_block, + })); + } Ok(BlockWithTriggers::new(block, triggers, logger)) } diff --git a/chain/starknet/src/felt.rs b/chain/starknet/src/felt.rs index 7c0e6b6496d..5b3aeccafb0 100644 --- a/chain/starknet/src/felt.rs +++ b/chain/starknet/src/felt.rs @@ -3,12 +3,14 @@ use std::{ str::FromStr, }; -use graph::anyhow; +use graph::anyhow::{self, anyhow}; use serde::{de::Visitor, Deserialize}; -/// Represents the primitive `FieldElement` type used in Starknet. Each `FieldElement` is 252-bit -/// in size. -#[derive(Clone, PartialEq, Eq)] +/// Represents the universal primitive `FieldElement` type used in Starknet. Each `FieldElement` is +/// 252-bit in size. All data structures on Starknet inherently consist of `FieldElement`, despite +/// higher-level abstraction. Contract addresses, transaction calldata, event keys (similar to +/// topics in Ethereum) and data etc. are all `FieldElement`. +#[derive(Hash, Clone, PartialEq, Eq)] pub struct Felt([u8; 32]); struct FeltVisitor; @@ -25,6 +27,21 @@ impl From<[u8; 32]> for Felt { } } +impl TryFrom<&[u8]> for Felt { + type Error = anyhow::Error; + + fn try_from(value: &[u8]) -> Result { + if value.len() > 32 { + Err(anyhow!("slice too long")) + } else { + let mut buffer = [0u8; 32]; + buffer[(32 - value.len())..].copy_from_slice(value); + + Ok(buffer.into()) + } + } +} + impl AsRef<[u8]> for Felt { fn as_ref(&self) -> &[u8] { &self.0 @@ -48,6 +65,12 @@ impl FromStr for Felt { } } +impl From<&Felt> for Vec { + fn from(value: &Felt) -> Self { + value.0.to_vec() + } +} + impl<'de> Deserialize<'de> for Felt { fn deserialize(deserializer: D) -> Result where diff --git a/graph/build.rs b/graph/build.rs index 3cc00c0dc07..c0cf7d1f3a0 100644 --- a/graph/build.rs +++ b/graph/build.rs @@ -8,6 +8,7 @@ fn main() { "proto/ethereum/transforms.proto", "proto/near/transforms.proto", "proto/cosmos/transforms.proto", + "proto/starknet/transforms.proto", ], &["proto"], ) diff --git a/graph/proto/starknet/transforms.proto b/graph/proto/starknet/transforms.proto new file mode 100644 index 00000000000..092802ba17f --- /dev/null +++ b/graph/proto/starknet/transforms.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; + +package zklend.starknet.transform.v1; + +option go_package = "github.com/starknet-graph/firehose-starknet/types/pb/zklend/starknet/type/v1;pbtransform"; + +// Stream block headers only. The `transactions` field is always empty. +message BlockHeaderOnly {} + +// Stream every single block, but each block will only contain transactions that match with `event_filters`. +// A TransactionEventFilter message with an empty `event_filters` is invalid. Do not send any filter instead +// if you wish to receive full blocks. +message TransactionEventFilter { + repeated ContractEventFilter event_filters = 1; +} + +// Only include transactions which emit at least one event that *BOTH* +// * is emitted by `contract_address` +// * matches with at least one topic in `topics` +message ContractEventFilter { + bytes contract_address = 1; + repeated TopicWithRanges topics = 2; +} + +// Matches events whose `keys[0]` equals `topic`, *AND* in any of the `block_ranges`. +message TopicWithRanges { + bytes topic = 1; + repeated BlockRange block_ranges = 2; +} + +// A range of blocks. `start_block` is inclusive, and `end_block` is exclusive. When `end_block` is `0`, it means +// that any block height >= `start_block` is matched. +message BlockRange { + uint64 start_block = 1; + uint64 end_block = 2; +} diff --git a/graph/src/firehose/codec.rs b/graph/src/firehose/codec.rs index 5537dba153b..62a46fddc9f 100644 --- a/graph/src/firehose/codec.rs +++ b/graph/src/firehose/codec.rs @@ -14,7 +14,12 @@ mod pbnear; #[path = "sf.cosmos.transform.v1.rs"] mod pbcosmos; +#[rustfmt::skip] +#[path = "zklend.starknet.transform.v1.rs"] +mod pbstarknet; + pub use pbcosmos::*; pub use pbethereum::*; pub use pbfirehose::*; pub use pbnear::*; +pub use pbstarknet::*; diff --git a/graph/src/firehose/zklend.starknet.transform.v1.rs b/graph/src/firehose/zklend.starknet.transform.v1.rs new file mode 100644 index 00000000000..9beae526b7f --- /dev/null +++ b/graph/src/firehose/zklend.starknet.transform.v1.rs @@ -0,0 +1,43 @@ +/// Stream block headers only. The `transactions` field is always empty. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlockHeaderOnly {} +/// Stream every single block, but each block will only contain transactions that match with `event_filters`. +/// A TransactionEventFilter message with an empty `event_filters` is invalid. Do not send any filter instead +/// if you wish to receive full blocks. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TransactionEventFilter { + #[prost(message, repeated, tag = "1")] + pub event_filters: ::prost::alloc::vec::Vec, +} +/// Only include transactions which emit at least one event that *BOTH* +/// * is emitted by `contract_address` +/// * matches with at least one topic in `topics` +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ContractEventFilter { + #[prost(bytes = "vec", tag = "1")] + pub contract_address: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "2")] + pub topics: ::prost::alloc::vec::Vec, +} +/// Matches events whose `keys\[0\]` equals `topic`, *AND* in any of the `block_ranges`. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TopicWithRanges { + #[prost(bytes = "vec", tag = "1")] + pub topic: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "2")] + pub block_ranges: ::prost::alloc::vec::Vec, +} +/// A range of blocks. `start_block` is inclusive, and `end_block` is exclusive. When `end_block` is `0`, it means +/// that any block height >= `start_block` is matched. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlockRange { + #[prost(uint64, tag = "1")] + pub start_block: u64, + #[prost(uint64, tag = "2")] + pub end_block: u64, +}