Skip to content

Commit

Permalink
[client] Remove client events module
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudhead committed Nov 19, 2023
1 parent 2e111bd commit af1e9fc
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 332 deletions.
11 changes: 2 additions & 9 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ pub use crate::event::Loading;
pub use crate::handle;
pub use crate::service::Service;

use crate::event::Mapper;
use crate::peer;
use nakamoto_net::{Reactor, Waker};

Expand Down Expand Up @@ -216,10 +215,7 @@ impl<R: Reactor> Client<R> {
p.emit((filter, block, height));
}
});
let (publisher, subscriber) = event::broadcast({
let mut mapper = Mapper::default();
move |e, p| mapper.process(e, p)
});
let (publisher, subscriber) = event::broadcast(|e, p| p.emit(e));

let publisher = Publisher::default()
.register(event_pub)
Expand Down Expand Up @@ -703,10 +699,7 @@ impl<W: Waker> handle::Handle for Handle<W> {
None => event::wait(
&events,
|e| match e {
Event::BlockHeadersImported {
result: ImportResult::TipChanged { height, hash, .. },
..
} if height == h => Some(hash),
Event::BlockHeadersImported { height, hash, .. } if height == h => Some(hash),
_ => None,
},
self.timeout,
Expand Down
228 changes: 6 additions & 222 deletions client/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
//! Client events.
#![allow(clippy::manual_range_contains)]
use std::collections::HashSet;
use std::fmt;

use nakamoto_common::block::{Block, BlockHash, Height};
use nakamoto_net::event::Emitter;
use nakamoto_p2p::fsm;
use nakamoto_p2p::fsm::Event;
use nakamoto_common::block::Height;

/// Event emitted by the client during the "loading" phase.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -35,149 +31,18 @@ impl fmt::Display for Loading {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::BlockHeaderLoaded { height } => {
write!(fmt, "block header #{} loaded", height)
write!(fmt, "Block header #{} loaded", height)
}
Self::FilterHeaderLoaded { height } => {
write!(fmt, "filter header #{} loaded", height)
write!(fmt, "Filter header #{} loaded", height)
}
Self::FilterHeaderVerified { height } => {
write!(fmt, "filter header #{} verified", height)
write!(fmt, "Filter header #{} verified", height)
}
}
}
}

/// Event mapper for client events.
/// Consumes raw state machine events and emits [`Event`].
pub(crate) struct Mapper {
/// Best height known.
tip: Height,
/// The height up to which we've processed filters and matching blocks.
sync_height: Height,
/// The height up to which we've processed filters.
/// This is usually going to be greater than `sync_height`.
filter_height: Height,
/// The height up to which we've processed matching blocks.
/// This is always going to be lesser or equal to `filter_height`.
block_height: Height,
/// Filter heights that have been matched, and for which we are awaiting a block to process.
pending: HashSet<Height>,
}

impl Default for Mapper {
/// Create a new client event mapper.
fn default() -> Self {
let tip = 0;
let sync_height = 0;
let filter_height = 0;
let block_height = 0;
let pending = HashSet::new();

Self {
tip,
sync_height,
filter_height,
block_height,
pending,
}
}
}

impl Mapper {
/// Process protocol event and map it to client event(s).
pub fn process(&mut self, event: fsm::Event, emitter: &Emitter<Event>) {
match event {
Event::BlockHeadersSynced { height, .. } => {
self.tip = height;
emitter.emit(event);
}
Event::BlockProcessed {
block,
height,
fees,
} => {
let hash = self.process_block(block, height);

if let Some(fees) = fees {
emitter.emit(Event::FeeEstimated {
block: hash,
height,
fees,
});
}
}
Event::FilterRescanStarted { start, .. } => {
self.pending.clear();

self.filter_height = start;
self.sync_height = start;
self.block_height = start;
}
Event::FilterProcessed {
height,
matched,
valid: true,
..
} => {
debug_assert!(height >= self.filter_height);

if matched {
log::debug!("Filter matched for block #{}", height);
self.pending.insert(height);
}
self.filter_height = height;
emitter.emit(event);
}
other => emitter.emit(other),
}
assert!(
self.block_height <= self.filter_height,
"Filters are processed before blocks"
);
assert!(
self.sync_height <= self.filter_height,
"Filters are processed before we are done"
);

// If we have no blocks left to process, we are synced to the height of the last
// processed filter. Otherwise, we're synced up to the last processed block.
let height = if self.pending.is_empty() {
self.filter_height
} else {
self.block_height
};

// Ensure we only broadcast sync events when the sync height has changed.
if height > self.sync_height {
self.sync_height = height;

emitter.emit(Event::Synced {
height,
tip: self.tip,
});
}
}

// PRIVATE METHODS /////////////////////////////////////////////////////////

// TODO: Instead of receiving the block, fetch it if matched.
fn process_block(&mut self, block: Block, height: Height) -> BlockHash {
let hash = block.block_hash();

if !self.pending.remove(&height) {
// Received unexpected block.
return hash;
}

log::debug!("Received block {} at height {}", hash, height);
debug_assert!(height >= self.block_height);

self.block_height = height;

hash
}
}

