Skip to content

Commit

Permalink
[p2p] Reduce API surface
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudhead committed Nov 19, 2023
1 parent af1e9fc commit 2cb33ba
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 121 deletions.
Empty file removed client/src/spv/tests.rs
Empty file.
15 changes: 7 additions & 8 deletions p2p/src/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ impl<T: BlockTree, F: Filters, P: peer::Store, C: AdjustedClock<PeerId>> StateMa
self.pingmgr.received_event(e.clone(), &self.tree);
self.invmgr.received_event(e.clone(), &self.tree);
self.syncmgr.received_event(e.clone(), &mut self.tree);
self.addrmgr.received_event(e.clone(), &self.tree);
self.addrmgr.received_event(e.clone());
self.peermgr.received_event(e, &self.tree);
}

Expand Down Expand Up @@ -799,7 +799,6 @@ impl<T: BlockTree, F: Filters, P: peer::Store, C: AdjustedClock<PeerId>> traits:
}

fn attempted(&mut self, addr: &net::SocketAddr) {
self.addrmgr.peer_attempted(addr);
self.peermgr.peer_attempted(addr);
}

Expand All @@ -826,12 +825,12 @@ impl<T: BlockTree, F: Filters, P: peer::Store, C: AdjustedClock<PeerId>> traits:
fn timer_expired(&mut self) {
trace!("Received wake");

self.invmgr.received_wake(&self.tree);
self.syncmgr.received_wake(&self.tree);
self.pingmgr.received_wake();
self.addrmgr.received_wake();
self.peermgr.received_wake(&mut self.addrmgr);
self.cbfmgr.received_wake(&self.tree);
self.invmgr.timer_expired(&self.tree);
self.syncmgr.timer_expired(&self.tree);
self.pingmgr.timer_expired();
self.addrmgr.timer_expired();
self.peermgr.timer_expired(&mut self.addrmgr);
self.cbfmgr.timer_expired(&self.tree);

#[cfg(not(test))]
let local_time = self.clock.local_time();
Expand Down
71 changes: 37 additions & 34 deletions p2p/src/fsm/addrmgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,8 @@ impl<P: Store, C: Clock> AddressManager<P, C> {
self.idle();
}

/// Return an iterator over randomly sampled addresses.
pub fn iter(&mut self, services: ServiceFlags) -> impl Iterator<Item = (Address, Source)> + '_ {
Iter(move || self.sample(services))
}

/// Get addresses from peers.
pub fn get_addresses(&mut self) {
for peer in &self.sources {
self.outbox.get_addr(*peer);
}
}

