Skip to content
This repository has been archived by the owner on Jan 17, 2020. It is now read-only.

Commit

Permalink
Better story about in flight messages and rate limit (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
flxo authored and Ravi Teja committed Feb 28, 2019
1 parent f585876 commit 5660643
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 160 deletions.
207 changes: 81 additions & 126 deletions src/client/connection.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
use crate::client::{
mqttstate::MqttState,
network::stream::NetworkStream,
prepend::{Prepend, StreamExt},
Command, Notification, Request, UserHandle,
mqttstate::MqttState, network::stream::NetworkStream, prepend::Prepend, Command, Notification, Request, UserHandle,
};
use crate::codec::MqttCodec;
use crate::error::{ConnectError, NetworkError};
use crate::mqttoptions::{ConnectionMethod, MqttOptions, Proxy, ReconnectOptions};
use crossbeam_channel::{self, Sender};
use futures::{
future::{self, Either},
stream::{self, SplitStream},
future,
stream::{self, poll_fn, SplitStream},
sync::mpsc::{self, Receiver},
Future, Sink, Stream,
Async, Future, Poll, Sink, Stream,
};
use mqtt311::Packet;
use std::{cell::RefCell, rc::Rc, thread, time::{Duration, Instant}};
use tokio::runtime::current_thread::Runtime;
use std::{cell::RefCell, rc::Rc, thread, time::Duration};
use tokio::codec::Framed;
use tokio::timer::{timeout, Delay, Timeout};

// NOTES: Don't use `wait` in eventloop thread even if you
// are ok with blocking code. It might cause deadlocks
// https://github.com/tokio-rs/tokio-core/issues/182
use tokio::prelude::StreamExt;
use tokio::runtime::current_thread::Runtime;
use tokio::timer::{timeout, Timeout};

pub struct Connection {
mqtt_state: Rc<RefCell<MqttState>>,
Expand Down Expand Up @@ -81,12 +75,12 @@ impl Connection {
// NOTE: We need to use same reactor across threads because io resources (framed) will
// bind to reactor lazily.
// You'll face `reactor gone` error if `framed` is used again with a new recator
fn mqtt_eventloop(&mut self, mut request_rx: Receiver<Request>, mut command_rx: Receiver<Command>) {
let mut prepended_request_stream = self.prepend_stream(request_rx.by_ref());
fn mqtt_eventloop(&mut self, request_rx: Receiver<Request>, mut command_rx: Receiver<Command>) {
let network_request_stream = request_rx.map_err(|_| NetworkError::Blah);
let mut network_request_stream = network_request_stream.prependable();
let mut command_stream = self.command_stream(command_rx.by_ref());

'reconnection: loop {

let mqtt_connect_future = self.mqtt_connect();
let timeout = Duration::from_secs(30);
let (runtime, framed) = match self.connect_timeout(mqtt_connect_future, timeout) {
Expand All @@ -98,16 +92,16 @@ impl Connection {
let (network_sink, network_stream) = framed.split();
let network_sink = network_sink.sink_map_err(NetworkError::Io);
let network_reply_stream = self.network_reply_stream(network_stream);
let prepended_request_stream = &mut prepended_request_stream;
let command_stream = &mut command_stream;

// merge previous session's unacked data into current stream
self.merge_network_request_stream(prepended_request_stream);
let network_request_stream = self.request_stream(prepended_request_stream);
let network_request_stream = &mut network_request_stream;
// Insert previous session. If this is the first connect, the buffer in
// network_request_stream is empty.
network_request_stream.insert(self.mqtt_state.borrow_mut().handle_reconnection());
let network_request_stream = self.limit_in_flight_request_stream(network_request_stream);
let network_request_stream = self.request_stream(network_request_stream);

let delayed_request_stream = self.delayed_request_stream(network_request_stream);
let mqtt_future = self.mqtt_future(command_stream,
delayed_request_stream,
let mqtt_future = self.mqtt_future(&mut command_stream,
network_request_stream,
network_reply_stream,
network_sink);

Expand All @@ -121,7 +115,7 @@ impl Connection {
if self.should_reconnect_again() { continue 'reconnection } else { break 'reconnection }
}
}

/// Makes a blocking mqtt connection an returns framed and reactor
fn connect_timeout(
&mut self,
Expand Down Expand Up @@ -226,17 +220,15 @@ impl Connection {
.and_then(move |packet| future::ok(packet.into()));
let network_stream = network_reply_stream.select(network_request_stream);

if self.is_network_enabled {
Either::A(command_stream
.select(network_stream)
.forward(network_sink)
.map(|(_selct, _splitsink)| ()))
let stream = if self.is_network_enabled {
EitherStream::A(command_stream.select(network_stream))
} else {
Either::B(command_stream.forward(network_sink).map(|(_selct, _splitsink)| ()))
}
}

EitherStream::B(command_stream)
};

let throttled_stream = self.rate_limited_network_stream(stream);
throttled_stream.forward(network_sink).map(|(_, _)| ())
}

fn handle_connection_success(&mut self) {
self.connection_count += 1;
Expand Down Expand Up @@ -329,30 +321,6 @@ impl Connection {
network_stream.chain(stream::once(Err(NetworkError::NetworkStreamClosed)))
}

fn merge_network_request_stream(&mut self, previous_request_stream: &mut Prepend<impl RequestStream>) {
let mqtt_state = self.mqtt_state.clone();
let last_session_publishes = mqtt_state.borrow_mut().handle_reconnection();
previous_request_stream.merge_session(last_session_publishes);
}

/// Handles all incoming user and session requests and creates a stream of packets to send
/// on network
/// All the remaining packets in the last session (when cleansession = false) will be prepended
/// to user request stream to ensure that they are handled first. This cleanly handles last
/// session stray (even if disconnect happens while sending last session data)because we always
/// get back this stream from reactor after disconnection.
fn prepend_stream<'a>(&mut self, requests: &'a mut mpsc::Receiver<Request>) -> Prepend<impl RequestStream + 'a> {
let request_stream = requests
.map_err(|e| {
error!("User request error = {:?}", e);
NetworkError::Blah
});

let mqtt_state = self.mqtt_state.clone();
let last_session_publishes = mqtt_state.borrow_mut().handle_reconnection();
request_stream.prepend(last_session_publishes)
}

/// Handles all incoming user and session requests and creates a stream of packets to send
/// on network
/// All the remaining packets in the last session (when cleansession = false) will be prepended
Expand All @@ -367,6 +335,9 @@ impl Connection {
error!("User request error = {:?}", e);
NetworkError::Blah
})
.inspect(|request| {
debug!("{}", request_info(request));
})
.and_then(move |userrequest| {
let mut mqtt_state = mqtt_state.borrow_mut();
validate_userrequest(userrequest, &mut mqtt_state)
Expand All @@ -375,37 +346,40 @@ impl Connection {
let mqtt_state = self.mqtt_state.clone();
request_stream.and_then(move |packet: Packet| {
let mut mqtt_state = mqtt_state.borrow_mut();
let o = mqtt_state.handle_outgoing_mqtt_packet(packet);
future::result(o)
mqtt_state.handle_outgoing_mqtt_packet(packet)
})
}

fn delayed_request_stream<'a>(&self, stream: impl Stream<Item = Request, Error = NetworkError> + 'a)-> impl Stream<Item = Request, Error = NetworkError> + 'a {
let outgoing_ratedelay = self
.mqttoptions
.outgoing_ratelimit()
.map(|rate| Duration::from_millis(1000/rate));
let (limit, queuedelay) = self
.mqttoptions
.outgoing_queuelimit();

// Apply outgoing queue limit (in flights) by answering stream poll with not ready if queue is full
// by returning NotReady.
fn limit_in_flight_request_stream(&self, requests: impl RequestStream) -> impl RequestStream {
let mqtt_state = self.mqtt_state.clone();

stream.and_then(move |request| {
let request = request;
let mqtt_state = mqtt_state.borrow();
let len = mqtt_state.publish_queue_len();
debug!("Outgoing request = {:?}", request_info(&request));

// set rate limiting if the option is set
if let Some(ratedelay) = outgoing_ratedelay {
Either::A(throttled_request(ratedelay, queuedelay, len, limit, request))
let in_flight = self.mqttoptions.in_flight();
let mut stream = requests.peekable();
poll_fn(move || -> Poll<Option<Request>, NetworkError> {
if mqtt_state.borrow().publish_queue_len() >= in_flight {
match stream.peek() {
Err(_) => stream.poll(),
_ => Ok(Async::NotReady)
}
} else {
Either::B(nonthrottled_request(queuedelay, len, limit, request))
stream.poll()
}
})
}

// Apply rate limit if configured
fn rate_limited_network_stream(&mut self, request: impl PacketStream) -> impl PacketStream {
if let Some(rate) = self.mqttoptions.outgoing_ratelimit() {
let duration = Duration::from_nanos(1_000_000_000 / rate);
let throttled = request.throttle(duration)
.map_err(|_| NetworkError::ThrottleError);
EitherStream::A(throttled)
} else {
EitherStream::B(request)
}
}

fn command_stream<'a>(&mut self, commands: &'a mut mpsc::Receiver<Command>) -> impl PacketStream + 'a {
// process user commands and raise appropriate error to the event loop
commands
Expand All @@ -417,46 +391,6 @@ impl Connection {
}
}

fn throttled_request(
throttle_delay: Duration,
queuelimit_delay: Duration,
current_queue_size: usize,
queue_limit: usize,
request: Request)
-> impl Future<Item = Request, Error = NetworkError> {

if current_queue_size > queue_limit {
debug!("queue len = {}, limit = {}", current_queue_size, queue_limit);
let out = Delay::new(Instant::now() + queuelimit_delay)
.map_err(|e| e.into())
.map(|_| request);
Either::A(out)
} else {
let out = Delay::new(Instant::now() + throttle_delay)
.map_err(|e| e.into())
.map(|_| request);
Either::B(out)
}
}

fn nonthrottled_request(
queuelimit_delay: Duration,
current_queue_size: usize,
queue_limit: usize,
request: Request)
-> impl Future<Item = Request, Error = NetworkError> {

if current_queue_size > queue_limit {
debug!("queue len = {}, limit = {}", current_queue_size, queue_limit);
let out = Delay::new(Instant::now() + queuelimit_delay)
.map_err(|e| e.into())
.map(|_| request);
Either::A(out)
} else {
Either::B(future::ok(request))
}
}

fn handle_stream_timeout_error(
error: timeout::Error<NetworkError>,
mqtt_state: &mut MqttState)
Expand All @@ -465,12 +399,9 @@ fn handle_stream_timeout_error(
let out = mqtt_state.handle_outgoing_mqtt_packet(Packet::Pingreq);
future::err(error).or_else(move |e| {
if e.is_elapsed() {
match out {
Ok(packet) => future::ok(packet),
Err(e) => future::err(e),
}
out
} else {
future::err(e.into_inner().unwrap())
Err(e.into_inner().unwrap())
}
})
}
Expand Down Expand Up @@ -590,6 +521,30 @@ impl<T> RequestFuture for T where T: Future<Item = Request, Error = NetworkError
trait FramedFuture: Future<Item = MqttFramed, Error = ConnectError> {}
impl<T> FramedFuture for T where T: Future<Item = MqttFramed, Error = ConnectError> {}

// TODO: Remove if impl Either for Stream is backported to futures 0.1
// See https://github.com/rust-lang-nursery/futures-rs/issues/614
#[derive(Debug)]
pub enum EitherStream<A, B> {
/// First branch of the type
A(A),
/// Second branch of the type
B(B),
}

impl<A, B> Stream for EitherStream<A, B>
where A: Stream,
B: Stream<Item = A::Item, Error = A::Error>
{
type Item = A::Item;
type Error = A::Error;

fn poll(&mut self) -> Poll<Option<A::Item>, A::Error> {
match *self {
EitherStream::A(ref mut a) => a.poll(),
EitherStream::B(ref mut b) => b.poll(),
}
}
}

// fn print_last_session_state(
// prepend: &mut Prepend<impl RequestStream>,
Expand Down
42 changes: 21 additions & 21 deletions src/client/prepend.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,59 @@
use futures::{Async, Poll, Stream};
use std::collections::VecDeque;
use std::iter::IntoIterator;

pub trait StreamExt: Stream {
fn prepend(self, first: VecDeque<Self::Item>) -> Prepend<Self>
pub trait Prepend: Stream {
fn prependable(self) -> Prependable<Self>
where
Self: Sized,
{
new(self, first)
new(self)
}
}

impl<T: ?Sized> StreamExt for T where T: Stream {}
impl<T: ?Sized> Prepend for T where T: Stream {}

/// An adapter for chaining the output of two streams.
///
/// The resulting stream produces items from first stream and then
/// from second stream.
#[must_use = "streams do nothing unless polled"]
pub struct Prepend<S>
pub struct Prependable<S>
where
S: Stream,
{
stream: S,
pub session: VecDeque<<S as Stream>::Item>,
items: VecDeque<<S as Stream>::Item>,
}

pub fn new<S>(stream: S, session: VecDeque<<S as Stream>::Item>) -> Prepend<S>
pub fn new<S>(stream: S) -> Prependable<S>
where
S: Stream,
{
Prepend { stream, session }
Prependable {
stream,
items: VecDeque::new(),
}
}

impl<S> Prepend<S>
impl<S> Prependable<S>
where
S: futures::Stream,
{
pub fn merge_session(&mut self, session: VecDeque<<S as Stream>::Item>) {
self.session.extend(session)
/// Insert items in between present items and wrapped stream
pub fn insert(&mut self, items: impl IntoIterator<Item = <S as Stream>::Item>) {
self.items.extend(items)
}
}

impl<S> Stream for Prepend<S>
impl<S> Stream for Prependable<S>
where
S: Stream,
{
type Item = <S as Stream>::Item;
type Error = <S as Stream>::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(v) = self.session.pop_front() {
debug!("Sending previous session data");
return Ok(Async::Ready(Some(v)));
if let Some(v) = self.items.pop_front() {
Ok(Async::Ready(Some(v)))
} else {
self.stream.poll()
}

self.stream.poll()
}
}
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub enum NetworkError {
Timer(timer::Error),
#[fail(display = "Tokio timer error = {}", _0)]
TimeOut(timeout::Error<IoError>),
#[fail(display = "Tokio timer error")]
ThrottleError,
#[fail(display = "User requested for reconnect")]
UserReconnect,
#[fail(display = "User requested for disconnect")]
Expand Down

0 comments on commit 5660643

Please sign in to comment.