tokio integration #72

Closed
diwic opened this Issue Jan 29, 2017 · 58 comments

Comments

Projects
None yet
6 participants
@diwic
Owner

diwic commented Jan 29, 2017

It would be nice if it was easier to use this dbus library in an async fashion.

@brson

This comment has been minimized.

Show comment
Hide comment
@brson

brson Mar 31, 2017

@antoyo, @zeenix and I spent some time hacking on a proof of concept of tokio integration. We managed to create a basic example client, started hacking on an example server, and then began feeling a bit over our heads w/r/t dbus-rs internals. I hope that, at the least, this work can serve as guidance toward the real solution.

Here's the branch: https://github.com/brson/dbus-rs/tree/tokio

The entire implementation is contained in tokio.rs. There is a client example and server.

The comment at the top of tokio.rs describes the integration strategy, which is derived from tokio-curl.

This patch integrates directly into dbus-rs for simplicity, but it would be just as well as a standalone crate.

One unfortunate thing about this integration is that it needs to know directly about tokio-core and mio in order to communicate information about the fds that come out of dbus. So it's not completely abstracted from the underlying event loop, and will need specific integration into future tokio event loop implementations. I figure solutions there will shake out in time.

Since we didn't make much progress on the server implementation, we just wrote the server example to demonstrate what might be a plausible API. In the client code we took a strategy of encapsulating Connection in TokioConnection; for the server we might similarly encapsulate it in TokioServer, or possibly give TokioConnection a method that returns a server future of some kind, which can then be used to drive an event loop (this seems like it may be more consistent with the existing Connection design).

The big thing we got stuck on was understanding the parameterization of Factory over a function abstraction, and in particular what MTFn does. I suspect we need some serious guidance at this point on how to proceed in a way that is congruent with the existing design.

FYI, I don't personally expect to continue working on this, but I'm hopeful @antoyo and @zeenix can continue in collaboration with @diwic.

brson commented Mar 31, 2017

@antoyo, @zeenix and I spent some time hacking on a proof of concept of tokio integration. We managed to create a basic example client, started hacking on an example server, and then began feeling a bit over our heads w/r/t dbus-rs internals. I hope that, at the least, this work can serve as guidance toward the real solution.

Here's the branch: https://github.com/brson/dbus-rs/tree/tokio

The entire implementation is contained in tokio.rs. There is a client example and server.

The comment at the top of tokio.rs describes the integration strategy, which is derived from tokio-curl.

This patch integrates directly into dbus-rs for simplicity, but it would be just as well as a standalone crate.

One unfortunate thing about this integration is that it needs to know directly about tokio-core and mio in order to communicate information about the fds that come out of dbus. So it's not completely abstracted from the underlying event loop, and will need specific integration into future tokio event loop implementations. I figure solutions there will shake out in time.

Since we didn't make much progress on the server implementation, we just wrote the server example to demonstrate what might be a plausible API. In the client code we took a strategy of encapsulating Connection in TokioConnection; for the server we might similarly encapsulate it in TokioServer, or possibly give TokioConnection a method that returns a server future of some kind, which can then be used to drive an event loop (this seems like it may be more consistent with the existing Connection design).

The big thing we got stuck on was understanding the parameterization of Factory over a function abstraction, and in particular what MTFn does. I suspect we need some serious guidance at this point on how to proceed in a way that is congruent with the existing design.

FYI, I don't personally expect to continue working on this, but I'm hopeful @antoyo and @zeenix can continue in collaboration with @diwic.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Mar 31, 2017

Owner

MTFn, MTFnMut and MTSync represent methods that are Fn, FnMut and Fn + Send + Sync. This was needed because someone asked for being able to run the tree from several threads in parallel. Then someone else asked for the tree methods to be able to mutate its environment. So I made the tree generic over different function types. But it does complicate matters quite a bit, so I'm not totally happy about it. Not sure how to integrate async-ness into that...

Owner

diwic commented Mar 31, 2017

MTFn, MTFnMut and MTSync represent methods that are Fn, FnMut and Fn + Send + Sync. This was needed because someone asked for being able to run the tree from several threads in parallel. Then someone else asked for the tree methods to be able to mutate its environment. So I made the tree generic over different function types. But it does complicate matters quite a bit, so I'm not totally happy about it. Not sure how to integrate async-ness into that...

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Apr 1, 2017

Owner

@brson Looking at the server example, I see you added an f.async_method which is not implemented. Could you answer:

  1. This function takes a callback. Does this callback return a AsyncMethodResult? Is an AsyncMethodResult a Future of some sort?
  2. When dbus-rs gets this AsyncMethodResult back from a method, I guess it's supposed to hand it over to an event loop somehow, wait for it to complete into a normal MethodResult, and then handle that MethodResult. But I'm lost on how to do that, i e, how to hand an AsyncMethodResult over to the event loop and how it then hands me a MethodResult later?
Owner

diwic commented Apr 1, 2017

@brson Looking at the server example, I see you added an f.async_method which is not implemented. Could you answer:

  1. This function takes a callback. Does this callback return a AsyncMethodResult? Is an AsyncMethodResult a Future of some sort?
  2. When dbus-rs gets this AsyncMethodResult back from a method, I guess it's supposed to hand it over to an event loop somehow, wait for it to complete into a normal MethodResult, and then handle that MethodResult. But I'm lost on how to do that, i e, how to hand an AsyncMethodResult over to the event loop and how it then hands me a MethodResult later?
@brson

This comment has been minimized.

Show comment
Hide comment
@brson

brson Apr 1, 2017

This function takes a callback. Does this callback return a AsyncMethodResult? Is an AsyncMethodResult a Future of some sort?

Yes, that was the intent.

When dbus-rs gets this AsyncMethodResult back from a method, I guess it's supposed to hand it over to an event loop somehow, wait for it to complete into a normal MethodResult, and then handle that MethodResult. But I'm lost on how to do that, i e, how to hand an AsyncMethodResult over to the event loop and how it then hands me a MethodResult later?

So async_method itself does not return a future, it's only the callback provided to async_method that returns a future. That callback can be considered a method call future factory. The idea is that the 'engine' itself calls that callback (factory) to construct a new future, then itself is responsible for scheduling a task based on that future.

I cannot say that this is a correct idea, but it felt right at a first pass.

brson commented Apr 1, 2017

This function takes a callback. Does this callback return a AsyncMethodResult? Is an AsyncMethodResult a Future of some sort?

Yes, that was the intent.

When dbus-rs gets this AsyncMethodResult back from a method, I guess it's supposed to hand it over to an event loop somehow, wait for it to complete into a normal MethodResult, and then handle that MethodResult. But I'm lost on how to do that, i e, how to hand an AsyncMethodResult over to the event loop and how it then hands me a MethodResult later?

So async_method itself does not return a future, it's only the callback provided to async_method that returns a future. That callback can be considered a method call future factory. The idea is that the 'engine' itself calls that callback (factory) to construct a new future, then itself is responsible for scheduling a task based on that future.

I cannot say that this is a correct idea, but it felt right at a first pass.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Apr 2, 2017

Owner

then itself is responsible for scheduling a task based on that future.

