Skip to content

Commit

Permalink
start on fuel-core-services
Browse files Browse the repository at this point in the history
  • Loading branch information
freesig committed Jan 10, 2023
1 parent a551442 commit 7b101ae
Show file tree
Hide file tree
Showing 14 changed files with 457 additions and 22 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
.vscode
.cov
lcov.info
version-compatibility/Cargo.lock
version-compatibility/Cargo.lock
benches/benches-outputs/Cargo.lock
11 changes: 8 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions crates/services/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
[package]
name = "fuel-core-services"
version = "0.15.1"
authors = ["Fuel Labs <contact@fuel.sh>"]
description = "The common code for fuel core services."
edition = "2021"
homepage = "https://fuel.network/"
keywords = ["blockchain", "fuel", "consensus", "bft"]
keywords = ["bft", "blockchain", "consensus", "fuel"]
license = "BUSL-1.1"
name = "fuel-core-services"
repository = "https://github.com/FuelLabs/fuel-core"
description = "The common code for fuel core services."
version = "0.15.1"

[dependencies]
anyhow = "1.0"
async-trait = "0.1"
futures = "0.3"
parking_lot = "0.12"
tokio = { version = "1.21", features = ["full"] }
tracing = "0.1"

Expand Down
12 changes: 12 additions & 0 deletions crates/services/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,25 @@ pub mod stream {
core::pin::Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>;
}

/// The source of some network data.
pub struct SourcePeer<T> {
/// The source of the data.
pub peer_id: PeerId,
/// The data.
pub data: T,
}

/// Placeholder for a peer id
pub struct PeerId;

pub use service::{
EmptyShared,
RunnableService,
RunnableTask,
Service,
ServiceRunner,
Shared,
SharedMutex,
State,
StateWatcher,
};
17 changes: 17 additions & 0 deletions crates/services/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ pub type Shared<T> = std::sync::Arc<T>;
/// The alias for `State` watcher to re-export `watch::Receiver`.
pub type StateWatcher = watch::Receiver<State>;

/// A mutex that can safely be in async contexts and avoids deadlocks.
#[derive(Debug, Clone)]
pub struct SharedMutex<T>(Shared<parking_lot::Mutex<T>>);

/// Used if services have no asynchronously shared data
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EmptyShared;
Expand Down Expand Up @@ -334,6 +338,19 @@ where
})
}

impl<T> SharedMutex<T> {
/// Creates a new `SharedMutex` with the given value.
pub fn new(t: T) -> Self {
Self(Shared::new(parking_lot::Mutex::new(t)))
}

/// Apply a function to the inner value and return a value.
pub fn apply<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
let mut t = self.0.lock();
f(&mut t)
}
}

