Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{ kind: Interrupted, error: "Mailbox has closed" } #7

Closed
xudesheng opened this issue Mar 6, 2020 · 13 comments
Closed

{ kind: Interrupted, error: "Mailbox has closed" } #7

xudesheng opened this issue Mar 6, 2020 · 13 comments

Comments

@xudesheng
Copy link
Contributor

xudesheng commented Mar 6, 2020

First step, I added a loop to continuously publish message and it works:
`
let mut total_count = 0;
let mut rng = rand::thread_rng();

            loop {
                total_count += 1;
                if total_count>1000 {break;}

                println!("Publish: {}", total_count);
                let payload = json!({
                    "temperature": 707 + rng.gen_range(0, 70),
                    "humidity": 202 + rng.gen_range(0, 20),
                });

                let payload = serde_json::to_string(&payload).unwrap();
                println!("payload:{}", payload);
                client
                    .publish(
                        //String::from("test"),
                        TOPIC.to_owned(),
                        QualityOfService::Level0,
                        Vec::from(payload.as_bytes()),
                    )
                    .await?;
                println!("Sending");
                println!("Wait for 5s");
                let delay_time = Instant::now() + Duration::new(1, 0);
                delay_until(delay_time).await;
                println!("Sent successfully, delayed 5s");
            }

`

second step is to simulate multiple client by adding a for loop

`
for client in clients {

let future = ...

   let result = async move {

       new mqtt-client

       loop {sending msg}

   }

Arbiter::spawn(future);

}

`

After adding a for loop, all clients will only be able to send out first message and then Actor will stop. Error message is Custom { kind: Interrupted, error: "Mailbox has closed" }. I couldn't figure out why Actor was stopped.

Appreciate your help.

@Syndim
Copy link
Owner

Syndim commented Mar 6, 2020

Hi, I did a quick test on my side but I can't repro this issue. Can you provide more info about:

  1. Which OS are you running?
  2. How did you create multiple clients? Do you just clone the client or connect to the MQTT server for multiple times?

In the mainwhile I created 2 tests:
This test clones the MQTT client for 5 times and send the random packet