Right, it's specifically this part I'm wondering about. How do I schedule a task based on a future?
(I'm not even sure about the conceptual difference between a task and a future.)

Owner

diwic commented Apr 2, 2017

then itself is responsible for scheduling a task based on that future.

Right, it's specifically this part I'm wondering about. How do I schedule a task based on a future?
(I'm not even sure about the conceptual difference between a task and a future.)

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Apr 3, 2017

Owner

Having read up a little about futures and tasks, I think maybe it will be better for tree.handle_async (or something like that) to return those futures directly and leave it up to the caller to combine, turning into tasks etc.

Owner

diwic commented Apr 3, 2017

Having read up a little about futures and tasks, I think maybe it will be better for tree.handle_async (or something like that) to return those futures directly and leave it up to the caller to combine, turning into tasks etc.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Apr 5, 2017

Owner

@brson @zeenix

Ok, so I tried from from the other end, i e, trying to build a tree with async methods. I figured this would be the easier part, since it does not do much fd related stuff. But I haven't come very far yet. I'm stuck on two things:

  1. The example fails to compile here with cannot move out of captured outer variable in an Fn closure. I don't know how to resolve that - and_then takes an FnOnce closure so that error doesn't make sense, but maybe this has to do with that we're inside an Fn closure...?

  2. As for AMethodResult (short for brson's AsyncMethodResult), I'm not sure what it should contain? A BoxFuture? Or were we supposed to avoid boxes? Or should the callback return an R: IntoFuture perhaps? And what about the Err type of the future?

Owner

diwic commented Apr 5, 2017

@brson @zeenix

Ok, so I tried from from the other end, i e, trying to build a tree with async methods. I figured this would be the easier part, since it does not do much fd related stuff. But I haven't come very far yet. I'm stuck on two things:

  1. The example fails to compile here with cannot move out of captured outer variable in an Fn closure. I don't know how to resolve that - and_then takes an FnOnce closure so that error doesn't make sense, but maybe this has to do with that we're inside an Fn closure...?

  2. As for AMethodResult (short for brson's AsyncMethodResult), I'm not sure what it should contain? A BoxFuture? Or were we supposed to avoid boxes? Or should the callback return an R: IntoFuture perhaps? And what about the Err type of the future?

@zeenix

This comment has been minimized.

Show comment
Hide comment
@zeenix

zeenix Apr 11, 2017

I wish I could reply but when it comes to code, I was only looking over the shoulder of @brson and making observations and suggestions based on my guesses about the code. So i hope @brson will be able to remember and respond.

zeenix commented Apr 11, 2017

I wish I could reply but when it comes to code, I was only looking over the shoulder of @brson and making observations and suggestions based on my guesses about the code. So i hope @brson will be able to remember and respond.

@antoyo

This comment has been minimized.

Show comment
Hide comment
@antoyo

antoyo Apr 20, 2017

@diwic :
For your first issue, it is indeed caused by using a Fn. You see, you move the signal into the FnOnce of the and_then call but this closure (the Fn one) can be called again.
A fix is to clone() it:

                let signal2 = signal.clone();
                sleep_future.and_then(move |_| {
                    let s = format!("Hello {}!", sender);
                    let mret = mret.append1(s);
                    let sig = signal2.msg(&pname, &iname).append1(&*sender);

                    // Two messages will be returned - one is the method return (and should always be there),
                    // and in our case we also have a signal we want to send at the same time.
                    Ok(vec!(mret, sig))
                })

For your other issue, you should most probably avoid BoxFuture as it is confusing (Box<Future> behave more as expected).
In any case, you probably want to avoid boxing the future right away.
One way to avoid that is to use -> impl Future which is currently unstable.
All the ways to return a Future are described here.
You Err type will probably be MethodErr.

antoyo commented Apr 20, 2017

@diwic :
For your first issue, it is indeed caused by using a Fn. You see, you move the signal into the FnOnce of the and_then call but this closure (the Fn one) can be called again.
A fix is to clone() it:

                let signal2 = signal.clone();
                sleep_future.and_then(move |_| {
                    let s = format!("Hello {}!", sender);
                    let mret = mret.append1(s);
                    let sig = signal2.msg(&pname, &iname).append1(&*sender);

                    // Two messages will be returned - one is the method return (and should always be there),
                    // and in our case we also have a signal we want to send at the same time.
                    Ok(vec!(mret, sig))
                })

For your other issue, you should most probably avoid BoxFuture as it is confusing (Box<Future> behave more as expected).
In any case, you probably want to avoid boxing the future right away.
One way to avoid that is to use -> impl Future which is currently unstable.
All the ways to return a Future are described here.
You Err type will probably be MethodErr.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Apr 21, 2017

Owner

@antoyo

You see, you move the signal

Ah, right. It would have been helpful if the compiler could have given a hint about what variable it was complaining about. Fixed now.

As for AMethodResult:

One way to avoid that is to use -> impl Future which is currently unstable.
All the ways to return a Future are described here.

AMethodResult is not a generic type, so -> impl Future makes no sense. From the options in the link, AMethodResult is the "Custom type" option.

You Err type will probably be MethodErr.

Okay, I'll started with this (see just committed code). One piece fell in place when I changed the example to convert a TimerError to a MethodErr. I'll continue later, need to go to work now :-)

Owner

diwic commented Apr 21, 2017

@antoyo

You see, you move the signal

Ah, right. It would have been helpful if the compiler could have given a hint about what variable it was complaining about. Fixed now.

As for AMethodResult:

One way to avoid that is to use -> impl Future which is currently unstable.
All the ways to return a Future are described here.

AMethodResult is not a generic type, so -> impl Future makes no sense. From the options in the link, AMethodResult is the "Custom type" option.

You Err type will probably be MethodErr.

Okay, I'll started with this (see just committed code). One piece fell in place when I changed the example to convert a TimerError to a MethodErr. I'll continue later, need to go to work now :-)

@antoyo

This comment has been minimized.

Show comment
Hide comment
@antoyo

antoyo Apr 28, 2017

@diwic: I reported an issue for the hint and it was fixed.

For the -> impl Future thing, it is not for generic type, but for types implementing a trait.
So, your AMethodResult will implement Future so that would work.
But since you already have a concrete type, it is better to use it than using the -> impl feature which is unstable.

antoyo commented Apr 28, 2017

@diwic: I reported an issue for the hint and it was fixed.

For the -> impl Future thing, it is not for generic type, but for types implementing a trait.
So, your AMethodResult will implement Future so that would work.
But since you already have a concrete type, it is better to use it than using the -> impl feature which is unstable.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic May 14, 2017

Owner

As you have perhaps noticed, not much has happened during the last few weeks. This is partially due to other things taking priority (and I expect the same for a few weeks more), but also due to the Tokio architecture being hard to understand for me. I've filed tokio-rs/tokio#9 now, let's see if that gives something.

Owner

diwic commented May 14, 2017

As you have perhaps noticed, not much has happened during the last few weeks. This is partially due to other things taking priority (and I expect the same for a few weeks more), but also due to the Tokio architecture being hard to understand for me. I've filed tokio-rs/tokio#9 now, let's see if that gives something.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic May 19, 2017

Owner

Ok, so with some help in that other thread, I was able to get somewhere, so I have somewhat of a working client. Here's my test case:

let conn = Rc::new(Connection::get_private(BusType::Session).unwrap());
let mut core = Core::new().unwrap();
let awatcher = AWatcher::new(conn.clone(), &core.handle()).unwrap();

let (tx, rx) = futures::sync::oneshot::channel();
let m = Message::new_method_call("org.freedesktop.DBus", "/", "org.freedesktop.DBus", "ListNames").unwrap();
awatcher.append(conn.send_with_reply(m, move |r| {
    let z: Vec<&str> = r.get1().unwrap();
    println!("got reply: {:?}", z);
    tx.send(()).unwrap();
}));

core.handle().spawn(awatcher.for_each(|_| Ok(())));
core.run(rx).unwrap();

Feel free to review / try it out etc. I think the main differences between mine and brson's implementations are:

  • Calling a method does not return a future. Instead you append it to an instance of AWatcher as you see in the example. Maybe this can be improved in the future (pun not intended :-) ).
  • internally, I have one Future per fd.
  • my version does not interfere with original dbus crate internals, dbus-tokio is a separate crate. It instead interacts with the somewhat new MsgHandler trait I've been working on, that can also be used without tokio.
Owner

diwic commented May 19, 2017

Ok, so with some help in that other thread, I was able to get somewhere, so I have somewhat of a working client. Here's my test case:

let conn = Rc::new(Connection::get_private(BusType::Session).unwrap());
let mut core = Core::new().unwrap();
let awatcher = AWatcher::new(conn.clone(), &core.handle()).unwrap();

let (tx, rx) = futures::sync::oneshot::channel();
let m = Message::new_method_call("org.freedesktop.DBus", "/", "org.freedesktop.DBus", "ListNames").unwrap();
awatcher.append(conn.send_with_reply(m, move |r| {
    let z: Vec<&str> = r.get1().unwrap();
    println!("got reply: {:?}", z);
    tx.send(()).unwrap();
}));

core.handle().spawn(awatcher.for_each(|_| Ok(())));
core.run(rx).unwrap();

Feel free to review / try it out etc. I think the main differences between mine and brson's implementations are:

  • Calling a method does not return a future. Instead you append it to an instance of AWatcher as you see in the example. Maybe this can be improved in the future (pun not intended :-) ).
  • internally, I have one Future per fd.
  • my version does not interfere with original dbus crate internals, dbus-tokio is a separate crate. It instead interacts with the somewhat new MsgHandler trait I've been working on, that can also be used without tokio.
