Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnection issue #34

Closed
mojzu opened this issue Feb 12, 2020 · 13 comments
Closed

Reconnection issue #34

mojzu opened this issue Feb 12, 2020 · 13 comments

Comments

@mojzu
Copy link

mojzu commented Feb 12, 2020

Hi, I'm in the process of migrating from the rumqtt client to this library and running into reconnection problems during testing. When I disconnect the client from the network the broker is on I get the following error:

{
  "file": "/build/.cargo/registry/src/github.com-1ecc6299db9ec823/rumq-client-0.1.0-alpha.4/src/state.rs",
  "level": "ERROR",
  "line": 263,
  "message": "Error awaiting for last ping response",
  "module_path": "rumq_client::state",
  "target": "rumq_client::state",
  "time": "2020-02-12T14:30:57.027601409+00:00"
}

However when I reconnect the client to the network I continue getting the same error and the client doesn't reconnect. Based on the logs I'm not receiving a Reconnection or Disconnection notification, but with each error I am getting a StreamEnd:

{
  "file": "app/src/mqtt/mod.rs",
  "level": "DEBUG",
  "line": 551,
  "message": "notification: StreamEnd(MqttState(AwaitPingResp))",
  "module_path": "app::mqtt",
  "target": "app::mqtt",
  "time": "2020-02-12T15:28:10.837594182+00:00"
}

In case I've done something wrong here is my code for setting up the client options.

    fn rumq_options(&self) -> rumq_client::MqttOptions {
        let mut options = rumq_client::MqttOptions::new(&self.client_id, &self.host, self.port);
        options
            .set_keep_alive(10)
            .set_throttle(Duration::from_secs(1))
            .set_clean_session(false)
            .set_request_channel_capacity(128)
            .set_notification_channel_capacity(128);

        if let Some(user_password) = &self.user_password {
            options.set_credentials(&user_password.user, &user_password.password);
        }
        if let Some(tls) = &self.tls {
            options.set_ca(tls.ca_pem.clone());
            // mqttoptions.set_client_auth(client_cert, client_key);
        }
        options
    }

And my code for spawning the tasks for sending/receiving.

    /// Start asynchronous loops to handle events.
    pub async fn start(self) -> (task::JoinHandle<()>, task::JoinHandle<()>) {
        let mqtt = Arc::new(self);

        let (mut requests_tx, requests_rx) = channel(10);
        let mut eventloop = eventloop(mqtt.options.rumq_options(), requests_rx);

        mqtt.metric_connected().set(1);
        mqtt.rumq_subscribe(&mut requests_tx).await;

        let mqtt1 = mqtt.clone();
        let mut requests_tx1 = requests_tx.clone();
        let poll_task = task::spawn(async move {
            loop {
                let mqtt = mqtt1.clone();
                mqtt.poll_handler(&mut requests_tx1).await;
                time::delay_for(Duration::from_secs(1)).await;
            }
        });

        let notification_task = task::spawn(async move {
            loop {
                let mqtt = mqtt.clone();
                let mut stream = eventloop.stream();
                while let Some(item) = stream.next().await {
                    mqtt.notification_handler(item, &mut requests_tx).await;
                }
                time::delay_for(Duration::from_secs(1)).await;
            }
        });

        (poll_task, notification_task)
    }

If there's any other information I can provide let me know. Thanks for the help and the great library, migrating was much easier than I expected and the new interface works very well with my other async code 👍

@tekjar
Copy link
Contributor

tekjar commented Feb 13, 2020

The code looks correct to me

However when I reconnect the client to the network I continue getting the same error and the client doesn't reconnect. Based on the logs I'm not receiving a Reconnection or Disconnection notification, but with each error I am getting a StreamEnd

eventloop.stream() should reconnect but notifications like Connected aren't implemented in the current master yet. It's implemented in the next release though. That can be used to resubscribe when the connection is established.

  "message": "notification: StreamEnd(MqttState(AwaitPingResp))",

StreamEnd is the last element of the stream when it encounters errors. while let Some(item) = stream.next().await loop should break after that and let mut stream = eventloop.stream(); should trigger a connection. Let me know if that's not happening

If there's any other information I can provide let me know.

If things aren't working as expected, please provide me a minimal example. Helps me fix faster :)

new interface works very well with my other async code

I'm really happy with the current design. Simple to maintain unlike the last one and easy to customize :)

