Skip to content
Permalink
Browse files

Track delayed salts

  • Loading branch information...
ArtemGr committed May 19, 2019
1 parent 3da0968 commit 815e470ae69b2812529f761f26f5d655fe0b31cd
Showing with 63 additions and 27 deletions.
  1. +2 −1 mm2src/common/common.rs
  2. +31 −3 mm2src/peers/http_fallback.rs
  3. +30 −23 mm2src/peers/peers.rs
@@ -482,7 +482,8 @@ E: fmt::Display + Send + 'static {
/// Another option to consider is https://github.com/alexcrichton/futures-timer.
/// P.S. The older `0.1` version of the `tokio::timer` might work NP, it works in other parts of our code.
/// The new version, on the other hand, requires the Tokio runtime (https://tokio.rs/blog/2018-03-timers/).
/// TODO: Use futures-timer instead.
/// P.S. We could try using the `futures-timer` crate instead, but note that it is currently under-maintained,
/// https://github.com/rustasync/futures-timer/issues/9#issuecomment-400802515.
pub struct Timeout<R> {
fut: Box<Future<Item=R, Error=String>>,
started: f64,
@@ -2,7 +2,7 @@ use base64;
use crdts::{CvRDT, CmRDT, Map, Orswot};
use futures::{self, Future};
use gstuff::{netstring, now_float};
use hashbrown::hash_map::{Entry, HashMap};
use hashbrown::hash_map::{Entry, HashMap, RawEntryMut};
use hyper::{Request, Body};
use hyper::rt::{Stream};
use hyper::service::Service;
@@ -288,9 +288,37 @@ log! ("transmit] TBD, time to use the HTTP fallback...");
Ok(())
}

/// Invoked when a delayed retrieval is detected by the peers loop.
///
/// * `salt` - The subject salt (checksum of the `subject` passed to `fn recv`).
pub fn hf_delayed_get (pctx: &super::PeersContext, salt: &Vec<u8>) {
let mut delayed_salts = match pctx.delayed_salts.lock() {
Ok (set) => set,
Err (err) => {log! ("Can't lock `delayed_salts`: " (err)); return}
};
if let RawEntryMut::Vacant (ve) = delayed_salts.raw_entry_mut().from_key (salt) {
ve.insert (salt.clone(), ());
}
}

/// Manage HTTP fallback retrievals.
/// Invoked from the peers loop whenever there are delayed gets.
pub fn hf_poll (our_public_key: &bits256) -> Result<(), String> {
/// Invoked periodically from the peers loop.
pub fn hf_poll (pctx: &Arc<super::PeersContext>) -> Result<(), String> {
{
let delayed_salts = try_s! (pctx.delayed_salts.lock());
if delayed_salts.is_empty() {return Ok(())}
}

Ok(())
}

/// Invoked when the client terminates a retrieval attempt.
///
/// * `salt` - The subject salt (checksum of the `subject` passed to `fn recv`).
pub fn hf_drop_get (pctx: &super::PeersContext, salt: &Vec<u8>) {
let mut delayed_salts = match pctx.delayed_salts.lock() {
Ok (set) => set,
Err (err) => {log! ("Can't lock `delayed_salts`: " (err)); return}
};
delayed_salts.remove (salt);
}
@@ -20,7 +20,7 @@
pub mod peers_tests;

pub mod http_fallback;
use crate::http_fallback::{hf_poll, hf_transmit, HttpFallbackTargetTrack};
use crate::http_fallback::{hf_delayed_get, hf_drop_get, hf_poll, hf_transmit, HttpFallbackTargetTrack};

use byteorder::{BigEndian, WriteBytesExt, ReadBytesExt};
use common::{binprint, bits256, is_a_test_drill, slice_to_malloc, RaiiRm};
@@ -33,7 +33,7 @@ use either::Either;
use futures::{future, Async, Future, Poll};
use futures::task::Task;
use gstuff::{now_float, slurp};
use hashbrown::hash_map::{DefaultHashBuilder, Entry, HashMap, OccupiedEntry};
use hashbrown::hash_map::{DefaultHashBuilder, Entry, HashMap, OccupiedEntry, RawEntryMut};
use itertools::Itertools;
use libc::{c_char, c_void};
use rand::{thread_rng, Rng, RngCore};
@@ -253,7 +253,6 @@ pub struct PeersContext {
cmd_tx: channel::Sender<LtCommand>,
/// Should only be used by the `peers_thread`.
cmd_rx: channel::Receiver<LtCommand>,
// TODO: Remove the outdated `recently_fetched` entries after a while.
/// Salt -> last-modified, value.
recently_fetched: Mutex<HashMap<Vec<u8>, (f64, Vec<u8>)>>,
/// Of retransmission subsystem.
@@ -263,7 +262,10 @@ pub struct PeersContext {
/// The number of data chunks delivered directly via the DHT pings.
/// (direct_pings - discovery pings - pongs - invalid).
direct_chunks: AtomicU64,
http_fallback_maps: Mutex<HashMap<bits256, HttpFallbackTargetTrack>>
/// Recent attempts at reaching targets via HTTP fallback. seed -> track
http_fallback_maps: Mutex<HashMap<bits256, HttpFallbackTargetTrack>>,
/// Subject salts we're trying to get for longer than their `fallback` timeout.
delayed_salts: Mutex<HashMap<Vec<u8>, ()>>
}

impl PeersContext {
@@ -280,7 +282,8 @@ impl PeersContext {
trans_meta: Mutex::new (TransMeta::default()),
direct_pings: AtomicU64::new (0),
direct_chunks: AtomicU64::new (0),
http_fallback_maps: Mutex::new (HashMap::new())
http_fallback_maps: Mutex::new (HashMap::new()),
delayed_salts: Mutex::new (HashMap::new())
})
})))
}
@@ -582,12 +585,6 @@ fn transmit (dugout: &mut dugout_t, ctx: &MmArc) -> Result<(), String> {
}
}

