Skip to content

Commit

Permalink
http: Implement the synced-tip option to catch up with missed events (
Browse files Browse the repository at this point in the history
  • Loading branch information
shesek committed May 23, 2020
1 parent dda560b commit 9d619f7
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 24 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

## Unreleased

- Fix: Update the confirmation status of send-only (no change) transactions
- HTTP: Implement the `synced-tip` option to catch up with missed events (#6)

- Unite the `History` event into `Txo{Funded,Spent}`

- Fix: Update the confirmation status of send-only (no change) transactions

## 0.1.0 - 2020-05-20

First release!
36 changes: 35 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -991,9 +991,43 @@ data:{"category":"TxoSpent","params":["a3bc61a974b113223c336c866bc656cd23481d146
data:{"category":"TxoSpent","params":["a3bc61a974b113223c336c866bc656cd23481d1466e063e46930a5983e70c20d:1","97e9cc06a9a9d95a7ff26a9e5fdf9e1836792a3337c0ff718c88e012feb217bd","bb94b1547397cd89441edd74d0581913d8bb3005d070fa6f9744af44f654c25a:0",117]}
```

</details>

#### Catching up with missed events & re-org detection

To catch-up with historical events that your app missed while being down, you can specify the `synced-tip` query string parameter with the `<block-height>:<block-hash>` of the latest block known to be processed.

If the `synced-tip` is still part of the best chain, this will return all historical `Transaction`, `TxoFunded` and `TxoSpent` events that occurred after `block-height` (exclusive, ordered with oldest first, unconfirmed included at the end), followed by a *single* `ChainTip` event with the currently synced tip, followed by a stream of real-time events.

If the `synced-tip` is no longer part of the best chain, an error will be returned indicating that a reorg took place.
One way to recover from reorgs is to delete all entries that occurred in the last `N` blocks before the orphaned `synced-tip` and re-sync them (where `N` is large enough such that reorgs deeper than it are unlikely).

You can specify `synced-tip` with just the height to skip reorg detection (for example, `0` to get all events since the genesis block).

<details><summary>Expand...</summary><p></p>
Example:

```
# Start by syncing everything from the beginning
$ curl localhost:3060/stream?synced-tip=0
data:{"category":"TxoFunded","params":["ac42d918b45351835bf9448bbd0c2f8e9ddad56a8bd118fe93919cc74bd0c487:1","48138c88b8cb17544ac2450c4bd147106a9f773d6cf2b7f31a5a9dde75a8387a",399999856,114]}
data:{"category":"ChainTip","params":[120,"5cc1fb1153f8eb12d445d0db06e96bbb39c45b8ed22d4f0de718aa6b0ef00cd1"]}
# Oops, we got disconnected! Let's try again with the last `ChainTip` we heard of
$ curl localhost:3060/stream?synced-tip=120:5cc1fb1153f8eb12d445d0db06e96bbb39c45b8ed22d4f0de718aa6b0ef00cd1
data:{"category":"TxoSpent","params":["a3bc61a974b113223c336c866bc656cd23481d1466e063e46930a5983e70c20d:1","97e9cc06a9a9d95a7ff26a9e5fdf9e1836792a3337c0ff718c88e012feb217bd","bb94b1547397cd89441edd74d0581913d8bb3005d070fa6f9744af44f654c25a:0",122]}
data:{"category":"ChainTip","params":[130,"57d17db78d5017c89e86e863a7397c02027f09327222feb72cdfe8372644c589"]}
# Disconnected again, this time while a reorg happened
$ curl localhost:3060/stream?synced-tip=130:57d17db78d5017c89e86e863a7397c02027f09327222feb72cdfe8372644c589
< HTTP/1.1 410 Gone
Reorg detected at height 130
# Re-sync from height 110 (N=20)
$ curl localhost:3060/stream?synced-tip=110
```

</details>

### Miscellaneous

Expand Down
2 changes: 1 addition & 1 deletion src/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ impl SubscriptionManager {
for change in &changelog {
let msg = match change {
IndexChange::ChainTip(blockid) if subscriber.blocks => {
Message::ChainTip(blockid.clone())
Message::ChainTip(*blockid)
}
IndexChange::TxoFunded(_, scripthash, ..)
| IndexChange::TxoSpent(_, scripthash, ..)
Expand Down
84 changes: 68 additions & 16 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ use std::net;
use std::sync::{mpsc, Arc, Mutex};

use async_std::task;
use tokio::stream::{Stream, StreamExt};
use serde::{Deserialize, Deserializer};
use tokio::stream::{self, Stream, StreamExt};
use tokio::sync::mpsc as tmpsc;
use warp::http::{header, StatusCode};
use warp::sse::ServerSentEvent;
use warp::{reply, Filter, Reply};

use bitcoin::util::bip32::Fingerprint;
use bitcoin::{Address, BlockHash, OutPoint, Txid};
use bitcoin_hashes::hex::FromHex;

use crate::error::{fmt_error_chain, Error, OptionExt};
use crate::types::{BlockId, ScriptHash};
Expand Down Expand Up @@ -295,10 +297,14 @@ async fn run(
.and(warp::path!("stream"))
.and(warp::query::<ChangelogFilter>())
.and(listeners.clone())
.map(|filter: ChangelogFilter, listeners: Listeners| {
let stream = make_connection_sse_stream(listeners, filter);
warp::sse::reply(warp::sse::keep_alive().stream(stream))
});
.and(query.clone())
.map(
|filter: ChangelogFilter, listeners: Listeners, query: Arc<Query>| {
let stream = make_sse_stream(filter, listeners, &query)?;
Ok(warp::sse::reply(warp::sse::keep_alive().stream(stream)))
},
)
.map(handle_error);

// GET /hd/:fingerprint/:index/stream
// GET /scripthash/:scripthash/stream
Expand All @@ -308,13 +314,18 @@ async fn run(
.and(warp::path!("stream"))
.and(warp::query::<ChangelogFilter>())
.and(listeners.clone())
.and(query.clone())
.map(
|scripthash: ScriptHash, mut filter: ChangelogFilter, listeners: Listeners| {
|scripthash: ScriptHash,
mut filter: ChangelogFilter,
listeners: Listeners,
query: Arc<Query>| {
filter.scripthash = Some(scripthash);
let stream = make_connection_sse_stream(listeners, filter);
warp::sse::reply(warp::sse::keep_alive().stream(stream))
let stream = make_sse_stream(filter, listeners, &query)?;
Ok(warp::sse::reply(warp::sse::keep_alive().stream(stream)))
},
);
)
.map(handle_error);

// GET /block/tip
let block_tip_handler = warp::get()
Expand Down Expand Up @@ -347,7 +358,7 @@ async fn run(
})
.map(handle_error);

// GET /block/:height
// GET /block/:block_height
let block_height_handler = warp::get()
.and(warp::path!("block" / u32))
.and(query.clone())
Expand Down Expand Up @@ -497,18 +508,41 @@ struct Listener {
filter: ChangelogFilter,
}

fn make_connection_sse_stream(
listeners: Listeners,
// Create a stream of real-time changelog events matching `filter`, optionally also including
// historical events occuring after `synced-tip`
fn make_sse_stream(
filter: ChangelogFilter,
) -> impl Stream<Item = Result<impl ServerSentEvent, warp::Error>> {
listeners: Listeners,
query: &Query,
) -> Result<impl Stream<Item = Result<impl ServerSentEvent, warp::Error>>, Error> {
debug!("subscribing sse client with {:?}", filter);

let (tx, rx) = tmpsc::unbounded_channel();
listeners.lock().unwrap().push(Listener { tx, filter });
rx.map(|change: IndexChange| Ok(warp::sse::json(change)))
listeners.lock().unwrap().push(Listener {
tx,
filter: filter.clone(),
});

// fetch historical changelog since the requested start point (if requesed)
let changelog = match &filter.synced_tip {
Some(synced_tip) => query.get_changelog_after(*synced_tip)?,
None => vec![],
}
.into_iter()
.filter(move |change| filter.matches(change));
// TODO don't produce unwanted events to begin with instead of filtering them

Ok(stream::iter(changelog)
.chain(rx)
.map(|change: IndexChange| Ok(warp::sse::json(change))))
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
struct ChangelogFilter {
#[serde(default, deserialize_with = "parse_synced_tip")]
synced_tip: Option<BlockId>,

scripthash: Option<ScriptHash>,
outpoint: Option<OutPoint>,
category: Option<String>,
Expand Down Expand Up @@ -547,6 +581,24 @@ impl ChangelogFilter {
}
}

pub fn parse_synced_tip<'de, D>(deserializer: D) -> std::result::Result<Option<BlockId>, D::Error>
where
D: Deserializer<'de>,
{
fn parse(s: &str) -> Result<BlockId, Error> {
let mut parts = s.splitn(2, ':');
let height: u32 = parts.next().req()?.parse()?;
Ok(match parts.next() {
Some(block_hash) => BlockId(height, BlockHash::from_hex(block_hash)?),
None => BlockId(height, BlockHash::default()),
})
}

let s = String::deserialize(deserializer)?;
let blockid = parse(&s).map_err(|err| serde::de::Error::custom(err.to_string()))?;
Ok(Some(blockid))
}

#[derive(Deserialize, Debug)]
struct UtxoOptions {
#[serde(default)]
Expand Down
22 changes: 20 additions & 2 deletions src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ impl Indexer {
let (synced_tip, mut changelog) = self._sync()?;
self.watcher.do_imports(&self.rpc, /*rescan=*/ false)?;

if self.tip.as_ref() != Some(&synced_tip) {
if self.tip != Some(synced_tip) {
info!("synced up to height {}", synced_tip.0);
changelog.push(IndexChange::ChainTip(synced_tip.clone()));
changelog.push(IndexChange::ChainTip(synced_tip));
self.tip = Some(synced_tip);
}

Expand Down Expand Up @@ -295,6 +295,24 @@ impl Indexer {

Ok(())
}

/// Get historical events that happened at or after `min_block_height`, including unconfirmed,
/// ordered with oldest first.
///
/// Includes the `Transaction`, `TxoFunded` and `TxoSpent` events, and a *single* `ChainTip`
/// event with the currently synced tip as the last entry (when bwt is synced).
pub fn get_changelog_since(&self, min_block_height: u32) -> Vec<IndexChange> {
self.store
.get_history_since(min_block_height)
.into_iter()
.map(|txhist| {
let tx_entry = self.store.get_tx_entry(&txhist.txid).unwrap();
IndexChange::from_tx(&txhist.txid, tx_entry)
})
.flatten()
.chain(self.tip.clone().map(IndexChange::ChainTip).into_iter())
.collect()
}
}

#[derive(Clone, Serialize, Debug)]
Expand Down
23 changes: 22 additions & 1 deletion src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use bitcoincore_rpc::{json as rpcjson, Client as RpcClient, RpcApi};

use crate::error::{OptionExt, Result};
use crate::hd::{HDWallet, KeyOrigin};
use crate::indexer::Indexer;
use crate::indexer::{IndexChange, Indexer};
use crate::store::{FundingInfo, HistoryEntry, ScriptInfo, SpendingInfo, TxEntry};
use crate::types::{BlockId, ScriptHash, TxStatus};
use crate::util::make_fee_histogram;
Expand Down Expand Up @@ -236,6 +236,27 @@ impl Query {
entries.into_iter().map(f).collect()
}

/// Get historical events that occurred after the `synced_tip` block, including unconfirmed,
/// ordered with oldest first.
///
/// Verifies that the `synced_tip` is still part of the best chain and returns an error if not.
/// Using the default BlockHash disables this validation.
pub fn get_changelog_after(&self, synced_tip: BlockId) -> Result<Vec<IndexChange>> {
let BlockId(synced_height, synced_blockhash) = synced_tip;

if synced_blockhash != BlockHash::default() {
let current_blockhash = self.get_block_hash(synced_height)?;
ensure!(
synced_blockhash == current_blockhash,
"Reorg detected at height {}",
synced_height,
);
}

let indexer = self.indexer.read().unwrap();
Ok(indexer.get_changelog_since(synced_height + 1))
}

//
// Outputs
//
Expand Down
3 changes: 1 addition & 2 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use serde::Serialize;

use bitcoin::{Address, BlockHash, Txid};
use bitcoin_hashes::{sha256, Hash};

pub use bitcoincore_rpc::json::ImportMultiRescanSince as RescanSince;

hash_newtype!(
Expand All @@ -30,7 +29,7 @@ impl From<Address> for ScriptHash {
#[cfg(feature = "electrum")]
hash_newtype!(StatusHash, sha256::Hash, 32, doc = "The status hash.");

#[derive(Serialize, Debug, PartialEq, Clone)]
#[derive(Serialize, Debug, PartialEq, Clone, Copy)]
pub struct BlockId(pub u32, pub BlockHash);

#[derive(Debug, Copy, Clone)]
Expand Down

0 comments on commit 9d619f7

Please sign in to comment.