fn test_random_publish_level0_cloned_client() {

This test connects to the MQTT server(I'm using mosquitto) for 5 times and send random packet

fn test_random_publish_level0_created_client() {

Both tests are running fine on my side(MacOS 10.15.3), can you also try to run those tests on your side and see if it works?

@xudesheng
Copy link
Contributor Author

xudesheng commented Mar 6, 2020

Thank you for your quick response:

  1. I'm on Mac 10.15
  2. I created new client, instead of clone. I print out peer_addr and local_addr as:
    devicemqtt0001,Remote:V4(104.46.115.xxx:8883), Local:V4(192.168.1.9:57784) devicemqtt0000,Remote:V4(104.46.115.xxx:8883), Local:V4(192.168.1.9:57785)
    Because I have to simulate client pressure, so I want to use more clients instead of clone.

appreciate you shared your test code.

let me follow your pattern exactly and try it.

@xudesheng
Copy link
Contributor Author

xudesheng commented Mar 6, 2020

let mut client = MqttClient::new( r, w, format!("test_{}", client_id), MqttOptions::default(), MessageActor(**sender.clone()**).start().recipient(), ErrorActor.start().recipient(), None, );
I feel this is the only difference: sender.clone(). I don't understand why it needed here and I couldn't find it out from doc. may I have missed anything?

after look at your code more closely, I understood why you add this sender, but it seems like my issue will not be impacted by this one.

let me install a broker in my local computer and test your code.

@xudesheng
Copy link
Contributor Author

xudesheng commented Mar 6, 2020

running 4 tests
test random_test::test_random_publish_level0_cloned_client ... FAILED
test tests::test_client ... FAILED
test random_test::test_random_publish_level2 ... FAILED
test random_test::test_random_publish_level0_created_client ... test random_test::test_random_publish_level0_created_client has been running for over 60 seconds

Your test code seems like failed on my local.

message from mosquitto console:
1583510015: New client connected from 127.0.0.1 as test_0 (p2, c0, k0). 1583510015: New client connected from 127.0.0.1 as test_1 (p2, c0, k0). 1583510015: New client connected from 127.0.0.1 as test_2 (p2, c0, k0). 1583510015: New client connected from 127.0.0.1 as test_3 (p2, c0, k0). 1583510015: New client connected from 127.0.0.1 as test_4 (p2, c0, k0).

@xudesheng
Copy link
Contributor Author

xudesheng commented Mar 6, 2020

if I only run test_client case, it's success.

cargo test test_client
Finished test [unoptimized + debuginfo] target(s) in 0.12s
Running target/debug/deps/actix_mqtt_client-21c06d40e896069d

running 1 test
test tests::test_client ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 3 filtered out

but if I only run `cargo test random_test', it will fail:

cargo test random_test
Finished test [unoptimized + debuginfo] target(s) in 0.10s
Running target/debug/deps/actix_mqtt_client-21c06d40e896069d

running 3 tests
test random_test::test_random_publish_level2 ... FAILED
test random_test::test_random_publish_level0_cloned_client ... FAILED
test random_test::test_random_publish_level0_created_client ... test random_test::test_random_publish_level0_created_client has been running for over 60 seconds

@Syndim
Copy link
Owner

Syndim commented Mar 7, 2020

The random tests are actually infinite loops which sends the random packets forever so they won't actually "pass" the tests :).

Looks like you are doing exactly the same as test_random_publish_level0_created_client is doing. Can you try to run the test by:

RUST_LOG=info cargo test test_random_publish_level0_created_client -- --nocapture

And see if there are any error logs? If everything goes fine you will only get "INFO" logs like this:

[2020-03-07T08:58:47Z INFO  actix_mqtt_client::random_test] Got message: id:0, topic: test, payload: [226, 104, 107, 32, 142, 174, 59, 194, 156, 86, 244, 56, 129, 94, 60, 62, 131, 60, 122, 247, 12, 187, 214, 64, 128, 176, 1, 119, 1, 139, 150, 66]
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::random_test] Got message: id:0, topic: test, payload: [87, 50, 65, 193, 210, 48, 17, 234, 167, 240, 219, 169, 173, 46, 207, 207, 149, 234, 103, 177, 125, 115, 105, 113, 156, 174, 160, 96, 213, 176, 141, 129]
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::random_test] Pending recv items: 0
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::random_test] Pending recv items: 0
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::actors::packets::publish] Handle message for RecvPublishActor
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::actors::packets::publish] Handle message for RecvPublishActor

If you find any error logs please paste them here and I can take a look.

Or if you can share the full code snippet which you use to connect to local MQTT server I can also check what's happening.

@xudesheng
Copy link
Contributor Author

The random tests are actually infinite loops which sends the random packets forever so they won't actually "pass" the tests :).

Looks like you are doing exactly the same as test_random_publish_level0_created_client is doing. Can you try to run the test by:

RUST_LOG=info cargo test test_random_publish_level0_created_client -- --nocapture

And see if there are any error logs? If everything goes fine you will only get "INFO" logs like this:

[2020-03-07T08:58:47Z INFO  actix_mqtt_client::random_test] Got message: id:0, topic: test, payload: [226, 104, 107, 32, 142, 174, 59, 194, 156, 86, 244, 56, 129, 94, 60, 62, 131, 60, 122, 247, 12, 187, 214, 64, 128, 176, 1, 119, 1, 139, 150, 66]
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::random_test] Got message: id:0, topic: test, payload: [87, 50, 65, 193, 210, 48, 17, 234, 167, 240, 219, 169, 173, 46, 207, 207, 149, 234, 103, 177, 125, 115, 105, 113, 156, 174, 160, 96, 213, 176, 141, 129]
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::random_test] Pending recv items: 0
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::random_test] Pending recv items: 0
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::actors::packets::publish] Handle message for RecvPublishActor
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::actors::packets::publish] Handle message for RecvPublishActor

If you find any error logs please paste them here and I can take a look.

Or if you can share the full code snippet which you use to connect to local MQTT server I can also check what's happening.

Thank you. I didn't find any error log.

let me clean up my code and narrow down issue scope. if issue persists, I will send to you and appreciate you help in advance.

@xudesheng
Copy link
Contributor Author

The random tests are actually infinite loops which sends the random packets forever so they won't actually "pass" the tests :).

Looks like you are doing exactly the same as test_random_publish_level0_created_client is doing. Can you try to run the test by:

RUST_LOG=info cargo test test_random_publish_level0_created_client -- --nocapture

And see if there are any error logs? If everything goes fine you will only get "INFO" logs like this:

[2020-03-07T08:58:47Z INFO  actix_mqtt_client::random_test] Got message: id:0, topic: test, payload: [226, 104, 107, 32, 142, 174, 59, 194, 156, 86, 244, 56, 129, 94, 60, 62, 131, 60, 122, 247, 12, 187, 214, 64, 128, 176, 1, 119, 1, 139, 150, 66]
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::random_test] Got message: id:0, topic: test, payload: [87, 50, 65, 193, 210, 48, 17, 234, 167, 240, 219, 169, 173, 46, 207, 207, 149, 234, 103, 177, 125, 115, 105, 113, 156, 174, 160, 96, 213, 176, 141, 129]
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::random_test] Pending recv items: 0
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::random_test] Pending recv items: 0
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::actors::packets::publish] Handle message for RecvPublishActor
[2020-03-07T08:58:47Z INFO  actix_mqtt_client::actors::packets::publish] Handle message for RecvPublishActor

If you find any error logs please paste them here and I can take a look.

Or if you can share the full code snippet which you use to connect to local MQTT server I can also check what's happening.

Hi, I created a repo: https://github.com/xudesheng/actixmqtttest
since it includes credential for test, so, I have to make it to be private.

main binary: actixmqtttest, I modified your sample code but it always fail.
another binary: single, which is also modified from your sample code and it works.

I also run single in two separated instances and point to different devices, it works as expected.
RUST_LOG=info cargo run --bin single 0

RUST_LOG=info cargo run --bin single 1

I included error log in repo too.

appreciate your help.

@Syndim
Copy link
Owner

Syndim commented Mar 8, 2020

hi, I compared your working/not working code and find that the client name of the not working one is wrong:

let mut client = MqttClient::new(
            r,
            w,
            format!("test_{}", client_id),                // THIS LINE
            mqtt_option,
            MessageActor(sender.clone()).start().recipient(),
            ErrorActor.start().recipient(),
            None,
        );

It should just be client_id. After updating the client name the code is running fine.

I also made some improvement for multi-client support of the library and published v0.4.0, you can also have a try :).

@xudesheng
Copy link
Contributor Author

hi, I compared your working/not working code and find that the client name of the not working one is wrong:

let mut client = MqttClient::new(
            r,
            w,
            format!("test_{}", client_id),                // THIS LINE
            mqtt_option,
            MessageActor(sender.clone()).start().recipient(),
            ErrorActor.start().recipient(),
            None,
        );

It should just be client_id. After updating the client name the code is running fine.

I also made some improvement for multi-client support of the library and published v0.4.0, you can also have a try :).

Shame! I wasted whole day there.
really appreciate and thank you very much. I will come back to this lib in my next release very soon.

@xudesheng
Copy link
Contributor Author

@Syndim , sorry, one more question needs your help. Appreciate.
I have to simulate up to 1000 MQTT client. If I understand correctly, one Arbiter is just one thread, which obviously will be a bottle neck.
I'm thinking about two approaches:

  1. lunch multiple threads, each thread has a System::run to drive all futures.
  2. Use multiple Arbiters? never see examples.

appreciate you can give some hit.

@Syndim
Copy link
Owner

Syndim commented Mar 9, 2020

@xudesheng sorry I don't have much experience about it. I think the event loop itself is single threaded but the executors aren't so I guess it may not be a big issue?

@xudesheng
Copy link
Contributor Author

@xudesheng sorry I don't have much experience about it. I think the event loop itself is single threaded but the executors aren't so I guess it may not be a big issue?

I will run some benchmark test. I have used another lib 'mqtt-async-client' and have to lunch multiple threads and use tokio::local in each thread.
New build is based your lib and I will use system executor. Hopefully I can find enough resource to run benchmark test sometime later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants