From e64ffcad705b49fa3f7d41741907219c8f8c77a8 Mon Sep 17 00:00:00 2001 From: Jonathan LEI Date: Sun, 3 Dec 2023 08:22:50 +0000 Subject: [PATCH 1/5] feat(starknet): block trigger filter --- chain/starknet/src/adapter.rs | 68 ++++++++++++++++++++++++++++++----- chain/starknet/src/chain.rs | 18 ++++++++-- 2 files changed, 74 insertions(+), 12 deletions(-) diff --git a/chain/starknet/src/adapter.rs b/chain/starknet/src/adapter.rs index e04df8e979c..883eb3ccfac 100644 --- a/chain/starknet/src/adapter.rs +++ b/chain/starknet/src/adapter.rs @@ -1,4 +1,7 @@ -use graph::blockchain::{EmptyNodeCapabilities, TriggerFilter as TriggerFilterTrait}; +use graph::{ + blockchain::{EmptyNodeCapabilities, TriggerFilter as TriggerFilterTrait}, + components::store::BlockNumber, +}; use crate::{ data_source::{DataSource, DataSourceTemplate}, @@ -6,22 +9,69 @@ use crate::{ }; #[derive(Default, Clone)] -pub struct TriggerFilter; +pub struct TriggerFilter { + pub(crate) block: StarknetBlockFilter, +} + +#[derive(Default, Clone)] +pub struct StarknetBlockFilter { + pub block_ranges: Vec, +} + +#[derive(Clone, PartialEq, Eq)] +pub struct BlockRange { + pub start_block: BlockNumber, + pub end_block: Option, +} impl TriggerFilterTrait for TriggerFilter { - #[allow(unused)] - fn extend_with_template(&mut self, data_source: impl Iterator) { - todo!() - } + fn extend_with_template(&mut self, _data_source: impl Iterator) {} - #[allow(unused)] - fn extend<'a>(&mut self, data_sources: impl Iterator + Clone) {} + fn extend<'a>(&mut self, data_sources: impl Iterator + Clone) { + self.block + .extend(StarknetBlockFilter::from_data_sources(data_sources)); + } fn node_capabilities(&self) -> EmptyNodeCapabilities { - todo!() + Default::default() } fn to_firehose_filter(self) -> Vec { todo!() } } + +impl StarknetBlockFilter { + pub fn from_data_sources<'a>(iter: impl IntoIterator) -> Self { + iter.into_iter() + .filter(|data_source| data_source.mapping.block_handler.is_some()) + .fold(Self::default(), |mut filter_opt, data_source| { + filter_opt.extend(Self { + block_ranges: vec![BlockRange { + start_block: data_source.source.start_block, + end_block: data_source.source.end_block, + }], + }); + filter_opt + }) + } + + pub fn extend(&mut self, other: StarknetBlockFilter) { + if other.is_empty() { + return; + } + + let StarknetBlockFilter { block_ranges } = other; + + // TODO: merge overlapping block ranges + for new_range in block_ranges.into_iter() { + if !self.block_ranges.contains(&new_range) { + self.block_ranges.push(new_range); + } + } + } + + pub fn is_empty(&self) -> bool { + self.block_ranges.is_empty() + } +} diff --git a/chain/starknet/src/chain.rs b/chain/starknet/src/chain.rs index 060a502d80d..29483abd122 100644 --- a/chain/starknet/src/chain.rs +++ b/chain/starknet/src/chain.rs @@ -381,6 +381,7 @@ impl TriggersAdapterTrait for TriggersAdapter { block: codec::Block, filter: &crate::adapter::TriggerFilter, ) -> Result, Error> { + let block_height = block.height as BlockNumber; let shared_block = Arc::new(block.clone()); let mut triggers: Vec<_> = shared_block @@ -402,9 +403,20 @@ impl TriggersAdapterTrait for TriggersAdapter { }) .collect(); - triggers.push(StarknetTrigger::Block(StarknetBlockTrigger { - block: shared_block, - })); + if filter.block.block_ranges.iter().any(|range| { + if block_height >= range.start_block { + match range.end_block { + Some(end_block) => block_height < end_block, + None => true, + } + } else { + false + } + }) { + triggers.push(StarknetTrigger::Block(StarknetBlockTrigger { + block: shared_block, + })); + } Ok(BlockWithTriggers::new(block, triggers, logger)) } From 561fc1f3f5a2e45c0c194143e7d903b27e4d571e Mon Sep 17 00:00:00 2001 From: Jonathan LEI Date: Mon, 4 Dec 2023 20:53:03 +0000 Subject: [PATCH 2/5] feat(starknet): event trigger filter --- chain/starknet/src/adapter.rs | 93 +++++++++++++++++++++++++++++++++++ chain/starknet/src/chain.rs | 44 ++++++++++++++--- chain/starknet/src/felt.rs | 19 ++++++- 3 files changed, 147 insertions(+), 9 deletions(-) diff --git a/chain/starknet/src/adapter.rs b/chain/starknet/src/adapter.rs index 883eb3ccfac..1567dc3538f 100644 --- a/chain/starknet/src/adapter.rs +++ b/chain/starknet/src/adapter.rs @@ -1,3 +1,5 @@ +use std::collections::{hash_map::Entry, HashMap}; + use graph::{ blockchain::{EmptyNodeCapabilities, TriggerFilter as TriggerFilterTrait}, components::store::BlockNumber, @@ -5,14 +7,23 @@ use graph::{ use crate::{ data_source::{DataSource, DataSourceTemplate}, + felt::Felt, Chain, }; +type TopicWithRanges = HashMap>; + #[derive(Default, Clone)] pub struct TriggerFilter { + pub(crate) event: StarknetEventFilter, pub(crate) block: StarknetBlockFilter, } +#[derive(Default, Clone)] +pub struct StarknetEventFilter { + pub contract_addresses: HashMap, +} + #[derive(Default, Clone)] pub struct StarknetBlockFilter { pub block_ranges: Vec, @@ -28,6 +39,8 @@ impl TriggerFilterTrait for TriggerFilter { fn extend_with_template(&mut self, _data_source: impl Iterator) {} fn extend<'a>(&mut self, data_sources: impl Iterator + Clone) { + self.event + .extend(StarknetEventFilter::from_data_sources(data_sources.clone())); self.block .extend(StarknetBlockFilter::from_data_sources(data_sources)); } @@ -41,6 +54,86 @@ impl TriggerFilterTrait for TriggerFilter { } } +impl StarknetEventFilter { + pub fn from_data_sources<'a>(iter: impl IntoIterator) -> Self { + iter.into_iter() + // Using `filter_map` instead of `filter` to avoid having to unwrap source address in + // `fold` below. + .filter_map(|data_source| { + if data_source.mapping.event_handlers.is_empty() { + None + } else { + data_source + .source + .address + .as_ref() + .map(|source_address| (data_source, source_address.to_owned())) + } + }) + .fold( + Self::default(), + |mut filter_opt, (data_source, source_address)| { + filter_opt.extend(Self { + contract_addresses: [( + source_address, + data_source + .mapping + .event_handlers + .iter() + .map(|event_handler| { + ( + event_handler.event_selector.clone(), + vec![BlockRange { + start_block: data_source.source.start_block, + end_block: data_source.source.end_block, + }], + ) + }) + .collect(), + )] + .into_iter() + .collect(), + }); + filter_opt + }, + ) + } + + pub fn extend(&mut self, other: StarknetEventFilter) { + if other.is_empty() { + return; + } + + let StarknetEventFilter { contract_addresses } = other; + + for (address, topic_with_ranges) in contract_addresses.into_iter() { + match self.contract_addresses.entry(address) { + Entry::Occupied(entry) => { + let entry = entry.into_mut(); + for (topic, mut block_ranges) in topic_with_ranges.into_iter() { + match entry.entry(topic) { + Entry::Occupied(topic_entry) => { + // TODO: merge overlapping block ranges + topic_entry.into_mut().append(&mut block_ranges); + } + Entry::Vacant(topic_entry) => { + topic_entry.insert(block_ranges); + } + } + } + } + Entry::Vacant(entry) => { + entry.insert(topic_with_ranges); + } + } + } + } + + pub fn is_empty(&self) -> bool { + self.contract_addresses.is_empty() + } +} + impl StarknetBlockFilter { pub fn from_data_sources<'a>(iter: impl IntoIterator) -> Self { iter.into_iter() diff --git a/chain/starknet/src/chain.rs b/chain/starknet/src/chain.rs index 29483abd122..1c0cdb650fb 100644 --- a/chain/starknet/src/chain.rs +++ b/chain/starknet/src/chain.rs @@ -34,6 +34,7 @@ use crate::{ data_source::{ DataSource, DataSourceTemplate, UnresolvedDataSource, UnresolvedDataSourceTemplate, }, + felt::Felt, trigger::{StarknetBlockTrigger, StarknetEventTrigger, StarknetTrigger}, }; @@ -374,7 +375,6 @@ impl TriggersAdapterTrait for TriggersAdapter { panic!("Should never be called since not used by FirehoseBlockStream") } - #[allow(unused)] async fn triggers_in_block( &self, logger: &Logger, @@ -392,12 +392,42 @@ impl TriggersAdapterTrait for TriggersAdapter { transaction .events .iter() - .map(|event| { - StarknetTrigger::Event(StarknetEventTrigger { - event: Arc::new(event.clone()), - block: shared_block.clone(), - transaction: transaction.clone(), - }) + .filter_map(|event| { + let from_addr: Felt = event.from_addr.as_slice().try_into().ok()?; + + match filter.event.contract_addresses.get(&from_addr) { + Some(entry) => { + let event_topic: Felt = + event.keys.first()?.as_slice().try_into().ok()?; + + match entry.get(&event_topic) { + Some(block_ranges) => { + let block_matched = block_ranges.iter().any(|range| { + if block_height >= range.start_block { + match range.end_block { + Some(end_block) => block_height < end_block, + None => true, + } + } else { + false + } + }); + + if block_matched { + Some(StarknetTrigger::Event(StarknetEventTrigger { + event: Arc::new(event.clone()), + block: shared_block.clone(), + transaction: transaction.clone(), + })) + } else { + None + } + } + None => None, + } + } + None => None, + } }) .collect() }) diff --git a/chain/starknet/src/felt.rs b/chain/starknet/src/felt.rs index 7c0e6b6496d..654120b4791 100644 --- a/chain/starknet/src/felt.rs +++ b/chain/starknet/src/felt.rs @@ -3,12 +3,12 @@ use std::{ str::FromStr, }; -use graph::anyhow; +use graph::anyhow::{self, anyhow}; use serde::{de::Visitor, Deserialize}; /// Represents the primitive `FieldElement` type used in Starknet. Each `FieldElement` is 252-bit /// in size. -#[derive(Clone, PartialEq, Eq)] +#[derive(Hash, Clone, PartialEq, Eq)] pub struct Felt([u8; 32]); struct FeltVisitor; @@ -25,6 +25,21 @@ impl From<[u8; 32]> for Felt { } } +impl TryFrom<&[u8]> for Felt { + type Error = anyhow::Error; + + fn try_from(value: &[u8]) -> Result { + if value.len() > 32 { + Err(anyhow!("slice too long")) + } else { + let mut buffer = [0u8; 32]; + buffer[(32 - value.len())..].copy_from_slice(value); + + Ok(buffer.into()) + } + } +} + impl AsRef<[u8]> for Felt { fn as_ref(&self) -> &[u8] { &self.0 From b378704da44fe097cf66775df7c620b986de6397 Mon Sep 17 00:00:00 2001 From: Jonathan LEI Date: Mon, 8 Apr 2024 07:39:55 +0800 Subject: [PATCH 3/5] feat(starknet): firehose filter --- chain/starknet/src/adapter.rs | 51 ++++++++++++++++++- chain/starknet/src/felt.rs | 6 +++ graph/build.rs | 1 + graph/proto/starknet/transforms.proto | 36 +++++++++++++ graph/src/firehose/codec.rs | 5 ++ .../firehose/zklend.starknet.transform.v1.rs | 43 ++++++++++++++++ 6 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 graph/proto/starknet/transforms.proto create mode 100644 graph/src/firehose/zklend.starknet.transform.v1.rs diff --git a/chain/starknet/src/adapter.rs b/chain/starknet/src/adapter.rs index 1567dc3538f..f9e2c3265f1 100644 --- a/chain/starknet/src/adapter.rs +++ b/chain/starknet/src/adapter.rs @@ -3,7 +3,20 @@ use std::collections::{hash_map::Entry, HashMap}; use graph::{ blockchain::{EmptyNodeCapabilities, TriggerFilter as TriggerFilterTrait}, components::store::BlockNumber, + firehose::{ + BlockHeaderOnly as FirehoseFilterBlockHeaderOnly, BlockRange as FirehoseFilterBlockRange, + ContractEventFilter as FirehoseFilterContractEventFilter, + TopicWithRanges as FirehoseFilterTopicWithRanges, + TransactionEventFilter as FirehoseFilterTransactionEventFilter, + }, }; +use prost::Message; +use prost_types::Any; + +const BLOCK_HEADER_ONLY_TYPE_URL: &str = + "type.googleapis.com/zklend.starknet.transform.v1.BlockHeaderOnly"; +const TRANSACTION_EVENT_FILTER_TYPE_URL: &str = + "type.googleapis.com/zklend.starknet.transform.v1.TransactionEventFilter"; use crate::{ data_source::{DataSource, DataSourceTemplate}, @@ -50,7 +63,43 @@ impl TriggerFilterTrait for TriggerFilter { } fn to_firehose_filter(self) -> Vec { - todo!() + // An empty event filter list means that the subgraph is not interested in events at all. + // So we can stream just header-only blocks. + if self.event.is_empty() { + return vec![Any { + type_url: BLOCK_HEADER_ONLY_TYPE_URL.into(), + value: FirehoseFilterBlockHeaderOnly {}.encode_to_vec(), + }]; + } + + let event_filters = self + .event + .contract_addresses + .iter() + .map( + |(contract_address, topic_with_ranges)| FirehoseFilterContractEventFilter { + contract_address: contract_address.into(), + topics: topic_with_ranges + .iter() + .map(|(topic, ranges)| FirehoseFilterTopicWithRanges { + topic: topic.into(), + block_ranges: ranges + .iter() + .map(|range| FirehoseFilterBlockRange { + start_block: range.start_block as u64, + end_block: range.end_block.unwrap_or_default() as u64, + }) + .collect(), + }) + .collect(), + }, + ) + .collect(); + + vec![Any { + type_url: TRANSACTION_EVENT_FILTER_TYPE_URL.into(), + value: FirehoseFilterTransactionEventFilter { event_filters }.encode_to_vec(), + }] } } diff --git a/chain/starknet/src/felt.rs b/chain/starknet/src/felt.rs index 654120b4791..e54a47d2fe0 100644 --- a/chain/starknet/src/felt.rs +++ b/chain/starknet/src/felt.rs @@ -63,6 +63,12 @@ impl FromStr for Felt { } } +impl From<&Felt> for Vec { + fn from(value: &Felt) -> Self { + value.0.to_vec() + } +} + impl<'de> Deserialize<'de> for Felt { fn deserialize(deserializer: D) -> Result where diff --git a/graph/build.rs b/graph/build.rs index 3cc00c0dc07..c0cf7d1f3a0 100644 --- a/graph/build.rs +++ b/graph/build.rs @@ -8,6 +8,7 @@ fn main() { "proto/ethereum/transforms.proto", "proto/near/transforms.proto", "proto/cosmos/transforms.proto", + "proto/starknet/transforms.proto", ], &["proto"], ) diff --git a/graph/proto/starknet/transforms.proto b/graph/proto/starknet/transforms.proto new file mode 100644 index 00000000000..104d43eb22e --- /dev/null +++ b/graph/proto/starknet/transforms.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; + +package zklend.starknet.transform.v1; + +option go_package = "github.com/starknet-graph/firehose-starknet/types/pb/zklend/starknet/type/v1;pbtransform"; + +// Stream block headers only. The `transactions` field is always empty. +message BlockHeaderOnly {} + +// Stream every single block, but each block will only contain transactions that match with `event_filters`. +// A TransactionEventFilter message with an empty `event_filters` is invalid. Do not send any filter instead +// if you wish to receive full blocks. +message TransactionEventFilter { + repeated ContractEventFilter event_filters = 1; +} + +// Only include transactions which emit at least one event that *BOTH* +// * is emitted by `contract_address` +// * matches with at least one topic in `topics` +message ContractEventFilter { + bytes contract_address = 1; + repeated TopicWithRanges topics = 2; +} + +// Matches events whose `keys[0]` equals `topic`, *AND* in any of the `block_ranges`. +message TopicWithRanges { + bytes topic = 1; + repeated BlockRange block_ranges = 2; +} + +// A range of blocks. Both `start_block` and `end_block` are inclusive. When `end_block` is `0`, it means that any +// block height >= `start_block` is matched. +message BlockRange { + uint64 start_block = 1; + uint64 end_block = 2; +} diff --git a/graph/src/firehose/codec.rs b/graph/src/firehose/codec.rs index 5537dba153b..62a46fddc9f 100644 --- a/graph/src/firehose/codec.rs +++ b/graph/src/firehose/codec.rs @@ -14,7 +14,12 @@ mod pbnear; #[path = "sf.cosmos.transform.v1.rs"] mod pbcosmos; +#[rustfmt::skip] +#[path = "zklend.starknet.transform.v1.rs"] +mod pbstarknet; + pub use pbcosmos::*; pub use pbethereum::*; pub use pbfirehose::*; pub use pbnear::*; +pub use pbstarknet::*; diff --git a/graph/src/firehose/zklend.starknet.transform.v1.rs b/graph/src/firehose/zklend.starknet.transform.v1.rs new file mode 100644 index 00000000000..bd2b3b14e4b --- /dev/null +++ b/graph/src/firehose/zklend.starknet.transform.v1.rs @@ -0,0 +1,43 @@ +/// Stream block headers only. The `transactions` field is always empty. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlockHeaderOnly {} +/// Stream every single block, but each block will only contain transactions that match with `event_filters`. +/// A TransactionEventFilter message with an empty `event_filters` is invalid. Do not send any filter instead +/// if you wish to receive full blocks. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TransactionEventFilter { + #[prost(message, repeated, tag = "1")] + pub event_filters: ::prost::alloc::vec::Vec, +} +/// Only include transactions which emit at least one event that *BOTH* +/// * is emitted by `contract_address` +/// * matches with at least one topic in `topics` +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ContractEventFilter { + #[prost(bytes = "vec", tag = "1")] + pub contract_address: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "2")] + pub topics: ::prost::alloc::vec::Vec, +} +/// Matches events whose `keys\[0\]` equals `topic`, *AND* in any of the `block_ranges`. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TopicWithRanges { + #[prost(bytes = "vec", tag = "1")] + pub topic: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "2")] + pub block_ranges: ::prost::alloc::vec::Vec, +} +/// A range of blocks. Both `start_block` and `end_block` are inclusive. When `end_block` is `0`, it means that any +/// block height >= `start_block` is matched. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlockRange { + #[prost(uint64, tag = "1")] + pub start_block: u64, + #[prost(uint64, tag = "2")] + pub end_block: u64, +} From 62021b7f1b7d2c8eea729692f77585a3daaf9f15 Mon Sep 17 00:00:00 2001 From: Jonathan LEI Date: Wed, 10 Apr 2024 03:17:47 +0800 Subject: [PATCH 4/5] refactor: encapsulate starknet trigger matching --- chain/starknet/src/adapter.rs | 145 ++++++++++++++---- chain/starknet/src/chain.rs | 74 ++------- chain/starknet/src/felt.rs | 6 +- graph/proto/starknet/transforms.proto | 4 +- .../firehose/zklend.starknet.transform.v1.rs | 4 +- 5 files changed, 139 insertions(+), 94 deletions(-) diff --git a/chain/starknet/src/adapter.rs b/chain/starknet/src/adapter.rs index f9e2c3265f1..8a252fe2c33 100644 --- a/chain/starknet/src/adapter.rs +++ b/chain/starknet/src/adapter.rs @@ -1,6 +1,7 @@ use std::collections::{hash_map::Entry, HashMap}; use graph::{ + anyhow::Error, blockchain::{EmptyNodeCapabilities, TriggerFilter as TriggerFilterTrait}, components::store::BlockNumber, firehose::{ @@ -19,33 +20,73 @@ const TRANSACTION_EVENT_FILTER_TYPE_URL: &str = "type.googleapis.com/zklend.starknet.transform.v1.TransactionEventFilter"; use crate::{ + codec::Event, data_source::{DataSource, DataSourceTemplate}, felt::Felt, Chain, }; -type TopicWithRanges = HashMap>; +/// Starknet contract address, represented by the primitive [Felt] type. +type Address = Felt; +/// Starknet event signature, encoded as the first element of any event's `keys` array. +type EventSignature = Felt; + +/// A hashmap for quick lookup on whether an event matches with the filter. If the event's signature +/// exists, further comparison needs to be made against the block ranges. +type EventSignatureWithRanges = HashMap>; + +/// Contains event and block filters. The two types of filters function independently: event +/// filters are only applied to event triggers; and block filters are only applied to block +/// triggers. #[derive(Default, Clone)] pub struct TriggerFilter { - pub(crate) event: StarknetEventFilter, - pub(crate) block: StarknetBlockFilter, + event: EventFilter, + block: BlockFilter, } +/// An event trigger is matched if and only if *ALL* conditions are met for the event: +/// +/// - it's emitted from one of the `contract_addresses` keys +/// - its event signature matches with one of the keys in [EventSignatureWithRanges] +/// - it's emitted in a block within one of the block ranges for that matched signature #[derive(Default, Clone)] -pub struct StarknetEventFilter { - pub contract_addresses: HashMap, +struct EventFilter { + contract_addresses: HashMap, } +/// A block trigger is matched if and only if its height falls into any one of the defined +/// `block_ranges`. +/// +/// Note that this filter is only used to match block triggers interally inside `graph-node`, and +/// is never sent to upstream Firehose providers, as we always need the block headers for marking +/// chain head. #[derive(Default, Clone)] -pub struct StarknetBlockFilter { - pub block_ranges: Vec, +struct BlockFilter { + block_ranges: Vec, } +/// A range of blocks defined by starting and (optional) ending height. +/// +/// `start_block` is inclusive. `end_block` (if defined) is exclusive. #[derive(Clone, PartialEq, Eq)] -pub struct BlockRange { - pub start_block: BlockNumber, - pub end_block: Option, +struct BlockRange { + start_block: BlockNumber, + end_block: Option, +} + +impl TriggerFilter { + pub fn is_block_matched(&self, block_height: BlockNumber) -> bool { + self.block.is_matched(block_height) + } + + pub fn is_event_matched( + &self, + event: &Event, + block_height: BlockNumber, + ) -> Result { + self.event.is_matched(event, block_height) + } } impl TriggerFilterTrait for TriggerFilter { @@ -53,9 +94,9 @@ impl TriggerFilterTrait for TriggerFilter { fn extend<'a>(&mut self, data_sources: impl Iterator + Clone) { self.event - .extend(StarknetEventFilter::from_data_sources(data_sources.clone())); + .extend(EventFilter::from_data_sources(data_sources.clone())); self.block - .extend(StarknetBlockFilter::from_data_sources(data_sources)); + .extend(BlockFilter::from_data_sources(data_sources)); } fn node_capabilities(&self) -> EmptyNodeCapabilities { @@ -77,12 +118,12 @@ impl TriggerFilterTrait for TriggerFilter { .contract_addresses .iter() .map( - |(contract_address, topic_with_ranges)| FirehoseFilterContractEventFilter { + |(contract_address, sig_with_ranges)| FirehoseFilterContractEventFilter { contract_address: contract_address.into(), - topics: topic_with_ranges + topics: sig_with_ranges .iter() - .map(|(topic, ranges)| FirehoseFilterTopicWithRanges { - topic: topic.into(), + .map(|(sig, ranges)| FirehoseFilterTopicWithRanges { + topic: sig.into(), block_ranges: ranges .iter() .map(|range| FirehoseFilterBlockRange { @@ -103,7 +144,7 @@ impl TriggerFilterTrait for TriggerFilter { } } -impl StarknetEventFilter { +impl EventFilter { pub fn from_data_sources<'a>(iter: impl IntoIterator) -> Self { iter.into_iter() // Using `filter_map` instead of `filter` to avoid having to unwrap source address in @@ -148,31 +189,31 @@ impl StarknetEventFilter { ) } - pub fn extend(&mut self, other: StarknetEventFilter) { + pub fn extend(&mut self, other: EventFilter) { if other.is_empty() { return; } - let StarknetEventFilter { contract_addresses } = other; + let EventFilter { contract_addresses } = other; - for (address, topic_with_ranges) in contract_addresses.into_iter() { + for (address, sig_with_ranges) in contract_addresses.into_iter() { match self.contract_addresses.entry(address) { Entry::Occupied(entry) => { let entry = entry.into_mut(); - for (topic, mut block_ranges) in topic_with_ranges.into_iter() { - match entry.entry(topic) { - Entry::Occupied(topic_entry) => { + for (sig, mut block_ranges) in sig_with_ranges.into_iter() { + match entry.entry(sig) { + Entry::Occupied(sig_entry) => { // TODO: merge overlapping block ranges - topic_entry.into_mut().append(&mut block_ranges); + sig_entry.into_mut().append(&mut block_ranges); } - Entry::Vacant(topic_entry) => { - topic_entry.insert(block_ranges); + Entry::Vacant(sig_entry) => { + sig_entry.insert(block_ranges); } } } } Entry::Vacant(entry) => { - entry.insert(topic_with_ranges); + entry.insert(sig_with_ranges); } } } @@ -181,9 +222,40 @@ impl StarknetEventFilter { pub fn is_empty(&self) -> bool { self.contract_addresses.is_empty() } + + pub fn is_matched(&self, event: &Event, block_height: BlockNumber) -> Result { + let from_addr: Felt = event.from_addr.as_slice().try_into()?; + + Ok(match self.contract_addresses.get(&from_addr) { + Some(entry) => { + let event_sig = match event.keys.first() { + Some(sig) => sig, + // Non-standard events with an empty `keys` array never match. + None => return Ok(false), + }; + let event_sig: Felt = event_sig.as_slice().try_into()?; + + match entry.get(&event_sig) { + Some(block_ranges) => block_ranges.iter().any(|range| { + if block_height >= range.start_block { + match range.end_block { + // `end_block` is exclusive + Some(end_block) => block_height < end_block, + None => true, + } + } else { + false + } + }), + None => false, + } + } + None => false, + }) + } } -impl StarknetBlockFilter { +impl BlockFilter { pub fn from_data_sources<'a>(iter: impl IntoIterator) -> Self { iter.into_iter() .filter(|data_source| data_source.mapping.block_handler.is_some()) @@ -198,12 +270,12 @@ impl StarknetBlockFilter { }) } - pub fn extend(&mut self, other: StarknetBlockFilter) { + pub fn extend(&mut self, other: BlockFilter) { if other.is_empty() { return; } - let StarknetBlockFilter { block_ranges } = other; + let BlockFilter { block_ranges } = other; // TODO: merge overlapping block ranges for new_range in block_ranges.into_iter() { @@ -216,4 +288,17 @@ impl StarknetBlockFilter { pub fn is_empty(&self) -> bool { self.block_ranges.is_empty() } + + pub fn is_matched(&self, block_height: BlockNumber) -> bool { + self.block_ranges.iter().any(|range| { + if block_height >= range.start_block { + match range.end_block { + Some(end_block) => block_height < end_block, + None => true, + } + } else { + false + } + }) + } } diff --git a/chain/starknet/src/chain.rs b/chain/starknet/src/chain.rs index 1c0cdb650fb..4e4088c61c3 100644 --- a/chain/starknet/src/chain.rs +++ b/chain/starknet/src/chain.rs @@ -34,7 +34,6 @@ use crate::{ data_source::{ DataSource, DataSourceTemplate, UnresolvedDataSource, UnresolvedDataSourceTemplate, }, - felt::Felt, trigger::{StarknetBlockTrigger, StarknetEventTrigger, StarknetTrigger}, }; @@ -384,65 +383,24 @@ impl TriggersAdapterTrait for TriggersAdapter { let block_height = block.height as BlockNumber; let shared_block = Arc::new(block.clone()); - let mut triggers: Vec<_> = shared_block - .transactions - .iter() - .flat_map(|transaction| -> Vec { - let transaction = Arc::new(transaction.clone()); - transaction - .events - .iter() - .filter_map(|event| { - let from_addr: Felt = event.from_addr.as_slice().try_into().ok()?; - - match filter.event.contract_addresses.get(&from_addr) { - Some(entry) => { - let event_topic: Felt = - event.keys.first()?.as_slice().try_into().ok()?; - - match entry.get(&event_topic) { - Some(block_ranges) => { - let block_matched = block_ranges.iter().any(|range| { - if block_height >= range.start_block { - match range.end_block { - Some(end_block) => block_height < end_block, - None => true, - } - } else { - false - } - }); - - if block_matched { - Some(StarknetTrigger::Event(StarknetEventTrigger { - event: Arc::new(event.clone()), - block: shared_block.clone(), - transaction: transaction.clone(), - })) - } else { - None - } - } - None => None, - } - } - None => None, - } - }) - .collect() - }) - .collect(); - - if filter.block.block_ranges.iter().any(|range| { - if block_height >= range.start_block { - match range.end_block { - Some(end_block) => block_height < end_block, - None => true, + let mut triggers = vec![]; + + // Using for loops instead of iterators to make error handling easier (using ?). + for transaction in shared_block.transactions.iter() { + let transaction = Arc::new(transaction.clone()); + + for event in transaction.events.iter() { + if filter.is_event_matched(event, block_height)? { + triggers.push(StarknetTrigger::Event(StarknetEventTrigger { + event: Arc::new(event.clone()), + block: shared_block.clone(), + transaction: transaction.clone(), + })); } - } else { - false } - }) { + } + + if filter.is_block_matched(block_height) { triggers.push(StarknetTrigger::Block(StarknetBlockTrigger { block: shared_block, })); diff --git a/chain/starknet/src/felt.rs b/chain/starknet/src/felt.rs index e54a47d2fe0..5b3aeccafb0 100644 --- a/chain/starknet/src/felt.rs +++ b/chain/starknet/src/felt.rs @@ -6,8 +6,10 @@ use std::{ use graph::anyhow::{self, anyhow}; use serde::{de::Visitor, Deserialize}; -/// Represents the primitive `FieldElement` type used in Starknet. Each `FieldElement` is 252-bit -/// in size. +/// Represents the universal primitive `FieldElement` type used in Starknet. Each `FieldElement` is +/// 252-bit in size. All data structures on Starknet inherently consist of `FieldElement`, despite +/// higher-level abstraction. Contract addresses, transaction calldata, event keys (similar to +/// topics in Ethereum) and data etc. are all `FieldElement`. #[derive(Hash, Clone, PartialEq, Eq)] pub struct Felt([u8; 32]); diff --git a/graph/proto/starknet/transforms.proto b/graph/proto/starknet/transforms.proto index 104d43eb22e..092802ba17f 100644 --- a/graph/proto/starknet/transforms.proto +++ b/graph/proto/starknet/transforms.proto @@ -28,8 +28,8 @@ message TopicWithRanges { repeated BlockRange block_ranges = 2; } -// A range of blocks. Both `start_block` and `end_block` are inclusive. When `end_block` is `0`, it means that any -// block height >= `start_block` is matched. +// A range of blocks. `start_block` is inclusive, and `end_block` is exclusive. When `end_block` is `0`, it means +// that any block height >= `start_block` is matched. message BlockRange { uint64 start_block = 1; uint64 end_block = 2; diff --git a/graph/src/firehose/zklend.starknet.transform.v1.rs b/graph/src/firehose/zklend.starknet.transform.v1.rs index bd2b3b14e4b..9beae526b7f 100644 --- a/graph/src/firehose/zklend.starknet.transform.v1.rs +++ b/graph/src/firehose/zklend.starknet.transform.v1.rs @@ -31,8 +31,8 @@ pub struct TopicWithRanges { #[prost(message, repeated, tag = "2")] pub block_ranges: ::prost::alloc::vec::Vec, } -/// A range of blocks. Both `start_block` and `end_block` are inclusive. When `end_block` is `0`, it means that any -/// block height >= `start_block` is matched. +/// A range of blocks. `start_block` is inclusive, and `end_block` is exclusive. When `end_block` is `0`, it means +/// that any block height >= `start_block` is matched. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct BlockRange { From 314f0bebf9ba66e51de8ea62186d2a1bc37b6f84 Mon Sep 17 00:00:00 2001 From: Jonathan LEI Date: Wed, 10 Apr 2024 04:11:37 +0800 Subject: [PATCH 5/5] test: starknet trigger filter tests --- chain/starknet/src/adapter.rs | 199 ++++++++++++++++++++++++++++++++++ 1 file changed, 199 insertions(+) diff --git a/chain/starknet/src/adapter.rs b/chain/starknet/src/adapter.rs index 8a252fe2c33..f21456046e4 100644 --- a/chain/starknet/src/adapter.rs +++ b/chain/starknet/src/adapter.rs @@ -302,3 +302,202 @@ impl BlockFilter { }) } } + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use super::*; + + #[test] + fn starknet_trigger_empty_filter_to_firehose() { + let filter = TriggerFilter::default(); + + // Produces a "header-only" Firehose filter to not stream any transaction. + assert_eq!( + filter.to_firehose_filter(), + vec![Any { + type_url: BLOCK_HEADER_ONLY_TYPE_URL.into(), + value: FirehoseFilterBlockHeaderOnly {}.encode_to_vec(), + }] + ); + } + + #[test] + fn starknet_trigger_filter_no_events_to_firehose() { + let filter = create_block_only_filter(); + + // [BlockFilter] is discarded as we would always stream block headers. + assert_eq!( + filter.to_firehose_filter(), + vec![Any { + type_url: BLOCK_HEADER_ONLY_TYPE_URL.into(), + value: FirehoseFilterBlockHeaderOnly {}.encode_to_vec(), + }] + ); + } + + #[test] + fn starknet_trigger_event_filter_to_firehose() { + let filter = create_event_only_filter(); + + // [BlockFilter] is discarded as we would always stream block headers. + assert_eq!( + filter.to_firehose_filter(), + vec![Any { + type_url: TRANSACTION_EVENT_FILTER_TYPE_URL.into(), + value: FirehoseFilterTransactionEventFilter { + event_filters: vec![FirehoseFilterContractEventFilter { + contract_address: Address::from_str("0x1234").unwrap().as_ref().into(), + topics: vec![FirehoseFilterTopicWithRanges { + topic: EventSignature::from_str("0x8888").unwrap().as_ref().into(), + block_ranges: vec![FirehoseFilterBlockRange { + start_block: 100, + end_block: 200 + }] + }] + }] + } + .encode_to_vec(), + }] + ); + } + + #[test] + fn starknet_trigger_block_filter_matched() { + let filter = create_block_only_filter(); + + assert!(filter.is_block_matched(100)); + assert!(filter.is_block_matched(199)); + } + + #[test] + fn starknet_trigger_block_filter_not_matched() { + let filter = create_block_only_filter(); + + assert_eq!(filter.is_block_matched(99), false); + + // `end_block` is exclusive + assert_eq!(filter.is_block_matched(200), false); + } + + #[test] + fn starknet_trigger_block_filter_open_ended() { + let filter = TriggerFilter { + event: EventFilter::default(), + block: BlockFilter { + block_ranges: vec![BlockRange { + start_block: 100, + end_block: None, + }], + }, + }; + + assert_eq!(filter.is_block_matched(99), false); + assert_eq!(filter.is_block_matched(100), true); + assert_eq!(filter.is_block_matched(1000), true); + } + + #[test] + fn starknet_trigger_event_filter_matched() { + let filter = create_event_only_filter(); + + assert_eq!( + filter + .is_event_matched( + &Event { + from_addr: Address::from_str("0x1234").unwrap().as_ref().into(), + keys: vec![EventSignature::from_str("0x8888").unwrap().as_ref().into()], + data: vec![] + }, + 100 + ) + .unwrap(), + true + ); + } + + #[test] + fn starknet_trigger_event_filter_not_matched() { + let filter = create_event_only_filter(); + + // Address mismatch + assert_eq!( + filter + .is_event_matched( + &Event { + from_addr: Address::from_str("0x4321").unwrap().as_ref().into(), + keys: vec![EventSignature::from_str("0x8888").unwrap().as_ref().into()], + data: vec![] + }, + 100 + ) + .unwrap(), + false + ); + + // Missing event keys (non-standard events) + assert_eq!( + filter + .is_event_matched( + &Event { + from_addr: Address::from_str("0x1234").unwrap().as_ref().into(), + keys: vec![], + data: vec![] + }, + 100 + ) + .unwrap(), + false + ); + + // Block out of range + assert_eq!( + filter + .is_event_matched( + &Event { + from_addr: Address::from_str("0x4321").unwrap().as_ref().into(), + keys: vec![EventSignature::from_str("0x8888").unwrap().as_ref().into()], + data: vec![] + }, + 200 + ) + .unwrap(), + false + ); + } + + fn create_block_only_filter() -> TriggerFilter { + TriggerFilter { + event: EventFilter::default(), + block: BlockFilter { + block_ranges: vec![BlockRange { + start_block: 100, + end_block: Some(200), + }], + }, + } + } + + fn create_event_only_filter() -> TriggerFilter { + TriggerFilter { + event: EventFilter { + contract_addresses: [( + Address::from_str("0x1234").unwrap(), + [( + EventSignature::from_str("0x8888").unwrap(), + vec![BlockRange { + start_block: 100, + end_block: Some(200), + }], + )] + .into_iter() + .collect(), + )] + .into_iter() + .collect(), + }, + block: BlockFilter::default(), + } + } +}