Skip to content

Commit

Permalink
Revert "[BEAM-6374] Emit PCollection metrics from GoSDK (#10942)" (#1…
Browse files Browse the repository at this point in the history
…1061)

This reverts commit ded686a.
  • Loading branch information
lostluck authored Mar 6, 2020
1 parent 9a3ba93 commit 7097850
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 491 deletions.
77 changes: 21 additions & 56 deletions sdks/go/pkg/beam/core/runtime/exec/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ type DataSource struct {
Name string
Coder *coder.Coder
Out Node
PCol PCollection // Handles size metrics. Value instead of pointer so it's initialized by default in tests.

source DataManager
state StateReader

index int64
splitIdx int64
start time.Time
// TODO(lostluck) 2020/02/06: refactor to support more general PCollection metrics on nodes.
outputPID string // The index is the output count for the PCollection.
index int64
splitIdx int64
start time.Time

mu sync.Mutex
}
Expand All @@ -70,39 +70,13 @@ func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContex
return n.Out.StartBundle(ctx, id, data)
}

// ByteCountReader is a passthrough reader that counts all the bytes read through it.
// It trusts the nested reader to return accurate byte information.
type byteCountReader struct {
count *int
reader io.ReadCloser
}

func (r *byteCountReader) Read(p []byte) (int, error) {
n, err := r.reader.Read(p)
*r.count += n
return n, err
}

func (r *byteCountReader) Close() error {
return r.reader.Close()
}

func (r *byteCountReader) reset() int {
c := *r.count
*r.count = 0
return c
}

// Process opens the data source, reads and decodes data, kicking off element processing.
func (n *DataSource) Process(ctx context.Context) error {
r, err := n.source.OpenRead(ctx, n.SID)
if err != nil {
return err
}
defer r.Close()
n.PCol.resetSize() // initialize the size distribution for this bundle.
var byteCount int
bcr := byteCountReader{reader: r, count: &byteCount}

c := coder.SkipW(n.Coder)
wc := MakeWindowDecoder(n.Coder.Window)
Expand All @@ -125,7 +99,6 @@ func (n *DataSource) Process(ctx context.Context) error {
if n.incrementIndexAndCheckSplit() {
return nil
}
// TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes?
ws, t, err := DecodeWindowedValueHeader(wc, r)
if err != nil {
if err == io.EOF {
Expand All @@ -135,7 +108,7 @@ func (n *DataSource) Process(ctx context.Context) error {
}

// Decode key or parallel element.
pe, err := cp.Decode(&bcr)
pe, err := cp.Decode(r)
if err != nil {
return errors.Wrap(err, "source decode failed")
}
Expand All @@ -144,7 +117,7 @@ func (n *DataSource) Process(ctx context.Context) error {

var valReStreams []ReStream
for _, cv := range cvs {
values, err := n.makeReStream(ctx, pe, cv, &bcr)
values, err := n.makeReStream(ctx, pe, cv, r)
if err != nil {
return err
}
Expand All @@ -154,15 +127,11 @@ func (n *DataSource) Process(ctx context.Context) error {
if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil {
return err
}
// Collect the actual size of the element, and reset the bytecounter reader.
n.PCol.addSize(int64(bcr.reset()))
bcr.reader = r
}
}

func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv ElementDecoder, bcr *byteCountReader) (ReStream, error) {
// TODO(lostluck) 2020/02/22: Do we include the chunk size, or just the element sizes?
size, err := coder.DecodeInt32(bcr.reader)
func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv ElementDecoder, r io.ReadCloser) (ReStream, error) {
size, err := coder.DecodeInt32(r)
if err != nil {
return nil, errors.Wrap(err, "stream size decoding failed")
}
Expand All @@ -171,16 +140,16 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen
case size >= 0:
// Single chunk streams are fully read in and buffered in memory.
buf := make([]FullValue, 0, size)
buf, err = readStreamToBuffer(cv, bcr, int64(size), buf)
buf, err = readStreamToBuffer(cv, r, int64(size), buf)
if err != nil {
return nil, err
}
return &FixedReStream{Buf: buf}, nil
case size == -1:
case size == -1: // Shouldn't this be 0?
// Multi-chunked stream.
var buf []FullValue
for {
chunk, err := coder.DecodeVarInt(bcr.reader)
chunk, err := coder.DecodeVarInt(r)
if err != nil {
return nil, errors.Wrap(err, "stream chunk size decoding failed")
}
Expand All @@ -190,17 +159,17 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen
return &FixedReStream{Buf: buf}, nil
case chunk > 0: // Non-zero chunk, read that many elements from the stream, and buffer them.
chunkBuf := make([]FullValue, 0, chunk)
chunkBuf, err = readStreamToBuffer(cv, bcr, chunk, chunkBuf)
chunkBuf, err = readStreamToBuffer(cv, r, chunk, chunkBuf)
if err != nil {
return nil, err
}
buf = append(buf, chunkBuf...)
case chunk == -1: // State backed iterable!
chunk, err := coder.DecodeVarInt(bcr.reader)
chunk, err := coder.DecodeVarInt(r)
if err != nil {
return nil, err
}
token, err := ioutilx.ReadN(bcr.reader, (int)(chunk))
token, err := ioutilx.ReadN(r, (int)(chunk))
if err != nil {
return nil, err
}
Expand All @@ -212,9 +181,6 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen
if err != nil {
return nil, err
}
// We can't re-use the original bcr, since we may get new iterables,
// or multiple of them at the same time, but we can re-use the count itself.
r = &byteCountReader{reader: r, count: bcr.count}
return &elementStream{r: r, ec: cv}, nil
},
},
Expand Down Expand Up @@ -275,11 +241,12 @@ func (n *DataSource) incrementIndexAndCheckSplit() bool {
}

// ProgressReportSnapshot captures the progress reading an input source.
//
// TODO(lostluck) 2020/02/06: Add a visitor pattern for collecting progress
// metrics from downstream Nodes.
type ProgressReportSnapshot struct {
ID, Name string
Count int64

pcol PCollectionSnapshot
ID, Name, PID string
Count int64
}

// Progress returns a snapshot of the source's progress.
Expand All @@ -288,7 +255,6 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
return ProgressReportSnapshot{}
}
n.mu.Lock()
pcol := n.PCol.snapshot()
// The count is the number of "completely processed elements"
// which matches the index of the currently processing element.
c := n.index
Expand All @@ -297,8 +263,7 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
if c < 0 {
c = 0
}
pcol.ElementCount = c
return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name, Count: c, pcol: pcol}
return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
}

// Split takes a sorted set of potential split indices, selects and actuates
Expand Down
39 changes: 11 additions & 28 deletions sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,6 @@ func TestDataSource_Iterators(t *testing.T) {
if got, want := iVals, expectedKeys; !equalList(got, want) {
t.Errorf("DataSource => %#v, want %#v", extractValues(got...), extractValues(want...))
}

// We're using integers that encode to 1 byte, so do some quick math to validate.
sizeOfSmallInt := 1
snap := quickTestSnapshot(source, int64(len(test.keys)))
snap.pcol.SizeSum = int64(len(test.keys) * (1 + len(test.vals)) * sizeOfSmallInt)
snap.pcol.SizeMin = int64((1 + len(test.vals)) * sizeOfSmallInt)
snap.pcol.SizeMax = int64((1 + len(test.vals)) * sizeOfSmallInt)
if got, want := source.Progress(), snap; got != want {
t.Errorf("progress didn't match: got %v, want %v", got, want)
}
})
}
}
Expand Down Expand Up @@ -368,6 +358,15 @@ func TestDataSource_Split(t *testing.T) {
})

