From 9018b8398889167e5d91a21f62cb271ed250d814 Mon Sep 17 00:00:00 2001 From: shollyman Date: Wed, 18 Jan 2023 14:10:12 -0800 Subject: [PATCH] refactor: simplify options, instrumentation for multiplexing (#7257) This PR consolidates where in managedwriter that options apply by moving the destinationTable inside of streamSettings. This PR also make changes to how instrumentation works. Currently, we retain a long-lived context on the the ManagedStream and that's used for both connection management and to attach tag context to instrumentation reporting (e.g. to tag metrics with the appropriate keys). With this change, connection-oriented metrics no longer tag with the writer tags (stream ID, data origin). We also remove the stream's keys from the default views the package exposes for these metrics. When we cut over to connections and pools, the context retained on the ManagedStream will only used for metrics tagging, and thus will come from the parent context setup elsewhere (namely the pool context). Towards: https://togithub.com/googleapis/google-cloud-go/issues/7103 --- bigquery/storage/managedwriter/client.go | 12 ++--- .../storage/managedwriter/instrumentation.go | 51 +++++++++++-------- .../storage/managedwriter/integration_test.go | 31 ++++++++++- .../storage/managedwriter/managed_stream.go | 30 +++++++---- bigquery/storage/managedwriter/options.go | 2 +- .../storage/managedwriter/options_test.go | 4 +- 6 files changed, 87 insertions(+), 43 deletions(-) diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go index 6540e7a573f4..91252275aff1 100644 --- a/bigquery/storage/managedwriter/client.go +++ b/bigquery/storage/managedwriter/client.go @@ -131,11 +131,11 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient if ms.streamSettings.streamID == "" { // not instantiated with a stream, construct one. - streamName := fmt.Sprintf("%s/streams/_default", ms.destinationTable) + streamName := fmt.Sprintf("%s/streams/_default", ms.streamSettings.destinationTable) if ms.streamSettings.streamType != DefaultStream { // For everything but a default stream, we create a new stream on behalf of the user. req := &storagepb.CreateWriteStreamRequest{ - Parent: ms.destinationTable, + Parent: ms.streamSettings.destinationTable, WriteStream: &storagepb.WriteStream{ Type: streamTypeToEnum(ms.streamSettings.streamType), }} @@ -149,13 +149,11 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient } } if ms.streamSettings != nil { - if ms.ctx != nil { - ms.ctx = keyContextWithTags(ms.ctx, ms.streamSettings.streamID, ms.streamSettings.dataOrigin) - } ms.fc = newFlowController(ms.streamSettings.MaxInflightRequests, ms.streamSettings.MaxInflightBytes) } else { ms.fc = newFlowController(0, 0) } + ms.ctx = setupWriterStatContext(ms) return ms, nil } @@ -173,9 +171,9 @@ func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error { } // update type and destination based on stream metadata ms.streamSettings.streamType = StreamType(info.Type.String()) - ms.destinationTable = TableParentFromStreamName(ms.streamSettings.streamID) + ms.streamSettings.destinationTable = TableParentFromStreamName(ms.streamSettings.streamID) } - if ms.destinationTable == "" { + if ms.streamSettings.destinationTable == "" { return fmt.Errorf("no destination table specified") } // we could auto-select DEFAULT here, but let's force users to be specific for now. diff --git a/bigquery/storage/managedwriter/instrumentation.go b/bigquery/storage/managedwriter/instrumentation.go index 3eed1ad43970..f82d15444db0 100644 --- a/bigquery/storage/managedwriter/instrumentation.go +++ b/bigquery/storage/managedwriter/instrumentation.go @@ -16,8 +16,6 @@ package managedwriter import ( "context" - "log" - "sync" "go.opencensus.io/stats" "go.opencensus.io/stats/view" @@ -129,8 +127,8 @@ var ( ) func init() { - AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin) - AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin) + AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount)) + AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount)) AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin) AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin) @@ -173,24 +171,37 @@ func createSumView(m stats.Measure, keys ...tag.Key) *view.View { return createView(m, view.Sum(), keys...) } -var logTagStreamOnce sync.Once -var logTagOriginOnce sync.Once - -// keyContextWithStreamID returns a new context modified with the instrumentation tags. -func keyContextWithTags(ctx context.Context, streamID, dataOrigin string) context.Context { - ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID)) - if err != nil { - logTagStreamOnce.Do(func() { - log.Printf("managedwriter: error creating tag map for 'streamID' key: %v", err) - }) +// setupWriterStatContext returns a new context modified with the instrumentation tags. +// This will panic if no managedstream is provided +func setupWriterStatContext(ms *ManagedStream) context.Context { + if ms == nil { + panic("no ManagedStream provided") + } + kCtx := ms.ctx + if ms.streamSettings == nil { + return kCtx + } + if ms.streamSettings.streamID != "" { + ctx, err := tag.New(kCtx, tag.Upsert(keyStream, ms.streamSettings.streamID)) + if err != nil { + return kCtx // failed to add a tag, return the original context. + } + kCtx = ctx } - ctx, err = tag.New(ctx, tag.Upsert(keyDataOrigin, dataOrigin)) - if err != nil { - logTagOriginOnce.Do(func() { - log.Printf("managedwriter: error creating tag map for 'dataOrigin' key: %v", err) - }) + if ms.streamSettings.dataOrigin != "" { + ctx, err := tag.New(kCtx, tag.Upsert(keyDataOrigin, ms.streamSettings.dataOrigin)) + if err != nil { + return kCtx + } + kCtx = ctx } - return ctx + return kCtx +} + +// recordWriterStat records a measure which may optionally contain writer-related tags like stream ID +// or data origin. +func recordWriterStat(ms *ManagedStream, m *stats.Int64Measure, n int64) { + stats.Record(ms.ctx, m.M(n)) } func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) { diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go index b99afc087533..f8cd8f42d5ab 100644 --- a/bigquery/storage/managedwriter/integration_test.go +++ b/bigquery/storage/managedwriter/integration_test.go @@ -871,6 +871,16 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq // to report. time.Sleep(time.Second) + // metric to key tag names + wantTags := map[string][]string{ + "cloud.google.com/go/bigquery/storage/managedwriter/stream_open_count": nil, + "cloud.google.com/go/bigquery/storage/managedwriter/stream_open_retry_count": nil, + "cloud.google.com/go/bigquery/storage/managedwriter/append_requests": []string{"streamID"}, + "cloud.google.com/go/bigquery/storage/managedwriter/append_request_bytes": []string{"streamID"}, + "cloud.google.com/go/bigquery/storage/managedwriter/append_request_errors": []string{"streamID"}, + "cloud.google.com/go/bigquery/storage/managedwriter/append_rows": []string{"streamID"}, + } + for _, tv := range testedViews { // Attempt to further improve race failures by retrying metrics retrieval. metricData, err := func() ([]*view.Row, error) { @@ -894,8 +904,25 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq t.Errorf("%q: expected 1 row of metrics, got %d", tv.Name, mlen) continue } - if len(metricData[0].Tags) != 1 { - t.Errorf("%q: only expected 1 tag, got %d", tv.Name, len(metricData[0].Tags)) + if wantKeys, ok := wantTags[tv.Name]; ok { + if wantKeys == nil { + if n := len(tv.TagKeys); n != 0 { + t.Errorf("expected view %q to have no keys, but %d present", tv.Name, n) + } + } else { + for _, wk := range wantKeys { + var found bool + for _, gk := range tv.TagKeys { + if gk.Name() == wk { + found = true + break + } + } + if !found { + t.Errorf("expected view %q to have key %q, but wasn't present", tv.Name, wk) + } + } + } } entry := metricData[0].Data sum, ok := entry.(*view.SumData) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index ca8051529dd5..22e878a92d5c 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -80,7 +80,6 @@ type ManagedStream struct { streamSettings *streamSettings schemaDescriptor *descriptorpb.DescriptorProto - destinationTable string c *Client fc *flowController retry *statelessRetryer @@ -127,6 +126,9 @@ type streamSettings struct { // dataOrigin can be set for classifying metrics generated // by a stream. dataOrigin string + + // retains reference to the target table when resolving settings + destinationTable string } func defaultStreamSettings() *streamSettings { @@ -166,7 +168,7 @@ func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64, opts ...ga }, } resp, err := ms.c.rawClient.FlushRows(ctx, req, opts...) - recordStat(ms.ctx, FlushRequests, 1) + recordWriterStat(ms, FlushRequests, 1) if err != nil { return 0, err } @@ -338,9 +340,9 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error { numRows := int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows())) statsOnExit = func() { // these will get recorded once we exit the critical section. - recordStat(ms.ctx, AppendRequestRows, numRows) - recordStat(ms.ctx, AppendRequests, 1) - recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize)) + recordWriterStat(ms, AppendRequestRows, numRows) + recordWriterStat(ms, AppendRequests, 1) + recordWriterStat(ms, AppendRequestBytes, int64(pw.reqSize)) } ch <- pw return nil @@ -362,8 +364,11 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio // Append yielded an error. Retry by continuing or return. status := grpcstatus.Convert(appendErr) if status != nil { - ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String())) - recordStat(ctx, AppendRequestErrors, 1) + recordCtx := ms.ctx + if ctx, err := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String())); err == nil { + recordCtx = ctx + } + recordStat(recordCtx, AppendRequestErrors, 1) } bo, shouldRetry := ms.statelessRetryer().Retry(appendErr, pw.attemptCount) if shouldRetry { @@ -471,6 +476,7 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ... // // The ManagedStream reference is used for performing re-enqueing of failed writes. func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) { + for { select { case <-ms.ctx.Done(): @@ -498,14 +504,16 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie continue } // Record that we did in fact get a response from the backend. - recordStat(ms.ctx, AppendResponses, 1) + recordWriterStat(ms, AppendResponses, 1) if status := resp.GetError(); status != nil { // The response from the backend embedded a status error. We record that the error // occurred, and tag it based on the response code of the status. - if tagCtx, tagErr := tag.New(ms.ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil { - recordStat(tagCtx, AppendResponseErrors, 1) + recordCtx := ms.ctx + if tagCtx, tagErr := tag.New(ms.ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr != nil { + recordCtx = tagCtx } + recordStat(recordCtx, AppendResponseErrors, 1) respErr := grpcstatus.ErrorProto(status) if _, shouldRetry := ms.statelessRetryer().Retry(respErr, nextWrite.attemptCount); shouldRetry { // We use the status error to evaluate and possible re-enqueue the write. @@ -540,7 +548,7 @@ func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.Ap } // Break out of the loop, we were successful and the write has been // re-inserted. - recordStat(ms.ctx, AppendRetryCount, 1) + recordWriterStat(ms, AppendRetryCount, 1) break } } diff --git a/bigquery/storage/managedwriter/options.go b/bigquery/storage/managedwriter/options.go index 342a04b3c1b2..9e364b0b25a6 100644 --- a/bigquery/storage/managedwriter/options.go +++ b/bigquery/storage/managedwriter/options.go @@ -48,7 +48,7 @@ func WithStreamName(name string) WriterOption { // projects/{projectid}/datasets/{dataset}/tables/{table} func WithDestinationTable(destTable string) WriterOption { return func(ms *ManagedStream) { - ms.destinationTable = destTable + ms.streamSettings.destinationTable = destTable } } diff --git a/bigquery/storage/managedwriter/options_test.go b/bigquery/storage/managedwriter/options_test.go index ad7cf96c47fd..4cbceddf8756 100644 --- a/bigquery/storage/managedwriter/options_test.go +++ b/bigquery/storage/managedwriter/options_test.go @@ -93,9 +93,9 @@ func TestWriterOptions(t *testing.T) { options: []WriterOption{WithDestinationTable("foo")}, want: func() *ManagedStream { ms := &ManagedStream{ - streamSettings: defaultStreamSettings(), - destinationTable: "foo", + streamSettings: defaultStreamSettings(), } + ms.streamSettings.destinationTable = "foo" return ms }(), },