// TODO: Add tests
#[cfg(test)]
mod tests {
Expand Down
17 changes: 13 additions & 4 deletions crates/services/sync/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
[package]
name = "fuel-core-sync"
version = "0.15.1"
authors = ["Fuel Labs <contact@fuel.sh>"]
description = "Fuel Synchronizer"
edition = "2021"
homepage = "https://fuel.network/"
keywords = ["blockchain", "fuel", "fuel-vm"]
license = "BUSL-1.1"
name = "fuel-core-sync"
repository = "https://github.com/FuelLabs/fuel-core"
description = "Fuel Synchronizer"
version = "0.15.1"

[dependencies]
anyhow = "1.0"
async-trait = "0.1.60"
fuel-core-services = { path = "../" }
fuel-core-types = { path = "../../types", version = "0.15.1" }
parking_lot = "0.12"
futures = "0.3.25"
tokio = { version = "1.21", features = ["full"] }

[dev-dependencies]
fuel-core-types = { path = "../../types", features = [
"test-helpers",
] }
mockall = "0.11.3"
test-case = "2.2.2"
183 changes: 183 additions & 0 deletions crates/services/sync/src/import.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use std::{
ops::RangeInclusive,
sync::Arc,
};

use fuel_core_services::{
SharedMutex,
SourcePeer,
};
use fuel_core_types::{
blockchain::{
block::Block,
consensus::Sealed,
primitives::{
BlockHeight,
BlockId,
},
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
services::executor::ExecutionBlock,
};
use futures::{
stream,
stream::StreamExt,
Stream,
};
use std::future::Future;
use tokio::sync::Notify;

use crate::{
ports::{
Executor,
PeerToPeer,
},
State,
};

#[cfg(test)]
mod tests;

#[derive(Clone, Copy, Debug)]
pub struct Params {
pub max_get_header_requests: usize,
pub max_get_txns_requests: usize,
}

async fn import(
state: SharedMutex<State>,
notify: Arc<Notify>,
params: Params,
p2p: Arc<impl PeerToPeer + 'static>,
executor: Arc<impl Executor + 'static>,
) {
loop {
if let Some(range) = state.apply(|s| {
match (s.in_flight_height.as_mut(), s.best_seen_height.as_mut()) {
(Some(in_flight), Some(best)) if !in_flight.is_empty() => {
in_flight.end = *best;
Some(in_flight.start..=*best)
}
(Some(in_flight), None) if !in_flight.is_empty() => {
Some(in_flight.start..=in_flight.end)
}
(Some(in_flight), Some(best)) if *best > in_flight.end => {
in_flight.start = in_flight.end + 1u32.into();
in_flight.end = *best;
Some(in_flight.start..=in_flight.end)
}
(None, Some(best)) => {
s.in_flight_height = Some(0u32.into()..*best);
Some(0u32.into()..=*best)
}
_ => None,
}
}) {
let range = (**range.start())..=(**range.end());
get_header_range_buffered(range, params, p2p.clone())
.map(|header| {
let SourcePeer {
peer_id,
data:
Sealed {
entity: header,
consensus,
},
} = header;
let id = header.id();
let block_id = SourcePeer { peer_id, data: id };
let p2p = p2p.clone();
async move {
p2p.get_transactions(block_id).await.unwrap().and_then(
|transactions| {
Some(SealedBlock {
entity: Block::try_from_executed(
header,
transactions,
)?,
consensus,
})
},
)
}
})
.buffered(params.max_get_txns_requests)
.scan((), |_, block| futures::future::ready(block))
.for_each(|block| {
let state = state.clone();
let height = *block.entity.header().height();
let executor = executor.clone();
async move {
match executor.execute_and_commit(block).await {
Ok(_) => {
state.apply(|s| {
s.in_flight_height.as_mut().unwrap().start = height;
});
}
Err(_) => todo!(),
}
}
})
.await;
}
notify.notified().await;
}
}

fn get_header_range_buffered(
range: RangeInclusive<u32>,
params: Params,
p2p: Arc<impl PeerToPeer + 'static>,
) -> impl Stream<Item = SourcePeer<SealedBlockHeader>> {
get_header_range(range, p2p)
.buffered(params.max_get_header_requests)
.scan((), |_, h| futures::future::ready(h))
}

fn get_header_range(
range: RangeInclusive<u32>,
p2p: Arc<impl PeerToPeer + 'static>,
) -> impl Stream<Item = impl Future<Output = Option<SourcePeer<SealedBlockHeader>>>> {
stream::iter(range).map(move |height| {
let p2p = p2p.clone();
async move { p2p.get_sealed_block_header(height.into()).await.unwrap() }
})
}

fn get_txns_buffered(
block_ids: impl Iterator<Item = SourcePeer<BlockId>>,
params: Params,
p2p: Arc<impl PeerToPeer + 'static>,
) -> impl Stream<Item = Vec<Transaction>> {
stream::iter(block_ids)
.map({
move |block_id| {
let p2p = p2p.clone();
async move { p2p.get_transactions(block_id).await.unwrap() }
}
})
.buffered(params.max_get_txns_requests)
.scan((), |_, h| futures::future::ready(h))
}

fn range_iterator(
range: std::ops::RangeInclusive<u64>,
chunk_size: u64,
) -> impl Iterator<Item = std::ops::RangeInclusive<u64>> {
let start = *range.start();
let end = *range.end();
let mut current_start = start;
let mut current_end = std::cmp::min(current_start + chunk_size - 1, end);
std::iter::from_fn(move || {
if current_start > end {
None
} else {
let result = current_start..=current_end;
current_start = current_end + 1;
current_end = std::cmp::min(current_start + chunk_size - 1, end);
Some(result)
}
})
}
Loading

0 comments on commit 7b101ae

Please sign in to comment.