Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce chain-listener [fixes NET-694 NET-673 NET-674 NET-677 NET-685] #1972

Merged
merged 9 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
285 changes: 240 additions & 45 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ members = [
"spell-storage",
"particle-execution",
"crates/system-services",
"crates/chain-listener",
"crates/hex-utils",
"crates/chain-data"
]
exclude = [
"nox/tests/tetraplets",
Expand Down Expand Up @@ -90,6 +93,9 @@ particle-execution = { path = "particle-execution" }
system-services = { path = "crates/system-services" }
health = { path = "crates/health" }
subnet-resolver = { path = "crates/subnet-resolver" }
hex-utils = { path = "crates/hex-utils"}
chain-data = { path = "crates/chain-data"}
chain-listener = { path = "crates/chain-listener"}

# spell
fluence-spell-dtos = "=0.6.4"
Expand Down Expand Up @@ -144,9 +150,13 @@ cid = "0.11.0"
libipld = "0.16.0"
axum = "0.7.2"
reqwest = "0.11.22"
faster-hex = "0.9.0"
once_cell = "1.19.0"
tempfile = "3.8.1"
hex = "0.4.3"
ethabi = "18.0.0"
jsonrpsee = "0.21.0"
blake3 = "1.5.0"
rand = "0.8.4"

# Enable a small amount of optimization in debug mode
[profile.dev]
Expand Down
5 changes: 0 additions & 5 deletions crates/air-interpreter-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,4 @@ edition = "2021"

[dependencies]
air-interpreter-wasm = { workspace = true }

eyre = { workspace = true }
serde = { version = "1.0.193", features = ["derive"] }
serde_json = { workspace = true }
blake3 = "1.5.0"
faster-hex = { workspace = true }
15 changes: 15 additions & 0 deletions crates/chain-data/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "chain-data"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ethabi = { workspace = true }
serde = { workspace = true }
libp2p-identity = { workspace = true, features = ["peerid"] }
hex-utils = { workspace = true }
hex = { workspace = true }
log = { workspace = true }
thiserror = { workspace = true }
51 changes: 51 additions & 0 deletions crates/chain-data/src/chain_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use crate::error::ChainDataError;
use ethabi::{ParamType, Token};
use hex_utils::decode_hex;

#[derive(Debug, Clone, PartialEq)]
/// Kind of the field in Chain Event
pub enum EventField {
/// If field is indexed, it's passed among topics
Indexed(ParamType),
/// If field is not indexed, it's passed in log.data
NotIndexed(ParamType),
}

impl EventField {
pub fn param_type(self) -> ParamType {
match self {
EventField::Indexed(t) => t,
EventField::NotIndexed(t) => t,
}
}
}

pub trait ChainData {
fn event_name() -> &'static str;
fn signature() -> Vec<EventField>;
fn parse(data_tokens: &mut impl Iterator<Item = Token>) -> Result<Self, ChainDataError>
where
Self: Sized;

fn topic() -> String {
let sig: Vec<_> = Self::signature()
.into_iter()
.map(|t| t.param_type())
.collect();
let hash = ethabi::long_signature(Self::event_name(), &sig);
format!("0x{}", hex::encode(hash.as_bytes()))
}
}

pub trait ChainEvent<ChainData> {
fn new(block_number: String, data: ChainData) -> Self;
}

/// Parse data from chain. Accepts data with and without "0x" prefix.
pub fn parse_chain_data(data: &str, signature: &[ParamType]) -> Result<Vec<Token>, ChainDataError> {
if data.is_empty() {
return Err(ChainDataError::Empty);
}
let data = decode_hex(data).map_err(ChainDataError::DecodeHex)?;
Ok(ethabi::decode(signature, &data)?)
}
24 changes: 24 additions & 0 deletions crates/chain-data/src/data_tokens.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use crate::ChainDataError;
use crate::ChainDataError::{InvalidParsedToken, MissingParsedToken};
use ethabi::Token;

// Take next token and parse it with `f`
pub fn next_opt<T>(
data_tokens: &mut impl Iterator<Item = Token>,
name: &'static str,
f: impl Fn(Token) -> Option<T>,
) -> Result<T, ChainDataError> {
let next = data_tokens.next().ok_or(MissingParsedToken(name))?;
let parsed = f(next).ok_or(InvalidParsedToken(name))?;

Ok(parsed)
}

// Take next token and parse it with `f`
pub fn next<T>(
data_tokens: &mut impl Iterator<Item = Token>,
name: &'static str,
f: impl Fn(Token) -> T,
) -> Result<T, ChainDataError> {
next_opt(data_tokens, name, |t| Some(f(t)))
}
16 changes: 16 additions & 0 deletions crates/chain-data/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ChainDataError {
#[error("empty data, nothing to parse")]
Empty,
#[error("missing token for field '{0}'")]
MissingParsedToken(&'static str),
#[error("invalid token for field '{0}'")]
InvalidParsedToken(&'static str),
#[error("invalid compute peer id: '{0}'")]
InvalidComputePeerId(#[from] libp2p_identity::ParseError),
#[error("data is not a valid hex: '{0}'")]
DecodeHex(#[source] hex::FromHexError),
#[error(transparent)]
EthError(#[from] ethabi::Error),
}
14 changes: 14 additions & 0 deletions crates/chain-data/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#![feature(try_blocks)]
mod chain_data;
mod data_tokens;
mod error;
mod log;
mod u256;
mod utils;

pub use chain_data::{ChainData, ChainEvent, EventField};
pub use data_tokens::{next, next_opt};
pub use error::ChainDataError;
pub use log::{parse_log, Log, LogParseError};
pub use u256::U256;
pub use utils::{parse_peer_id, peer_id_to_hex};
133 changes: 133 additions & 0 deletions crates/chain-data/src/log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use libp2p_identity::ParseError;
use serde::{Deserialize, Serialize};
use thiserror::Error;

use crate::chain_data::EventField::{Indexed, NotIndexed};
use crate::chain_data::{parse_chain_data, ChainData, ChainEvent, EventField};
use crate::error::ChainDataError;
use crate::log::LogParseError::{MissingToken, MissingTopic};

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Log {
// Log arguments
pub data: String,
// The block number in hex (with 0x prefix) that contains this log
pub block_number: String,
justprosh marked this conversation as resolved.
Show resolved Hide resolved
// true when the log was removed, due to a chain reorganization. false if its a valid log.
#[serde(default)]
pub removed: bool,
pub topics: Vec<String>,
}

#[derive(Debug, Error)]
pub enum LogParseError {
#[error(transparent)]
EthError(#[from] ethabi::Error),
#[error(transparent)]
DecodeHex(#[from] hex::FromHexError),
#[error("parsed data doesn't correspond to the expected signature: {0:?}")]
SignatureMismatch(Vec<EventField>),
#[error(
"incorrect log signature: not found token for field #{position} of type ${event_field:?}"
)]
MissingToken {
position: usize,
event_field: EventField,
},
#[error("incorrect log signature: not found topic for indexed field #{position} of type ${event_field:?}")]
MissingTopic {
position: usize,
event_field: EventField,
},
#[error("missing token for field '{0}'")]
MissingParsedToken(&'static str),
#[error("invalid token for field '{0}'")]
InvalidParsedToken(&'static str),
#[error("invalid compute peer id: '{0}'")]
InvalidComputePeerId(#[from] ParseError),
#[error(transparent)]
ChainData(#[from] ChainDataError),
#[error("no tokens after deserialization")]
NoTokens,
}

/// Parse Event Log to specified DTO
///
/// Logs consist of data fields, much like ADT. Fields can indexed and not indexed.
///
/// Data for indexed fields is encoded in 'log.topics', starting from 1th topic, i.e. 0th is skipped
/// Data for non indexed fields is encoded in 'log.data'.
///
/// Indexed and non indexed data fields can be interleaved.
/// That forces a certain parsing scheme, which is implemented below.
pub fn parse_log<U: ChainData, T: ChainEvent<U>>(log: Log) -> Result<T, LogParseError> {
log::debug!("Parse log from block {:?}", log.block_number);
let result: Result<_, LogParseError> = try {
// event log signature, i.e. data field types
let signature = U::signature();
// gather data types for non indexed ("indexless") fields
let indexless = signature
.clone()
.into_iter()
.filter_map(|t| match t {
NotIndexed(t) => Some(t),
Indexed(_) => None,
})
.collect::<Vec<_>>();
// parse all non indexed fields to tokens
let indexless = parse_chain_data(&log.data, &indexless)?;

// iterate through data field types (signature), and take
// data `Token` from either 'indexless' or 'topics'
let mut indexless = indexless.into_iter();
// skip first topic, because it contains actual topic, and not indexed data field
let mut topics = log.topics.into_iter().skip(1);
// accumulate tokens here
let mut tokens = vec![];
for (position, event_field) in signature.into_iter().enumerate() {
match event_field {
NotIndexed(_) => {
// take next token for non indexed data field
let token = indexless.next().ok_or(MissingToken {
position,
event_field,
})?;
tokens.push(token);
}
ef @ Indexed(_) => {
let topic = topics.next().ok_or(MissingTopic {
position,
event_field: ef.clone(),
})?;
// parse indexed field to token one by one
let parsed = parse_chain_data(&topic, &[ef.clone().param_type()])?;
debug_assert!(parsed.len() == 1, "parse of an indexed event fields yielded several tokens, expected a single one");
let token = parsed.into_iter().next().ok_or(MissingToken {
position,
event_field: ef,
})?;
tokens.push(token)
}
}
}

if tokens.is_empty() {
return Err(LogParseError::NoTokens);
}

let block_number = log.block_number.clone();
let log = U::parse(&mut tokens.into_iter())?;
T::new(block_number, log)
};

if let Err(e) = result.as_ref() {
log::warn!(target: "connector",
"Cannot parse deal log from block {}: {:?}",
log.block_number,
e.to_string()
);
}

result
}
36 changes: 36 additions & 0 deletions crates/chain-data/src/u256.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use ethabi::Token;
use serde::{Deserialize, Serialize};

#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct U256 {
bytes: Vec<u8>,
}

impl U256 {
pub fn from_bytes(bs: &[u8; 32]) -> Self {
U256 { bytes: bs.to_vec() }
}

pub fn to_eth(&self) -> ethabi::ethereum_types::U256 {
ethabi::ethereum_types::U256::from_little_endian(&self.bytes)
}

pub fn to_u64_trunc(&self) -> u64 {
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&self.bytes[0..8]);
u64::from_le_bytes(bytes)
}

pub fn from_eth(num: ethabi::ethereum_types::U256) -> U256 {
let bytes = num
.0
.iter()
.flat_map(|x| x.to_le_bytes())
.collect::<Vec<_>>();
U256 { bytes }
}

pub fn from_token(token: Token) -> Option<Self> {
token.into_uint().map(Self::from_eth)
}
}
32 changes: 32 additions & 0 deletions crates/chain-data/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use libp2p_identity::{ParseError, PeerId};

/// Static prefix of the PeerId. Protobuf encoding + multihash::identity + length and so on.
pub(crate) const PEER_ID_PREFIX: &[u8] = &[0, 36, 8, 1, 18, 32];

pub fn parse_peer_id(bytes: Vec<u8>) -> Result<PeerId, ParseError> {
let peer_id = [PEER_ID_PREFIX, &bytes].concat();

PeerId::from_bytes(&peer_id)
}

pub fn peer_id_to_hex(peer_id: PeerId) -> String {
let peer_id = peer_id.to_bytes();
format!("0x{:0>64}", hex::encode(&peer_id[PEER_ID_PREFIX.len()..]))
}

#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn peer_id_test() {
let hex = "0x246cd65bc58db104674f76c9b1340eb16881d9ef90e33d4b1086ebd334f4002d".to_string();
let peer_id =
PeerId::from_str("12D3KooWCGZ6t8by5ag5YMQW4k3HoPLaKdN5rB9DhAmDUeG8dj1N").unwrap();
assert_eq!(
peer_id,
parse_peer_id(hex::decode(&hex[2..]).unwrap()).unwrap()
);
assert_eq!(hex, peer_id_to_hex(peer_id));
}
}
Loading
Loading