@mojzu
Copy link
Author

mojzu commented Feb 13, 2020

Thanks for the help, that explains not getting reconnection/disconnect notifications yet.

I also tried switching to using #[tokio::main(basic_scheduler)] as I noticed it was a difference between my code and the example, but it doesn't appear to have made a difference.

I'm a bit swamped at the minute (as we all are), but I'll try and put together a minimal reproduction over the weekend.

@mojzu
Copy link
Author

mojzu commented Feb 17, 2020

This is what I have to try and reproduce this so far. If I run the Mosquitto server using docker:

docker run --rm -p 1883:1883 eclipse-mosquitto:latest

And then run the reconnection example:

cargo run --example reconnection

I get the following output from the reconnection example when I stop the server to simulate disconnection:

Received = Puback(PacketIdentifier(9))
Received = Puback(PacketIdentifier(10))
Stream done
Received = StreamEnd(Network(Io(Os { code: 111, kind: ConnectionRefused, message: "Connection refused" })))
Stream done
Received = StreamEnd(Network(Io(Os { code: 111, kind: ConnectionRefused, message: "Connection refused" })))
Stream done
Received = StreamEnd(Network(Io(Os { code: 111, kind: ConnectionRefused, message: "Connection refused" })))
Stream done
Received = Puback(PacketIdentifier(11))
Received = Puback(PacketIdentifier(12))

Client is reconnecting successfully. If I then modify the reconnection example to connect to our production MQTT server (TLS + credentials) and disconnect the network to test disconnection I get the following:

Received = Puback(PacketIdentifier(4))
Received = Puback(PacketIdentifier(5))
 ERROR rumq_client::state > Error awaiting for last ping response
Received = StreamEnd(MqttState(AwaitPingResp))
Stream done
Received = Puback(PacketIdentifier(6))
Received = Puback(PacketIdentifier(7))