validateSource(t, out, source, makeValues(test.expected...))

// Adjust expectations to maximum number of elements.
adjustedExpectation := test.splitIdx
if adjustedExpectation > int64(len(elements)) {
adjustedExpectation = int64(len(elements))
}
if got, want := source.Progress().Count, adjustedExpectation; got != want {
t.Fatalf("progress didn't match split: got %v, want %v", got, want)
}
})
}
})
Expand Down Expand Up @@ -465,29 +464,13 @@ func constructAndExecutePlanWithContext(t *testing.T, us []Unit, dc DataContext)
}
}

func quickTestSnapshot(source *DataSource, count int64) ProgressReportSnapshot {
return ProgressReportSnapshot{
Name: source.Name,
ID: source.SID.PtransformID,
Count: count,
pcol: PCollectionSnapshot{
ElementCount: count,
SizeCount: count,
SizeSum: count,
// We're only encoding small ints here, so size will only be 1.
SizeMin: 1,
SizeMax: 1,
},
}
}

func validateSource(t *testing.T, out *CaptureNode, source *DataSource, expected []FullValue) {
t.Helper()
if got, want := len(out.Elements), len(expected); got != want {
t.Fatalf("lengths don't match: got %v, want %v", got, want)
}
if got, want := source.Progress(), quickTestSnapshot(source, int64(len(expected))); got != want {
t.Fatalf("progress snapshot didn't match: got %v, want %v", got, want)
if got, want := source.Progress().Count, int64(len(expected)); got != want {
t.Fatalf("progress count didn't match: got %v, want %v", got, want)
}
if !equalList(out.Elements, expected) {
t.Errorf("DataSource => %#v, want %#v", extractValues(out.Elements...), extractValues(expected...))
Expand Down
Loading

0 comments on commit 7097850

Please sign in to comment.