Skip to content
This repository has been archived by the owner on Jan 17, 2020. It is now read-only.

Feature: Connection status notifications and notifications drop handling #143

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
142 changes: 95 additions & 47 deletions src/client/connection.rs
Expand Up @@ -2,26 +2,35 @@ use crate::client::{
mqttstate::{MqttConnectionStatus, MqttState},
network::stream::NetworkStream,
prepend::Prepend,
Command, Notification, Request, UserHandle,
Command, Notification, Request,
};
use crate::codec::MqttCodec;
use crate::error::{ConnectError, NetworkError};
use crate::mqttoptions::{ConnectionMethod, MqttOptions, Proxy, ReconnectOptions};
use crossbeam_channel::{self, Sender};
use futures::{
future::{self, Either},
stream::{self, poll_fn, SplitStream},
stream::{once, poll_fn, SplitStream},
sync::mpsc::{self, Receiver},
sync::oneshot,
Async, Future, Poll, Sink, Stream,
};
use mqtt311::Packet;
use std::{cell::RefCell, rc::Rc, thread, time::Duration};
use std::{cell::RefCell, rc::Rc, thread, time::{Duration}};
use tokio::runtime::current_thread::Runtime;
use tokio::codec::Framed;
use tokio::prelude::StreamExt;
use tokio::runtime::current_thread::Runtime;
use tokio::timer::{timeout, Timeout};