@zeenix

This comment has been minimized.

Show comment
Hide comment

zeenix commented May 22, 2017

@diwic cool. :)

@manuels

This comment has been minimized.

Show comment
Hide comment
@manuels

manuels Jun 3, 2017

Contributor

This tokio integration would be tremendously helpful! I'd like to refactor my crate bulletinboard to make it work with tokio.

Do you already have a rough idea when this feature will land in your dbus crate?

Contributor

manuels commented Jun 3, 2017

This tokio integration would be tremendously helpful! I'd like to refactor my crate bulletinboard to make it work with tokio.

Do you already have a rough idea when this feature will land in your dbus crate?

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Jun 4, 2017

Owner

@manuels It will not land in the dbus crate; dbus-tokio will be a separate crate.

The slow progress last few weeks is basically me prioritizing other things. Those things will hopefully calm down a week from now but it's hard to know for sure. Also expect redesigns as I learn more of Tokio.

Owner

diwic commented Jun 4, 2017

@manuels It will not land in the dbus crate; dbus-tokio will be a separate crate.

The slow progress last few weeks is basically me prioritizing other things. Those things will hopefully calm down a week from now but it's hard to know for sure. Also expect redesigns as I learn more of Tokio.

@manuels

This comment has been minimized.

Show comment
Hide comment
@manuels

manuels Jun 5, 2017

Contributor

Thanks for the feedback, @diwic!

Contributor

manuels commented Jun 5, 2017

Thanks for the feedback, @diwic!

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Jun 25, 2017

Owner

Good news! I finally got something up and working enough, that I have just made the first release of dbus-tokio.

What should be working is:

  • Client: Make method calls and wait asynchronously for them to be replied to - see AConnection::method_call
  • Get a stream of incoming messages (so you can listen to signals etc) - see AConnection::messages
  • Server: Make a tree handle that stream of incoming messages - see tree::ATreeServer
  • Server: Add asynchronous methods to the tree - in case you cannot reply right away, you can return a future that will reply when that future resolves - see tree::AFactory::amethod

So @manuels , @zeenix , @antoyo - this is where I hand over to you to try it out and see what you like and what you don't like, if it's working or if it's buggy etc... (and @albel727 might be interested as well).

Owner

diwic commented Jun 25, 2017

Good news! I finally got something up and working enough, that I have just made the first release of dbus-tokio.

What should be working is:

  • Client: Make method calls and wait asynchronously for them to be replied to - see AConnection::method_call
  • Get a stream of incoming messages (so you can listen to signals etc) - see AConnection::messages
  • Server: Make a tree handle that stream of incoming messages - see tree::ATreeServer
  • Server: Add asynchronous methods to the tree - in case you cannot reply right away, you can return a future that will reply when that future resolves - see tree::AFactory::amethod

So @manuels , @zeenix , @antoyo - this is where I hand over to you to try it out and see what you like and what you don't like, if it's working or if it's buggy etc... (and @albel727 might be interested as well).

@manuels

This comment has been minimized.

Show comment
Hide comment
@manuels

manuels Jun 25, 2017

Contributor

Looks great. The interface looks quite simple.
I'll give it a try the next days!

Contributor

manuels commented Jun 25, 2017

Looks great. The interface looks quite simple.
I'll give it a try the next days!

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jun 30, 2017

Contributor

I've taken a look at the implementation, and I'm starting to think that implementing a tokio wrapper around libdbus isn't possible in a clean and reliable way, at least not with PollEvented. I haven't checked my conjectures experimentally yet, but bear with me.

The problem I see is that PollEvented is always registered with PollOpt::edge() in tokio core, and libdbus doesn't seem to go well with edge-triggered polling. Neither dbus_connection_read_write() nor dbus_watch_handle() always exhaust the socket, which is a requirement for edge-triggered polling. For instance, those functions seem to return prematurely when

  1. Connection authentication just completed
  2. Amount of read/written data exceeds 2048 bytes (not configurable).

See socket_handle_watch() and socket_do_iteration() functions. And do_reading()/do_writing() functions that they call, in particular.

In the current implementation, Future for ADriver calls dbus_watch_handle() once, and then, if that failed to produce items, it calls dbus_connection_read_write_dispatch() at least once due to iter(0) in self.handle_items(items.unwrap_or(cc.iter(0))). If that still failed to produce messages, the ADriver is left with Ok(Async::NotReady). And if at that point the socket is still not exhausted, the ADriver may never be polled again, despite the need_read()/need_write() calls.

Well, it's a bit more complicated than this, since everything that, e.g. calls _dbus_connection_do_iteration_unlocked(), like dbus_connection_send(), will make some ~2048 byte progress. And so it might exhaust the socket, which may convince epoll() to generate an event for it, which may cause ADriver future to get polled again. But needless to say, that probably won't be in a timely manner.

So, if my understanding is correct, if sent/received messages are more than 4096 bytes size on average, current implementation will grind to a halt. I don't see how this can be fixed in general, as none of the dbus functions return whether they encountered EAGAIN, which would ensure that edge-triggered event loop would work correctly.

I encourage someone to test my guess, since I don't have time for it just now.

As for possible workarounds, maybe issuing read()/write() on the socket with zero byte count will return EAGAIN and so would allow us to check for exhaustion and run dbus_watch_handle() until the socket is exhausted. But read()/write() with zero count is not guaranteed to return errors, so it probably won't work on all Unixes, let alone Windows.

Contributor

albel727 commented Jun 30, 2017

I've taken a look at the implementation, and I'm starting to think that implementing a tokio wrapper around libdbus isn't possible in a clean and reliable way, at least not with PollEvented. I haven't checked my conjectures experimentally yet, but bear with me.

The problem I see is that PollEvented is always registered with PollOpt::edge() in tokio core, and libdbus doesn't seem to go well with edge-triggered polling. Neither dbus_connection_read_write() nor dbus_watch_handle() always exhaust the socket, which is a requirement for edge-triggered polling. For instance, those functions seem to return prematurely when

  1. Connection authentication just completed
  2. Amount of read/written data exceeds 2048 bytes (not configurable).

See socket_handle_watch() and socket_do_iteration() functions. And do_reading()/do_writing() functions that they call, in particular.

In the current implementation, Future for ADriver calls dbus_watch_handle() once, and then, if that failed to produce items, it calls dbus_connection_read_write_dispatch() at least once due to iter(0) in self.handle_items(items.unwrap_or(cc.iter(0))). If that still failed to produce messages, the ADriver is left with Ok(Async::NotReady). And if at that point the socket is still not exhausted, the ADriver may never be polled again, despite the need_read()/need_write() calls.

Well, it's a bit more complicated than this, since everything that, e.g. calls _dbus_connection_do_iteration_unlocked(), like dbus_connection_send(), will make some ~2048 byte progress. And so it might exhaust the socket, which may convince epoll() to generate an event for it, which may cause ADriver future to get polled again. But needless to say, that probably won't be in a timely manner.

So, if my understanding is correct, if sent/received messages are more than 4096 bytes size on average, current implementation will grind to a halt. I don't see how this can be fixed in general, as none of the dbus functions return whether they encountered EAGAIN, which would ensure that edge-triggered event loop would work correctly.

I encourage someone to test my guess, since I don't have time for it just now.

As for possible workarounds, maybe issuing read()/write() on the socket with zero byte count will return EAGAIN and so would allow us to check for exhaustion and run dbus_watch_handle() until the socket is exhausted. But read()/write() with zero count is not guaranteed to return errors, so it probably won't work on all Unixes, let alone Windows.

@antoyo

This comment has been minimized.

Show comment
Hide comment
@antoyo

antoyo Jun 30, 2017

@albel727: Do you think it could work with alternative event loops like futures-glib?

