Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 152 additions & 56 deletions sdks/go/pkg/beam/core/runtime/exec/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,28 +115,28 @@ func (r *byteCountReader) reset() int {

// Process opens the data source, reads and decodes data, kicking off element processing.
func (n *DataSource) Process(ctx context.Context) error {
r, err := n.source.OpenRead(ctx, n.SID)
dataReader, err := n.source.OpenRead(ctx, n.SID)
if err != nil {
return err
}
defer r.Close()
defer dataReader.Close()
n.PCol.resetSize() // initialize the size distribution for this bundle.
var byteCount int
bcr := byteCountReader{reader: r, count: &byteCount}
dataReaderCounted := byteCountReader{reader: dataReader, count: &byteCount}

c := coder.SkipW(n.Coder)
wc := MakeWindowDecoder(n.Coder.Window)

var cp ElementDecoder // Decoder for the primary element or the key in CoGBKs.
var cvs []ElementDecoder // Decoders for each value stream in CoGBKs.
var cp ElementDecoder // Decoder for the primary element or the key in CoGBKs.
var valueCoders []*coder.Coder // Decoders for each value stream in CoGBKs.

switch {
case coder.IsCoGBK(c):
cp = MakeElementDecoder(c.Components[0])

// TODO(https://github.com/apache/beam/issues/18032): Support multiple value streams (coder components) with
// with CoGBK.
cvs = []ElementDecoder{MakeElementDecoder(c.Components[1])}
valueCoders = []*coder.Coder{c.Components[1]}
default:
cp = MakeElementDecoder(c)
}
Expand All @@ -146,7 +146,7 @@ func (n *DataSource) Process(ctx context.Context) error {
return nil
}
// TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes?
ws, t, pn, err := DecodeWindowedValueHeader(wc, r)
ws, t, pn, err := DecodeWindowedValueHeader(wc, dataReader)
if err != nil {
if err == io.EOF {
return nil
Expand All @@ -155,7 +155,7 @@ func (n *DataSource) Process(ctx context.Context) error {
}

// Decode key or parallel element.
pe, err := cp.Decode(&bcr)
pe, err := cp.Decode(&dataReaderCounted)
if err != nil {
return errors.Wrap(err, "source decode failed")
}
Expand All @@ -164,70 +164,83 @@ func (n *DataSource) Process(ctx context.Context) error {
pe.Pane = pn

var valReStreams []ReStream
for _, cv := range cvs {
values, err := n.makeReStream(ctx, pe, cv, &bcr)
reStreamCloser := &multiOnceCloser{}
defer reStreamCloser.Close()
for _, cod := range valueCoders {
values, closer, err := n.makeReStream(ctx, pe, cod, &dataReaderCounted)
if err != nil {
return err
}
valReStreams = append(valReStreams, values)
reStreamCloser.children = append(reStreamCloser.children, closer)
}

if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil {
return err
}
// Collect the actual size of the element, and reset the bytecounter reader.
n.PCol.addSize(int64(bcr.reset()))
bcr.reader = r
n.PCol.addSize(int64(dataReaderCounted.reset()))
dataReaderCounted.reader = dataReader

if err := reStreamCloser.Close(); err != nil {
return fmt.Errorf("error closing ReStream after processing element: %w", err)
}
}
}

func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv ElementDecoder, bcr *byteCountReader) (ReStream, error) {
func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, elemCoder *coder.Coder, bcr *byteCountReader) (ReStream, io.Closer, error) {
// TODO(lostluck) 2020/02/22: Do we include the chunk size, or just the element sizes?
size, err := coder.DecodeInt32(bcr.reader)
if err != nil {
return nil, errors.Wrap(err, "stream size decoding failed")
return nil, nopCloser{}, errors.Wrap(err, "stream size decoding failed")
}

switch {
case size >= 0:
// Single chunk streams are fully read in and buffered in memory.
buf := make([]FullValue, 0, size)
buf, err = readStreamToBuffer(cv, bcr, int64(size), buf)
if err != nil {
return nil, err
}
return &FixedReStream{Buf: buf}, nil
stream, cleanupFn, err := readStreamToReStream(ctx, bcr, int64(size), elemCoder)
return stream, closeFunc(cleanupFn), err
case size == -1:
decoder := MakeElementDecoder(elemCoder)
// Multi-chunked stream.
var buf []FullValue
for {
chunk, err := coder.DecodeVarInt(bcr.reader)
if err != nil {
return nil, errors.Wrap(err, "stream chunk size decoding failed")
var chunkReStreams []ReStream
chunkReStreamsCloser := &multiOnceCloser{}
closeChunkReStreamsEarly := true
defer func() {
if !closeChunkReStreamsEarly {
return
}
// All done, escape out.
switch {
case chunk == 0: // End of stream, return buffer.
return &FixedReStream{Buf: buf}, nil
case chunk > 0: // Non-zero chunk, read that many elements from the stream, and buffer them.
chunkBuf := make([]FullValue, 0, chunk)
chunkBuf, err = readStreamToBuffer(cv, bcr, chunk, chunkBuf)
if err != nil {
return nil, err
}
buf = append(buf, chunkBuf...)
case chunk == -1: // State backed iterable!
chunk, err := coder.DecodeVarInt(bcr.reader)
if err != nil {
return nil, err
}
token, err := ioutilx.ReadN(bcr.reader, (int)(chunk))
chunkReStreamsCloser.Close() // ignore error because makeReStream is already returning an error in this case.
}()
// createChunkReStreams appends to chunkStreams and
// chunkStreamsCloser.children
createChunkReStreams := func() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indirecting the error through an anon closure doesn't help with code readability here. The closure is too long for a meaningful readability benefit.

I'd strongly prefer that we move this out to a named method and pass parameters in and out, instead of making hard to follow code harder to follow.

And looking at this again, is this just to avoid adding a nopCloser{} return parameter on error cases? That would be clearer than the indirection.

for {
chunkSize, err := coder.DecodeVarInt(bcr.reader)
if err != nil {
return nil, err
return errors.Wrap(err, "stream chunk size decoding failed")
}
return &concatReStream{
first: &FixedReStream{Buf: buf},
next: &proxyReStream{
// All done, escape out.
switch {
case chunkSize == 0: // End of stream.
return nil
case chunkSize > 0: // Non-zero chunk; read that many elements from the stream, and add a new ReStream to chunkReStreams.
chunkStream, closer, err := readStreamToReStream(ctx, bcr, chunkSize, elemCoder)
if err != nil {
return err
}
chunkReStreams = append(chunkReStreams, chunkStream)
chunkReStreamsCloser.children = append(chunkReStreamsCloser.children, closeFunc(closer))
case chunkSize == -1: // State backed iterable!
chunk, err := coder.DecodeVarInt(bcr.reader)
if err != nil {
return err
}
token, err := ioutilx.ReadN(bcr.reader, (int)(chunk))
if err != nil {
return err
}
chunkReStreams = append(chunkReStreams, &proxyReStream{
open: func() (Stream, error) {
r, err := n.state.OpenIterable(ctx, n.SID, token)
if err != nil {
Expand All @@ -236,24 +249,67 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen
// We can't re-use the original bcr, since we may get new iterables,
// or multiple of them at the same time, but we can re-use the count itself.
r = &byteCountReader{reader: r, count: bcr.count}
return &elementStream{r: r, ec: cv}, nil
return &elementStream{r: r, ec: decoder}, nil
},
},
}, nil
default:
return nil, errors.Errorf("multi-chunk stream with invalid chunk size of %d", chunk)
})
return nil
default:
return errors.Errorf("multi-chunk stream with invalid chunk size of %d", chunkSize)
}
}
}
if err := createChunkReStreams(); err != nil {
return nil, nopCloser{}, err
}
closeChunkReStreamsEarly = false
return newConcatReStream(chunkReStreams...), chunkReStreamsCloser, nil
default:
return nil, errors.Errorf("received stream with marker size of %d", size)
return nil, nopCloser{}, errors.Errorf("received stream with marker size of %d", size)
}
}

func readStreamToBuffer(cv ElementDecoder, r io.ReadCloser, size int64, buf []FullValue) ([]FullValue, error) {
for i := int64(0); i < size; i++ {
value, err := cv.Decode(r)
var readStreamToReStream ReStreamFactory = DefaultReadStreamToReStream

// ReStreamFactory is a function that constructs a ReStream from an io.Reader
// and a coder for type of elements that need to be decoded. A ReStreamFactory
// is used by the SDK hardness to transform a byte stream into a stream of
// FullValues while executing a DoFn that takes an iterator as once of its
// arguments (GBK and CoGBK DoFns).
//
// The factory should return a ReStream that decodes numElements elements from
// the encodedStream reader. After the DoFn that uses the stream has finished,
// the second return value will be called to close the ReStream; this provides
// the factory an opportunity to release any resources associated with the
// returned ReStream.
//
// DefaultReadSTreamToReStream is the default ReStreamFactory that is used by
// the exec package
type ReStreamFactory func(ctx context.Context, encodedStream io.Reader, numElements int64, coder *coder.Coder) (ReStream, func() error, error)

// SetReStreamFactory overrides the default behavior for constructing a ReStream
// for DoFns that iterate over values (GBK and CoGBK).
//
// The default implementation of this function is DefaultReadStreamToBuffer.
func SetReStreamFactory(fn ReStreamFactory) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion, please label this as Experimental, and link to the issue we're attempting to solve with this.

readStreamToReStream = fn
}

// DefaultReadStreamToReStream reads numElements from the byteStream using the
// element decoder dec and returns an in-memory ReStream.
func DefaultReadStreamToReStream(_ context.Context, encodedStream io.Reader, numElements int64, coder *coder.Coder) (ReStream, func() error, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion, please unexport the default implementation.

buf, err := defaultReadStreamToBuffer(encodedStream, numElements, MakeElementDecoder(coder))
if err != nil {
return nil, func() error { return nil }, err
}
return &FixedReStream{buf}, func() error { return nil }, nil
}

func defaultReadStreamToBuffer(encodedStream io.Reader, numElements int64, dec ElementDecoder) ([]FullValue, error) {
buf := make([]FullValue, 0, numElements)
for i := int64(0); i < numElements; i++ {
value, err := dec.Decode(encodedStream)
if err != nil {
return nil, errors.Wrap(err, "stream value decode failed")
return nil, fmt.Errorf("stream value decode failed: %w", err)
}
buf = append(buf, *value)
}
Expand Down Expand Up @@ -619,6 +675,18 @@ type concatReStream struct {
first, next ReStream
}

func newConcatReStream(streams ...ReStream) *concatReStream {
if len(streams) == 0 {
streams = []ReStream{&FixedReStream{}}
}
first := streams[0]
rest := streams[1:]
if len(rest) == 0 {
return &concatReStream{first: first, next: nil}
}
return &concatReStream{first: first, next: newConcatReStream(rest...)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a huge fan of the recursive linked list construction here, but I think in practice this will end up being minimally used, if ever. But this is probably less fiddly than alternative approaches.

}

func (c *concatReStream) Open() (Stream, error) {
firstStream, err := c.first.Open()
if err != nil {
Expand Down Expand Up @@ -671,3 +739,31 @@ func (s *concatStream) Read() (*FullValue, error) {
}
return nil, err
}

type multiOnceCloser struct {
once sync.Once
err error
children []io.Closer
}

// Close() calls Close() on all the children the first time it is called and
// returns the first non-nil error or nil. If called multiple times, returns the
// original error but does not call Close again on the children.
func (c *multiOnceCloser) Close() error {
c.once.Do(func() {
for _, ch := range c.children {
if err := ch.Close(); err != nil && c.err == nil {
c.err = err
}
}
})
return c.err
}

type nopCloser struct{}

func (nopCloser) Close() error { return nil }

type closeFunc func() error

func (f closeFunc) Close() error { return f() }
20 changes: 15 additions & 5 deletions sdks/go/pkg/beam/core/runtime/exec/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (n *ParDo) processSingleWindow(mainIn *MainInput) (sdf.ProcessContinuation,
if val != nil {
// Check for incomplete processing of a restriction without a checkpoint
if mainIn.RTracker != nil && !mainIn.RTracker.IsDone() && val.Continuation == nil {
return nil, rtErrHelper(mainIn.RTracker.GetError())
return nil, rtErrHelper(mainIn.RTracker)
}
// We do not forward a ProcessContinuation on its own
if val.Elm == nil {
Expand All @@ -193,17 +193,27 @@ func (n *ParDo) processSingleWindow(mainIn *MainInput) (sdf.ProcessContinuation,
}

if mainIn.RTracker != nil && !mainIn.RTracker.IsDone() {
return nil, rtErrHelper(mainIn.RTracker.GetError())
return nil, rtErrHelper(mainIn.RTracker)
}

return nil, nil
}

func rtErrHelper(err error) error {
if err != nil {
func rtErrHelper(rt sdf.RTracker) error {
if err := rt.GetError(); err != nil {
// TODO: Consider adding more context.
return err
}
return errors.New("DoFn terminated without fully processing restriction")
errSuffix := ""
if !rt.IsDone() {
done, remaining := rt.GetProgress()
denom := done + remaining
if denom != 0 {
errSuffix += fmt.Sprintf("; %.04f%% complete (done=%f, remaining=%f)", 100*done/denom, done, remaining)
}
}

return errors.New("DoFn terminated without fully processing restriction" + errSuffix)
}

// mustExplodeWindows returns true iif we need to call the function
Expand Down