-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Stream.Cancel() #5
Conversation
I think we should add "Cancel" to Stream instead of changing how Remove() before Close() works, feels more explicit about this new behavior and makes us less likely to break existing behavior. Remove() can already be called before Close, it'll wait for everything to Close writers & readers. I think Cancel could work like:
Cancel() is "stop writing, prevent new readers, prevent existing readers from reading further and drop them, remove underlying file" Close() is "stop writing, allow new readers, allow existing readers to read to end of written content" Remove() is "prevent new readers, wait for existing readers & writer to finish, remove underlying file" -- I really need to add more test cases to assert this behavior, sorry! What do you think of that? |
Also meant to say, thanks for sending a pull request! I appreciate it! |
Good call, I've updated the behaviour to your spec and added missing tests for Remove I've also added additional guard to not write after closing stream |
@djherbis I've also added a possibility to cancel reader, hope you'll find time to review it next week |
Hey, I've done some reviewing (haven't released it yet). Going a little slow since I've been busy with work / travel the past couple days (had a lot of work to catch up on after returning from holidays, GitHub is my side projects). The code is also a little tricky in parts since there is some caveats with all the concurrency involved (what invariants need to hold, what needs to be guarded by a Read/Write Lock, atomics, other sync stuff etc.) I may just make a pull req. to your branch to contribute my suggestions back, I've tried to make some simplifications to hopefully make it easier to reason about too. I'm assuming Cancel() on a Reader is just to unblock blocking reads & close the Reader? How quickly do you want Stream.Cancel() to return? |
// It immediately cancels all pending reads | ||
func (r *Reader) Cancel() error { | ||
r.cancel = true | ||
r.s.b.Unblock(r.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will unblock all blocking all Readers (thanks to Broadcast), I wish there was a way to signal a specific one to unblock as unblocking all readers just to check if a single one has canceled is a bit of a pain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I'm aware. But there's no easy way to implement canceling on readers other way, no? I think unblocking all readers upon canceling single one is fair tradeoff, especially because they are continously unblocking on every write. I think the performance won't hurt that much and scales linearly with number of readers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There isn't a nice way to do it in Go right now, something like golang/go#16620 would be nice, but it's just a proposal. I'm not criticizing your implementation, just lamenting that I wish this didn't have to wake all blocking reads for nothing (would be great if we had something like cond.SignalWaiter(waiterId) or similar).
The main diff. I feel here is that yes all Writes broadcast, but every woken Reader likely has something to do (read the written data). In this case, it's very unlikely that each woken Reader has something to do.
Do you have an immediate use case for this feature? I'm also wondering about the case where the trigger for Reader.Cancel() might happen for all the Readers, causing n Readers to all call broadcast => n^2 wakeups.
Also, what's your use case for Stream.Cancel feature as well, it's always nice to know the problem we're solving.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm using stream for work queue where client is sending streaming work request and workers can subscribe for this work request (and perform work). Let's say it's video processing and workers are converting video to multiple resolutions.
Stream.Cancel() is used for cancelling request for work, which should cancel all subscribers as well, so they stop performing work they maybe started already doing (e.g. someone removed video immediately after uploading, before workers converted it).
Workers can fail or hang for some reasons (e.g. broken network or deadlock). Server should note it, cancel reader, and re-try work on other node, but most importantly it should not expect failed worker to perform any more Read from the stream, so ignoring unfinished read when e.g. waiting for all readers to finish when doing Remove.
If Stream.Cancel() doesn't work, workers perform unnecessary work.
If Reader.Cancel() doesn't work, server will wait forever for broken workers to finish read and call .Done() on WaitGroup, what will never happen.
So all to all, I'm thinking just doing .closeUnsafe() and decrementing waitgroup instead of unlocking reader should suffice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writing this makes me want a Stream.Wait() method that just waits for all readers to finish, without Removing the file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With Stream.Remove() you get the guarantee that no new Readers will be created, for Stream.Wait() are you okay with the race between the return from Stream.Wait() and the possibility of new Readers being started?
This kind of race is normal with sync.WaitGroup, it's usually prevented by insuring you don't call "Add()" after Wait() might have completed. In this case, it would mean you would not call NextReader() after Wait() has been called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's forget about Wait() for now, I'd leave it for later as it needs such discussion
Feel free to make any changes :) There's no rush as well.
Yes, it should unblock it, return ErrCanceled error from last read (kind of important, because I don't want reader of Reader to think it reached end of stream, e.g. ioutil.ReadAll should error when cancelling reader), close the reader and return any error if closing fails.
I suppose after writer and all readers are fully cancelled and file is removed
Touche. Maybe add wg.WaitGroup un sync.go and and wg.Done() in Close() (even if closing fails)? Then wg.Wait() for it after performClose() in Cancel of Stream. If any cancelation fails, then return immediately with error, but skip removing file. |
The WaitGroup waits for all file handles to be closed, and the places that call Wait() guarantee that no more handles will be opened (In my unreleased edits I've rewritten this to make it that clearer), so we know it's safe to call os.Remove after Wait(), the problem is Wait(). How would you detect that Cancellation has failed? We don't really have a deadline for them to finish, I'm considering just having Cancel() not do the Remove(), make it non-blocking, and have it so you can call Remove() optionally after Cancel(). Only other options is to track all the open file handles in Stream, and force close them there, though I'm not sure about the bookkeeping cost of this. |
Optional removal after cancellation can be useful as I imagine someone would want to inspect partial write to check what caused stream to hang or process too slowly, so I'd opt-in for this one. |
Instead of adding Reader.Cancel() I just made Reader.Close() unblock blocking Reads. Let me know if this works for what you want. |
Great work :) My comments:
I'll report any issues I have when I'll use it in my project. |
The mutex used for Reader.Read is only there to gate concurrent calls to Read() (also protecting Reader.readOff from concurrent access). I don't think I originally specified that Read/Close should be safe to call concurrently, the Reader was only meant to be concurrent safe with Stream operations. That being said, with Cancel closing Readers, I guess we have introduced the possibility for concurrent Read/Close, it looks like it might be safe to call Close() and Read() concurrently in go1.10, but I won't rely on that. Just submitted a fix for that. Can you elaborate on when you think closeOnce.Do doesn't need locking inside? Do you mean the locking done by DropReader? |
closeOnce.Do is used two times:
It's not a bug, I'm just wondering if it's necessary |
Technically we could probably get away without the closeOnce lock, but I'm using sync.Once because it prevents the inner func from being run twice, and I don't have to do the state management to guarantee that. Each of the locks has a specific thing it's guarding, instead of locks pulling double-duty. Since this is Close(), it's not a hot-spot in the code, so I'm more willing to sacrifice a little performance for simpler to reason about concurrency. |
This PR adds ability to run Remove before Cancel, effectively cancelling streaming transfer.
This is so to support cancelling of streaming (i.e. notifying readers that write was closed prematurely), and they should not interpret their result as complete byte stream.
So: