Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages priority & inbox unification #85

Merged
merged 72 commits into from
Jun 22, 2022
Merged

Messages priority & inbox unification #85

merged 72 commits into from
Jun 22, 2022

Conversation

Restioson
Copy link
Owner

@Restioson Restioson commented Jun 13, 2022

Summary

This change unifies the three existing sources of messages (immediate notifications, address messages, and broadcasts) for an actor to handle into one big channel implementation in inbox.rs. It additionally upgrades the capabilities of immediate notifications, address messages, and broadcasts with support for assigning priorities to them. An ordered queue of default-priority messages from addresses is still kept to provide FIFO ordering over default-priority messages, as binary heaps do not preserve this order, so the address message priority queue cannot be used.

Motivation

Pros

  1. Merge barrage & flume into one channel that is simpler than the sum of its parts (from a behind-the-scenes perspective). This can be done since the resulting channel is specialised for xtra. The downstream channel code of xtra is a lot simpler as a result
  2. Support for priority messages. This can be used to make sure that important tasks (like pings/health checks) are not missed whilst handling other messages.
  3. Less memory overhead. Since the inbox channel is specialised to xtra, much more can be exposed. We can reuse its inner allocation for all kinds of things, including reference counting. The result is that Address can be pointer sized!
  4. Less polling overhead. A minimal amount of work can be done in each poll whilst still easily maintaining priority invariants. Contrast this to the implementation before which required either a defined order of broadcast and select passed to futures::select, or try_recvs to be interspersed to prevent starvation.
  5. Fewer special cases in the manage loop. Because anti-starvation polls can be removed, there are fewer special cases in the manage loop. This helps to bring us closer to something like Refactor context #51 with less boxing overhead. It also simplifies join and select - the difference is night and day!
  6. Flexibility. The flexibility of being able to implement whatever we need from a channel, even if it is something super specific (like storing specialised reference counts inside), is very nice.

Cons

  1. More code to maintain in xtra. Xtra now needs to maintain channel code upstream. However, the code that needs to be maintained is a lot simpler as it's very specialised and only needs to support exactly what xtra needs - e.g, support for synchronous sending, receiving, and channel draining can all be dropped.
  2. Immaturity. Flume is a more mature channel than the new inbox channel is. This can hopefully be combatted with a robust suite of tests in xtra itself, which would benefit the library in any case, even if we decide to stick with flume/barrage.

Conclusion

Personally, I believe that the pros outweigh the cons, so I began to implement this proof of concept.

Channel design

I leaned heavily on my experience working on flume and barrage to create this channel. I borrowed the idea of simply locking the inner channel from flume, since as far as I understand it, channels will always have contention on the head and tail, and whether or not this is resolved through lockless CAS loops or simple locks, this will usually be the limiting factor in speed. Therefore, what is far more important than atomic trickery is just reducing contention on the lock itself. Using a lock really does not produce performance that far off of atomic trickery anyway, and is more than sufficient for this use case. This allows the code to be much simpler to reason about compared to a lockless channel written with the same level of features provided, and it can be in fully safe Rust.

The main channel is locked with a std Mutex, as contention is expected (though, not for that long). One consideration could be to use a fair Mutex here. The individual broadcast channels and other things that are not expecting to be heavily contended are locked with Spinlocks, as this minimises the amount of time needed to unlock when no contention is present.

Diagram of channel operations

An activity diagram of all of the channel operations

A UML activity diagram of the channel operations

Unresolved questions

  • Use a fair mutex or a parking lot mutex for the inner channel? Can be resolved in the future if there is interest.
  • Does broadcast need a bound?
  • Does priority stealing heap need a bound? This could make sense for messages with a below-default priority.
  • Is sink impl flexible enough? Can be resolved in the future if there is interest.
  • Does MessageChannel not having a broadcast block this PR or not?

