Skip to content

Commit

Permalink
start on fuel-core-services
Browse files Browse the repository at this point in the history
  • Loading branch information
freesig committed Jan 5, 2023
1 parent 81d9ef8 commit b121512
Show file tree
Hide file tree
Showing 13 changed files with 455 additions and 21 deletions.
11 changes: 8 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions crates/services/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
[package]
name = "fuel-core-services"
version = "0.15.1"
authors = ["Fuel Labs <contact@fuel.sh>"]
description = "The common code for fuel core services."
edition = "2021"
homepage = "https://fuel.network/"
keywords = ["blockchain", "fuel", "consensus", "bft"]
keywords = ["bft", "blockchain", "consensus", "fuel"]
license = "BUSL-1.1"
name = "fuel-core-services"
repository = "https://github.com/FuelLabs/fuel-core"
description = "The common code for fuel core services."
version = "0.15.1"

[dependencies]
anyhow = "1.0"
async-trait = "0.1"
futures = "0.3"
parking_lot = "0.12"
tokio = { version = "1.21", features = ["full"] }
tracing = "0.1"

Expand Down
12 changes: 12 additions & 0 deletions crates/services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,23 @@ pub mod stream {
core::pin::Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>;
}

/// The source of some network data.
pub struct SourcePeer<T> {
/// The source of the data.
pub peer_id: PeerId,
/// The data.
pub data: T,
}

/// Placeholder for a peer id
pub struct PeerId;

pub use service::{
EmptyShared,
RunnableService,
Service,
ServiceRunner,
Shared,
SharedMutex,
State,
};
17 changes: 17 additions & 0 deletions crates/services/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use tokio::{
/// Alias for Arc<T>
pub type Shared<T> = std::sync::Arc<T>;

/// A mutex that can safely be in async contexts and avoids deadlocks.
#[derive(Debug, Clone)]
pub struct SharedMutex<T>(Shared<parking_lot::Mutex<T>>);

/// Used if services have no asynchronously shared data
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EmptyShared;
Expand Down Expand Up @@ -279,6 +283,19 @@ where
})
}

impl<T> SharedMutex<T> {
/// Creates a new `SharedMutex` with the given value.
pub fn new(t: T) -> Self {
Self(Shared::new(parking_lot::Mutex::new(t)))
}

/// Apply a function to the inner value and return a value.
pub fn apply<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
let mut t = self.0.lock();
f(&mut t)
}
}

