Skip to content

Commit

Permalink
feat: introduce chain-listener [fixes NET-694 NET-673 NET-674 NET-677 N…
Browse files Browse the repository at this point in the history
  • Loading branch information
justprosh committed Jan 3, 2024
1 parent 017f076 commit 3ff44b5
Show file tree
Hide file tree
Showing 40 changed files with 1,064 additions and 139 deletions.
285 changes: 240 additions & 45 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ members = [
"spell-storage",
"particle-execution",
"crates/system-services",
"crates/chain-listener",
"crates/hex-utils",
"crates/chain-data"
]
exclude = [
"nox/tests/tetraplets",
Expand Down Expand Up @@ -90,6 +93,9 @@ particle-execution = { path = "particle-execution" }
system-services = { path = "crates/system-services" }
health = { path = "crates/health" }
subnet-resolver = { path = "crates/subnet-resolver" }
hex-utils = { path = "crates/hex-utils"}
chain-data = { path = "crates/chain-data"}
chain-listener = { path = "crates/chain-listener"}

# spell
fluence-spell-dtos = "=0.6.4"
Expand Down Expand Up @@ -144,9 +150,13 @@ cid = "0.11.0"
libipld = "0.16.0"
axum = "0.7.2"
reqwest = "0.11.22"
faster-hex = "0.9.0"
once_cell = "1.19.0"
tempfile = "3.8.1"
hex = "0.4.3"
ethabi = "18.0.0"
jsonrpsee = "0.21.0"
blake3 = "1.5.0"
rand = "0.8.4"

# Enable a small amount of optimization in debug mode
[profile.dev]
Expand Down
5 changes: 0 additions & 5 deletions crates/air-interpreter-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,4 @@ edition = "2021"

[dependencies]
air-interpreter-wasm = { workspace = true }

eyre = { workspace = true }
serde = { version = "1.0.193", features = ["derive"] }
serde_json = { workspace = true }
blake3 = "1.5.0"
faster-hex = { workspace = true }
15 changes: 15 additions & 0 deletions crates/chain-data/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "chain-data"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ethabi = { workspace = true }
serde = { workspace = true }
libp2p-identity = { workspace = true, features = ["peerid"] }
hex-utils = { workspace = true }
hex = { workspace = true }
log = { workspace = true }
thiserror = { workspace = true }
51 changes: 51 additions & 0 deletions crates/chain-data/src/chain_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use crate::error::ChainDataError;
use ethabi::{ParamType, Token};
use hex_utils::decode_hex;

#[derive(Debug, Clone, PartialEq)]
/// Kind of the field in Chain Event
pub enum EventField {
/// If field is indexed, it's passed among topics
Indexed(ParamType),
/// If field is not indexed, it's passed in log.data
NotIndexed(ParamType),
}

impl EventField {
pub fn param_type(self) -> ParamType {
match self {
EventField::Indexed(t) => t,
EventField::NotIndexed(t) => t,
}
}
}

pub trait ChainData {
fn event_name() -> &'static str;
fn signature() -> Vec<EventField>;
fn parse(data_tokens: &mut impl Iterator<Item = Token>) -> Result<Self, ChainDataError>
where
Self: Sized;

fn topic() -> String {
let sig: Vec<_> = Self::signature()
.into_iter()
.map(|t| t.param_type())
.collect();
let hash = ethabi::long_signature(Self::event_name(), &sig);
format!("0x{}", hex::encode(hash.as_bytes()))
}
}

pub trait ChainEvent<ChainData> {
fn new(block_number: String, data: ChainData) -> Self;
}

/// Parse data from chain. Accepts data with and without "0x" prefix.
pub fn parse_chain_data(data: &str, signature: &[ParamType]) -> Result<Vec<Token>, ChainDataError> {
if data.is_empty() {
return Err(ChainDataError::Empty);
}
let data = decode_hex(data).map_err(ChainDataError::DecodeHex)?;
Ok(ethabi::decode(signature, &data)?)
}
24 changes: 24 additions & 0 deletions crates/chain-data/src/data_tokens.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use crate::ChainDataError;
use crate::ChainDataError::{InvalidParsedToken, MissingParsedToken};
use ethabi::Token;

// Take next token and parse it with `f`
pub fn next_opt<T>(
data_tokens: &mut impl Iterator<Item = Token>,
name: &'static str,
f: impl Fn(Token) -> Option<T>,
) -> Result<T, ChainDataError> {
let next = data_tokens.next().ok_or(MissingParsedToken(name))?;
let parsed = f(next).ok_or(InvalidParsedToken(name))?;

Ok(parsed)
}

// Take next token and parse it with `f`
pub fn next<T>(
data_tokens: &mut impl Iterator<Item = Token>,
name: &'static str,
f: impl Fn(Token) -> T,
) -> Result<T, ChainDataError> {
next_opt(data_tokens, name, |t| Some(f(t)))
}
16 changes: 16 additions & 0 deletions crates/chain-data/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ChainDataError {
#[error("empty data, nothing to parse")]
Empty,
#[error("missing token for field '{0}'")]
MissingParsedToken(&'static str),
#[error("invalid token for field '{0}'")]
InvalidParsedToken(&'static str),
#[error("invalid compute peer id: '{0}'")]
InvalidComputePeerId(#[from] libp2p_identity::ParseError),
#[error("data is not a valid hex: '{0}'")]
DecodeHex(#[source] hex::FromHexError),
#[error(transparent)]
EthError(#[from] ethabi::Error),
}
14 changes: 14 additions & 0 deletions crates/chain-data/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#![feature(try_blocks)]
mod chain_data;
mod data_tokens;
mod error;
mod log;
mod u256;
mod utils;

pub use chain_data::{ChainData, ChainEvent, EventField};
pub use data_tokens::{next, next_opt};
pub use error::ChainDataError;
pub use log::{parse_log, Log, LogParseError};
pub use u256::U256;
pub use utils::{parse_peer_id, peer_id_to_hex};
133 changes: 133 additions & 0 deletions crates/chain-data/src/log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use libp2p_identity::ParseError;
use serde::{Deserialize, Serialize};
use thiserror::Error;

use crate::chain_data::EventField::{Indexed, NotIndexed};
use crate::chain_data::{parse_chain_data, ChainData, ChainEvent, EventField};
use crate::error::ChainDataError;
use crate::log::LogParseError::{MissingToken, MissingTopic};

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Log {
// Log arguments
pub data: String,
// The block number in hex (with 0x prefix) that contains this log
pub block_number: String,
// true when the log was removed, due to a chain reorganization. false if its a valid log.
#[serde(default)]
pub removed: bool,
pub topics: Vec<String>,
}

#[derive(Debug, Error)]
pub enum LogParseError {
#[error(transparent)]
EthError(#[from] ethabi::Error),
#[error(transparent)]
DecodeHex(#[from] hex::FromHexError),
#[error("parsed data doesn't correspond to the expected signature: {0:?}")]
SignatureMismatch(Vec<EventField>),
#[error(
"incorrect log signature: not found token for field #{position} of type ${event_field:?}"
)]
MissingToken {
position: usize,
event_field: EventField,
},
#[error("incorrect log signature: not found topic for indexed field #{position} of type ${event_field:?}")]
MissingTopic {
position: usize,
event_field: EventField,
},
#[error("missing token for field '{0}'")]
MissingParsedToken(&'static str),
#[error("invalid token for field '{0}'")]
InvalidParsedToken(&'static str),
#[error("invalid compute peer id: '{0}'")]
InvalidComputePeerId(#[from] ParseError),
#[error(transparent)]
ChainData(#[from] ChainDataError),
#[error("no tokens after deserialization")]
NoTokens,
}

/// Parse Event Log to specified DTO
///
/// Logs consist of data fields, much like ADT. Fields can indexed and not indexed.
///
/// Data for indexed fields is encoded in 'log.topics', starting from 1th topic, i.e. 0th is skipped
/// Data for non indexed fields is encoded in 'log.data'.
///
/// Indexed and non indexed data fields can be interleaved.
/// That forces a certain parsing scheme, which is implemented below.
pub fn parse_log<U: ChainData, T: ChainEvent<U>>(log: Log) -> Result<T, LogParseError> {
log::debug!("Parse log from block {:?}", log.block_number);
let result: Result<_, LogParseError> = try {
// event log signature, i.e. data field types
let signature = U::signature();
// gather data types for non indexed ("indexless") fields
let indexless = signature
.clone()
.into_iter()
.filter_map(|t| match t {
NotIndexed(t) => Some(t),
Indexed(_) => None,
})
.collect::<Vec<_>>();
// parse all non indexed fields to tokens
let indexless = parse_chain_data(&log.data, &indexless)?;

// iterate through data field types (signature), and take
// data `Token` from either 'indexless' or 'topics'
let mut indexless = indexless.into_iter();
// skip first topic, because it contains actual topic, and not indexed data field
let mut topics = log.topics.into_iter().skip(1);
// accumulate tokens here
let mut tokens = vec![];
for (position, event_field) in signature.into_iter().enumerate() {
match event_field {
NotIndexed(_) => {
// take next token for non indexed data field
let token = indexless.next().ok_or(MissingToken {
position,
event_field,
})?;
tokens.push(token);
}
ef @ Indexed(_) => {
let topic = topics.next().ok_or(MissingTopic {
position,
event_field: ef.clone(),
})?;
// parse indexed field to token one by one
let parsed = parse_chain_data(&topic, &[ef.clone().param_type()])?;
debug_assert!(parsed.len() == 1, "parse of an indexed event fields yielded several tokens, expected a single one");
let token = parsed.into_iter().next().ok_or(MissingToken {
position,
event_field: ef,
})?;
tokens.push(token)
}
}
}

if tokens.is_empty() {
return Err(LogParseError::NoTokens);
}

let block_number = log.block_number.clone();
let log = U::parse(&mut tokens.into_iter())?;
T::new(block_number, log)
};

if let Err(e) = result.as_ref() {
log::warn!(target: "connector",
"Cannot parse deal log from block {}: {:?}",
log.block_number,
e.to_string()
);
}

result
}
36 changes: 36 additions & 0 deletions crates/chain-data/src/u256.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use ethabi::Token;
use serde::{Deserialize, Serialize};

#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct U256 {
bytes: Vec<u8>,
}

impl U256 {
pub fn from_bytes(bs: &[u8; 32]) -> Self {
U256 { bytes: bs.to_vec() }
}

pub fn to_eth(&self) -> ethabi::ethereum_types::U256 {
ethabi::ethereum_types::U256::from_little_endian(&self.bytes)
}

pub fn to_u64_trunc(&self) -> u64 {
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&self.bytes[0..8]);
u64::from_le_bytes(bytes)
}

pub fn from_eth(num: ethabi::ethereum_types::U256) -> U256 {
let bytes = num
.0
.iter()
.flat_map(|x| x.to_le_bytes())
.collect::<Vec<_>>();
U256 { bytes }
}

pub fn from_token(token: Token) -> Option<Self> {
token.into_uint().map(Self::from_eth)
}
}
32 changes: 32 additions & 0 deletions crates/chain-data/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use libp2p_identity::{ParseError, PeerId};

/// Static prefix of the PeerId. Protobuf encoding + multihash::identity + length and so on.
pub(crate) const PEER_ID_PREFIX: &[u8] = &[0, 36, 8, 1, 18, 32];

pub fn parse_peer_id(bytes: Vec<u8>) -> Result<PeerId, ParseError> {
let peer_id = [PEER_ID_PREFIX, &bytes].concat();

PeerId::from_bytes(&peer_id)
}

pub fn peer_id_to_hex(peer_id: PeerId) -> String {
let peer_id = peer_id.to_bytes();
format!("0x{:0>64}", hex::encode(&peer_id[PEER_ID_PREFIX.len()..]))
}

#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn peer_id_test() {
let hex = "0x246cd65bc58db104674f76c9b1340eb16881d9ef90e33d4b1086ebd334f4002d".to_string();
let peer_id =
PeerId::from_str("12D3KooWCGZ6t8by5ag5YMQW4k3HoPLaKdN5rB9DhAmDUeG8dj1N").unwrap();
assert_eq!(
peer_id,
parse_peer_id(hex::decode(&hex[2..]).unwrap()).unwrap()
);
assert_eq!(hex, peer_id_to_hex(peer_id));
}
}
Loading

0 comments on commit 3ff44b5

Please sign in to comment.