/// Event received.
pub fn received_event<T>(&mut self, event: Event, _tree: &T) {
pub fn received_event(&mut self, event: Event) {
match event {
Event::PeerConnected { addr, .. } => {
self.peer_connected(&addr);
Expand All @@ -151,6 +139,9 @@ impl<P: Store, C: Clock> AddressManager<P, C> {
}
self.peer_negotiated(&addr, services, link);
}
Event::PeerConnecting { addr, .. } => {
self.peer_attempted(&addr);
}
Event::PeerDisconnected { addr, reason } => {
self.peer_disconnected(&addr, reason);
}
Expand All @@ -174,8 +165,30 @@ impl<P: Store, C: Clock> AddressManager<P, C> {
}
}

/// Called when a tick is received.
pub fn timer_expired(&mut self) {
let local_time = self.clock.local_time();

// If we're already using all the addresses we have available, we should fetch more.
if local_time - self.last_request.unwrap_or_default() >= REQUEST_TIMEOUT
&& self.is_exhausted()
{
self.outbox.event(Event::AddressBookExhausted);

self.get_addresses();
self.last_request = Some(local_time);
self.outbox.set_timer(REQUEST_TIMEOUT);
}

if local_time - self.last_idle.unwrap_or_default() >= IDLE_TIMEOUT {
self.idle();
}
}

////////////////////////////////////////////////////////////////////////////

/// Called when we receive a `getaddr` message.
pub fn received_getaddr(&mut self, from: &net::SocketAddr) {
fn received_getaddr(&mut self, from: &net::SocketAddr) {
// TODO: We should only respond with peers who were last active within
// the last 3 hours.
let mut addrs = Vec::new();
Expand All @@ -194,28 +207,20 @@ impl<P: Store, C: Clock> AddressManager<P, C> {
self.outbox.addr(*from, addrs);
}

/// Called when a tick is received.
pub fn received_wake(&mut self) {
let local_time = self.clock.local_time();

// If we're already using all the addresses we have available, we should fetch more.
if local_time - self.last_request.unwrap_or_default() >= REQUEST_TIMEOUT
&& self.is_exhausted()
{
self.outbox.event(Event::AddressBookExhausted);

self.get_addresses();
self.last_request = Some(local_time);
self.outbox.set_timer(REQUEST_TIMEOUT);
}
/// Return an iterator over randomly sampled addresses.
fn iter(&mut self, services: ServiceFlags) -> impl Iterator<Item = (Address, Source)> + '_ {
Iter(move || self.sample(services))
}

if local_time - self.last_idle.unwrap_or_default() >= IDLE_TIMEOUT {
self.idle();
/// Get addresses from peers.
fn get_addresses(&mut self) {
for peer in &self.sources {
self.outbox.get_addr(*peer);
}
}

/// Called when a peer connection is attempted.
pub fn peer_attempted(&mut self, addr: &net::SocketAddr) {
fn peer_attempted(&mut self, addr: &net::SocketAddr) {
let time = self.clock.local_time();
// We're only interested in connection attempts for addresses we keep track of.
if let Some(ka) = self.peers.get_mut(&addr.ip()) {
Expand Down Expand Up @@ -281,8 +286,6 @@ impl<P: Store, C: Clock> AddressManager<P, C> {
}
}

////////////////////////////////////////////////////////////////////////////

fn idle(&mut self) {
// If it's been a while, save addresses to store.
if let Err(err) = self.peers.flush() {
Expand Down Expand Up @@ -1079,7 +1082,7 @@ mod tests {

// Make sure we can re-sample the same addresses for the purpose of this example.
clock.elapse(LocalDuration::from_mins(60));
addrmgr.received_wake();
addrmgr.timer_expired();

if adversary_addrs.contains(&addr) {
adversary += 1;
Expand Down
122 changes: 61 additions & 61 deletions p2p/src/fsm/cbfmgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl<F: Filters, C: Clock> FilterManager<F, C> {
}

/// A tick was received.
pub fn received_wake<T: BlockReader>(&mut self, tree: &T) {
pub fn timer_expired<T: BlockReader>(&mut self, tree: &T) {
self.idle(tree);

let timeout = self.config.request_timeout;
Expand Down Expand Up @@ -358,60 +358,6 @@ impl<F: Filters, C: Clock> FilterManager<F, C> {
}
}

/// Rollback filters to the given height.
pub fn rollback(&mut self, height: Height) -> Result<(), filter::Error> {
// It's possible that a rollback doesn't affect the filter chain, if the filter headers
// haven't caught up to the block headers when the re-org happens.
if height >= self.filters.height() {
return Ok(());
}

// Purge stale block filters.
self.rescan.rollback(height);
// Rollback filter header chain.
self.filters.rollback(height)?;

// Nb. Inflight filter header requests for heights that were rolled back will be ignored
// when received.
//
// TODO: Inflight filter requests need to be re-issued.

if self.rescan.active {
// Reset "current" scanning height.
//
// We start re-scanning from either the start, or the current height, whichever
// is greater, while ensuring that we only reset backwards, ie. we never skip
// heights.
//
// For example, given we are currently at 7, if we rolled back to height 4, and our
// start is at 5, we restart from 5.
//
// If we rolled back to height 4 and our start is at 3, we restart at 4, because
// we don't need to scan blocks before our start height.
//
// If we rolled back to height 9 from height 11, we wouldn't want to re-scan any
// blocks, since we haven't yet gotten to that height.
//
let start = self.rescan.start;
let current = self.rescan.current;

if current > height + 1 {
self.rescan.current = Height::max(height + 1, start);
}

log::debug!(
target: "p2p",
"Rollback from {} to {}, start = {}, height = {}",
current,
self.rescan.current,
start,
height
);
}

Ok(())
}

/// Add scripts to the list of scripts to watch.
pub fn watch(&mut self, scripts: Vec<Script>) {
self.rescan.watch.extend(scripts);
Expand Down Expand Up @@ -532,8 +478,64 @@ impl<F: Filters, C: Clock> FilterManager<F, C> {
Ok(())
}

// PRIVATE METHODS /////////////////////////////////////////////////////////

/// Rollback filters to the given height.
fn rollback(&mut self, height: Height) -> Result<(), filter::Error> {
// It's possible that a rollback doesn't affect the filter chain, if the filter headers
// haven't caught up to the block headers when the re-org happens.
if height >= self.filters.height() {
return Ok(());
}

// Purge stale block filters.
self.rescan.rollback(height);
// Rollback filter header chain.
self.filters.rollback(height)?;

// Nb. Inflight filter header requests for heights that were rolled back will be ignored
// when received.
//
// TODO: Inflight filter requests need to be re-issued.

if self.rescan.active {
// Reset "current" scanning height.
//
// We start re-scanning from either the start, or the current height, whichever
// is greater, while ensuring that we only reset backwards, ie. we never skip
// heights.
//
// For example, given we are currently at 7, if we rolled back to height 4, and our
// start is at 5, we restart from 5.
//
// If we rolled back to height 4 and our start is at 3, we restart at 4, because
// we don't need to scan blocks before our start height.
//
// If we rolled back to height 9 from height 11, we wouldn't want to re-scan any
// blocks, since we haven't yet gotten to that height.
//
let start = self.rescan.start;
let current = self.rescan.current;

if current > height + 1 {
self.rescan.current = Height::max(height + 1, start);
}

log::debug!(
target: "p2p",
"Rollback from {} to {}, start = {}, height = {}",
current,
self.rescan.current,
start,
height
);
}

Ok(())
}

/// Called when a new peer was negotiated.
pub fn peer_negotiated<T: BlockReader>(
fn peer_negotiated<T: BlockReader>(
&mut self,
addr: PeerId,
height: Height,
Expand Down Expand Up @@ -562,7 +564,7 @@ impl<F: Filters, C: Clock> FilterManager<F, C> {
}

/// Attempt to sync the filter header chain.
pub fn sync<T: BlockReader>(&mut self, tree: &T) {
fn sync<T: BlockReader>(&mut self, tree: &T) {
let filter_height = self.filters.height();
let block_height = tree.height();

Expand Down Expand Up @@ -597,8 +599,6 @@ impl<F: Filters, C: Clock> FilterManager<F, C> {
}
}

// PRIVATE METHODS /////////////////////////////////////////////////////////

/// Remove transaction from list of transactions being watch.
fn unwatch_transaction(&mut self, txid: &Txid) -> bool {
self.rescan.transactions.remove(txid).is_some()
Expand Down Expand Up @@ -722,7 +722,7 @@ impl<F: Filters, C: Clock> FilterManager<F, C> {
}

/// Handle a `getcfheaders` message from a peer.
pub fn received_getcfheaders<T: BlockReader>(
fn received_getcfheaders<T: BlockReader>(
&mut self,
from: &PeerId,
msg: GetCFHeaders,
Expand Down Expand Up @@ -1414,7 +1414,7 @@ mod tests {

// We still have filters we are waiting for, but let's let some time pass.
cbfmgr.clock.elapse(DEFAULT_REQUEST_TIMEOUT);
cbfmgr.received_wake(&tree);
cbfmgr.timer_expired(&tree);

// We expect a new request to be sent from the new starting height.
let (stop_hash, _) = tree.tip();
Expand Down
Loading

0 comments on commit 2cb33ba

Please sign in to comment.