// TODO: Add tests
#[cfg(test)]
mod tests {
Expand Down
17 changes: 13 additions & 4 deletions crates/services/sync/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
[package]
name = "fuel-core-sync"
version = "0.15.1"
authors = ["Fuel Labs <contact@fuel.sh>"]
description = "Fuel Synchronizer"
edition = "2021"
homepage = "https://fuel.network/"
keywords = ["blockchain", "fuel", "fuel-vm"]
license = "BUSL-1.1"
name = "fuel-core-sync"
repository = "https://github.com/FuelLabs/fuel-core"
description = "Fuel Synchronizer"
version = "0.15.1"

[dependencies]
anyhow = "1.0"
async-trait = "0.1.60"
fuel-core-services = { path = "../" }
fuel-core-types = { path = "../../types", version = "0.15.1" }
parking_lot = "0.12"
futures = "0.3.25"
tokio = { version = "1.21", features = ["full"] }

[dev-dependencies]
fuel-core-types = { path = "../../types", features = [
"test-helpers",
] }
mockall = "0.11.3"
test-case = "2.2.2"
183 changes: 183 additions & 0 deletions crates/services/sync/src/import.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use std::{
ops::RangeInclusive,
sync::Arc,
};

use fuel_core_services::{
SharedMutex,
SourcePeer,
};
use fuel_core_types::{
blockchain::{
block::Block,
consensus::Sealed,
primitives::{
BlockHeight,
BlockId,
},
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
services::executor::ExecutionBlock,
};
use futures::{
stream,
stream::StreamExt,
Stream,
};
use std::future::Future;
use tokio::sync::Notify;

use crate::{
ports::{
Executor,
PeerToPeer,
},
State,
};

#[cfg(test)]
mod tests;

#[derive(Clone, Copy, Debug)]
pub struct Params {
pub max_get_header_requests: usize,
pub max_get_txns_requests: usize,
}

async fn import(
state: SharedMutex<State>,
notify: Arc<Notify>,
params: Params,
p2p: Arc<impl PeerToPeer + 'static>,
executor: Arc<impl Executor + 'static>,
) {
loop {
if let Some(range) = state.apply(|s| {
match (s.in_flight_height.as_mut(), s.best_seen_height.as_mut()) {
(Some(in_flight), Some(best)) if !in_flight.is_empty() => {
in_flight.end = *best;
Some(in_flight.start..=*best)
}
(Some(in_flight), None) if !in_flight.is_empty() => {
Some(in_flight.start..=in_flight.end)
}
(Some(in_flight), Some(best)) if *best > in_flight.end => {
in_flight.start = in_flight.end + 1u32.into();
in_flight.end = *best;
Some(in_flight.start..=in_flight.end)
}
(None, Some(best)) => {
s.in_flight_height = Some(0u32.into()..*best);
Some(0u32.into()..=*best)
}
_ => None,
}
}) {
let range = (**range.start())..=(**range.end());
get_header_range_buffered(range, params, p2p.clone())
.map(|header| {
let SourcePeer {
peer_id,
data:
Sealed {
entity: header,
consensus,
},
} = header;
let id = header.id();
let block_id = SourcePeer { peer_id, data: id };
let p2p = p2p.clone();
async move {
p2p.get_transactions(block_id).await.unwrap().and_then(
|transactions| {
Some(SealedBlock {
entity: Block::try_from_executed(
header,
transactions,
)?,
consensus,
})
},
)
}
})
.buffered(params.max_get_txns_requests)
.scan((), |_, block| futures::future::ready(block))
.for_each(|block| {
let state = state.clone();
let height = *block.entity.header().height();
let executor = executor.clone();
async move {
match executor.execute_and_commit(block).await {
Ok(_) => {
state.apply(|s| {
s.in_flight_height.as_mut().unwrap().start = height;
});
}
Err(_) => todo!(),
}
}
})
.await;
}
notify.notified().await;
}
}

fn get_header_range_buffered(
range: RangeInclusive<u32>,
params: Params,
p2p: Arc<impl PeerToPeer + 'static>,
) -> impl Stream<Item = SourcePeer<SealedBlockHeader>> {
get_header_range(range, p2p)
.buffered(params.max_get_header_requests)
.scan((), |_, h| futures::future::ready(h))
}

fn get_header_range(
range: RangeInclusive<u32>,
p2p: Arc<impl PeerToPeer + 'static>,
) -> impl Stream<Item = impl Future<Output = Option<SourcePeer<SealedBlockHeader>>>> {
stream::iter(range).map(move |height| {
let p2p = p2p.clone();
async move { p2p.get_sealed_block_header(height.into()).await.unwrap() }
})
}

fn get_txns_buffered(
block_ids: impl Iterator<Item = SourcePeer<BlockId>>,
params: Params,
p2p: Arc<impl PeerToPeer + 'static>,
) -> impl Stream<Item = Vec<Transaction>> {
stream::iter(block_ids)
.map({
move |block_id| {
let p2p = p2p.clone();
async move { p2p.get_transactions(block_id).await.unwrap() }
}
})
.buffered(params.max_get_txns_requests)
.scan((), |_, h| futures::future::ready(h))
}

fn range_iterator(
range: std::ops::RangeInclusive<u64>,
chunk_size: u64,
) -> impl Iterator<Item = std::ops::RangeInclusive<u64>> {
let start = *range.start();
let end = *range.end();
let mut current_start = start;
let mut current_end = std::cmp::min(current_start + chunk_size - 1, end);
std::iter::from_fn(move || {
if current_start > end {
None
} else {
let result = current_start..=current_end;
current_start = current_end + 1;
current_end = std::cmp::min(current_start + chunk_size - 1, end);
Some(result)
}
})
}
62 changes: 62 additions & 0 deletions crates/services/sync/src/import/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::time::Duration;

use fuel_core_services::PeerId;
use fuel_core_types::{
blockchain::{
consensus::Consensus,
header::BlockHeader,
},
services::executor::ExecutionResult,
};

use crate::ports::{
MockExecutor,
MockPeerToPeer,
};

use super::*;

#[tokio::test]
async fn test_import() {
let state = SharedMutex::new(State {
best_seen_height: Some(5u32.into()),
..Default::default()
});
let notify = Arc::new(Notify::new());
let params = Params {
max_get_header_requests: 10,
max_get_txns_requests: 10,
};
let mut p2p = MockPeerToPeer::default();
p2p.expect_get_sealed_block_header().returning(|_| {
let header = BlockHeader::default();
let consensus = Consensus::default();
let sealed = Sealed {
entity: header,
consensus,
};
let peer_id = PeerId {};
let source_peer = SourcePeer {
peer_id,
data: sealed,
};
Ok(Some(source_peer))
});
p2p.expect_get_transactions()
.returning(|_| Ok(Some(vec![])));
let p2p = Arc::new(p2p);
let mut executor = MockExecutor::default();
executor.expect_execute_and_commit().returning(|_| {
Ok(ExecutionResult {
block: Block::default(),
skipped_transactions: vec![],
tx_status: vec![],
})
});
let executor = Arc::new(executor);
let _ = tokio::time::timeout(
Duration::from_secs(2),
import(state, notify, params, p2p, executor),
)
.await;
}
Loading

0 comments on commit b121512

Please sign in to comment.