pub struct Connection {
/// Combines handles returned by the eventloop
pub(crate) struct UserHandle {
pub request_tx: mpsc::Sender<Request>,
pub command_tx: mpsc::Sender<Command>,
pub notification_rx: crossbeam_channel::Receiver<Notification>,
pub notification_closed_rx: oneshot::Receiver<()>,
}

pub(crate) struct Connection {
mqtt_state: Rc<RefCell<MqttState>>,
notification_tx: Sender<Notification>,
connection_tx: Option<Sender<Result<(), ConnectError>>>,
Expand All @@ -33,8 +42,9 @@ pub struct Connection {
impl Connection {
/// Takes mqtt options and tries to create initial connection on current thread and handles
/// connection events in a new thread if the initial connection is successful
pub fn run(mqttoptions: MqttOptions) -> Result<UserHandle, ConnectError> {
pub(crate) fn run(mqttoptions: MqttOptions) -> Result<UserHandle, ConnectError> {
let (notification_tx, notification_rx) = crossbeam_channel::bounded(mqttoptions.notification_channel_capacity());
let (notification_closed_tx, notification_closed_rx) = oneshot::channel::<()>();
let (request_tx, request_rx) = mpsc::channel::<Request>(mqttoptions.request_channel_capacity());
let (command_tx, command_rx) = mpsc::channel::<Command>(5);

Expand All @@ -43,7 +53,7 @@ impl Connection {

// start the network thread to handle all mqtt network io
thread::spawn(move || {
let mqtt_state = Rc::new(RefCell::new(MqttState::new(mqttoptions.clone())));
let mqtt_state = Rc::new(RefCell::new(MqttState::new(mqttoptions.clone(), notification_closed_tx)));
let mut connection = Connection {
mqtt_state,
notification_tx,
Expand All @@ -61,6 +71,7 @@ impl Connection {
request_tx,
command_tx,
notification_rx,
notification_closed_rx,
};

match reconnect_option {
Expand Down Expand Up @@ -151,34 +162,44 @@ impl Connection {
}

fn mqtt_io(&mut self, mut runtime: Runtime, mqtt_future: impl Future<Item = (), Error = NetworkError>) -> Result<(), bool> {
let mqtt_state = self.mqtt_state.clone();
let notification_tx = self.notification_tx.clone();
let notify = move |n| {
if mqtt_state.borrow().send_notifications() {
let err = notification_tx.send(n);
drop(err);
}
};

match runtime.block_on(mqtt_future) {
// don't use user defined reconnection behaviour here
Err(NetworkError::UserDisconnect) => {
debug!("User commanded for network disconnect");
self.is_network_enabled = false;
notify(Notification::Disconnected(Ok(())));
Err(true)
}
// don't use user defined reconnection behaviour here
Err(NetworkError::UserReconnect) => {
debug!("User commanded for network reconnect");
// No connection notification here!
self.is_network_enabled = true;
Err(true)
}
Err(NetworkError::NetworkStreamClosed) => {
Err(e) => {
let mqtt_state = self.mqtt_state.borrow();
if mqtt_state.connection_status() == MqttConnectionStatus::Disconnecting {
debug!("Shutting down gracefully");
notify(Notification::Disconnected(Ok(())));
Err(false)
} else {
notify(Notification::Disconnected(Err(ConnectError::NetworkError(e))));
Err(self.should_reconnect_again())
}
}
Err(e) => {
error!("Event loop returned. Error = {:?}", e);
Err(self.should_reconnect_again())
}
Ok(_v) => {
warn!("Strange!! Evenloop finished");
notify(Notification::Disconnected(Ok(())));
Err(self.should_reconnect_again())
}
}
Expand Down Expand Up @@ -225,9 +246,16 @@ impl Connection {
fn handle_connection_success(&mut self) {
self.connection_count += 1;

if self.connection_count == 1 {
let connection_tx = self.connection_tx.take().unwrap();
connection_tx.send(Ok(())).unwrap();
if self.mqtt_state.borrow().send_notifications() {
// Ignore SendError here because the notification handle could be dropped
// after checking it here. That's ok.
let result = self.notification_tx.send(Notification::Connected);
drop(result);
}

if let Some(connection_tx) = self.connection_tx.take() {
// TODO: Error handling
connection_tx.send(Ok(())).expect("Channel error")
}
}

Expand All @@ -239,9 +267,15 @@ impl Connection {
None => Err(ConnectError::Timeout),
};

if self.connection_count == 1 {
let connection_tx = self.connection_tx.take().unwrap();
connection_tx.send(error).unwrap();

if let Some(connection_tx) = self.connection_tx.take() {
// TODO: Error handling
connection_tx.send(error).expect("Channel error");
} else if self.mqtt_state.borrow().send_notifications() {
let result = self.notification_tx.send(Notification::Disconnected(error));
// Ignore SendError here because the notification handle could be dropped
// after checking it here. That's ok.
drop(result);
}
}

Expand Down Expand Up @@ -295,32 +329,37 @@ impl Connection {
/// channel) and creates a stream of packets to send on network
fn network_reply_stream(&self, network_stream: SplitStream<MqttFramed>) -> impl PacketStream {
let mqtt_state = self.mqtt_state.clone();
let mqtt_state_notification = self.mqtt_state.clone();
let keep_alive = self.mqttoptions.keep_alive();
let notification_tx = self.notification_tx.clone();

let network_stream = network_stream
.map_err(NetworkError::Io)
.and_then(move |packet| {
debug!("Incoming packet = {:?}", packet_info(&packet));
let reply = mqtt_state.borrow_mut().handle_incoming_mqtt_packet(packet);
future::result(reply)
let mut mqtt_state = mqtt_state.borrow_mut();
mqtt_state.handle_incoming_mqtt_packet(packet)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why you've moved linear and_then to nested and_then? IMO chaining promises one after the other is much more readable than nesting combinators, which can lead to call back hell sort of code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This two lines just remove the explicit future::ok(). The Result<> returned by handle_incoming_mqtt_packet can be used directly and coverts because of IntoFuture for Result. Not sure what you mean by linear and nested her.

Copy link

@tekjar tekjar Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Got it.

What I meant was that method 2 below will keep combinators like and_then one after the other instead of one inside the other

.and_then(move |packet| {
                debug!("Incoming packet = {:?}", packet_info(&packet));
                let mut mqtt_state = mqtt_state.borrow_mut();
                mqtt_state.handle_incoming_mqtt_packet(packet)
                    .and_then(|(notification, reply)| {
                        handle_notification(notification)
                        Ok(reply)
                })
})
.filter(should_forward_packet);

vs

.and_then(move |packet| {
        debug!("Incoming packet = {:?}", packet_info(&packet));
        let mut mqtt_state = mqtt_state.borrow_mut();
        let o = mqtt_state.handle_incoming_mqtt_packet(packet)
        future::result(o)
})
.map(|(notification, reply)| {
        handle_notification(notification)
        reply
})
.filter(should_forward_packet);

})
.and_then(move |(notification, reply)| {
handle_notification(notification, &notification_tx);
future::ok(reply)
let mqtt_state = mqtt_state_notification.borrow();
match notification {
Notification::None => (),
// Ignore error on notification_tx send, since the receiver can be dropped at any time
_ if mqtt_state.send_notifications() => drop(notification_tx.send(notification)),
Copy link

@tekjar tekjar Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this will block the event loop. What happens when the receiver is not dropped and the channel is full?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This blocks the loop. As far a I got it that's ok in order to backpressure on the broker.
A option would be to use a unbounded channel - but this is probably not what we want here.

I'm not fluent in the MQTT spec: What is a broker allowed to do with clients that back pressure?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking the event loop is not ok just because the receiver is not able to catch up. Pings should still continue to happen to prevent broker disconnecting the client and publisher should not be slow because the receiver is doing some heavy computation.

This is getting a little tricky than I anticipated. I'll give proper thought to this and ping you in a few days.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. You're right. I forgot the pings...sorry. Will think about that too.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool :). Sorry for the back and forths on this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Options regarding the pings:

  1. Accept that an overload of the client will stop sending pings. The broker will probably disconnect in such a situation (or has to according the spec?). The timeouts are quite relaxed to be fine with a short delayed ping. This is the behaviour on notification_refactor (this PR).
  2. A unbound notification queue: sorry - bad idea...
  3. Discard notifications when the notification queue is full. Behaviour on master. I'd really prefer option 1 one over 3 since at least you see what happened.
  4. Another idea is to split the notification queue into several ones to give the client more flexibility what to handle or which of the Receivers to drop. e.g Notification::Puback could be less interesting that Notification::Connected or Notification::Disconnected(_). Here still the decision whether a Notification can be discarded or not has to be done.
  5. Configuration options to choose between option 1 and 3. Should not be a feature but could be added to the config struct.

What do you think?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @flxo. Sorry for the delay on this. Coming to the options that you've mentioned

  1. Blocking the event loop, in general, isn't a good idea. It's not just about pings and disconnects. If someone is doing publishes from a different thread and the receiver is doing heavy computations, publishers won't progress. Just because crossbeam channel is full, blocking the publisher doesn't make any sense. Also, the eventloop block will keep interfering with any future features we might need on the eventloop.

  2. Agreed :)

  3. Sounds good

Personally, I feel that we should provide an option for future channels along with crossbeam channel for notifications. Future channels won't have this problem of blocking the eventloop while doing a blocking send. Also we won't lose data silently. If people choose to use crossbeam channel, the get the flexibility of select but might miss incoming data.

I'm open to discussion on this. May on a new issue :)

Also, I'll busy this month and won't be able to do a lot of rumqtt work. But we can try to reach a conclusion this month and implement it in the next

_ => (),
}
Ok(reply)
})
.filter(|reply| should_forward_packet(reply))
.and_then(future::ok);
.filter(should_forward_packet)
.chain(once(Err(NetworkError::NetworkStreamClosed)));

let network_reply_stream = network_stream.chain(stream::once(Err(NetworkError::NetworkStreamClosed)));
let mqtt_state = self.mqtt_state.clone();
Timeout::new(network_reply_stream, keep_alive)
Timeout::new(network_stream, keep_alive)
.or_else(move |e| {
let mut mqtt_state = mqtt_state.borrow_mut();
handle_stream_timeout_error(e, &mut mqtt_state)
})
.filter(|reply| should_forward_packet(reply))
.and_then(move |packet| future::ok(packet.into()))
.filter(should_forward_packet)
.map(std::convert::Into::into)
}

/// Handles all incoming user and session requests and creates a stream of packets to send
Expand Down Expand Up @@ -409,16 +448,6 @@ fn validate_userrequest(userrequest: Request, mqtt_state: &mut MqttState) -> imp
}
}

fn handle_notification(notification: Notification, notification_tx: &Sender<Notification>) {
match notification {
Notification::None => (),
_ => match notification_tx.try_send(notification) {
Ok(()) => (),
Err(e) => error!("Notification send failed. Error = {:?}", e),
},
}
}

/// Checks if incoming packet is mqtt connack packet. Useful after mqtt
/// connect when we are waiting for connack but not any other packet.
fn check_and_validate_connack(packet: Option<Packet>, framed: MqttFramed, mqtt_state: &mut MqttState) -> impl FramedFuture {
Expand Down Expand Up @@ -547,7 +576,7 @@ impl Sink for BlackHole {
type SinkItem = Packet;
type SinkError = NetworkError;

fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
fn start_send(&mut self, _item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
Ok(AsyncSink::Ready)
}

Expand All @@ -562,11 +591,13 @@ impl Sink for BlackHole {

#[cfg(test)]
mod test {
use super::{Connection, MqttOptions, MqttState, NetworkError, ReconnectOptions};
use super::{Connection, MqttOptions, MqttState, ConnectError, NetworkError, Notification, ReconnectOptions};
use crossbeam_channel::Receiver;

use futures::{
future,
stream::{self, Stream},
sync::oneshot,
};
use mqtt311::Packet;
use mqtt311::Publish;
Expand All @@ -577,11 +608,12 @@ mod test {
use std::time::Instant;
use tokio::runtime::current_thread::Runtime;

fn mock_mqtt_connection(mqttoptions: MqttOptions) -> (Connection, Runtime) {
fn mock_mqtt_connection(mqttoptions: MqttOptions) -> (Connection, Runtime, (Receiver<Notification>, oneshot::Receiver<()>)) {
let (connection_tx, _connection_rx) = crossbeam_channel::bounded(1);
let (notification_tx, _notification_rx) = crossbeam_channel::bounded(10);
let (notification_tx, notification_rx) = crossbeam_channel::bounded(10);
let (notification_closed_tx, notification_closed_rx) = oneshot::channel::<()>();

let mqtt_state = MqttState::new(mqttoptions.clone());
let mqtt_state = MqttState::new(mqttoptions.clone(), notification_closed_tx);
let mqtt_state = Rc::new(RefCell::new(mqtt_state));
let connection = Connection {
mqtt_state,
Expand All @@ -593,7 +625,7 @@ mod test {
};

let runtime = Runtime::new().unwrap();
(connection, runtime)
(connection, runtime, (notification_rx, notification_closed_rx))
}

fn sample_outgoing_publishes() -> Vec<Packet> {
Expand Down Expand Up @@ -621,17 +653,33 @@ mod test {

let mqttoptions = MqttOptions::new("mqtt-io-test", "localhost", 1883).set_reconnect_opts(reconnect_opt);

let (mut connection, runtime) = mock_mqtt_connection(mqttoptions);
let (mut connection, runtime, (_, _)) = mock_mqtt_connection(mqttoptions);

let network_future = future::err::<(), _>(NetworkError::NetworkStreamClosed);
let out = connection.mqtt_io(runtime, network_future);
assert_eq!(out, Err(true));
}

#[test]
fn connection_status_connected_disconnected() {
let reconnect_opt = ReconnectOptions::Always(10);

let mqttoptions = MqttOptions::new("mqtt-io-test", "localhost", 1883).set_reconnect_opts(reconnect_opt);

let (mut connection, runtime, (notifications, _notification_closed_rx)) = mock_mqtt_connection(mqttoptions);

let network_future = future::err::<(), _>(NetworkError::NetworkStreamClosed);
let _ = connection.mqtt_io(runtime, network_future);
match notifications.recv().expect("Failed to recv") {
Notification::Disconnected(Err(ConnectError::NetworkError(NetworkError::NetworkStreamClosed))) => (),
n => panic!("Invalid notification: {:?}", n),
}
}

#[test]
fn rate_limiting_to_the_stream_behaves_right() {
let mqttoptions = MqttOptions::default().set_outgoing_ratelimit(5);
let (mut connection, mut runtime) = mock_mqtt_connection(mqttoptions);
let (mut connection, mut runtime, (_, _)) = mock_mqtt_connection(mqttoptions);

let publishes = sample_outgoing_publishes();
let packet_stream = stream::iter_ok(publishes);
Expand Down