Skip to content
This repository has been archived by the owner on Mar 14, 2023. It is now read-only.

Commit

Permalink
Simplify rpc block emission logic
Browse files Browse the repository at this point in the history
Not tested yet and might not be correct!
  • Loading branch information
LLFourn committed Dec 20, 2022
1 parent 4411e11 commit 6674080
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 143 deletions.
47 changes: 26 additions & 21 deletions bdk_rpc_example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use rpc::{Client, RpcData, RpcError};

const CHANNEL_BOUND: usize = 10;
const LIVE_POLL_DUR_SECS: u64 = 15;
const FALLBACK_CP_LIMIT: usize = 100;

#[derive(Args, Debug, Clone)]
struct RpcArgs {
Expand Down Expand Up @@ -107,11 +106,9 @@ fn main() -> anyhow::Result<()> {

// emit blocks thread
let thread_flag = sigterm_flag.clone();
let cp_limit = keychain_tracker
.checkpoint_limit()
.unwrap_or(FALLBACK_CP_LIMIT);
let join_handle = std::thread::spawn(move || loop {
client.emit_blocks(&chan, &mut local_cps, cp_limit, fallback_height)?;
client.emit_blocks(chan.clone(), &mut local_cps, fallback_height, 50)?;
client.emit_mempool(chan.clone(), 1000)?;
if live && !await_flag(&thread_flag, Duration::from_secs(LIVE_POLL_DUR_SECS)) {
continue;
}
Expand All @@ -131,40 +128,45 @@ fn main() -> anyhow::Result<()> {

let txs = match data {
RpcData::Start {
local_tip,
starting_tip,
target_tip,
} => {
tip = target_tip;
println!(
"sync start: current_tip={}, target_tip={}",
local_tip, target_tip
starting_tip, target_tip
);
continue;
}
RpcData::Synced => {
RpcData::BlocksSynced => {
continue;
}
RpcData::MempoolSynced => {
let balance = keychain_tracker
.full_utxos()
.map(|(_, utxo)| utxo.txout.value)
.sum::<u64>();
let duration = SystemTime::now().duration_since(start_time)?.as_secs();
println!(
"sync finished: duration={}s, tip={}, balance={}sats",
"block sync finished: duration={}s, tip={}, balance={}sats",
duration, tip, balance
);
continue;
}
RpcData::Blocks { last_cp, blocks } => {
let checkpoints = blocks
.iter()
.map(|(h, b)| (*h, b.block_hash()))
.chain(last_cp);
for (height, hash) in checkpoints {
update.insert_checkpoint(BlockId { height, hash })?;
}

RpcData::Blocks {
first_height,
blocks,
} => {
blocks
.into_iter()
.flat_map(|(height, block)| {
.enumerate()
.flat_map(|(i, block)| {
let height = first_height + i as u32;
if height != 0 {
update.insert_checkpoint(BlockId { height: height - 1, hash: block.header.prev_blockhash }).expect("rpc emitted a block whose prev_blockhash wasn't the previous one it emitted");
}
update.insert_checkpoint(BlockId { height, hash: block.block_hash() }).expect("unreachable since height is increasing");

block
.txdata
.into_iter()
Expand All @@ -180,15 +182,18 @@ fn main() -> anyhow::Result<()> {

let old_indexes = keychain_tracker.txout_index.derivation_indices();

for (height, tx) in txs {
for (_, tx) in &txs {
keychain_tracker
.txout_index
.derive_until_unused_gap(lookahead);
keychain_tracker.txout_index.scan(tx);
}

for (height, tx) in txs {
if keychain_tracker.txout_index.is_relevant(&tx) {
println!("* adding tx to update: {} @ {}", tx.txid(), height);
update.insert_tx(tx.clone(), Some(height))?;
}
keychain_tracker.txout_index.scan(&tx);
}

keychain_tracker
Expand Down
176 changes: 54 additions & 122 deletions bdk_rpc_example/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
use std::{
collections::BTreeMap,
mem,
sync::mpsc::{SendError, SyncSender},
};

use bdk_core::bitcoin::{Block, BlockHash, Transaction};
use bitcoincore_rpc::{Auth, Client as RpcClient, RpcApi};

/// Minimum number of transactions to batch together for each emission.
const TX_EMIT_THRESHOLD: usize = 75_000;

pub enum RpcData {
Start {
local_tip: u32,
starting_tip: u32,
target_tip: u32,
},
Blocks {
last_cp: Option<(u32, BlockHash)>,
blocks: BTreeMap<u32, Block>,
// the height of the first block in the vector
first_height: u32,
blocks: Vec<Block>,
},
Mempool(Vec<Transaction>),
Synced,
BlocksSynced,
MempoolSynced,
}

#[derive(Debug)]
pub enum RpcError {
Rpc(bitcoincore_rpc::Error),
Send(SendError<RpcData>),
Reorg(u32),
}

impl From<bitcoincore_rpc::Error> for RpcError {
Expand All @@ -51,153 +50,86 @@ impl std::error::Error for RpcError {}

pub struct Client {
client: RpcClient,
tx_emit_threshold: usize,
}

impl Client {
pub fn new(url: &str, auth: Auth) -> Result<Self, RpcError> {
let client = RpcClient::new(url, auth)?;
Ok(Client {
client,
tx_emit_threshold: TX_EMIT_THRESHOLD,
})
Ok(Client { client })
}

pub fn emit_blocks(
&self,
chan: &SyncSender<RpcData>,
chan: SyncSender<RpcData>,
local_cps: &mut BTreeMap<u32, BlockHash>,
cp_limit: usize,
fallback_height: u32,
block_batch_size: usize,
) -> Result<(), RpcError> {
let tip = self.client.get_block_count()? as u32;

let local_tip = local_cps
.iter()
.next_back()
.map(|(&height, _)| height as u32)
.unwrap_or(fallback_height);

chan.send(RpcData::Start {
target_tip: tip as _,
local_tip: local_tip as _,
})?;

// nothing to do if local tip is higher than node
if local_tip > tip {
return Ok(());
}

let mut last_agreement = None;
let mut must_include = None;
let mut start_emitting_at = fallback_height;

for (height, hash) in local_cps.iter().rev() {
match self.client.get_block_info(hash) {
Ok(res) => {
if res.confirmations < 0 {
must_include = Some(res.height as u32); // NOT in main chain
} else {
last_agreement = Some(res);
break;
}
}
Err(err) => {
use bitcoincore_rpc::jsonrpc;
match err {
bitcoincore_rpc::Error::JsonRpc(jsonrpc::Error::Rpc(rpc_err))
if rpc_err.code == -5 =>
{
must_include = Some(*height); // NOT in main chain
}
err => return Err(err.into()),
}
}
};
}

match &last_agreement {
Some(res) => {
println!("agreement @ height={}", res.height);
local_cps.split_off(&((res.height + 1) as _));
if self.client.get_block_stats(*height as u64)?.block_hash == *hash {
start_emitting_at = height + 1;
}
None => {
println!("no agreement, fallback_height={}", fallback_height);
}
};

// batch of blocks to emit
let mut to_emit = BTreeMap::<u32, Block>::new();

// determine first block and last checkpoint that should be included (if any)
let mut block = match last_agreement {
Some(res) => match res.nextblockhash {
Some(block_hash) => self.client.get_block_info(&block_hash)?,
// no next block after agreement point, checkout mempool
None => return self.emit_mempool(chan),
},
None => {
let block_hash = self.client.get_block_hash(fallback_height as _)?;
self.client.get_block_info(&block_hash)?
}
};
}

let mut has_next = true;
chan.send(RpcData::Start {
target_tip: tip,
starting_tip: start_emitting_at,
})?;

while has_next {
if block.confirmations < 0 {
return Err(RpcError::Reorg(block.height as _));
let mut curr_hash = self
.client
.get_block_stats(start_emitting_at as u64)?
.block_hash;
let mut curr_height = start_emitting_at;
let mut first_height = curr_height;
let mut block_buffer = Vec::with_capacity(block_batch_size);

loop {
let block_info = self.client.get_block_info(&curr_hash)?;
let block = self.client.get_block(&curr_hash)?;
block_buffer.push(block);

if block_buffer.len() == block_batch_size || block_info.nextblockhash.is_none() {
let emitted_blocks =
mem::replace(&mut block_buffer, Vec::with_capacity(block_batch_size));
chan.send(RpcData::Blocks {
first_height,
blocks: emitted_blocks,
})?;
first_height = curr_height + 1;
}

let _displaced = to_emit.insert(block.height as _, self.client.get_block(&block.hash)?);
debug_assert_eq!(_displaced, None);

match block.nextblockhash {
Some(next_hash) => block = self.client.get_block_info(&next_hash)?,
None => has_next = false,
curr_hash = match block_info.nextblockhash {
Some(nextblockhash) => nextblockhash,
None => break,
};

if !has_next
|| must_include.as_ref() <= to_emit.keys().next_back()
&& to_emit.iter().map(|(_, b)| b.txdata.len()).sum::<usize>()
>= self.tx_emit_threshold
{
let last_cp = local_cps.iter().next_back().map(|(h, b)| (*h, *b));
let blocks = to_emit.split_off(&0);

// update local checkpoints
for (height, block) in &blocks {
local_cps.insert(*height, block.block_hash());
}

// prune local checkpoints
if let Some(&last_height) = local_cps.keys().nth_back(cp_limit) {
let mut split = local_cps.split_off(&(last_height + 1));
core::mem::swap(local_cps, &mut split);
}

chan.send(RpcData::Blocks { last_cp, blocks })?;
}
curr_height += 1;
}

self.emit_mempool(chan)
chan.send(RpcData::BlocksSynced)?;

Ok(())
}

pub fn emit_mempool(&self, chan: &SyncSender<RpcData>) -> Result<(), RpcError> {
// TODO: I'm not sure if things are returned in order
// TODO: Include mempool sequence!
for txids in self
.client
.get_raw_mempool()?
.chunks(self.tx_emit_threshold)
{
pub fn emit_mempool(
&self,
chan: SyncSender<RpcData>,
tx_batch_size: usize,
) -> Result<(), RpcError> {
for txids in self.client.get_raw_mempool()?.chunks(tx_batch_size) {
let txs = txids
.iter()
.map(|txid| self.client.get_raw_transaction(txid, None))
.collect::<Result<Vec<_>, _>>()?;
chan.send(RpcData::Mempool(txs))?;
}

chan.send(RpcData::Synced)?;
chan.send(RpcData::MempoolSynced)?;

Ok(())
}
}
Expand Down

0 comments on commit 6674080

Please sign in to comment.