// Remove old entries from `http_fallback_maps`.
if let Ok (mut http_fallback_maps) = pctx.http_fallback_maps.lock() {
let now = now_float();
http_fallback_maps.retain (|_seed, track| now - track.last_store < 600.);
}

Ok(())
}

@@ -984,8 +981,11 @@ fn incoming_ping (cbctx: &mut CbCtx, pkt: &[u8], ip: &[u8], port: u16) -> Result
crc = update (crc, &IEEE_TABLE, &subject_salt);
if incoming_checksum != crc {return ERR! ("bad ping chunk")}

let (_, gets) = cbctx.gets.raw_entry_mut().from_key (subject_salt)
.or_insert (subject_salt.to_vec(), GetsEntry::default());
let mut gets_en = cbctx.gets.raw_entry_mut().from_key (subject_salt);
let (_, gets) = match gets_en {
RawEntryMut::Occupied (ref mut oe) => oe.get_key_value_mut(),
RawEntryMut::Vacant (ve) => ve.insert (subject_salt.to_vec(), GetsEntry::default())
};
if chunk_idx == 1 {
let number_of_chunks = try_s! (NonZeroU8::new (payload.remove (0)) .ok_or ("zero chunks"));
gets.number_of_chunks = Some (number_of_chunks)
@@ -1225,7 +1225,8 @@ fn peers_thread (ctx: MmArc, _netid: u16, our_public_key: bits256, preferred_por
assert_eq! (seed, our_public_key);
get_pieces_scheduler (&seed, salt, frid, task, fallback, &mut dugout, &mut gets, &*pctx)},
Ok (LtCommand::DropGet {salt, frid}) => {
if let Some (en) = gets.get_mut (&salt) {en.drop_get (frid)}},
if let Some (en) = gets.get_mut (&salt) {en.drop_get (frid)}
hf_drop_get (&pctx, &salt)},
Ok (LtCommand::Ping {endpoint}) => pingʹ (&ctx, &our_public_key, &endpoint, None),
Err (channel::RecvTimeoutError::Timeout) => {},
Err (channel::RecvTimeoutError::Disconnected) => break
@@ -1236,25 +1237,31 @@ fn peers_thread (ctx: MmArc, _netid: u16, our_public_key: bits256, preferred_por
if let Err (err) = transmit (&mut dugout, &ctx) {log! ("send_pings error: " (err))}

// Cloning the keys allows `get_pieces_scheduler_en` to remove the `gets_oe` entry.
let gets_keys: Vec<_> = gets.keys().map (|k| k.clone()) .collect();
let get_salts: Vec<_> = gets.keys().map (|k| k.clone()) .collect();
let now = now_float();
let mut delayed_gets = false;
for key in gets_keys {
let gets_oe = if let Entry::Occupied (oe) = gets.entry (key) {oe} else {panic!()};
for salt in get_salts {
let gets_oe = if let Entry::Occupied (oe) = gets.entry (salt) {oe} else {panic!()};

if let Some (created_at) = gets_oe.get().created_at {
if let Some (fallback) = gets_oe.get().fallback {
if now - created_at > fallback.get() as f64 {
delayed_gets = true
hf_delayed_get (&pctx, gets_oe.key())
} } }

get_pieces_scheduler_en (&our_public_key, &mut dugout, gets_oe, &*pctx);
}

if delayed_gets {
if let Err (err) = hf_poll (&our_public_key) {
log! ("hf_poll error: " (err))
} }
if let Err (err) = hf_poll (&pctx) {
log! ("hf_poll error: " (err))
}

// Remove old entries from `http_fallback_maps`.
if let Ok (mut http_fallback_maps) = pctx.http_fallback_maps.lock() {
http_fallback_maps.retain (|_seed, track| now - track.last_store < 600.)
}
if let Ok (mut recently_fetched) = pctx.recently_fetched.lock() {
recently_fetched.retain (|_, (lm, _)| now - *lm < 600.)
}

let after_boot_sec = 20.; // In order not to loose some potentially good but not yet checked nodes from a previous state.
if bootstrapped != 0. && now - bootstrapped > after_boot_sec && now - last_state_save > 600. {

0 comments on commit 815e470

Please sign in to comment.
You can’t perform that action at this time.