Skip to content
This repository has been archived by the owner on Mar 14, 2023. It is now read-only.

Commit

Permalink
RPC Example: Use SyncSender to reduce memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
evanlinjin committed Dec 15, 2022
1 parent 113e5f2 commit a8f2223
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 17 deletions.
26 changes: 12 additions & 14 deletions bdk_rpc_example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{channel, Sender},
mpsc::{sync_channel, SyncSender},
Arc,
},
time::Duration,
Expand All @@ -20,6 +20,8 @@ use bdk_keychain::KeychainScan;
use bitcoincore_rpc::Auth;
use rpc::{Client, RpcData, RpcError};

const CHANNEL_BOUND: usize = 1000;

#[derive(Args, Debug, Clone)]
struct RpcArgs {
/// RPC URL
Expand Down Expand Up @@ -66,8 +68,10 @@ enum RpcCommands {
}

fn main() -> anyhow::Result<()> {
println!("Loading wallet from db...");
let (args, keymap, mut keychain_tracker, mut db) =
bdk_cli::init::<RpcArgs, RpcCommands, TxHeight>()?;
println!("Wallet loaded.");

let client = {
let rpc_url = args.chain_args.url.clone();
Expand Down Expand Up @@ -95,23 +99,17 @@ fn main() -> anyhow::Result<()> {
stop_gap,
live,
} => {
let (chan, recv) = channel::<RpcData>();
let (chan, recv) = sync_channel::<RpcData>(CHANNEL_BOUND);
let sigterm_flag = start_ctrlc_handler(chan.clone());
let local_cps = keychain_tracker.chain().checkpoints().clone();

// emit blocks thread
let join_handle = std::thread::spawn(move || {
if live {
loop {
client.emit_blocks(&chan, &local_cps, fallback_height)?;
if await_flag(&sigterm_flag, Duration::from_secs(10)) {
break;
}
}
} else {
client.emit_blocks(&chan, &local_cps, fallback_height)?;
let join_handle = std::thread::spawn(move || loop {
client.emit_blocks(&chan, &local_cps, fallback_height)?;
if live && !await_flag(&sigterm_flag, Duration::from_secs(10)) {
continue;
}
chan.send(RpcData::Stop(true)).map_err(RpcError::Send)
return chan.send(RpcData::Stop(true)).map_err(RpcError::Send);
});

let mut tip = 0;
Expand Down Expand Up @@ -208,7 +206,7 @@ fn main() -> anyhow::Result<()> {
Ok(())
}

fn start_ctrlc_handler(chan: Sender<RpcData>) -> Arc<AtomicBool> {
fn start_ctrlc_handler(chan: SyncSender<RpcData>) -> Arc<AtomicBool> {
let flag = Arc::new(AtomicBool::new(false));
let cloned_flag = flag.clone();

Expand Down
12 changes: 9 additions & 3 deletions bdk_rpc_example/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::BTreeMap,
sync::mpsc::{SendError, Sender},
sync::mpsc::{SendError, SyncSender},
};

use bdk_core::bitcoin::{Block, BlockHash, Transaction};
Expand Down Expand Up @@ -60,7 +60,7 @@ impl Client {

pub fn emit_blocks(
&self,
chan: &Sender<RpcData>,
chan: &SyncSender<RpcData>,
local_cps: &BTreeMap<u32, BlockHash>,
fallback_height: u64,
) -> Result<(), RpcError> {
Expand Down Expand Up @@ -92,6 +92,7 @@ impl Client {
must_include = Some(res.height as u32); // NOT in main chain
} else {
last_agreement = Some(res);
break;
}
}
Err(err) => {
Expand All @@ -108,6 +109,11 @@ impl Client {
};
}

println!(
"point of agreement: height={}",
last_agreement.as_ref().map(|r| r.height).unwrap_or(0)
);

// determine first block
let mut block = match last_agreement {
Some(res) => match res.nextblockhash {
Expand Down Expand Up @@ -152,7 +158,7 @@ impl Client {
}
}

pub fn emit_mempool(&self, chan: &Sender<RpcData>) -> Result<(), RpcError> {
pub fn emit_mempool(&self, chan: &SyncSender<RpcData>) -> Result<(), RpcError> {
for txids in self.client.get_raw_mempool()?.chunks(MEMPOOL_CHUNK_SIZE) {
let txs = txids
.iter()
Expand Down

0 comments on commit a8f2223

Please sign in to comment.