From 9d619f76a973edde12dc2355a307ff0a89f3a4ff Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Sat, 23 May 2020 09:17:56 +0300 Subject: [PATCH] http: Implement the `synced-tip` option to catch up with missed events (#6) --- CHANGELOG.md | 4 ++- README.md | 36 ++++++++++++++++++++- src/electrum.rs | 2 +- src/http.rs | 84 +++++++++++++++++++++++++++++++++++++++---------- src/indexer.rs | 22 +++++++++++-- src/query.rs | 23 +++++++++++++- src/types.rs | 3 +- 7 files changed, 150 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c552cf5..9b87b04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,12 @@ ## Unreleased -- Fix: Update the confirmation status of send-only (no change) transactions +- HTTP: Implement the `synced-tip` option to catch up with missed events (#6) - Unite the `History` event into `Txo{Funded,Spent}` +- Fix: Update the confirmation status of send-only (no change) transactions + ## 0.1.0 - 2020-05-20 First release! diff --git a/README.md b/README.md index 3ca522f..e0ec09b 100644 --- a/README.md +++ b/README.md @@ -991,9 +991,43 @@ data:{"category":"TxoSpent","params":["a3bc61a974b113223c336c866bc656cd23481d146 data:{"category":"TxoSpent","params":["a3bc61a974b113223c336c866bc656cd23481d1466e063e46930a5983e70c20d:1","97e9cc06a9a9d95a7ff26a9e5fdf9e1836792a3337c0ff718c88e012feb217bd","bb94b1547397cd89441edd74d0581913d8bb3005d070fa6f9744af44f654c25a:0",117]} ``` - +#### Catching up with missed events & re-org detection + +To catch-up with historical events that your app missed while being down, you can specify the `synced-tip` query string parameter with the `:` of the latest block known to be processed. + +If the `synced-tip` is still part of the best chain, this will return all historical `Transaction`, `TxoFunded` and `TxoSpent` events that occurred after `block-height` (exclusive, ordered with oldest first, unconfirmed included at the end), followed by a *single* `ChainTip` event with the currently synced tip, followed by a stream of real-time events. + +If the `synced-tip` is no longer part of the best chain, an error will be returned indicating that a reorg took place. +One way to recover from reorgs is to delete all entries that occurred in the last `N` blocks before the orphaned `synced-tip` and re-sync them (where `N` is large enough such that reorgs deeper than it are unlikely). + +You can specify `synced-tip` with just the height to skip reorg detection (for example, `0` to get all events since the genesis block). + +
Expand...

+Example: + +``` +# Start by syncing everything from the beginning +$ curl localhost:3060/stream?synced-tip=0 +data:{"category":"TxoFunded","params":["ac42d918b45351835bf9448bbd0c2f8e9ddad56a8bd118fe93919cc74bd0c487:1","48138c88b8cb17544ac2450c4bd147106a9f773d6cf2b7f31a5a9dde75a8387a",399999856,114]} +data:{"category":"ChainTip","params":[120,"5cc1fb1153f8eb12d445d0db06e96bbb39c45b8ed22d4f0de718aa6b0ef00cd1"]} + +# Oops, we got disconnected! Let's try again with the last `ChainTip` we heard of +$ curl localhost:3060/stream?synced-tip=120:5cc1fb1153f8eb12d445d0db06e96bbb39c45b8ed22d4f0de718aa6b0ef00cd1 +data:{"category":"TxoSpent","params":["a3bc61a974b113223c336c866bc656cd23481d1466e063e46930a5983e70c20d:1","97e9cc06a9a9d95a7ff26a9e5fdf9e1836792a3337c0ff718c88e012feb217bd","bb94b1547397cd89441edd74d0581913d8bb3005d070fa6f9744af44f654c25a:0",122]} +data:{"category":"ChainTip","params":[130,"57d17db78d5017c89e86e863a7397c02027f09327222feb72cdfe8372644c589"]} + +# Disconnected again, this time while a reorg happened +$ curl localhost:3060/stream?synced-tip=130:57d17db78d5017c89e86e863a7397c02027f09327222feb72cdfe8372644c589 +< HTTP/1.1 410 Gone +Reorg detected at height 130 + +# Re-sync from height 110 (N=20) +$ curl localhost:3060/stream?synced-tip=110 +``` + +
### Miscellaneous diff --git a/src/electrum.rs b/src/electrum.rs index 6518177..2e501f1 100644 --- a/src/electrum.rs +++ b/src/electrum.rs @@ -638,7 +638,7 @@ impl SubscriptionManager { for change in &changelog { let msg = match change { IndexChange::ChainTip(blockid) if subscriber.blocks => { - Message::ChainTip(blockid.clone()) + Message::ChainTip(*blockid) } IndexChange::TxoFunded(_, scripthash, ..) | IndexChange::TxoSpent(_, scripthash, ..) diff --git a/src/http.rs b/src/http.rs index b815b6f..2dae089 100644 --- a/src/http.rs +++ b/src/http.rs @@ -2,7 +2,8 @@ use std::net; use std::sync::{mpsc, Arc, Mutex}; use async_std::task; -use tokio::stream::{Stream, StreamExt}; +use serde::{Deserialize, Deserializer}; +use tokio::stream::{self, Stream, StreamExt}; use tokio::sync::mpsc as tmpsc; use warp::http::{header, StatusCode}; use warp::sse::ServerSentEvent; @@ -10,6 +11,7 @@ use warp::{reply, Filter, Reply}; use bitcoin::util::bip32::Fingerprint; use bitcoin::{Address, BlockHash, OutPoint, Txid}; +use bitcoin_hashes::hex::FromHex; use crate::error::{fmt_error_chain, Error, OptionExt}; use crate::types::{BlockId, ScriptHash}; @@ -295,10 +297,14 @@ async fn run( .and(warp::path!("stream")) .and(warp::query::()) .and(listeners.clone()) - .map(|filter: ChangelogFilter, listeners: Listeners| { - let stream = make_connection_sse_stream(listeners, filter); - warp::sse::reply(warp::sse::keep_alive().stream(stream)) - }); + .and(query.clone()) + .map( + |filter: ChangelogFilter, listeners: Listeners, query: Arc| { + let stream = make_sse_stream(filter, listeners, &query)?; + Ok(warp::sse::reply(warp::sse::keep_alive().stream(stream))) + }, + ) + .map(handle_error); // GET /hd/:fingerprint/:index/stream // GET /scripthash/:scripthash/stream @@ -308,13 +314,18 @@ async fn run( .and(warp::path!("stream")) .and(warp::query::()) .and(listeners.clone()) + .and(query.clone()) .map( - |scripthash: ScriptHash, mut filter: ChangelogFilter, listeners: Listeners| { + |scripthash: ScriptHash, + mut filter: ChangelogFilter, + listeners: Listeners, + query: Arc| { filter.scripthash = Some(scripthash); - let stream = make_connection_sse_stream(listeners, filter); - warp::sse::reply(warp::sse::keep_alive().stream(stream)) + let stream = make_sse_stream(filter, listeners, &query)?; + Ok(warp::sse::reply(warp::sse::keep_alive().stream(stream))) }, - ); + ) + .map(handle_error); // GET /block/tip let block_tip_handler = warp::get() @@ -347,7 +358,7 @@ async fn run( }) .map(handle_error); - // GET /block/:height + // GET /block/:block_height let block_height_handler = warp::get() .and(warp::path!("block" / u32)) .and(query.clone()) @@ -497,18 +508,41 @@ struct Listener { filter: ChangelogFilter, } -fn make_connection_sse_stream( - listeners: Listeners, +// Create a stream of real-time changelog events matching `filter`, optionally also including +// historical events occuring after `synced-tip` +fn make_sse_stream( filter: ChangelogFilter, -) -> impl Stream> { + listeners: Listeners, + query: &Query, +) -> Result>, Error> { debug!("subscribing sse client with {:?}", filter); + let (tx, rx) = tmpsc::unbounded_channel(); - listeners.lock().unwrap().push(Listener { tx, filter }); - rx.map(|change: IndexChange| Ok(warp::sse::json(change))) + listeners.lock().unwrap().push(Listener { + tx, + filter: filter.clone(), + }); + + // fetch historical changelog since the requested start point (if requesed) + let changelog = match &filter.synced_tip { + Some(synced_tip) => query.get_changelog_after(*synced_tip)?, + None => vec![], + } + .into_iter() + .filter(move |change| filter.matches(change)); + // TODO don't produce unwanted events to begin with instead of filtering them + + Ok(stream::iter(changelog) + .chain(rx) + .map(|change: IndexChange| Ok(warp::sse::json(change)))) } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] struct ChangelogFilter { + #[serde(default, deserialize_with = "parse_synced_tip")] + synced_tip: Option, + scripthash: Option, outpoint: Option, category: Option, @@ -547,6 +581,24 @@ impl ChangelogFilter { } } +pub fn parse_synced_tip<'de, D>(deserializer: D) -> std::result::Result, D::Error> +where + D: Deserializer<'de>, +{ + fn parse(s: &str) -> Result { + let mut parts = s.splitn(2, ':'); + let height: u32 = parts.next().req()?.parse()?; + Ok(match parts.next() { + Some(block_hash) => BlockId(height, BlockHash::from_hex(block_hash)?), + None => BlockId(height, BlockHash::default()), + }) + } + + let s = String::deserialize(deserializer)?; + let blockid = parse(&s).map_err(|err| serde::de::Error::custom(err.to_string()))?; + Ok(Some(blockid)) +} + #[derive(Deserialize, Debug)] struct UtxoOptions { #[serde(default)] diff --git a/src/indexer.rs b/src/indexer.rs index 466c379..7b40d51 100644 --- a/src/indexer.rs +++ b/src/indexer.rs @@ -69,9 +69,9 @@ impl Indexer { let (synced_tip, mut changelog) = self._sync()?; self.watcher.do_imports(&self.rpc, /*rescan=*/ false)?; - if self.tip.as_ref() != Some(&synced_tip) { + if self.tip != Some(synced_tip) { info!("synced up to height {}", synced_tip.0); - changelog.push(IndexChange::ChainTip(synced_tip.clone())); + changelog.push(IndexChange::ChainTip(synced_tip)); self.tip = Some(synced_tip); } @@ -295,6 +295,24 @@ impl Indexer { Ok(()) } + + /// Get historical events that happened at or after `min_block_height`, including unconfirmed, + /// ordered with oldest first. + /// + /// Includes the `Transaction`, `TxoFunded` and `TxoSpent` events, and a *single* `ChainTip` + /// event with the currently synced tip as the last entry (when bwt is synced). + pub fn get_changelog_since(&self, min_block_height: u32) -> Vec { + self.store + .get_history_since(min_block_height) + .into_iter() + .map(|txhist| { + let tx_entry = self.store.get_tx_entry(&txhist.txid).unwrap(); + IndexChange::from_tx(&txhist.txid, tx_entry) + }) + .flatten() + .chain(self.tip.clone().map(IndexChange::ChainTip).into_iter()) + .collect() + } } #[derive(Clone, Serialize, Debug)] diff --git a/src/query.rs b/src/query.rs index c1c8b80..f1e5062 100644 --- a/src/query.rs +++ b/src/query.rs @@ -11,7 +11,7 @@ use bitcoincore_rpc::{json as rpcjson, Client as RpcClient, RpcApi}; use crate::error::{OptionExt, Result}; use crate::hd::{HDWallet, KeyOrigin}; -use crate::indexer::Indexer; +use crate::indexer::{IndexChange, Indexer}; use crate::store::{FundingInfo, HistoryEntry, ScriptInfo, SpendingInfo, TxEntry}; use crate::types::{BlockId, ScriptHash, TxStatus}; use crate::util::make_fee_histogram; @@ -236,6 +236,27 @@ impl Query { entries.into_iter().map(f).collect() } + /// Get historical events that occurred after the `synced_tip` block, including unconfirmed, + /// ordered with oldest first. + /// + /// Verifies that the `synced_tip` is still part of the best chain and returns an error if not. + /// Using the default BlockHash disables this validation. + pub fn get_changelog_after(&self, synced_tip: BlockId) -> Result> { + let BlockId(synced_height, synced_blockhash) = synced_tip; + + if synced_blockhash != BlockHash::default() { + let current_blockhash = self.get_block_hash(synced_height)?; + ensure!( + synced_blockhash == current_blockhash, + "Reorg detected at height {}", + synced_height, + ); + } + + let indexer = self.indexer.read().unwrap(); + Ok(indexer.get_changelog_since(synced_height + 1)) + } + // // Outputs // diff --git a/src/types.rs b/src/types.rs index 8de624d..6a8b78a 100644 --- a/src/types.rs +++ b/src/types.rs @@ -4,7 +4,6 @@ use serde::Serialize; use bitcoin::{Address, BlockHash, Txid}; use bitcoin_hashes::{sha256, Hash}; - pub use bitcoincore_rpc::json::ImportMultiRescanSince as RescanSince; hash_newtype!( @@ -30,7 +29,7 @@ impl From
for ScriptHash { #[cfg(feature = "electrum")] hash_newtype!(StatusHash, sha256::Hash, 32, doc = "The status hash."); -#[derive(Serialize, Debug, PartialEq, Clone)] +#[derive(Serialize, Debug, PartialEq, Clone, Copy)] pub struct BlockId(pub u32, pub BlockHash); #[derive(Debug, Copy, Clone)]