I'm now getting the same error as my other application, although reconnection is still working which suggests I may be doing something else wrong. I tried to modify the reconnection example to be more similar to my application but this didn't change the behaviour (https://github.com/mojzu/rumq/blob/master/rumq-client/examples/reconnection.rs).

When I get another chance I'll keep trying to modify the reconnection example until it either reproduces the error or shows me what I've done wrong in my application.

@tekjar
Copy link
Contributor

tekjar commented Feb 18, 2020

You mean all the MVEs are working fine as expected?

@qm3ster
Copy link
Contributor

qm3ster commented Feb 23, 2020

I feel like I'm in the same boat.
If a packet is dropped (eg if I unplug the cable) and I get even a single AwaitPingResp, I keep getting AwaitPingResp forever (which causes the stream to break), despite actually having a valid connection (I get Subacks, and Publishes and everything)

@qm3ster
Copy link
Contributor

qm3ster commented Feb 23, 2020

"minimum" repro:

use rumq_client::MqttOptions;
use rumq_client::QoS::AtLeastOnce;
use std::time::Duration;
use tokio::stream::StreamExt;
use tokio::sync::mpsc;
use tokio::time::delay_for;
#[tokio::main]
async fn main() {
    let mut options = MqttOptions::new("Obama", "10.22.22.6", 1883);
    options
        .set_credentials("mqttuser", "mqttpass")
        .set_keep_alive(5);
    let (mut req_tx, req_rx) = mpsc::channel(10);
    let mut eventloop = rumq_client::eventloop(options, req_rx);
    loop {
        let mut stream = eventloop.stream();
        while let Some(notif) = stream.next().await {
            println!("{:?}", notif);
            use rumq_client::Notification as N;
            use rumq_client::Request as R;
            use rumq_client::{publish, subscribe};
            let req = match notif {
                N::Connected => Some(R::Subscribe(subscribe("in", AtLeastOnce))),
                N::Publish(pubar) => Some(R::Publish(publish("out", AtLeastOnce, pubar.payload))),
                _ => None,
            };
            if let Some(req) = req {
                req_tx
                    .send(req)
                    .await
                    .expect("`requests` dropped by eventstream")
            }
        }
        eprintln!("stream ended, retrying in 100ms");
        delay_for(Duration::from_millis(100)).await;
    }
}
[package]
name = "min-example"
version = "0.1.0"
edition = "2018"

[dependencies]
rumq-client = "0.1.0-alpha.5"
tokio = {version = "0.2.11", features = ["stream", "macros", "sync"]}

Output:

Connected
Suback(Suback { pkid: PacketIdentifier(1), return_codes: [Success(AtLeastOnce)] })
StreamEnd(MqttState(AwaitPingResp))
stream ended, retrying in 100ms
StreamEnd(Timeout(Elapsed(())))
stream ended, retrying in 100ms
Connected
Suback(Suback { pkid: PacketIdentifier(2), return_codes: [Success(AtLeastOnce)] })
StreamEnd(MqttState(AwaitPingResp))
stream ended, retrying in 100ms
Connected
Suback(Suback { pkid: PacketIdentifier(3), return_codes: [Success(AtLeastOnce)] })
StreamEnd(MqttState(AwaitPingResp))
stream ended, retrying in 100ms
Connected
Suback(Suback { pkid: PacketIdentifier(4), return_codes: [Success(AtLeastOnce)] })
StreamEnd(MqttState(AwaitPingResp))
stream ended, retrying in 100ms
Connected
Suback(Suback { pkid: PacketIdentifier(5), return_codes: [Success(AtLeastOnce)] })
Publish(Topic = in, Qos = AtMostOnce, Retain = false, Pkid = None, Payload Size = 8)
Puback(PacketIdentifier(6))
StreamEnd(MqttState(AwaitPingResp))
stream ended, retrying in 100ms
Connected
Suback(Suback { pkid: PacketIdentifier(7), return_codes: [Success(AtLeastOnce)] })
Publish(Topic = in, Qos = AtMostOnce, Retain = false, Pkid = None, Payload Size = 8)
Puback(PacketIdentifier(8))
StreamEnd(MqttState(AwaitPingResp))
stream ended, retrying in 100ms
Connected
Suback(Suback { pkid: PacketIdentifier(9), return_codes: [Success(AtLeastOnce)] })
StreamEnd(MqttState(AwaitPingResp))
stream ended, retrying in 100ms
Connected
Suback(Suback { pkid: PacketIdentifier(10), return_codes: [Success(AtLeastOnce)] })

@mojzu
Copy link
Author

mojzu commented Feb 26, 2020

I've rebuilt my code with the branch from qm3ster and retested using our production MQTT server. I'm no longer getting the repeated "Error awaiting for last ping response" error and I'm getting new messages afterwards which suggests this issue is fixed!

I think I was struggling to recreate it because I was testing with an MQTT server running on localhost and pretending to disconnect the network by stopping the server. I'm guessing this wasn't leading to dropped packets in the same way as when I did these tests over a network and physically removed the ethernet cable.

@tekjar
Copy link
Contributor

tekjar commented Mar 2, 2020

Sorry for the delay. This will be fixed in the next few days

@qm3ster
Copy link
Contributor

qm3ster commented Mar 2, 2020

@mojzu were you still getting some or all messages (together with the repeating error)? Or did this error completely block incoming in your case?

@qm3ster
Copy link
Contributor

qm3ster commented Mar 2, 2020

It's just that the only branch that could reset await_pingresp was successfully getting a pingresp, and the branch that made a pingreq asking for one was conveniently the only thing blocked by checking for this error. But no one else checked for it, so it didn't actually interfere with handling any other packet (unless I myself got spooked by the error and dropped the eventloop)

@tekjar
Copy link
Contributor

tekjar commented Mar 3, 2020

I'll probably work on this after 2 days. One problem I see (and as @qm3ster rightly pointed out), await_pingresp should be reset correctly. The current master will work correctly as long as there are remaining publishes in the request stream but idle state will be a problem after the initial (pingreq) error as the eventloop will try to send pingreq (2nd time) and await_pingesp is still true from the previous iteration (and this repeats for every next ping request)

I think reset should happen here

@tekjar
Copy link
Contributor

tekjar commented Mar 7, 2020

Fixed in #42

@mojzu
Copy link
Author

mojzu commented Mar 8, 2020

I've upgraded to alpha.6 version and retested, and the issue is fixed.

I did have a compilation error while upgrading of unresolved import tokio::select, turns out some of my other dependencies were pulling the tokio version down to 0.2.6. Don't think it's a bug but in case anyone else runs into it, it was resolved by setting the tokio version in my projects Cargo.toml to "0.2.13" rather than "0.2".

Thanks for the fix and again for the great library! 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants