Skip to content

Commit

Permalink
Add a rust-based backup over-REST block downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
TheBlueMatt committed Oct 26, 2019
1 parent 3b2c1ca commit 67bef7d
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/Makefile.am
Expand Up @@ -374,7 +374,8 @@ libbitcoin_wallet_tool_a_SOURCES = \
LIBBITCOIN_RUSTY_SRCS= \
rusty/src/lib.rs \
rusty/src/bridge.rs \
rusty/src/dns_headers.rs
rusty/src/dns_headers.rs \
rusty/src/rest_downloader.rs

# We force overflow checks on so that any integer overflows result in a panic
# instead of, eg, attempting to alloc 4GB of memory (though we will catch such
Expand Down
5 changes: 5 additions & 0 deletions src/init.cpp
Expand Up @@ -191,6 +191,7 @@ void Shutdown(InitInterfaces& interfaces)

#if ENABLE_RUSTY
rust_block_fetch::stop_fetch_dns_headers();
rust_block_fetch::stop_fetch_rest_blocks();
#endif

StopHTTPRPC();
Expand Down Expand Up @@ -377,6 +378,7 @@ void SetupServerArgs()
gArgs.AddArg("-blocksonly", strprintf("Whether to reject transactions from network peers. Transactions from the wallet, RPC and relay whitelisted inbound peers are not affected. (default: %u)", DEFAULT_BLOCKSONLY), ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
#if ENABLE_RUSTY
gArgs.AddArg("-headersfetchdns=<domain>", "A domain name from which to fetch headers. eg bitcoinheaders.net", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
gArgs.AddArg("-blockfetchrest=<uri>", "A REST endpoint from which to fetch blocks. Acts as a redundant backup for P2P connectivity. eg http://cloudflare.deanonymizingseed.com/rest/", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
#endif
gArgs.AddArg("-conf=<file>", strprintf("Specify configuration file. Relative paths will be prefixed by datadir location. (default: %s)", BITCOIN_CONF_FILENAME), ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
gArgs.AddArg("-datadir=<dir>", "Specify data directory", ArgsManager::ALLOW_ANY, OptionsCategory::OPTIONS);
Expand Down Expand Up @@ -1840,6 +1842,9 @@ bool AppInitMain(InitInterfaces& interfaces)
for (const std::string& domain : gArgs.GetArgs("-headersfetchdns")) {
rust_block_fetch::init_fetch_dns_headers(domain.c_str());
}
for (const std::string& uri : gArgs.GetArgs("-blockfetchrest")) {
rust_block_fetch::init_fetch_rest_blocks(uri.c_str());
}
#endif

return true;
Expand Down
1 change: 1 addition & 0 deletions src/rusty/src/lib.rs
@@ -1,6 +1,7 @@
mod bridge;
use bridge::*;
mod dns_headers;
mod rest_downloader;

use std::alloc::{GlobalAlloc, Layout, System};
use std::ptr;
Expand Down
333 changes: 333 additions & 0 deletions src/rusty/src/rest_downloader.rs
@@ -0,0 +1,333 @@
use std::cmp;
use std::net::{TcpStream, ToSocketAddrs};
use std::io::{Read, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::panic::catch_unwind;

use crate::bridge::*;
use crate::await_ibd_complete_or_stalled;

use std::ffi::CStr;
use std::os::raw::c_char;

/// Splits an HTTP URI into its component part - (is_ssl, hostname, port number, and HTTP path)
fn split_uri<'a>(uri: &'a str) -> Option<(bool, &'a str, u16, &'a str)> {
let mut uri_iter = uri.splitn(2, ":");
let ssl = match uri_iter.next() {
Some("http") => false,
Some("https") => true,
_ => return None,
};
let mut host_path = match uri_iter.next() {
Some(r) => r,
None => return None,
};
host_path = host_path.trim_left_matches("/");
let mut host_path_iter = host_path.splitn(2, "/");
let (host_port_len, host, port) = match host_path_iter.next() {
Some(r) if !r.is_empty() => {
let is_v6_explicit = r.starts_with("[");
let mut iter = if is_v6_explicit {
r[1..].splitn(2, "]")
} else {
r.splitn(2, ":")
};
(r.len(), match iter.next() {
Some(host) => host,
None => return None,
}, match iter.next() {
Some(port) if !is_v6_explicit || !port.is_empty() => match if is_v6_explicit {
if port.as_bytes()[0] != ':' as u8 { return None; }
&port[1..]
} else { port }
.parse::<u16>() {
Ok(p) => p,
Err(_) => return None,
},
_ => if ssl { 443 } else { 80 },
})
},
_ => return None,
};
let path = &host_path[host_port_len..];

Some((ssl, host, port, path))
}

#[cfg(test)]
#[test]
fn test_split_uri() {
assert_eq!(split_uri("http://example.com:8080/path"), Some((false, "example.com", 8080, "/path")));
assert_eq!(split_uri("http:example.com:8080/path/b"), Some((false, "example.com", 8080, "/path/b")));
assert_eq!(split_uri("https://0.0.0.0/"), Some((true, "0.0.0.0", 443, "/")));
assert_eq!(split_uri("http:[0:bad::43]:80/"), Some((false, "0:bad::43", 80, "/")));
assert_eq!(split_uri("http:[::]"), Some((false, "::", 80, "")));
assert_eq!(split_uri("http://"), None);
assert_eq!(split_uri("http://example.com:70000/"), None);
assert_eq!(split_uri("ftp://example.com:80/"), None);
assert_eq!(split_uri("http://example.com"), Some((false, "example.com", 80, "")));
}

fn read_http_resp(socket: &mut TcpStream, max_resp: usize) -> Option<Vec<u8>> {
let mut resp = Vec::new();
let mut bytes_read = 0;
macro_rules! read_socket { () => { {
if unsafe { rusty_ShutdownRequested() } { return None; }
match socket.read(&mut resp[bytes_read..]) {
Ok(0) => return None,
Ok(b) => b,
Err(_) => return None,
}
} } }

let mut actual_len = 0;
let mut ok_found = false;
let mut chunked = false;
// We expect the HTTP headers to fit in 8KB, and use resp as a temporary buffer for headers
// until we know our real length.
resp.extend_from_slice(&[0; 8192]);
'read_headers: loop {
if bytes_read >= 8192 { return None; }
bytes_read += read_socket!();
for line in resp[..bytes_read].split(|c| *c == '\n' as u8 || *c == '\r' as u8) {
let content_header = b"Content-Length: ";
if line.len() > content_header.len() && line[..content_header.len()].eq_ignore_ascii_case(content_header) {
actual_len = match match std::str::from_utf8(&line[content_header.len()..]){
Ok(s) => s, Err(_) => return None,
}.parse() {
Ok(len) => len, Err(_) => return None,
};
}
let http_resp_1 = b"HTTP/1.1 200 ";
let http_resp_0 = b"HTTP/1.0 200 ";
if line.len() > http_resp_1.len() && (line[..http_resp_1.len()].eq_ignore_ascii_case(http_resp_1) ||
line[..http_resp_0.len()].eq_ignore_ascii_case(http_resp_0)) {
ok_found = true;
}
let transfer_encoding = b"Transfer-Encoding: ";
if line.len() > transfer_encoding.len() && line[..transfer_encoding.len()].eq_ignore_ascii_case(transfer_encoding) {
match &*String::from_utf8_lossy(&line[transfer_encoding.len()..]).to_ascii_lowercase() {
"chunked" => chunked = true,
_ => return None, // Unsupported
}
}
}
for (idx, window) in resp[..bytes_read].windows(4).enumerate() {
if window[0..2] == *b"\n\n" || window[0..2] == *b"\r\r" {
resp = resp.split_off(idx + 2);
resp.resize(bytes_read - idx - 2, 0);
break 'read_headers;
} else if window[0..4] == *b"\r\n\r\n" {
resp = resp.split_off(idx + 4);
resp.resize(bytes_read - idx - 4, 0);
break 'read_headers;
}
}
}
if !ok_found || (!chunked && (actual_len == 0 || actual_len > max_resp)) { return None; } // Sorry, not implemented
bytes_read = resp.len();
if !chunked {
resp.resize(actual_len, 0);
while bytes_read < actual_len {
bytes_read += read_socket!();
}
Some(resp)
} else {
actual_len = 0;
let mut chunk_remaining = 0;
'read_bytes: loop {
if chunk_remaining == 0 {
let mut bytes_skipped = 0;
let mut finished_read = false;
let mut lineiter = resp[actual_len..bytes_read].split(|c| *c == '\n' as u8 || *c == '\r' as u8).peekable();
loop {
let line = match lineiter.next() { Some(line) => line, None => break };
if lineiter.peek().is_none() { // We haven't yet read to the end of this line
if line.len() > 8 {
// No reason to ever have a chunk length line longer than 4 chars
return None;
}
break;
}
bytes_skipped += line.len() + 1;
if line.len() == 0 { continue; } // Probably between the \r and \n
match usize::from_str_radix(&match std::str::from_utf8(line) {
Ok(s) => s, Err(_) => return None,
}, 16) {
Ok(chunklen) => {
if chunklen == 0 { finished_read = true; }
chunk_remaining = chunklen;
match lineiter.next() {
Some(l) if l.is_empty() => {
// Drop \r after \n
bytes_skipped += 1;
if actual_len + bytes_skipped > bytes_read {
// Go back and get more bytes so we can skip trailing \n
chunk_remaining = 0;
}
},
Some(_) => {},
None => {
// Go back and get more bytes so we can skip trailing \n
chunk_remaining = 0;
},
}
break;
},
Err(_) => return None,
}
}
if chunk_remaining != 0 {
bytes_read -= bytes_skipped;
resp.drain(actual_len..actual_len + bytes_skipped);
if actual_len + chunk_remaining > max_resp { return None; }
let already_in_chunk = cmp::min(bytes_read - actual_len, chunk_remaining);
actual_len += already_in_chunk;
chunk_remaining -= already_in_chunk;
continue 'read_bytes;
} else {
if finished_read {
// Note that we may leave some extra \r\ns to be read, but that's OK,
// we'll ignore then when parsing headers for the next request.
resp.resize(actual_len, 0);
return Some(resp);
} else {
// Need to read more bytes to figure out chunk length
}
}
}
resp.resize(bytes_read + cmp::max(10, chunk_remaining), 0);
let avail = read_socket!();
bytes_read += avail;
if chunk_remaining != 0 {
let chunk_read = cmp::min(chunk_remaining, avail);
chunk_remaining -= chunk_read;
actual_len += chunk_read;
}
}
}
}

static THREAD_COUNT: AtomicUsize = AtomicUsize::new(0);
#[no_mangle]
pub extern "C" fn init_fetch_rest_blocks(uri: *const c_char) -> bool {
let uri_str: String = match unsafe { CStr::from_ptr(uri) }.to_str() {
Ok(r) => r.to_string(),
Err(_) => return false,
};
// Sadly only non-SSL is supported for now
if let Some((false, _, _, _)) = split_uri(&uri_str) { } else { return false; }
std::thread::spawn(move || {
// Always catch panics so that even if we have some bug in our parser we don't take the
// rest of Bitcoin Core down with us:
THREAD_COUNT.fetch_add(1, Ordering::AcqRel);
let _ = catch_unwind(move || {
await_ibd_complete_or_stalled();
let (ssl, host, port, path) = split_uri(&uri_str).unwrap();
let mut provider_state = BlockProviderState::new_with_current_best(BlockIndex::tip());
'reconnect: while unsafe { !rusty_ShutdownRequested() } {
std::thread::sleep(Duration::from_secs(1));
if unsafe { rusty_ShutdownRequested() } { return; }

let mut stream;
macro_rules! reconnect {
() => { {
stream = match TcpStream::connect_timeout(&match (host, port).to_socket_addrs() {
Ok(mut sockaddrs) => match sockaddrs.next() { Some(sockaddr) => sockaddr, None => continue 'reconnect },
Err(_) => continue 'reconnect,
}, Duration::from_secs(1)) {
Ok(stream) => stream,
Err(_) => continue 'reconnect,
};
stream.set_write_timeout(Some(Duration::from_secs(1))).expect("Host kernel is uselessly old?");
stream.set_read_timeout(Some(Duration::from_secs(10))).expect("Host kernel is uselessly old?");
if ssl {
unimplemented!();
}
} }
}
reconnect!();

'header_sync: while unsafe { !rusty_ShutdownRequested() } {
let req = format!("GET {}/headers/2000/{}.bin HTTP/1.1\nHost: {}\nConnection: keep-alive\n\n", path, provider_state.get_current_best().hash_hex(), host);
match stream.write(req.as_bytes()) {
Ok(len) if len == req.len() => {},
_ => continue 'reconnect,
}
let headers = match read_http_resp(&mut stream, 80*2000) {
Some(h) => h,
None => continue 'reconnect,
};
if headers.len() == 80 {
// We got exactly the header we requested, ie it is *also* the tip for the
// remote node, go on to block fetching!
break 'header_sync;
} else if headers.len() == 0 {
let genesis_tip = BlockIndex::genesis();
if genesis_tip == provider_state.get_current_best() { // Maybe they're on a different network entirely?
continue 'reconnect;
}
// We are on a fork, but there's nothing in REST (currently) that lets us walk
// back until we find the fork point, so we just start downloading again from
// genesis, 2000 headers at a time.
provider_state.set_current_best(genesis_tip);
continue 'header_sync;
}
match connect_headers_flat_bytes(&headers) {
Some(new_best) => {
if new_best == provider_state.get_current_best() {
// We tried to connect > 1 header, but didn't move forward, reconnect
// and try again
continue 'reconnect;
}
provider_state.set_current_best(new_best);
continue 'header_sync;
},
None => {
// We consider their response bogus, reconnect and try again
continue 'reconnect;
},
}
}

// We think we're caught up with their header chain!
// Wait a little bit to give the regular (more efficient) P2P logic a chance to
// download blocks, then check if we should download anything.
for _ in 0..30 {
std::thread::sleep(Duration::from_secs(1));
if unsafe { rusty_ShutdownRequested() } { return; }
}

reconnect!();

'block_fetch: while unsafe { !rusty_ShutdownRequested() } {
match provider_state.get_next_block_to_download(true) {
Some(to_fetch) => {
let req = format!("GET {}/block/{}.bin HTTP/1.1\nHost: {}\nConnection: keep-alive\n\n", path, to_fetch.hash_hex(), host);
match stream.write(req.as_bytes()) {
Ok(len) if len == req.len() => {},
_ => continue 'reconnect,
}
let block = match read_http_resp(&mut stream, 4 * 1000 * 1000) {
Some(b) => b,
None => continue 'reconnect,
};
connect_block(&block, Some(to_fetch));
},
None => continue 'reconnect,
}
}
}
});
THREAD_COUNT.fetch_sub(1, Ordering::AcqRel);
});
true
}

#[no_mangle]
pub extern "C" fn stop_fetch_rest_blocks() {
while THREAD_COUNT.load(Ordering::Acquire) != 0 {
std::thread::sleep(Duration::from_millis(10));
}
}
2 changes: 2 additions & 0 deletions src/rusty/src/rust_bridge.h
Expand Up @@ -17,6 +17,8 @@ extern "C" {

bool init_fetch_dns_headers(const char *domain);
bool stop_fetch_dns_headers();
bool init_fetch_rest_blocks(const char *uri);
bool stop_fetch_rest_blocks();

} // extern "C"

Expand Down

0 comments on commit 67bef7d

Please sign in to comment.