Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9459] Revert "[BEAM-6374] Emit PCollection metrics from GoSDK" #11061

Merged
merged 1 commit into from
Mar 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
77 changes: 21 additions & 56 deletions sdks/go/pkg/beam/core/runtime/exec/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ type DataSource struct {
Name string
Coder *coder.Coder
Out Node
PCol PCollection // Handles size metrics. Value instead of pointer so it's initialized by default in tests.

source DataManager
state StateReader

index int64
splitIdx int64
start time.Time
// TODO(lostluck) 2020/02/06: refactor to support more general PCollection metrics on nodes.
outputPID string // The index is the output count for the PCollection.
index int64
splitIdx int64
start time.Time

mu sync.Mutex
}
Expand All @@ -70,39 +70,13 @@ func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContex
return n.Out.StartBundle(ctx, id, data)
}

// ByteCountReader is a passthrough reader that counts all the bytes read through it.
// It trusts the nested reader to return accurate byte information.
type byteCountReader struct {
count *int
reader io.ReadCloser
}

func (r *byteCountReader) Read(p []byte) (int, error) {
n, err := r.reader.Read(p)
*r.count += n
return n, err
}

func (r *byteCountReader) Close() error {
return r.reader.Close()
}

func (r *byteCountReader) reset() int {
c := *r.count
*r.count = 0
return c
}

// Process opens the data source, reads and decodes data, kicking off element processing.
func (n *DataSource) Process(ctx context.Context) error {
r, err := n.source.OpenRead(ctx, n.SID)
if err != nil {
return err
}
defer r.Close()
n.PCol.resetSize() // initialize the size distribution for this bundle.
var byteCount int
bcr := byteCountReader{reader: r, count: &byteCount}

c := coder.SkipW(n.Coder)
wc := MakeWindowDecoder(n.Coder.Window)
Expand All @@ -125,7 +99,6 @@ func (n *DataSource) Process(ctx context.Context) error {
if n.incrementIndexAndCheckSplit() {
return nil
}
// TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes?
ws, t, err := DecodeWindowedValueHeader(wc, r)
if err != nil {
if err == io.EOF {
Expand All @@ -135,7 +108,7 @@ func (n *DataSource) Process(ctx context.Context) error {
}

// Decode key or parallel element.
pe, err := cp.Decode(&bcr)
pe, err := cp.Decode(r)
if err != nil {
return errors.Wrap(err, "source decode failed")
}
Expand All @@ -144,7 +117,7 @@ func (n *DataSource) Process(ctx context.Context) error {

var valReStreams []ReStream
for _, cv := range cvs {
values, err := n.makeReStream(ctx, pe, cv, &bcr)
values, err := n.makeReStream(ctx, pe, cv, r)
if err != nil {
return err
}
Expand All @@ -154,15 +127,11 @@ func (n *DataSource) Process(ctx context.Context) error {
if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil {
return err
}
// Collect the actual size of the element, and reset the bytecounter reader.
n.PCol.addSize(int64(bcr.reset()))
bcr.reader = r
}
}

func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv ElementDecoder, bcr *byteCountReader) (ReStream, error) {
// TODO(lostluck) 2020/02/22: Do we include the chunk size, or just the element sizes?
size, err := coder.DecodeInt32(bcr.reader)
func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv ElementDecoder, r io.ReadCloser) (ReStream, error) {
size, err := coder.DecodeInt32(r)
if err != nil {
return nil, errors.Wrap(err, "stream size decoding failed")
}
Expand All @@ -171,16 +140,16 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen
case size >= 0:
// Single chunk streams are fully read in and buffered in memory.
buf := make([]FullValue, 0, size)
buf, err = readStreamToBuffer(cv, bcr, int64(size), buf)
buf, err = readStreamToBuffer(cv, r, int64(size), buf)
if err != nil {
return nil, err
}
return &FixedReStream{Buf: buf}, nil
case size == -1:
case size == -1: // Shouldn't this be 0?
// Multi-chunked stream.
var buf []FullValue
for {
chunk, err := coder.DecodeVarInt(bcr.reader)
chunk, err := coder.DecodeVarInt(r)
if err != nil {
return nil, errors.Wrap(err, "stream chunk size decoding failed")
}
Expand All @@ -190,17 +159,17 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen
return &FixedReStream{Buf: buf}, nil
case chunk > 0: // Non-zero chunk, read that many elements from the stream, and buffer them.
chunkBuf := make([]FullValue, 0, chunk)
chunkBuf, err = readStreamToBuffer(cv, bcr, chunk, chunkBuf)
chunkBuf, err = readStreamToBuffer(cv, r, chunk, chunkBuf)
if err != nil {
return nil, err
}
buf = append(buf, chunkBuf...)
case chunk == -1: // State backed iterable!
chunk, err := coder.DecodeVarInt(bcr.reader)
chunk, err := coder.DecodeVarInt(r)
if err != nil {
return nil, err
}
token, err := ioutilx.ReadN(bcr.reader, (int)(chunk))
token, err := ioutilx.ReadN(r, (int)(chunk))
if err != nil {
return nil, err
}
Expand All @@ -212,9 +181,6 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen
if err != nil {
return nil, err
}
// We can't re-use the original bcr, since we may get new iterables,
// or multiple of them at the same time, but we can re-use the count itself.
r = &byteCountReader{reader: r, count: bcr.count}
return &elementStream{r: r, ec: cv}, nil
},
},
Expand Down Expand Up @@ -275,11 +241,12 @@ func (n *DataSource) incrementIndexAndCheckSplit() bool {
}

// ProgressReportSnapshot captures the progress reading an input source.
//
// TODO(lostluck) 2020/02/06: Add a visitor pattern for collecting progress
// metrics from downstream Nodes.
type ProgressReportSnapshot struct {
ID, Name string
Count int64

pcol PCollectionSnapshot
ID, Name, PID string
Count int64
}

// Progress returns a snapshot of the source's progress.
Expand All @@ -288,7 +255,6 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
return ProgressReportSnapshot{}
}
n.mu.Lock()
pcol := n.PCol.snapshot()
// The count is the number of "completely processed elements"
// which matches the index of the currently processing element.
c := n.index
Expand All @@ -297,8 +263,7 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
if c < 0 {
c = 0
}
pcol.ElementCount = c
return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name, Count: c, pcol: pcol}
return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
}

// Split takes a sorted set of potential split indices, selects and actuates
Expand Down
39 changes: 11 additions & 28 deletions sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,6 @@ func TestDataSource_Iterators(t *testing.T) {
if got, want := iVals, expectedKeys; !equalList(got, want) {
t.Errorf("DataSource => %#v, want %#v", extractValues(got...), extractValues(want...))
}

// We're using integers that encode to 1 byte, so do some quick math to validate.
sizeOfSmallInt := 1
snap := quickTestSnapshot(source, int64(len(test.keys)))
snap.pcol.SizeSum = int64(len(test.keys) * (1 + len(test.vals)) * sizeOfSmallInt)
snap.pcol.SizeMin = int64((1 + len(test.vals)) * sizeOfSmallInt)
snap.pcol.SizeMax = int64((1 + len(test.vals)) * sizeOfSmallInt)
if got, want := source.Progress(), snap; got != want {
t.Errorf("progress didn't match: got %v, want %v", got, want)
}
})
}
}
Expand Down Expand Up @@ -368,6 +358,15 @@ func TestDataSource_Split(t *testing.T) {
})