antoyo commented Jun 30, 2017

@albel727: Do you think it could work with alternative event loops like futures-glib?

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 1, 2017

Contributor

It probably could, though I'm not sure how that integrates with tokio, and this is "Tokio integration" issue, after all. Anything with level-based signaling should work. Glib uses poll() internally, so potentially it could work.

The possible workarounds are numerous, though few are elegant. Maybe even something as simple as overriding PollOpt setting to level() in mio::Evented for AWatch would work. Maybe checking exhaustion with read(0)/write(0)/poll() ourselves in Future for ADriver would work. Though the latter would still require Stream/Sink to poll ADriver directly and not through UnboundedSender/Receiver, so that it would be driven by user demand, when we couldn't exhaust the socket even with continuous calling of dbus_watch_handle() due to libdbus internal incoming queue limits (63 megabytes, 4096 fds total).

But first we better make sure that it indeed fails the way I predict it. Then potential fixes can be thought about and tried.

Contributor

albel727 commented Jul 1, 2017

It probably could, though I'm not sure how that integrates with tokio, and this is "Tokio integration" issue, after all. Anything with level-based signaling should work. Glib uses poll() internally, so potentially it could work.

The possible workarounds are numerous, though few are elegant. Maybe even something as simple as overriding PollOpt setting to level() in mio::Evented for AWatch would work. Maybe checking exhaustion with read(0)/write(0)/poll() ourselves in Future for ADriver would work. Though the latter would still require Stream/Sink to poll ADriver directly and not through UnboundedSender/Receiver, so that it would be driven by user demand, when we couldn't exhaust the socket even with continuous calling of dbus_watch_handle() due to libdbus internal incoming queue limits (63 megabytes, 4096 fds total).

But first we better make sure that it indeed fails the way I predict it. Then potential fixes can be thought about and tried.

@antoyo

This comment has been minimized.

Show comment
Hide comment
@antoyo

antoyo Jul 1, 2017

futures-glib was intented to be used with tokio-based library.
Currently, I'm waiting for this pull request to be merged.
After that, it should be possible to use tokio-based libraries like hyper with futures-glib.

antoyo commented Jul 1, 2017

futures-glib was intented to be used with tokio-based library.
Currently, I'm waiting for this pull request to be merged.
After that, it should be possible to use tokio-based libraries like hyper with futures-glib.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 2, 2017

Contributor

So I've tested receipt of messages, with a simple server and dbus-test-tool, and yeah, it fails like I expected.

fn main() {
    let conn = Rc::new(Connection::get_private(::dbus::BusType::Session).unwrap());
    //println!("Conn: {}", conn.unique_name());
    conn.register_name("org.dbus.tokio.Test", 7).unwrap();

    let mut old_cb = conn.replace_message_callback(None).unwrap();
    conn.replace_message_callback(Some(Box::new(move |conn, msg| {
        old_cb(conn, msg);
        true // Never reply to method calls, to suppress "vanished sender" errors.
    })));

    let mut core = ::tokio_core::reactor::Core::new().unwrap();
    let aconn = AConnection::new(conn.clone(), core.handle()).unwrap();

    let items: AMessageStream = aconn.messages().unwrap();
    let signals = items.for_each(|m| {
        println!("Message from {:?} was: {:?}", m.sender(), m);
        //println!("{:?}", m.get_items());
        Ok(())
    });
    core.run(signals).unwrap();
}

Executing

echo 8000 | dbus-test-tool spam --dest=org.dbus.tokio.Test --no-reply --random-size --count=1

will only result in receipt ~every second time, because messages fill the socket with twice the amount of bytes than is processed per wakeup.

Observing same failure with sending is harder, because unlike reading, which is limited to 2048 bytes per iteration, send() tries to send everything at once, it just gives up until next iteration, if it managed to send more than 2048 bytes. So even big messages can be sent in one go, but if they aren't, writing the rest might not occur for a long time.

Simple PollOpt::level() hack like opts.remove(mio::PollOpt::edge()); opts.insert(mio::PollOpt::level()); appears to work, though. Though it should be checked more extensively, than I did.

Contributor

albel727 commented Jul 2, 2017

So I've tested receipt of messages, with a simple server and dbus-test-tool, and yeah, it fails like I expected.

fn main() {
    let conn = Rc::new(Connection::get_private(::dbus::BusType::Session).unwrap());
    //println!("Conn: {}", conn.unique_name());
    conn.register_name("org.dbus.tokio.Test", 7).unwrap();

    let mut old_cb = conn.replace_message_callback(None).unwrap();
    conn.replace_message_callback(Some(Box::new(move |conn, msg| {
        old_cb(conn, msg);
        true // Never reply to method calls, to suppress "vanished sender" errors.
    })));

    let mut core = ::tokio_core::reactor::Core::new().unwrap();
    let aconn = AConnection::new(conn.clone(), core.handle()).unwrap();

    let items: AMessageStream = aconn.messages().unwrap();
    let signals = items.for_each(|m| {
        println!("Message from {:?} was: {:?}", m.sender(), m);
        //println!("{:?}", m.get_items());
        Ok(())
    });
    core.run(signals).unwrap();
}

Executing

echo 8000 | dbus-test-tool spam --dest=org.dbus.tokio.Test --no-reply --random-size --count=1

will only result in receipt ~every second time, because messages fill the socket with twice the amount of bytes than is processed per wakeup.

Observing same failure with sending is harder, because unlike reading, which is limited to 2048 bytes per iteration, send() tries to send everything at once, it just gives up until next iteration, if it managed to send more than 2048 bytes. So even big messages can be sent in one go, but if they aren't, writing the rest might not occur for a long time.

Simple PollOpt::level() hack like opts.remove(mio::PollOpt::edge()); opts.insert(mio::PollOpt::level()); appears to work, though. Though it should be checked more extensively, than I did.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 2, 2017

Contributor

Speaking of PollOpt hacks, I'm not sure delegating reregister() to self.register() is a good idea. The latter maps to EPOLL_CTL_ADD, while the former is expected to be EPOLL_CTL_MOD. I don't see a place where our reregister() might be called, but if it ever does get called, it will fail with EEXIST panic, due to the descriptor already being in the epoll set.

Contributor

albel727 commented Jul 2, 2017

Speaking of PollOpt hacks, I'm not sure delegating reregister() to self.register() is a good idea. The latter maps to EPOLL_CTL_ADD, while the former is expected to be EPOLL_CTL_MOD. I don't see a place where our reregister() might be called, but if it ever does get called, it will fail with EEXIST panic, due to the descriptor already being in the epoll set.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 2, 2017

Contributor

I assume ww.readable() == w.readable() || ww.readable() == w.readable() line in ADriver::modify_watch() was a copy-paste error. I think &&, not ||, and writable(), not readable() twice was meant to be there. Changed it and added to the PR #90.

But something is subtly broken anyway. I observe that aconnection_test() sometimes hangs, forever waiting for a reply from org.freedesktop.DBus::ListNames() call.

Contributor

albel727 commented Jul 2, 2017

I assume ww.readable() == w.readable() || ww.readable() == w.readable() line in ADriver::modify_watch() was a copy-paste error. I think &&, not ||, and writable(), not readable() twice was meant to be there. Changed it and added to the PR #90.

But something is subtly broken anyway. I observe that aconnection_test() sometimes hangs, forever waiting for a reply from org.freedesktop.DBus::ListNames() call.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Jul 2, 2017

Owner

Thanks a lot for your review, @albel727! I've merged your code, looks all good to me.

Owner

diwic commented Jul 2, 2017

Thanks a lot for your review, @albel727! I've merged your code, looks all good to me.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 3, 2017

Contributor

I have figured out the hanging bug. This is how it happens:

ADriver falls back to iter(0) when watched socket is not ready.

iter(0) calls dbus_connection_read_write_dispatch().

If the data arrived after the socket check but before dbus_connection_read_write_dispatch(), then the latter would empty the socket but wouldn't yield the message to filter_message_cb() until the next invocation of itself.

But the next invocation doesn't happen, due to to Nothing => return in handle_items().

So neither ADriver is polled again, because the socket is already empty, nor read_write_dispatch() is called by anything again.