TBD

  • Pop waiters in priority-order

  • Lifecycle management (drop notice & last strong address handling)

  • Add some tests from Refactor context #51

  • Rename StolenMessage and friends to something else

  • Documentation

    • Resolve all TODO(doc) comments
    • Document how the channel works and handles bounding, backpressure, disconnection, and message order
  • Sink impl

  • Priority sending API

    • Broadcast
    • To one (i.e remove send_priority in favour of send(...).priority(..))
  • Tests

    • Sink
    • Broadcast
    • Handle order
    • Priority send future wait order
    • MessageChannel send priority

@Restioson Restioson changed the title [POCMessages priority & inbox unification [POC] Messages priority & inbox unification Jun 13, 2022
Optimised for the case that most priorities are the same - in this way it is optimistic that the user is not mixing and matching priorities a huge amount.

I'm not sure if this is actually better in a real world case, but it should beat std's binary heap for all *existing* Address patterns, as they all use effectively the same priority. This could change in future, though, so std's might actually be better in those cases. This brings us down from 320ns+ to 200ns
@thomaseizinger
Copy link
Collaborator

Interesting. I've also considered suggesting the idea of message priorities to replace the concept of "self notifications" but I wasn't sure if this is a feature worth having. I am imaging that it could be confusing for users if they have to choose a priority so I'd vote for a good default here (probably lowest priority).

Copy link
Collaborator

@thomaseizinger thomaseizinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this might also completely replace #51?

Very interesting work, I'll tag along :)

src/address.rs Outdated Show resolved Hide resolved
@Restioson
Copy link
Owner Author

Restioson commented Jun 14, 2022

I guess this might also completely replace #51?

The plan would be either to rebase #51 on this, or do essentially what #51 does, since the API is good, but based on the master after this is merged. It might just be more work to get #51 rebased, though, so my plan is to try get this in and then evaluate.

@Restioson Restioson mentioned this pull request Jun 14, 2022
@Restioson
Copy link
Owner Author

As far as default priorities go, I am planning to add a separate queue for default priority messages, as binary heaps are not stable with insertion order, and this is important for regular, non-priority based messages

Default queue preserves order unlike the binary heap does. Plus, my implementation of the binary heap was subtly wrong. It would have been fixable, but not really worth it, considering that now that default queue exists, most priority messages probably won't have the same priority (or will be some infrequent that it doesn't matter), so std's strategy will probably beat it out anyway.
This helps handle its drop better (future change)
This should help reduce some out of order receiving when used properly. Theoretically, if `cancel` is used prior to cancelling a `ReceiveFuture`, then out of order messages should never happen. I only expect this to actually become a problem when custom event loops are implemented, or with `select`/`join` being used
@Restioson Restioson added this to the 0.6.0 milestone Jun 16, 2022
Also added example to join and refactored it to fix inbox-refactor changes well

# Conflicts:
#	Cargo.toml
#	src/address.rs
#	src/context.rs
Copy link
Collaborator

@thomaseizinger thomaseizinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've given this a (mostly stylistic) review, take as you wish :)

I think I understand the overall design and agree with your conclusion that a specialised impl is better than composing two libraries.

One question: Do you think it would make sense to try making a design that is based onstd::task::Waker for waking the correct task to pop the next message?

Instead of remembering the sender/receiver we need to fulfill, storing a waker from the async context when the particular sender/receiver tried to read/write a message could be used to wake exactly that task to try its operation again (which should succeed then).

I've seen such a design in libp2p-mplex to implement a multiplexer on top of TCP with individual substreams.

I am thinking that such a waker based approach could be simpler because we might not need the "try fulfilling" logic at the time of sending a message but instead just need to consume the waker.

Thoughts?

src/address.rs Outdated Show resolved Hide resolved
src/address.rs Outdated Show resolved Hide resolved
src/address.rs Outdated Show resolved Hide resolved
src/context.rs Outdated Show resolved Hide resolved
src/context.rs Outdated Show resolved Hide resolved
src/inbox.rs Outdated Show resolved Hide resolved
src/inbox.rs Outdated Show resolved Hide resolved
src/inbox/rx.rs Outdated Show resolved Hide resolved
src/receiver.rs Outdated Show resolved Hide resolved
src/send_future.rs Outdated Show resolved Hide resolved
@Restioson
Copy link
Owner Author

Restioson commented Jun 17, 2022

Instead of remembering the sender/receiver we need to fulfill, storing a waker from the async context when the particular sender/receiver tried to read/write a message could be used to wake exactly that task to try its operation again (which should succeed then).

As I understand it, this is basically what we do. The difference is a slight implementation thing, I think. Try fulfill can fail in the following ways:

  1. Nothing waiting (shared failure case)
  2. Waiting future was dropped (this is why it is a weak).
  3. Waiting future was canceled (this is what WakeReason::Canceled is for)

2 & 3 essentially only exist to move the cleanup of deregisteted waiters to the send rather than receive - this way, drop doesn't need to iterate through all waiters to see which it is. The caller of try fulfill does not observe failures of 2&3 since it will retry until the first failure case is hit, which cannot be helped.

Otherwise, is the difference that this solution skips the receiver or sender having to lock the channel again after waking to compete its operation? I'll elaborate further if this is what you are referring to when I'm back at my PC.

Can you elaborate more on how the other solution would differ?

@thomaseizinger
Copy link
Collaborator

thomaseizinger commented Jun 17, 2022

Can you elaborate more on how the other solution would differ?

It would use the async runtime scheduler (like tokio) to figure out, which task needs to be polled again to read the next message from the heap via a waker that is woken.

The workflow would be something like:

  • An event-loop calls next on the inbox's receiver via some kind of poll_next function
  • We check if we have an item on the heap.
    • If not, we grab a Waker from the std::task::Context and store it in a queue.
    • If we have an item, pop it off and return it as Poll::Ready

If a new item gets put on the heap, we check if we have a pending Waker. If yes, pop the Waker off and call wake on it. This should wake the task which will try to call poll_next again and read that item from the heap.

I think one advantage of this approach could be that a task that is waiting for a new message (i.e. an event-loop) will then be able to do more work within a single poll without context switching, i.e. once woken, a message will be popped off and the handler should be invoked immediately until that one returns Pending.

On a different note, the implementation may be easier to understand but that is a taste thing. I am biased because most of https://github.com/libp2p/rust-libp2p is implemented as manual, poll-based state machines :)

@Restioson
Copy link
Owner Author

Restioson commented Jun 17, 2022

I had just finished writing out a detailed comment of the differences between the two strategies, and then my power cut out 😢. I'm on my phone now and will summarize. The solution you are proposing differs in that the receiver must wake up, lock the channel, and try take off a message. This contends with other receive operations, and another receiver might have taken the message that the receive future was woken for from the queue already, resulting in a spurious wakeup. So, keeping a slot that is fulfilled reduces spurious wakeups as well as contention on the inner channel. This is the same strategy used by flume. I'm not sure what you mean by doing less work - as far as I understand, it would be doing more work given it has to lock the contended channel as opposed to the uncontended waiting receiver slot.

PS: there is a diagram for how the receive currently works in the original PR comment, which shows how the waiting is out of the critical section of the main channel lock

@thomaseizinger
Copy link
Collaborator

Interesting, thanks for the detailed response.

I guess the wake-up strategy only makes sense when there is a dedicated receiver for an item like a substream in the case of a multiplexer and the item cannot be handled by someone else.

@Restioson
Copy link
Owner Author

I believe if each item has a designated receiver at send time, then there will be no spurious wakeups, and probably less contention on the lock around whatever contains that item too, yes.

@Restioson
Copy link
Owner Author

Restioson commented Jun 17, 2022

Power's back, here's what was saved as draft by GitHub, for posterity:

  • An event-loop calls next on the inbox's receiver via some kind of poll_next function
  • We check if we have an item on the heap.
    • If not, we grab a Waker from the std::task::Context and store it in a queue.
    • If we have an item, pop it off and return it as Poll::Ready

This is very similar to the way receive is currently implemented, as per the diagram. The only difference is in the last step. What is done currently instead is:

  • An event-loop calls next on the inbox's receiver via some kind of poll_next function
  • Is this first poll?
    • If so, check if there is an item on the heap.
      • If there is, grab it and return Ready.
      • If not, we grab a Waker from the std::task::Context and store it in a queue along with a slot for a message.
    • If not, that means we were registered as waking and have been woken by a sender. Check the slot for a message and return it.

Restioson and others added 3 commits June 21, 2022 14:00
Thanks beta clippy!
* Initial draft of broadcast future

* Add some docs

* Add TODOS

* Add docs for `BroadcastFuture`

* Use `Priority::default` in `BroadcastFuture`
@Restioson
Copy link
Owner Author

Restioson commented Jun 21, 2022

I came up with some changes that allow for MessageChannel to have a broadcast method, but there is one issue: it is not trivial to restrict Return = () as this is not supported in where clauses (rust-lang/rust#20041). Self: MessageChannel<M, Return = ()> also does not work as this is not object safe.

I am not sure if this is a blocker for this PR, since it is still possible to make a fan out wrapper. Personally, I'd say that it's something that could go in a followup, as it's not really an issue with the channel here but rather that the API of MessageChannel as is was never going to be able to support this. Therefore, from my side I am ready to merge. Let me know what you think @thomaseizinger and maybe we can merge soon!

Copy link
Collaborator

@thomaseizinger thomaseizinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few doc nits but happy to merge this!

src/address.rs Outdated Show resolved Hide resolved
src/address.rs Outdated Show resolved Hide resolved
src/message_channel.rs Outdated Show resolved Hide resolved
src/send_future.rs Outdated Show resolved Hide resolved
Comment on lines +63 to +65
impl<R, F, TResolveMarker> SendFuture<R, F, TResolveMarker>
where
F: SetPriority,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would have probably created two impl blocks instead of a trait but that is more of a style question.

This solution might be slightly more expensive to compile.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally planned to use SetPriority to let BroadcastFuture with erased actor type set priority too. I can push that branch but it didn't work out for other reasons that I discussed, so we can move to two impl blocks maybe

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leave this as-is for now since when messagechannels get refactored they could use SetPriority too

tests/basic.rs Outdated Show resolved Hide resolved
tests/basic.rs Show resolved Hide resolved
@thomaseizinger
Copy link
Collaborator

thomaseizinger commented Jun 21, 2022

I came up with some changes that allow for MessageChannel to have a broadcast method, but there is one issue: it is not trivial to restrict Return = () as this is not supported in where clauses (rust-lang/rust#20041). Self: MessageChannel<M, Return = ()> also does not work as this is not object safe.

This should be solvable if we move away from a trait object and introduce a dedicated type right? Then we can restrict where this type can be constructed with bounds like Handler<M, Return = ()>.

This might also solve #68.

Restioson and others added 2 commits June 21, 2022 22:28
Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
@Restioson
Copy link
Owner Author

This should be solvable if we move away from a trait object and introduce a dedicated type right? Then we can restrict where this type can be constructed with bounds like Handler<M, Return = ()>.

That's true. If so, that would be for a followup PR.

Copy link
Contributor

@klochowicz klochowicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really cool! I have looked at it mostly to understand xtra internals a bit more, I'm looking forward to using these features! :)

tests/basic.rs Outdated Show resolved Hide resolved
src/context.rs Outdated Show resolved Hide resolved
@Restioson Restioson merged commit 2f0f9f0 into master Jun 22, 2022
@Restioson
Copy link
Owner Author

🎉

@Restioson Restioson deleted the inbox-refactor branch June 22, 2022 19:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants