Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: no easy way to fan out to multiple readers #9051

Open
gopherbot opened this Issue Nov 3, 2014 · 7 comments

Comments

Projects
None yet
6 participants
@gopherbot
Copy link

gopherbot commented Nov 3, 2014

by recv@awesam.de:

There is currently no good way to fan out from an io.Reader to multiple readers. A
use-case we commonly encounter is piping data from one reader to another, and doing some
calculation over the piped data on the side in a goroutine.

io.TeeReader gets you only half the way, and we found a nice solution based on that and
io.Pipe.

See http://play.golang.org/p/xBBeaBdVJf for an example, piping a .tar.gz from stdin to
stdout, printing the tar headers to stderr along the way.

Would you be open to introduce a similar StreamReader implementation to io/ioutils?
@gopherbot

This comment has been minimized.

Copy link
Author

gopherbot commented Nov 3, 2014

Comment 1 by recv@awesam.de:

Additionally, see http://play.golang.org/p/vF3EDZf_rf for a StreamReader which is just
an io.Reader instead of an io.ReadCloser.
@jbardin

This comment has been minimized.

Copy link
Contributor

jbardin commented Nov 3, 2014

Comment 2:

Your example can be refactored much more succinctly, while still using io.TeeReader and
not hiding error handling:
http://play.golang.org/p/P-7siRTjzA
You also have io.MultiWriter available, which can tee out to an arbitrary number of
io.Writers.
@ianlancetaylor

This comment has been minimized.

Copy link
Contributor

ianlancetaylor commented Nov 3, 2014

Comment 3:

Labels changed: added release-none, repo-main.

@gopherbot

This comment has been minimized.

Copy link
Author

gopherbot commented Nov 3, 2014

Comment 4 by recv@awesam.de:

Please excuse me for picking a bad example. Using io.Copy to consume the reader is not
exactly the definition of composing multiple readers.
There are cases where you don't have an io.Writer available to pass into io.TeeReader,
and instead are required to pass in an io.Reader. For example, instead of writing to
stdout, let's say you'd like to make a request with the data to some http endpoint.
http.NewRequest (or http.Client.Post for that matter) takes an io.Reader, and thus you
can't just write to the request via io.TeeReader. (http://play.golang.org/p/BPMeTteFX7)
Can you clarify what you mean by "hiding error handling?".
@jbardin

This comment has been minimized.

Copy link
Contributor

jbardin commented Nov 4, 2014

Comment 5:

(don't worry about the error comment. "hiding" wasn't the right word, I originally
thought you were putting the read somewhere where the error couldn't be logically
handled)
This construct can still be handled more cleanly inline with just the TeeReader and a
Pipe. Maybe you're looking for something that patches a Reader into N Pipes and a
MultiWriter, so that you can have N Identical Readers?
@gopherbot

This comment has been minimized.

Copy link
Author

gopherbot commented Nov 4, 2014

Comment 6 by recv@awesam.de:

How so? Can you provide an example? As I see it, if you wanted to handle this scenario
without the goroutine, you'd have to manually read from the source until EOF. This will
be more convoluted than a clean solution that is piping into a seperate goroutine.

@gopherbot gopherbot added new labels Nov 4, 2014

@bradfitz bradfitz removed the new label Dec 18, 2014

@rsc rsc added this to the Unplanned milestone Apr 10, 2015

@rsc rsc removed release-none labels Apr 10, 2015

@wking

This comment has been minimized.

Copy link

wking commented Nov 8, 2018

Old issue, but I think the fundamental issue here is how to balance the consumers. For all of these fan-out cases, you'll have a primary consumer driving the source-reader consumption. Otherwise one runaway consumer could fill the pipe (or whatever you're using for buffering) before the slower consumers got around to draining the buffer. If the primary consumer gives up, the secondary consumer(s) may want to distinguish between the following cases:

  1. The source reader finished (e.g. with an io.EOF).
  2. The primary consumer gave up, and possibly wants to pass along a reason (so expose CloseWithError to the primary consumer? And some way to distinguish between source and primary-consumer errors to secondary consumers).
  3. After the primary consumer gives up, some of the secondary consumers may want to continue consuming the source reader.

To ground this in a real-world example, you could have a Response.Body as the source reader. You want to cache that body to disk, so you wrap it in this fanout structure:

response.Body = fanout(response.Body, func (readCloser io.ReadCloser) {
  defer readCloser.Close()
  diskv.WriteStream("some-key", readCloser, true)
})

caching to Diskv. The fanout signature is something like:

func fanout(source io.ReadCloser, secondaryConsumers ...func(readCloser io.ReadCloser)) (primaryConsumer io.ReadCloser)

although you might also need a context.Context to cancel and a channel to collect secondary consumer errors and handle secondary-consumer promotion on primary-consumer exit.

With the cache consumer, you may want to continue on and cache the upstream body into Diskv even if the primary consumer closes the wrapped response.Body without reading (all of) it. Or maybe you only want to cache what gets read. Or maybe you only want to cache a response if the primary consumer successfully reads the whole body.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.