#[cfg(test)]
mod test {
//! Properties of the [`client::Client`] we'd like to test.
Expand Down Expand Up @@ -220,16 +85,13 @@ mod test {
//!
use std::io;

use quickcheck::TestResult;
use quickcheck_macros::quickcheck;

use nakamoto_common::block::time::Clock as _;
use nakamoto_common::network::Network;
use nakamoto_net::{Disconnect, Link, LocalTime, StateMachine as _};
use nakamoto_p2p::fsm;
use nakamoto_p2p::Event;
use nakamoto_test::assert_matches;
use nakamoto_test::block::gen;

use super::Event;
use super::*;

use crate::handle::Handle as _;
Expand Down Expand Up @@ -412,82 +274,4 @@ mod test {
if addr == remote && height == 42 && user_agent == "?"
);
}

#[quickcheck]
fn prop_client_side_filtering(birth: Height, height: Height, seed: u64) -> TestResult {
if height < 1 || height > 24 || birth >= height {
return TestResult::discard();
}

let mut rng = fastrand::Rng::with_seed(seed);
let network = Network::Regtest;
let genesis = network.genesis_block();
let chain = gen::blockchain(genesis, height, &mut rng);
let mut mock = mock::Client::new(network);
let mut client = mock.handle();

client.tip = (height, chain[height as usize].header, Default::default());

let mut spent = 0;
let (watch, heights, balance) = gen::watchlist_rng(birth, chain.iter(), &mut rng);

log::debug!(
"-- Test case with birth = {} and height = {}",
birth,
height
);
let subscriber = client.events();

mock.subscriber.broadcast(fsm::Event::BlockHeadersSynced {
hash: chain.last().block_hash(),
height,
});

for h in birth..=height {
let matched = heights.contains(&h);
let block = chain[h as usize].clone();

mock.subscriber.broadcast(fsm::Event::FilterProcessed {
block: block.block_hash(),
height: h,
matched,
cached: false,
valid: true,
});

if matched {
mock.subscriber
.broadcast(fsm::Event::BlockMatched { block, height: h });
}
}

for event in subscriber.try_iter() {
match event {
Event::BlockMatched { block, .. } => {
for t in &block.txdata {
for output in &t.output {
if watch.contains(&output.script_pubkey) {
spent += output.value;
}
}
}
}
Event::Synced {
height: sync_height,
tip,
} => {
assert_eq!(height, tip);

if sync_height == tip {
break;
}
}
_ => {}
}
}
assert_eq!(balance, spent);
client.shutdown().unwrap();

TestResult::passed()
}
}
4 changes: 1 addition & 3 deletions client/src/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use nakamoto_p2p::fsm::Peer;
use nakamoto_p2p::fsm::StateMachine;

use crate::client::{chan, Event, Loading};
use crate::event::Mapper;
use crate::handle::{self, Handle};

pub struct Client {
Expand Down Expand Up @@ -103,8 +102,7 @@ impl Default for Client {
let (blocks, blocks_) = chan::unbounded();
let (filters, filters_) = chan::unbounded();
let (commands_, commands) = chan::unbounded();
let mut mapper = Mapper::default();
let (subscriber, subscriber_) = event::broadcast(move |e, p| mapper.process(e, p));
let (subscriber, subscriber_) = event::broadcast(|e, p| p.emit(e));
let loading = event::Emitter::default();
let network = Network::default();
let protocol = {
Expand Down
Loading

0 comments on commit af1e9fc

Please sign in to comment.