And thus aconnection_test() hangs indefinitely.

iter(0) with read_write_dispatch() is not needed, and removing it and relying on watch_handle() and iter(None) with pure dispatch() fixes the bug.

Contributor

albel727 commented Jul 3, 2017

I have figured out the hanging bug. This is how it happens:

ADriver falls back to iter(0) when watched socket is not ready.

iter(0) calls dbus_connection_read_write_dispatch().

If the data arrived after the socket check but before dbus_connection_read_write_dispatch(), then the latter would empty the socket but wouldn't yield the message to filter_message_cb() until the next invocation of itself.

But the next invocation doesn't happen, due to to Nothing => return in handle_items().

So neither ADriver is polled again, because the socket is already empty, nor read_write_dispatch() is called by anything again.

And thus aconnection_test() hangs indefinitely.

iter(0) with read_write_dispatch() is not needed, and removing it and relying on watch_handle() and iter(None) with pure dispatch() fixes the bug.

@albel727 albel727 referenced this issue Jul 3, 2017

Merged

Fix poll hanging #92

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 3, 2017

Contributor

One way to look at it is that the root of the problem is incomplete event handling.

Either dispatch_status callback or wakeup_main callback or both would be triggered in the above situation, but we don't have them.

In our defence, they're not very straightforward to handle. I recall, one is not allowed to call libdbus functions from the callbacks, so any additional dispatching work has to be deferred in some way.

But I'm 50% sure, now that read_write() calls are avoided, nothing else would try to surreptitiously read the socket outside of dbus_watch_handle(). Only writing to it might happen elsewhere, and that shouldn't lead to failures, so we may be safe without the callbacks.

Another thing I'm uneasy about is the fact that watch settings may change during dbus_watch_handle(). And we call dbus_watch_handle() while iterating on a watch collection in ADriver, so our watch callbacks may happen to get called in order to change settings, which will mutate said watch collections.

But it doesn't immediately seem like anything bad will happen in the case of a single watched fd, which is apparently the only possible thing for client connections. And I have no time to reason about the correctness of iterating further.

Contributor

albel727 commented Jul 3, 2017

One way to look at it is that the root of the problem is incomplete event handling.

Either dispatch_status callback or wakeup_main callback or both would be triggered in the above situation, but we don't have them.

In our defence, they're not very straightforward to handle. I recall, one is not allowed to call libdbus functions from the callbacks, so any additional dispatching work has to be deferred in some way.

But I'm 50% sure, now that read_write() calls are avoided, nothing else would try to surreptitiously read the socket outside of dbus_watch_handle(). Only writing to it might happen elsewhere, and that shouldn't lead to failures, so we may be safe without the callbacks.

Another thing I'm uneasy about is the fact that watch settings may change during dbus_watch_handle(). And we call dbus_watch_handle() while iterating on a watch collection in ADriver, so our watch callbacks may happen to get called in order to change settings, which will mutate said watch collections.

But it doesn't immediately seem like anything bad will happen in the case of a single watched fd, which is apparently the only possible thing for client connections. And I have no time to reason about the correctness of iterating further.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 3, 2017

Contributor

so our watch callbacks may happen to get called in order to change settings, which will mutate said watch collections.

Well it might not propagate to the fds collection in ADriver until WatchFd is processed, which is only after iterating on fds, now that I think of it. But I'm not sure yet whether the rest of the collections are fine.

Contributor

albel727 commented Jul 3, 2017

so our watch callbacks may happen to get called in order to change settings, which will mutate said watch collections.

Well it might not propagate to the fds collection in ADriver until WatchFd is processed, which is only after iterating on fds, now that I think of it. But I'm not sure yet whether the rest of the collections are fine.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Jul 3, 2017

Owner

Thanks for the research. This sounds rather bad:

If the data arrived after the socket check but before dbus_connection_read_write_dispatch(), then the latter would empty the socket but wouldn't yield the message to filter_message_cb() until the next invocation of itself.

Why would libdbus not yield the message if it has (completely) read a message?

Owner

diwic commented Jul 3, 2017

Thanks for the research. This sounds rather bad:

If the data arrived after the socket check but before dbus_connection_read_write_dispatch(), then the latter would empty the socket but wouldn't yield the message to filter_message_cb() until the next invocation of itself.

Why would libdbus not yield the message if it has (completely) read a message?

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 4, 2017

Contributor

Why would libdbus not yield the message if it has (completely) read a message?

Beats me. Maybe they didn't think it would make a difference, if you loop it the standard while (read_write_dispatch()) { /*do work*/ } way. See for yourself in _dbus_connection_read_write_dispatch().

First they try to dispatch the already read messages. If there's none and it's because they failed to read them due to lack of memory, they sleep. Otherwise they try to read/write socket with _dbus_connection_do_iteration_unlocked(). The latter would also, via many delegates, call wakeup_main callback, if it managed to enqueue some messages to the incoming queue, so it's not like they don't warn us at all. :) Having done that, they return.

Contributor

albel727 commented Jul 4, 2017

Why would libdbus not yield the message if it has (completely) read a message?

Beats me. Maybe they didn't think it would make a difference, if you loop it the standard while (read_write_dispatch()) { /*do work*/ } way. See for yourself in _dbus_connection_read_write_dispatch().

First they try to dispatch the already read messages. If there's none and it's because they failed to read them due to lack of memory, they sleep. Otherwise they try to read/write socket with _dbus_connection_do_iteration_unlocked(). The latter would also, via many delegates, call wakeup_main callback, if it managed to enqueue some messages to the incoming queue, so it's not like they don't warn us at all. :) Having done that, they return.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 4, 2017

Contributor

to the incoming queue

And to avoid confusion, by that I mean not our pending_items queue, but the inner incoming queue of libdbus, from which it takes messages in dbus_connection_dispatch().

So if a message is read by read_write_dispatch(), only the next call to it, which would call dispatch(), would present it to us.

Contributor

albel727 commented Jul 4, 2017

to the incoming queue

And to avoid confusion, by that I mean not our pending_items queue, but the inner incoming queue of libdbus, from which it takes messages in dbus_connection_dispatch().

So if a message is read by read_write_dispatch(), only the next call to it, which would call dispatch(), would present it to us.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Jul 4, 2017

Owner

Hmm, sounds like we should insert an extra call to dbus_connection_dispatch after every call to dbus_connection_read_write_dispatch (in which case we can probably replace dbus_connection_read_write_dispatch with dbus_connection_read_write )?

Owner

diwic commented Jul 4, 2017

Hmm, sounds like we should insert an extra call to dbus_connection_dispatch after every call to dbus_connection_read_write_dispatch (in which case we can probably replace dbus_connection_read_write_dispatch with dbus_connection_read_write )?

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 4, 2017

Contributor

Hmm, sounds like we should insert an extra call to dbus_connection_dispatch after every call to dbus_connection_read_write_dispatch

I'd rather we never use dbus_connection_read_write_dispatch() at all in dbus-tokio. At least I don't see a place for it in the current architecture of things. And it wouldn't be literally just one call. More than one small message might have been buffered, so it's "keep calling as long as dispatch status is ffi::DBusDispatchStatus::DataRemains".

Contributor

albel727 commented Jul 4, 2017

Hmm, sounds like we should insert an extra call to dbus_connection_dispatch after every call to dbus_connection_read_write_dispatch

I'd rather we never use dbus_connection_read_write_dispatch() at all in dbus-tokio. At least I don't see a place for it in the current architecture of things. And it wouldn't be literally just one call. More than one small message might have been buffered, so it's "keep calling as long as dispatch status is ffi::DBusDispatchStatus::DataRemains".

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 4, 2017

Contributor

If you talk about replacing iter(timeout) with read_write(timeout) + while remains { dispatch() }, then I'm not sure there's really a benefit? It will only save us a few Nothings.

Contributor

albel727 commented Jul 4, 2017

If you talk about replacing iter(timeout) with read_write(timeout) + while remains { dispatch() }, then I'm not sure there's really a benefit? It will only save us a few Nothings.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Jul 5, 2017

Owner

If you talk about replacing iter(timeout) with read_write(timeout) + while remains { dispatch() }, then I'm not sure there's really a benefit? It will only save us a few Nothings.

