Skip to content

Commit

Permalink
Sequential relayer's commits (#1942)
Browse files Browse the repository at this point in the history
Updated relayer to do sequential commits. This means that if we don't
have any DA events for height `X`, we will still insert empty arrays. In
most cases, almost every block on DA will contain at least one event, so
overhead will be small and only affect the start of the network.

### Before requesting review
- [x] I have reviewed the code myself

---------

Co-authored-by: Brandon Kite <brandonkite92@gmail.com>
  • Loading branch information
xgreenx and Voxelot committed Jun 11, 2024
1 parent 997c02b commit 0692cfc
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 38 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Changed
- [#1942](https://github.com/FuelLabs/fuel-core/pull/1942): Sequential relayer's commits.

### Fixed
- [#1950](https://github.com/FuelLabs/fuel-core/pull/1950): Fix cursor `BlockHeight` encoding in `SortedTXCursor`

Expand Down
4 changes: 1 addition & 3 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,7 @@ where
.advance_height()
.ok_or(DatabaseError::FailedToAdvanceHeight)?;

// TODO: After https://github.com/FuelLabs/fuel-core/issues/451
// we can replace `next_expected_height > new_height` with `next_expected_height != new_height`.
if next_expected_height > new_height {
if next_expected_height != new_height {
return Err(DatabaseError::HeightsAreNotLinked {
prev_height: prev_height.as_u64(),
new_height: new_height.as_u64(),
Expand Down
56 changes: 33 additions & 23 deletions crates/services/relayer/src/service/get_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@ use fuel_core_types::{
services::relayer::Event,
};
use futures::TryStreamExt;
use std::collections::BTreeMap;
use std::collections::HashMap;

#[cfg(test)]
mod test;

pub struct DownloadedLogs {
pub start_height: u64,
pub last_height: u64,
pub logs: Vec<Log>,
}

/// Download the logs from the DA layer.
pub(crate) fn download_logs<'a, P>(
eth_sync_gap: &state::EthSyncGap,
contracts: Vec<H160>,
eth_node: &'a P,
page_size: u64,
) -> impl futures::Stream<Item = Result<(u64, Vec<Log>), ProviderError>> + 'a
) -> impl futures::Stream<Item = Result<DownloadedLogs, ProviderError>> + 'a
where
P: Middleware<Error = ProviderError> + 'static,
{
Expand All @@ -41,16 +47,23 @@ where
page.latest()
);

let oldest_block = page.oldest();
let latest_block = page.latest();

// Reduce the page.
let page = page.reduce();

// Get the logs and return the reduced page.
eth_node
.get_logs(&filter)
.await
.map(|logs| Some(((latest_block, logs), page)))
eth_node.get_logs(&filter).await.map(|logs| {
Some((
DownloadedLogs {
start_height: oldest_block,
last_height: latest_block,
logs,
},
page,
))
})
}
}
}
Expand All @@ -62,12 +75,16 @@ where
pub(crate) async fn write_logs<D, S>(database: &mut D, logs: S) -> anyhow::Result<()>
where
D: RelayerDb,
S: futures::Stream<Item = Result<(u64, Vec<Log>), ProviderError>>,
S: futures::Stream<Item = Result<DownloadedLogs, ProviderError>>,
{
tokio::pin!(logs);
while let Some((last_height, events)) = logs.try_next().await? {
let last_height = last_height.into();
let mut ordered_events = BTreeMap::<DaBlockHeight, Vec<Event>>::new();
while let Some(DownloadedLogs {
start_height,
last_height,
logs: events,
}) = logs.try_next().await?
{
let mut unordered_events = HashMap::<DaBlockHeight, Vec<Event>>::new();
let sorted_events = sort_events_by_log_index(events)?;
let fuel_events = sorted_events.into_iter().filter_map(|event| {
match EthEventLog::try_from(&event) {
Expand All @@ -90,21 +107,14 @@ where
for event in fuel_events {
let event = event?;
let height = event.da_height();
ordered_events.entry(height).or_default().push(event);
}

let mut inserted_last_height = false;
for (height, events) in ordered_events {
database.insert_events(&height, &events)?;
if height == last_height {
inserted_last_height = true;
}
unordered_events.entry(height).or_default().push(event);
}

// TODO: For https://github.com/FuelLabs/fuel-core/issues/451 we need to write each height
// (not only the last height), even if it's empty.
if !inserted_last_height {
database.insert_events(&last_height, &[])?;
let empty_events = Vec::new();
for height in start_height..=last_height {
let height: DaBlockHeight = height.into();
let events = unordered_events.get(&height).unwrap_or(&empty_events);
database.insert_events(&height, events)?;
}
}
Ok(())
Expand Down
27 changes: 17 additions & 10 deletions crates/services/relayer/src/service/get_logs/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async fn can_paginate_logs(input: Input) -> Expected {
&eth_node,
DEFAULT_LOG_PAGE_SIZE,
)
.map_ok(|(_, l)| l)
.map_ok(|logs| logs.logs)
.try_concat()
.await
.unwrap();
Expand All @@ -148,32 +148,39 @@ async fn can_paginate_logs(input: Input) -> Expected {
}

#[test_case(vec![
Ok((1, messages_n(1, 0)))
Ok((1, 1, messages_n(1, 0)))
] => 1 ; "Can add single"
)]
#[test_case(vec![
Ok((3, messages_n(3, 0))),
Ok((4, messages_n(1, 4)))
Ok((3, 3, messages_n(3, 0))),
Ok((4, 4, messages_n(1, 4)))
] => 4 ; "Can add two"
)]
#[test_case(vec![
Ok((3, messages_n(3, 0))),
Ok((4, vec![]))
Ok((3, 3, messages_n(3, 0))),
Ok((4, 4, vec![]))
] => 4 ; "Can add empty"
)]
#[test_case(vec![
Ok((7, messages_n(3, 0))),
Ok((19, messages_n(1, 4))),
Ok((1, 7, messages_n(3, 0))),
Ok((8, 19, messages_n(1, 4))),
Err(ProviderError::CustomError("".to_string()))
] => 19 ; "Still adds height when error"
)]
#[tokio::test]
#[allow(clippy::type_complexity)]
async fn test_da_height_updates(
stream: Vec<Result<(u64, Vec<Log>), ProviderError>>,
stream: Vec<Result<(u64, u64, Vec<Log>), ProviderError>>,
) -> u64 {
let mut mock_db = crate::mock_db::MockDb::default();

let logs = futures::stream::iter(stream);
let logs = futures::stream::iter(stream).map(|result| {
result.map(|(start_height, last_height, logs)| DownloadedLogs {
start_height,
last_height,
logs,
})
});

let _ = write_logs(&mut mock_db, logs).await;

Expand Down
2 changes: 1 addition & 1 deletion crates/services/relayer/src/service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn can_download_logs() {
&eth_node,
DEFAULT_LOG_PAGE_SIZE,
)
.map_ok(|(_, l)| l)
.map_ok(|logs| logs.logs)
.try_concat()
.await
.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion tests/test-helpers/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use fuel_core::{
use fuel_core_client::client::FuelClient;
use fuel_core_poa::Trigger;
use fuel_core_types::{
blockchain::header::LATEST_STATE_TRANSITION_VERSION,
fuel_asm::op,
fuel_tx::{
field::Inputs,
Expand Down Expand Up @@ -210,7 +211,7 @@ impl TestSetupBuilder {

let latest_block = self.starting_block.map(|starting_block| LastBlockConfig {
block_height: starting_block,
state_transition_version: 0,
state_transition_version: LATEST_STATE_TRANSITION_VERSION - 1,
..Default::default()
});

Expand Down

0 comments on commit 0692cfc

Please sign in to comment.