validateSource(t, out, source, makeValues(test.expected...))

// Adjust expectations to maximum number of elements.
adjustedExpectation := test.splitIdx
if adjustedExpectation > int64(len(elements)) {
adjustedExpectation = int64(len(elements))
}
if got, want := source.Progress().Count, adjustedExpectation; got != want {
t.Fatalf("progress didn't match split: got %v, want %v", got, want)
}
})
}
})
Expand Down Expand Up @@ -465,29 +464,13 @@ func constructAndExecutePlanWithContext(t *testing.T, us []Unit, dc DataContext)
}
}

func quickTestSnapshot(source *DataSource, count int64) ProgressReportSnapshot {
return ProgressReportSnapshot{
Name: source.Name,
ID: source.SID.PtransformID,
Count: count,
pcol: PCollectionSnapshot{
ElementCount: count,
SizeCount: count,
SizeSum: count,
// We're only encoding small ints here, so size will only be 1.
SizeMin: 1,
SizeMax: 1,
},
}
}

func validateSource(t *testing.T, out *CaptureNode, source *DataSource, expected []FullValue) {
t.Helper()
if got, want := len(out.Elements), len(expected); got != want {
t.Fatalf("lengths don't match: got %v, want %v", got, want)
}
if got, want := source.Progress(), quickTestSnapshot(source, int64(len(expected))); got != want {
t.Fatalf("progress snapshot didn't match: got %v, want %v", got, want)
if got, want := source.Progress().Count, int64(len(expected)); got != want {
t.Fatalf("progress count didn't match: got %v, want %v", got, want)
}
if !equalList(out.Elements, expected) {
t.Errorf("DataSource => %#v, want %#v", extractValues(out.Elements...), extractValues(expected...))
Expand Down