Skip to content

Commit

Permalink
Poll the fallback server
Browse files Browse the repository at this point in the history
  • Loading branch information
ArtemGr committed May 20, 2019
1 parent 815e470 commit c2d70fe
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 43 deletions.
2 changes: 1 addition & 1 deletion mm2src/common/common.rs
Expand Up @@ -558,7 +558,7 @@ lazy_static! {
};
}

type SlurpFut = Box<Future<Item=(StatusCode, HeaderMap, Vec<u8>), Error=String> + Send + 'static>;
pub type SlurpFut = Box<Future<Item=(StatusCode, HeaderMap, Vec<u8>), Error=String> + Send + 'static>;

/// Executes a Hyper request, returning the response status, headers and body.
pub fn slurp_req (request: Request<Body>) -> SlurpFut {
Expand Down
11 changes: 4 additions & 7 deletions mm2src/lp_native_dex.rs
Expand Up @@ -1707,8 +1707,8 @@ pub fn lp_init (mypullport: u16, mypubport: u16, conf: Json, c_conf: CJSON, ctx_
let i_am_seed = ctx.conf["i_am_seed"].as_bool().unwrap_or(false);
let netid = ctx.netid();

let mut http_fallback_shutdown = None;
let mut http_fallback_port = None;
// Keeps HTTP fallback server alive until `lp_init` exits.
let mut _hf_shutdown;

let myipaddr: IpAddr = if Path::new ("myipaddr") .exists() {
match fs::File::open ("myipaddr") {
Expand Down Expand Up @@ -1774,14 +1774,11 @@ pub fn lp_init (mypullport: u16, mypubport: u16, conf: Json, c_conf: CJSON, ctx_
// If the bind fails then emit a user-visible warning and fall back to 0.0.0.0.
let tags: &[&TagParam] = &[&"myipaddr"];
match test_ip (&ctx, ip) {
Ok ((hf_shutdown, hf_port)) => {
Ok ((hf_shutdown, _hf_port)) => {
ctx.log.log ("🙂", tags, &fomat! (
"We've detected an external IP " (ip) " and we can bind on it (port " (mypubport) ")"
", so probably a dedicated IP."));
if i_am_seed {
http_fallback_shutdown = Some (hf_shutdown);
http_fallback_port = Some (hf_port);
}
if i_am_seed {_hf_shutdown = hf_shutdown}
break ip
},
Err (err) => log! ("IP " (ip) " doesn't check: " (err))
Expand Down
73 changes: 60 additions & 13 deletions mm2src/peers/http_fallback.rs
@@ -1,6 +1,7 @@
use base64;
use byteorder::{BigEndian, WriteBytesExt};
use crdts::{CvRDT, CmRDT, Map, Orswot};
use futures::{self, Future};
use futures::{future, self, Async, Future};
use gstuff::{netstring, now_float};
use hashbrown::hash_map::{Entry, HashMap, RawEntryMut};
use hyper::{Request, Body};
Expand All @@ -11,6 +12,7 @@ use serde_json::{self as json, Value as Json};
use std::io::Write;
use std::net::{SocketAddr};
use std::sync::{Arc, Mutex};
use std::sync::atomic::Ordering;
use std::str::from_utf8_unchecked;
use tokio_core::net::TcpListener;

Expand Down Expand Up @@ -128,11 +130,11 @@ pub fn rep_keys (rep_map: &RepStrMap) -> Result<Vec<String>, String> {
} else {Ok (Vec::new())}
}

fn fallback_url (addr: &SocketAddr, method: &str) -> String {
fn fallback_url (hf_addr: &SocketAddr, method: &str) -> String {
fomat! (
"http" if addr.port() == 443 {'s'} "://"
(addr.ip())
if addr.port() != 80 && addr.port() != 443 {':' (addr.port())}
"http" if hf_addr.port() == 443 {'s'} "://"
(hf_addr.ip())
if hf_addr.port() != 80 && hf_addr.port() != 443 {':' (hf_addr.port())}
"/fallback/" (method)
)
}
Expand All @@ -143,10 +145,10 @@ fn fallback_url (addr: &SocketAddr, method: &str) -> String {
/// The port should be 80 or 443 as this should help the server to function
/// even with the most restrictive internet operators.
pub fn fetch_map (addr: &SocketAddr, id: Vec<u8>) -> Box<Future<Item=RepStrMap, Error=String> + Send> {
let url = fallback_url (addr, "fetch_map");
let hf_url = fallback_url (addr, "fetch_map");
let request = try_fus! (Request::builder()
.method("POST")
.uri (url)
.uri (hf_url)
.body (Body::from (id)));
let f = slurp_req (request);
let f = f.and_then (|(status, _headers, body)| -> Result<RepStrMap, String> {
Expand Down Expand Up @@ -198,7 +200,7 @@ pub struct HttpFallbackTargetTrack {
}

/// Plugged into `fn transmit` to send the chunks via HTTP fallback when necessary.
pub fn hf_transmit (pctx: &super::PeersContext, hf_addr: Option<SocketAddr>, our_public_key: &bits256,
pub fn hf_transmit (pctx: &super::PeersContext, hf_addr: &Option<SocketAddr>, our_public_key: &bits256,
seed: &bits256, package: &mut super::Package) -> Result<(), String> {
let hf_addr = match hf_addr {Some (a) => a, None => return Ok(())};

Expand All @@ -211,7 +213,7 @@ pub fn hf_transmit (pctx: &super::PeersContext, hf_addr: Option<SocketAddr>, our
if payload.chunk.is_none() {continue}
deliver_to_seed.insert (salt, (payload, meta));
}
let mut http_fallback_maps = try_s! (pctx.http_fallback_maps.lock());
let mut http_fallback_maps = try_s! (pctx.hf_maps.lock());
let mut trackⁱ = http_fallback_maps.entry (*seed);
let track = match trackⁱ {
Entry::Occupied (ref mut oe) => oe.get_mut(),
Expand Down Expand Up @@ -292,7 +294,7 @@ log! ("transmit] TBD, time to use the HTTP fallback...");
///
/// * `salt` - The subject salt (checksum of the `subject` passed to `fn recv`).
pub fn hf_delayed_get (pctx: &super::PeersContext, salt: &Vec<u8>) {
let mut delayed_salts = match pctx.delayed_salts.lock() {
let mut delayed_salts = match pctx.hf_delayed_salts.lock() {
Ok (set) => set,
Err (err) => {log! ("Can't lock `delayed_salts`: " (err)); return}
};
Expand All @@ -303,20 +305,65 @@ pub fn hf_delayed_get (pctx: &super::PeersContext, salt: &Vec<u8>) {

/// Manage HTTP fallback retrievals.
/// Invoked periodically from the peers loop.
pub fn hf_poll (pctx: &Arc<super::PeersContext>) -> Result<(), String> {
pub fn hf_poll (pctx: &Arc<super::PeersContext>, hf_addr: &Option<SocketAddr>) -> Result<(), String> {
let hf_addr = match hf_addr {Some (ref a) => a, None => return Ok(())};

{
let delayed_salts = try_s! (pctx.delayed_salts.lock());
let delayed_salts = try_s! (pctx.hf_delayed_salts.lock());
if delayed_salts.is_empty() {return Ok(())}
}

let mut hf_pollₒ = try_s! (pctx.hf_poll.lock());
// NB: Futures can only be polled from other futures, see https://stackoverflow.com/a/41813881.
let skip = try_s! (future::lazy (|| -> Result<bool, String> {
if let Some (ref mut hf_poll) = *hf_pollₒ {
match hf_poll.poll() {
Err (err) => {
log! ("hf_poll error: " (err));
// Try again next iteration.
*hf_pollₒ = None;
return Ok (true)
},
Ok (Async::NotReady) => {
// Retrieval already in progress.
return Ok (true)
},
Ok (Async::Ready (r)) => {
// TODO: Process the retrieved maps.
//pintln! ("hf_poll] got " [=r]);
// Keep polling on next iteration.
*hf_pollₒ = None;
return Ok (true)
}
}
}
Ok (false)
}) .wait());
if skip {return Ok(())}

//pintln! ("hf_poll] polling, at id " (pctx.hf_last_poll_id.load (Ordering::Relaxed)));

let mut hf_id_prefix = Vec::with_capacity (1 + 4 + 32 + 1);
hf_id_prefix.push (1); // Version of the query protocol.
try_s! (hf_id_prefix.write_u64::<BigEndian> (pctx.hf_last_poll_id.load (Ordering::Relaxed)));
hf_id_prefix.extend_from_slice (&unsafe {try_s! (pctx.our_public_key.lock()) .bytes} [..]);
hf_id_prefix.push (b'<');

let hf_url = fallback_url (hf_addr, "fetch_maps_by_prefix");
let request = try_s! (Request::builder()
.method("POST")
.uri (hf_url)
.body (Body::from (hf_id_prefix)));
*hf_pollₒ = Some (slurp_req (request));

Ok(())
}

/// Invoked when the client terminates a retrieval attempt.
///
/// * `salt` - The subject salt (checksum of the `subject` passed to `fn recv`).
pub fn hf_drop_get (pctx: &super::PeersContext, salt: &Vec<u8>) {
let mut delayed_salts = match pctx.delayed_salts.lock() {
let mut delayed_salts = match pctx.hf_delayed_salts.lock() {
Ok (set) => set,
Err (err) => {log! ("Can't lock `delayed_salts`: " (err)); return}
};
Expand Down
50 changes: 28 additions & 22 deletions mm2src/peers/peers.rs
Expand Up @@ -23,7 +23,7 @@ pub mod http_fallback;
use crate::http_fallback::{hf_delayed_get, hf_drop_get, hf_poll, hf_transmit, HttpFallbackTargetTrack};

use byteorder::{BigEndian, WriteBytesExt, ReadBytesExt};
use common::{binprint, bits256, is_a_test_drill, slice_to_malloc, RaiiRm};
use common::{binprint, bits256, is_a_test_drill, slice_to_malloc, RaiiRm, SlurpFut};
use common::log::TagParam;
use common::mm_ctx::{from_ctx, MmArc};
use crc::crc32::{update, IEEE_TABLE};
Expand Down Expand Up @@ -263,9 +263,13 @@ pub struct PeersContext {
/// (direct_pings - discovery pings - pongs - invalid).
direct_chunks: AtomicU64,
/// Recent attempts at reaching targets via HTTP fallback. seed -> track
http_fallback_maps: Mutex<HashMap<bits256, HttpFallbackTargetTrack>>,
hf_maps: Mutex<HashMap<bits256, HttpFallbackTargetTrack>>,
/// Subject salts we're trying to get for longer than their `fallback` timeout.
delayed_salts: Mutex<HashMap<Vec<u8>, ()>>
hf_delayed_salts: Mutex<HashMap<Vec<u8>, ()>>,
/// Long polling from HTTP fallback server.
hf_poll: Mutex<Option<SlurpFut>>,
/// The version of the HTTP fallback response that we already have.
hf_last_poll_id: AtomicU64
}

impl PeersContext {
Expand All @@ -282,8 +286,10 @@ impl PeersContext {
trans_meta: Mutex::new (TransMeta::default()),
direct_pings: AtomicU64::new (0),
direct_chunks: AtomicU64::new (0),
http_fallback_maps: Mutex::new (HashMap::new()),
delayed_salts: Mutex::new (HashMap::new())
hf_maps: Mutex::new (HashMap::new()),
hf_delayed_salts: Mutex::new (HashMap::new()),
hf_poll: Mutex::new (None),
hf_last_poll_id: AtomicU64::new (0)
})
})))
}
Expand Down Expand Up @@ -464,22 +470,12 @@ extern fn put_callback (arg: *mut c_void, arg2: u64, have: *const u8, havelen: i
}

/// Invoked periodically from the `peers_thread` in order to manage and retransmit the outgoing ping packets.
fn transmit (dugout: &mut dugout_t, ctx: &MmArc) -> Result<(), String> {
fn transmit (dugout: &mut dugout_t, ctx: &MmArc, hf_addr: &Option<SocketAddr>) -> Result<(), String> {
// AG: Instead of the current random RATELIM skipping
// we might consider *ordering* the packets according to the endpoint freshness, etc.
// For `dht_put` the ordering might take into account the presense or absence of the corresponding `dht_put` callback
// (we want to repeat a `dht_put` more often, relative to others, if we haven't heard from it).

let hf_addr = loop {
let ip = {
let seeds = try_s! (ctx.seeds.lock());
if seeds.is_empty() {break None}
seeds[0].clone()
};
let port = ctx.conf["http-fallback-port"].as_u64().unwrap_or (80) as u16;
break Some (SocketAddr::new (ip, port))
};

let pctx = try_s! (PeersContext::from_ctx (ctx));
let our_public_key = try_s! (pctx.our_public_key.lock()) .clone();
let mut trans = try_s! (pctx.trans_meta.lock());
Expand Down Expand Up @@ -1050,6 +1046,16 @@ fn peers_thread (ctx: MmArc, _netid: u16, our_public_key: bits256, preferred_por

let mut gets = Gets::default();

let hf_addr = loop {
let ip = {
let seeds = unwrap! (ctx.seeds.lock());
if seeds.is_empty() {break None}
seeds[0].clone()
};
let port = ctx.conf["http-fallback-port"].as_u64().unwrap_or (80) as u16;
break Some (SocketAddr::new (ip, port))
};

loop {
extern fn cb (_dugout: *mut dugout_t, cbctx: *mut c_void, alert: *mut Alert) {
//let dugout: &mut dugout_t = unsafe {&mut *dugout};
Expand Down Expand Up @@ -1234,7 +1240,7 @@ fn peers_thread (ctx: MmArc, _netid: u16, our_public_key: bits256, preferred_por

if ctx.is_stopping() {break}

if let Err (err) = transmit (&mut dugout, &ctx) {log! ("send_pings error: " (err))}
if let Err (err) = transmit (&mut dugout, &ctx, &hf_addr) {log! ("transmit error: " (err))}

// Cloning the keys allows `get_pieces_scheduler_en` to remove the `gets_oe` entry.
let get_salts: Vec<_> = gets.keys().map (|k| k.clone()) .collect();
Expand All @@ -1251,13 +1257,13 @@ fn peers_thread (ctx: MmArc, _netid: u16, our_public_key: bits256, preferred_por
get_pieces_scheduler_en (&our_public_key, &mut dugout, gets_oe, &*pctx);
}

if let Err (err) = hf_poll (&pctx) {
if let Err (err) = hf_poll (&pctx, &hf_addr) {
log! ("hf_poll error: " (err))
}

// Remove old entries from `http_fallback_maps`.
if let Ok (mut http_fallback_maps) = pctx.http_fallback_maps.lock() {
http_fallback_maps.retain (|_seed, track| now - track.last_store < 600.)
// Remove old entries from `hf_maps`.
if let Ok (mut hf_maps) = pctx.hf_maps.lock() {
hf_maps.retain (|_seed, track| now - track.last_store < 600.)
}
if let Ok (mut recently_fetched) = pctx.recently_fetched.lock() {
recently_fetched.retain (|_, (lm, _)| now - *lm < 600.)
Expand Down Expand Up @@ -1319,7 +1325,7 @@ pub fn initialize (ctx: &MmArc, netid: u16, our_public_key: bits256, preferred_p
let pctx = try_s! (PeersContext::from_ctx (&ctx));
*try_s! (pctx.our_public_key.lock()) = our_public_key;
*try_s! (pctx.peers_thread.lock()) =
Some (try_s! (thread::Builder::new().name ("dht".into()) .spawn ({
Some (try_s! (thread::Builder::new().name ("peers".into()) .spawn ({
let ctx = ctx.clone();
move || peers_thread (ctx, netid, our_public_key, preferred_port, read_only, delay_dht)
})));
Expand Down

0 comments on commit c2d70fe

Please sign in to comment.