Well, a few Nothings is at least something? :-) I'm thinking of people making loops like:

loop {
    for msg in conn.iter(0) { /* do something */ }
    /* process other things for a second */
}

In this case, messages will arrive faster if we change the order of read_write and while remains { dispatch }?

Owner

diwic commented Jul 5, 2017

If you talk about replacing iter(timeout) with read_write(timeout) + while remains { dispatch() }, then I'm not sure there's really a benefit? It will only save us a few Nothings.

Well, a few Nothings is at least something? :-) I'm thinking of people making loops like:

loop {
    for msg in conn.iter(0) { /* do something */ }
    /* process other things for a second */
}

In this case, messages will arrive faster if we change the order of read_write and while remains { dispatch }?

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 5, 2017

Contributor

To nitpick, that loop can't work the way you wrote it, since iter(0), unlike iter(None), never ends until the connection is closed and the "Disconnected" message is dispatched, which you surely know better than me. But let's assume you meant

for msg in conn.iter(0) { 
  match msg {
    /* process msgs and Nothings */
  }
  /* do something else */ 
}

Then yeah, the messages will arrive one iteration faster. Whether this is really worth it, I'm still not sure. If the user is OK with a second delay between each message, I'm not sure they'd be picky about latency anyway. But I guess it wouldn't hurt, so do as you wish.

I hadn't think it through before, but while remains { dispatch() } is not how it would look like inside next(). Trying one dispatch() after a fruitless read_write_dispatch() before giving up and returning Nothing is how it should probably look indeed.

So it's like you'd copy-paste the contents of None => {} case before return Some(Nothing). If there was a separate iter(None), user could achieve the same early message return effect with

for msg in conn.iter(0) { 
  let msg = match msg {
    Nothing => conn.iter(None).next().unwrap_or(Nothing),
    msg => msg
  };
  match msg {
    /* process msgs and Nothings */
  }
  /* do something else */ 
}

if they really wish it. Considering that we already require iter(None) in dbus-tokio, it may be another point in case for exposing it properly.

Speaking of required things. Another thing that seems to be required for dbus-tokio is a user-customizable callback for either "watches changed" notifications or for "pending_items not empty" notifications. At least I don't see how to wake up the ADriver otherwise in the "all watches disabled" case. So, which would you prefer to have?

Contributor

albel727 commented Jul 5, 2017

To nitpick, that loop can't work the way you wrote it, since iter(0), unlike iter(None), never ends until the connection is closed and the "Disconnected" message is dispatched, which you surely know better than me. But let's assume you meant

for msg in conn.iter(0) { 
  match msg {
    /* process msgs and Nothings */
  }
  /* do something else */ 
}

Then yeah, the messages will arrive one iteration faster. Whether this is really worth it, I'm still not sure. If the user is OK with a second delay between each message, I'm not sure they'd be picky about latency anyway. But I guess it wouldn't hurt, so do as you wish.

I hadn't think it through before, but while remains { dispatch() } is not how it would look like inside next(). Trying one dispatch() after a fruitless read_write_dispatch() before giving up and returning Nothing is how it should probably look indeed.

So it's like you'd copy-paste the contents of None => {} case before return Some(Nothing). If there was a separate iter(None), user could achieve the same early message return effect with

for msg in conn.iter(0) { 
  let msg = match msg {
    Nothing => conn.iter(None).next().unwrap_or(Nothing),
    msg => msg
  };
  match msg {
    /* process msgs and Nothings */
  }
  /* do something else */ 
}

if they really wish it. Considering that we already require iter(None) in dbus-tokio, it may be another point in case for exposing it properly.

Speaking of required things. Another thing that seems to be required for dbus-tokio is a user-customizable callback for either "watches changed" notifications or for "pending_items not empty" notifications. At least I don't see how to wake up the ADriver otherwise in the "all watches disabled" case. So, which would you prefer to have?

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 5, 2017

Contributor

At least I don't see how to wake up the ADriver otherwise

Well, actually there are other hacky ways, like never really disabling the PollEvented even if watches tell us otherwise, so it would poll us whenever there'd be data, but that's kinda ugly.

On the other hand, tokio never deregisters Eventeds internally, so the event loop keeps waking up on things even if nobody wants to read them. So at least we wouldn't be much worse than what's already there.

But then again, in tokio-s case it's at least mitigated by edge-triggering, so it doesn't wake up all that often. Our level-triggered fd will not let the event loop sleep at all, if there's as much as one byte ready for reading.

Contributor

albel727 commented Jul 5, 2017

At least I don't see how to wake up the ADriver otherwise

Well, actually there are other hacky ways, like never really disabling the PollEvented even if watches tell us otherwise, so it would poll us whenever there'd be data, but that's kinda ugly.

On the other hand, tokio never deregisters Eventeds internally, so the event loop keeps waking up on things even if nobody wants to read them. So at least we wouldn't be much worse than what's already there.

But then again, in tokio-s case it's at least mitigated by edge-triggering, so it doesn't wake up all that often. Our level-triggered fd will not let the event loop sleep at all, if there's as much as one byte ready for reading.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 6, 2017

Contributor

if there's as much as one byte ready for reading.

Which we'd be unable to consume, I must clarify. Oh, and

in which case we can probably replace dbus_connection_read_write_dispatch with dbus_connection_read_write()?

That depends on the actual code you have in mind, but no, I don't think this would be a good idea. We don't want to do IO and fill libdbus inner queue before we dispatch everything that's already in there, as it will only mean larger memory footprint with no benefits. Worst case, message dispatching will not be able to keep up with message arrival at all, and we'd have a bufferbloat on our hands.

So unconditional read_write() on every next() call is ruled out. And if you try to implement some sort of conditional logic like if dispatch_status != Remains { read_write() }; dispatch(), you'd find you've more or less reimplemented the read_write_dispatch(); if pending_items still empty { dispatch() } combo.

Contributor

albel727 commented Jul 6, 2017

if there's as much as one byte ready for reading.

Which we'd be unable to consume, I must clarify. Oh, and

in which case we can probably replace dbus_connection_read_write_dispatch with dbus_connection_read_write()?

That depends on the actual code you have in mind, but no, I don't think this would be a good idea. We don't want to do IO and fill libdbus inner queue before we dispatch everything that's already in there, as it will only mean larger memory footprint with no benefits. Worst case, message dispatching will not be able to keep up with message arrival at all, and we'd have a bufferbloat on our hands.

So unconditional read_write() on every next() call is ruled out. And if you try to implement some sort of conditional logic like if dispatch_status != Remains { read_write() }; dispatch(), you'd find you've more or less reimplemented the read_write_dispatch(); if pending_items still empty { dispatch() } combo.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 6, 2017

Contributor

Another reason not to use read_write() is that it's not equivalent in return value semantics to read_write_dispatch(). The former returns false when the connection is closed, whereas the latter waits not just for connection closure, but for incoming message queue exhaustion too.

Contributor

albel727 commented Jul 6, 2017

Another reason not to use read_write() is that it's not equivalent in return value semantics to read_write_dispatch(). The former returns false when the connection is closed, whereas the latter waits not just for connection closure, but for incoming message queue exhaustion too.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Jul 6, 2017

Owner

I meant like this:

for msg in conn.iter(0) {
    match msg {
        ConnectionItems::Nothing => /* do other things for a second */
        /* handle other types of ConnectionItems */
    }
}

To summarize:

  • We do need a iter(None), or iter_no_io().
  • Calling dispatch() after read_write_dispatch() is probably a good idea

I don't get the "All watches disabled" case. What is it that re-enables the watch, is it not during a call to dbus_connection_dispatch? Since we always dispatch while remains in dbus-tokio, the only thing we possibly get is a watch disabled and then re-enabled in the same iteration of ADriver::handle_items, which should not be a problem.

Owner

diwic commented Jul 6, 2017

I meant like this:

for msg in conn.iter(0) {
    match msg {
        ConnectionItems::Nothing => /* do other things for a second */
        /* handle other types of ConnectionItems */
    }
}

To summarize:

  • We do need a iter(None), or iter_no_io().
  • Calling dispatch() after read_write_dispatch() is probably a good idea

I don't get the "All watches disabled" case. What is it that re-enables the watch, is it not during a call to dbus_connection_dispatch? Since we always dispatch while remains in dbus-tokio, the only thing we possibly get is a watch disabled and then re-enabled in the same iteration of ADriver::handle_items, which should not be a problem.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Jul 6, 2017

Owner

the only thing we possibly get is a watch disabled and then re-enabled in the same iteration of ADriver::handle_items, which should not be a problem.

I think we should also call need_read in this case, so I pushed a patch that does this. It rarely happens though so I don't have a good test case for it.

Owner

diwic commented Jul 6, 2017

the only thing we possibly get is a watch disabled and then re-enabled in the same iteration of ADriver::handle_items, which should not be a problem.

I think we should also call need_read in this case, so I pushed a patch that does this. It rarely happens though so I don't have a good test case for it.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 6, 2017

Contributor

I meant like this

Yeah this is a receipt delay only when we're supposedly idle, but user still finds it OK to delay message reception for a second, so latency isn't their biggest concern. A message might come 5 milliseconds after they received Nothing, and this code considers it OK to not receive it faster by "doing other things for 5ms" instead of "for 1 second". One can argue that it's their conscious choice. But anyway "Calling dispatch() after read_write_dispatch() is probably a good idea", so yeah.

I don't get the "All watches disabled" case. What is it that re-enables the watch, is it not during a call to dbus_connection_dispatch?

Oh that's a fun thing really. They in fact don't re-enable the watch during dbus_connection_dispatch().

You see, as I mentioned in #92, there are bus_connection_set_max_received_size()/dbus_connection_set_max_received_unix_fds() limits.

And every received message counts toward it. But a message doesn't stop being counted toward it when dispatched. Even when messages lie in our pending_items, they refer to libdbus inner bytes/fds DBusCounter for incoming queue. They only decrement from it when they're freed, i.e. it happens on the last call to dbus_message_unref() somewhere in the user code. And when that happens, DBusCounter notifies libdbus inner code that rechecks whether the total size of remaining incoming messages is below the limit, so that watches can be re-enabled finally. So handle_items() and ADriver::poll() code is not executed anymore, when that happens.

Contributor

albel727 commented Jul 6, 2017

I meant like this

Yeah this is a receipt delay only when we're supposedly idle, but user still finds it OK to delay message reception for a second, so latency isn't their biggest concern. A message might come 5 milliseconds after they received Nothing, and this code considers it OK to not receive it faster by "doing other things for 5ms" instead of "for 1 second". One can argue that it's their conscious choice. But anyway "Calling dispatch() after read_write_dispatch() is probably a good idea", so yeah.

I don't get the "All watches disabled" case. What is it that re-enables the watch, is it not during a call to dbus_connection_dispatch?

Oh that's a fun thing really. They in fact don't re-enable the watch during dbus_connection_dispatch().

You see, as I mentioned in #92, there are bus_connection_set_max_received_size()/dbus_connection_set_max_received_unix_fds() limits.

And every received message counts toward it. But a message doesn't stop being counted toward it when dispatched. Even when messages lie in our pending_items, they refer to libdbus inner bytes/fds DBusCounter for incoming queue. They only decrement from it when they're freed, i.e. it happens on the last call to dbus_message_unref() somewhere in the user code. And when that happens, DBusCounter notifies libdbus inner code that rechecks whether the total size of remaining incoming messages is below the limit, so that watches can be re-enabled finally. So handle_items() and ADriver::poll() code is not executed anymore, when that happens.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 6, 2017

Contributor

Well, as an alternative to watches/pending_items notifications, we can try and screw libdbus counting by using dbus_message_copy() and dropping the originals. But that's an excess full copy, so there's that.

Contributor

albel727 commented Jul 6, 2017

Well, as an alternative to watches/pending_items notifications, we can try and screw libdbus counting by using dbus_message_copy() and dropping the originals. But that's an excess full copy, so there's that.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 6, 2017

Contributor

So

a watch disabled and then re-enabled in the same iteration

thing never really happens. But calling need_read()/need_write() like you did is the correct thing to do for proper handling. Now, another piece of the solution you're missing, is calling PollEvented::deregister(self, &self.core) on the dropped PollEvented before creating a new PollEvented, or you'll get EEXIST panic. I was still thinking how to solve the halted ADriver problem or I would've committed this already.

Contributor

albel727 commented Jul 6, 2017

So

a watch disabled and then re-enabled in the same iteration

thing never really happens. But calling need_read()/need_write() like you did is the correct thing to do for proper handling. Now, another piece of the solution you're missing, is calling PollEvented::deregister(self, &self.core) on the dropped PollEvented before creating a new PollEvented, or you'll get EEXIST panic. I was still thinking how to solve the halted ADriver problem or I would've committed this already.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 6, 2017

Contributor

You can test the halting driver problem by utilizing #93 and setting the byte limit to some small value. Say

conn.register_name("org.dbus.tokio.Test", 7).unwrap();
unsafe { ffi::dbus_connection_set_max_received_size(conn.conn(), 1); }

in my test server code above.

Contributor

albel727 commented Jul 6, 2017

You can test the halting driver problem by utilizing #93 and setting the byte limit to some small value. Say

conn.register_name("org.dbus.tokio.Test", 7).unwrap();
unsafe { ffi::dbus_connection_set_max_received_size(conn.conn(), 1); }

in my test server code above.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Jul 6, 2017

Owner

Hrm...thanks for the explanation. Annoying. :-/ This is just a bit of brainstorming, but would it work if we, in ADriver, wrapped Message into a newtype TokioMessage (or something), with Deref/DerefMut to Message, and on Drop called iter(None), task::notify, or something like that?

Owner

diwic commented Jul 6, 2017

Hrm...thanks for the explanation. Annoying. :-/ This is just a bit of brainstorming, but would it work if we, in ADriver, wrapped Message into a newtype TokioMessage (or something), with Deref/DerefMut to Message, and on Drop called iter(None), task::notify, or something like that?

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 6, 2017

Contributor

iter(None)

No, as we need to run modify_watch() from inside the ADrivers future task for it to be registered for wakeups, not just in whatever Drop is executed in.

task::notify()

Calling that on every Drop would mean a lot of spurious wakeups for ADriver under normal conditions.

It would mean an amply-sized TokioMessage(Message, Task) (Task is 72 bytes on x86_64. Ouch.), or at least TokioMessage(Message, Box<Task>) which is twice as large as simply Message and heap-allocates.

It would depend on user not doing anything with dbus_message_ref(), on continued nonexistence of shallow Message::clone() that would call dbus_message_ref().

And worst of all, TokioMessage would have to be not Send, otherwise if user passes it to another thread and drops there, it would be subject to a race condition:

  1. The dropping thread calls Task::notify() to awaken ADriver.
  2. The dropping thread happens to stop or not complete dbus_message_unref() call in time.
  3. ADriver's task wakes up as asked in another thread, but can't make any progress as the message is still alive, dies.
  4. The dropping thread unfreezes and frees the message, which puts a WatchFd to pending_items, but nobody is going to read it anymore.

I'm afraid I don't see anything working here, except an honest notification from watch update callback or at least some high-level callback of our own invention triggered after every pending_items.push().

Contributor

albel727 commented Jul 6, 2017

iter(None)

No, as we need to run modify_watch() from inside the ADrivers future task for it to be registered for wakeups, not just in whatever Drop is executed in.

task::notify()

Calling that on every Drop would mean a lot of spurious wakeups for ADriver under normal conditions.

It would mean an amply-sized TokioMessage(Message, Task) (Task is 72 bytes on x86_64. Ouch.), or at least TokioMessage(Message, Box<Task>) which is twice as large as simply Message and heap-allocates.

It would depend on user not doing anything with dbus_message_ref(), on continued nonexistence of shallow Message::clone() that would call dbus_message_ref().

And worst of all, TokioMessage would have to be not Send, otherwise if user passes it to another thread and drops there, it would be subject to a race condition:

  1. The dropping thread calls Task::notify() to awaken ADriver.
  2. The dropping thread happens to stop or not complete dbus_message_unref() call in time.
  3. ADriver's task wakes up as asked in another thread, but can't make any progress as the message is still alive, dies.
  4. The dropping thread unfreezes and frees the message, which puts a WatchFd to pending_items, but nobody is going to read it anymore.

I'm afraid I don't see anything working here, except an honest notification from watch update callback or at least some high-level callback of our own invention triggered after every pending_items.push().

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 6, 2017

Contributor

Well, maybe with TokioMessage(Option<Message>, Box<Task>) (which is 3 times the size of plain Message) you'd be able to avoid the race with something like

std::mem::drop(self.0.take()); 
self.1.notify();

in Drop, and it might even work, but I'd rather not resort to that.

Contributor

albel727 commented Jul 6, 2017

Well, maybe with TokioMessage(Option<Message>, Box<Task>) (which is 3 times the size of plain Message) you'd be able to avoid the race with something like

std::mem::drop(self.0.take()); 
self.1.notify();

in Drop, and it might even work, but I'd rather not resort to that.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 6, 2017

Contributor

Well, if we're going to heap-allocate anyway, TokioMessage(Box<(Option<Message>, Task)>) would at least be the same size as Message. But meh.

Contributor

albel727 commented Jul 6, 2017

Well, if we're going to heap-allocate anyway, TokioMessage(Box<(Option<Message>, Task)>) would at least be the same size as Message. But meh.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 6, 2017

Contributor

And yeah, unsafe impl Send for Message together with that unref notification business means, that pending_items access actually better be synchronized regardless of tokio use. Watch callbacks are called when the connection lock is held (for now at least, but the comments indicate that libdbus authors prefer that it wouldn't be the case), but filter callbacks are called when not under any lock, and neither is ConnectionItems::next() in general. So push(WatchFd) from some Message::drop() may coincide with default_filter_callback() pushing to/user popping messages from pending_items, with the hilarity that entails.

Contributor

albel727 commented Jul 6, 2017

And yeah, unsafe impl Send for Message together with that unref notification business means, that pending_items access actually better be synchronized regardless of tokio use. Watch callbacks are called when the connection lock is held (for now at least, but the comments indicate that libdbus authors prefer that it wouldn't be the case), but filter callbacks are called when not under any lock, and neither is ConnectionItems::next() in general. So push(WatchFd) from some Message::drop() may coincide with default_filter_callback() pushing to/user popping messages from pending_items, with the hilarity that entails.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Jul 8, 2017

Owner

Ok ok. So that means, either we need to protect pending_items with a Mutex, or somehow "send over" the watchfd things to the connection thread somehow. If we went for the mutex approach, could we also use a condvar to wakeup tokio?

Owner

diwic commented Jul 8, 2017

Ok ok. So that means, either we need to protect pending_items with a Mutex, or somehow "send over" the watchfd things to the connection thread somehow. If we went for the mutex approach, could we also use a condvar to wakeup tokio?

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 8, 2017

Contributor

If we went for the mutex approach, could we also use a condvar to wakeup tokio?

I'm not sure how? We can't block waiting for a condition inside ADriver, or the whole event loop will halt, so we'd need a dedicated thread to block on the condvar, and then I don't see how that would work at all, and why would we do that instead of notifying ADriver's task directly?

either we need to protect pending_items with a Mutex, or somehow "send over" the watchfd things to the connection thread somehow.

I doubt that deferred handling of events from watch callbacks via queue is a good idea at all. Not only it results in the halted driver problem, but also when DBusConnection disconnects, it notifies us about watch removal and then immediately closes the file descriptor. This means that if we don't deregister the fd from epoll in the watch callback, then it can't be deregistered anymore. Luckily, if the fd is not dup()ed, it will effectively deregister itself, and it's surely not dup()ed.

But here's the problem. The current WatchFd model conflates watch removal and watch disablement, so we can't distinguish those two cases. And in the case of watch disablement we want to be sure to have the fd deregistered, or the event loop will never sleep. But if an incoming WatchFd(false, false) turns out to come from watch removal, and we try to deregister the fd after it was closed, we either get a ENOENT panic, or unwittingly deregister some new fd that happened to reuse the descriptor number.

Luckily, as far as I can tell, watch removals should now happen only while we're still in ADriver::poll(). But if we expose dbus_connection_close() in some form to the user, then ADriver in its current state wouldn't be able to handle connection closure at all, as events from fd will not wake it up anymore.

If instead of scraping the queue for WatchFds in ADriver we'd use synchronous handling of watch callbacks, then fd registration could be handled correctly and the handler would be able to wake up ADriver when needed. Of course that would mean that ADriver::fds will need to be guarded by a mutex too, and that ADriver's Task handle somehow should be accessible to the watch callback.

Contributor

albel727 commented Jul 8, 2017

If we went for the mutex approach, could we also use a condvar to wakeup tokio?

I'm not sure how? We can't block waiting for a condition inside ADriver, or the whole event loop will halt, so we'd need a dedicated thread to block on the condvar, and then I don't see how that would work at all, and why would we do that instead of notifying ADriver's task directly?

either we need to protect pending_items with a Mutex, or somehow "send over" the watchfd things to the connection thread somehow.

I doubt that deferred handling of events from watch callbacks via queue is a good idea at all. Not only it results in the halted driver problem, but also when DBusConnection disconnects, it notifies us about watch removal and then immediately closes the file descriptor. This means that if we don't deregister the fd from epoll in the watch callback, then it can't be deregistered anymore. Luckily, if the fd is not dup()ed, it will effectively deregister itself, and it's surely not dup()ed.

But here's the problem. The current WatchFd model conflates watch removal and watch disablement, so we can't distinguish those two cases. And in the case of watch disablement we want to be sure to have the fd deregistered, or the event loop will never sleep. But if an incoming WatchFd(false, false) turns out to come from watch removal, and we try to deregister the fd after it was closed, we either get a ENOENT panic, or unwittingly deregister some new fd that happened to reuse the descriptor number.

Luckily, as far as I can tell, watch removals should now happen only while we're still in ADriver::poll(). But if we expose dbus_connection_close() in some form to the user, then ADriver in its current state wouldn't be able to handle connection closure at all, as events from fd will not wake it up anymore.

If instead of scraping the queue for WatchFds in ADriver we'd use synchronous handling of watch callbacks, then fd registration could be handled correctly and the handler would be able to wake up ADriver when needed. Of course that would mean that ADriver::fds will need to be guarded by a mutex too, and that ADriver's Task handle somehow should be accessible to the watch callback.

@albel727

This comment has been minimized.

Show comment
Hide comment
@albel727

albel727 Jul 8, 2017

Contributor

Luckily, as far as I can tell, watch removals should now happen only while we're still in ADriver::poll()

Ah, no, I forgot about send() that we do outside of ADrivertask.

Contributor

albel727 commented Jul 8, 2017

Luckily, as far as I can tell, watch removals should now happen only while we're still in ADriver::poll()

Ah, no, I forgot about send() that we do outside of ADrivertask.

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Jul 8, 2017

Owner

We do need a iter(None), or iter_no_io().
Calling dispatch() after read_write_dispatch() is probably a good idea

These two are now pushed.

Owner

diwic commented Jul 8, 2017

We do need a iter(None), or iter_no_io().
Calling dispatch() after read_write_dispatch() is probably a good idea

These two are now pushed.

@manuels

This comment has been minimized.

Show comment
Hide comment
@manuels

manuels Jul 30, 2017

Contributor

Just a quick comment: I played around with the dbus-tokio code the last days and it works quite well! So far no compains ;)

Contributor

manuels commented Jul 30, 2017

Just a quick comment: I played around with the dbus-tokio code the last days and it works quite well! So far no compains ;)

@diwic

This comment has been minimized.

Show comment
Hide comment
@diwic

diwic Aug 24, 2017

Owner

I'm going to close this issue as we now have something up and running and nobody seems to be actively complaining or requesting new features.

I've opened #99 for further discussion/fix of the watchfd issues.

Thanks everybody for helping out!

Owner

diwic commented Aug 24, 2017

I'm going to close this issue as we now have something up and running and nobody seems to be actively complaining or requesting new features.

I've opened #99 for further discussion/fix of the watchfd issues.

Thanks everybody for helping out!

@diwic diwic closed this Aug 24, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment