-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Add stream method to channel #12
Conversation
Hum, seems like I ran rustfmt which explains most of the modificiations. Is there a specific reason why the repo is not rustfmt'ed? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi! Thanks for looking into this - this is definitely something super-cool to have! I recently thought about it myself, and basically about the same kind of implementation (store the last Future
inside the Stream
). This should then also enable the use of those channels as trait objects, which is very good to have.
There is question about the logical behavior mentioned in the review: I'm currently not sure if storing the Future
for a longer amount of time could have any unexpected side-effect (other receivers can't progress). I would need to read some code again for this.
The general concept and pinning looks reasonable from my side. But maybe @Nemo157 who looked before at Future/Stream conversions could take a short peek and give an opinion on whether it looks correct. It might also be easier if this would start using one of the pin projection crates. But since those are also not yet used inside the remaining codebase that is certainly not a must-have for this change.
Hum, seems like I ran rustfmt which explains most of the modificiations. Is there a specific reason why the repo is not rustfmt'ed?
No. The only reason is I didn't spend the effort on figuring out how to use it when I wrote it. I'm definitely good with having things formatted. However it might be a bit nicer to keep formatting and new content in separate CRs, to make things easier to review.
A: RingBuf<Item = T>, | ||
{ | ||
channel: Option<&'a GenericChannel<MutexType, T, A>>, | ||
future: Option<ChannelReceiveFuture<'a, MutexType, T>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That definitely seems the way to go for a Future
-> Stream
adapter. From a pinning point of view it looks correct, since the Stream
also needs to be pinned.
What I'm however not yet sure is whether it's ok from a logical point of view. If a user would start polling on the Stream
but not getting an item, the Future
would still be stuck in a signaled state in the Stream. If that prevents other waiters on the Stream
from getting woken up it might be problematic. I think I would need to review the acutal wakeup logic to understand the impact - e.g. whether the Channel
kept track of Future
s it notified but which did not yet pick up an item. The Mutex
implementation definitely does this in order to guarantee fairness. The Channel
might need for rendevousz semantics?
It had been a while since I last looked into it. Will do on the weekend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So a scenario would be user create a stream, polls the stream once, which returns Poll::NotReady
then drops the stream future. The inner future is stuck in the stream and might be blocking subsequent futures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. This can also happen with the Future
based API. But there you are a lot more likely to see that an old Future is still around - especially since you can't move the Futures. In the Streams it is somewhat hidden.
But now since I wrote that: At least the Stream also needs to be pinned and can not be moved - so the user moving around the Stream without knowing that some registration is still active is not possible.
I'll write a separate PR for |
If you merge said |
723d67d
to
b497f49
Compare
I have reviewed the code again. As mentioned, there could be an issue when someone starts reading from the That means e.g.
then the second task will wait for the notification until the If during the situation another producer would send something to the channel, it would
Then that task would actually extract the old value, and the value from the second sender is shifted into the channel buffer. Therefore it seems like we might only lose the buffer and not run into a permanent deadlock state, but I'm still not 100% sure. Need to think further about other edge-cases. In order to avoid the issue totally each send could wake-up all stored receivers instead of only a single one. However that would introduce a lot of churn on normally used channels with multiple producers and consumers. E.g. if there are 5 consumers
which means lots of churn in the executor. Therefore I'm not too keen on implementing a change to wake-up all receivers. |
Lets say we did would run into a permanent deadlock, couldn't we circumvent it with a drop implementation so that the future cleans up after himself? For instance, by retrieving the value on drop by polling one last time? |
Where would you The concern here is mainly that there is some hidden state in the |
I did some very basic testing in e96b7ff and it seems like for the low level future API its not a big deal. From what I understand a forgotten future (send & receive) will eat up one wakeup, but everything works as normal after that. The values stored in the buffer won't be affected so its basically like a wakeup was lost in the aether. If a receive future is forgotten, the next time a receive future is called, the issue is fixed because the buf contains an element. The same goes of a send receive. |
Lost wakeups can lead to deadlocks if the task is not concurrently doing anything else or never encounters a spurious wakeup. That could be allowable though, leaking a future/stream instead of dropping it already sort of violates the expected usage. I'll try and take a look at this in a bit. |
I think I now have better understanding on what will happen regarding the lost wakeups: Every lost waker will cause one sender to be blocked. However on each consecutive That means in my understanding the maximum amount each non-used wakeup would cause an exactly on element in the channel being not used, but successive reads and writes should be succesful. That means a latency of 1 element would be introduced into the system. I think that might be acceptable in typical MPMC workflows - where elements are unrelated and latency is likely unpredictable. In single consumer workflows this should never happen, since that would mean the only consumer would not read from the channel. For To answer the question from yesterday why we don't run into the debug assert: The receivers always read from elements in the buffer. While reading one element from the buffer, they then also copy one element from the oldest send waiter into the buffer in order to unblock one more sender => There is no direct handover from a sender to the receiver. The buffer is always used in between. |
For a continuous workflow this is acceptable indeed. For temporary workflow, one of the sender would close the channel after its done which would wakes up all waiters. |
d079f5a
to
4a5ab99
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor comment improvements. Maybe you can merge those
I was away for a while and I forgot about this PR. What do you think would be necessary before this can be merged? What kind of additional tests do you think would be needed? |
f289447
to
03a2bc3
Compare
Sorry for the late feedback. I convinced myself that this is good to have that way. If we just degenerate on buffers instead of locking it seems acceptable. And @Nemo157's comment that even locking might be acceptable due to the fact that not using a One random thought I had was whether the Another random thought I had is that the |
It would be interesting it we could add a compiler warning if the user misuses the stream. Kinda like edit: It probably impossible to do that statically. |
Ah, good point around testing. I just reviewed again what this has and have added a few suggestions based on it. Apart from that I think just checking what the other channel tests have, and whether we miss an importatant edge condition would be good. But since we just delegate to the existing stream most things should be pretty well covered. |
That sounds cool. But I have no idea how, since it's a dynamic property. It would somehow need to track that an operation returned I guess runtime checks could be used: When the object last retuns |
03a2bc3
to
d6bb965
Compare
360d294
to
e577b5f
Compare
I also added some very vague doc concerning the |
This method returns a type which implements Future::Stream.
e577b5f
to
cabfbcc
Compare
I merged this now. Thanks for working on it! |
This method returns a type which implements Future::Stream.