Skip to content

Commit

Permalink
refactor: simplify options, instrumentation for multiplexing (#7257)
Browse files Browse the repository at this point in the history
This PR consolidates where in managedwriter that options apply by moving the destinationTable inside of streamSettings.

This PR also make changes to how instrumentation works.  Currently, we retain a long-lived context on the the ManagedStream and that's used for both connection management and to attach tag context to instrumentation reporting (e.g. to tag metrics with the appropriate keys).  With this change, connection-oriented metrics no longer tag with the writer tags (stream ID, data origin).  We also remove the stream's keys from the default views the package exposes for these metrics.

When we cut over to connections and pools, the context retained on the ManagedStream will only used for metrics tagging, and thus will come from the parent context setup elsewhere (namely the pool context).

Towards: https://togithub.com/googleapis/google-cloud-go/issues/7103
  • Loading branch information
shollyman committed Jan 18, 2023
1 parent edf3c24 commit 9018b83
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 43 deletions.
12 changes: 5 additions & 7 deletions bigquery/storage/managedwriter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient

if ms.streamSettings.streamID == "" {
// not instantiated with a stream, construct one.
streamName := fmt.Sprintf("%s/streams/_default", ms.destinationTable)
streamName := fmt.Sprintf("%s/streams/_default", ms.streamSettings.destinationTable)
if ms.streamSettings.streamType != DefaultStream {
// For everything but a default stream, we create a new stream on behalf of the user.
req := &storagepb.CreateWriteStreamRequest{
Parent: ms.destinationTable,
Parent: ms.streamSettings.destinationTable,
WriteStream: &storagepb.WriteStream{
Type: streamTypeToEnum(ms.streamSettings.streamType),
}}
Expand All @@ -149,13 +149,11 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
}
}
if ms.streamSettings != nil {
if ms.ctx != nil {
ms.ctx = keyContextWithTags(ms.ctx, ms.streamSettings.streamID, ms.streamSettings.dataOrigin)
}
ms.fc = newFlowController(ms.streamSettings.MaxInflightRequests, ms.streamSettings.MaxInflightBytes)
} else {
ms.fc = newFlowController(0, 0)
}
ms.ctx = setupWriterStatContext(ms)
return ms, nil
}

Expand All @@ -173,9 +171,9 @@ func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
}
// update type and destination based on stream metadata
ms.streamSettings.streamType = StreamType(info.Type.String())
ms.destinationTable = TableParentFromStreamName(ms.streamSettings.streamID)
ms.streamSettings.destinationTable = TableParentFromStreamName(ms.streamSettings.streamID)
}
if ms.destinationTable == "" {
if ms.streamSettings.destinationTable == "" {
return fmt.Errorf("no destination table specified")
}
// we could auto-select DEFAULT here, but let's force users to be specific for now.
Expand Down
51 changes: 31 additions & 20 deletions bigquery/storage/managedwriter/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package managedwriter

import (
"context"
"log"
"sync"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -129,8 +127,8 @@ var (
)

func init() {
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin)
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin)
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount))
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount))

AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin)
AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin)
Expand Down Expand Up @@ -173,24 +171,37 @@ func createSumView(m stats.Measure, keys ...tag.Key) *view.View {
return createView(m, view.Sum(), keys...)
}

var logTagStreamOnce sync.Once
var logTagOriginOnce sync.Once

// keyContextWithStreamID returns a new context modified with the instrumentation tags.
func keyContextWithTags(ctx context.Context, streamID, dataOrigin string) context.Context {
ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID))
if err != nil {
logTagStreamOnce.Do(func() {
log.Printf("managedwriter: error creating tag map for 'streamID' key: %v", err)
})
// setupWriterStatContext returns a new context modified with the instrumentation tags.
// This will panic if no managedstream is provided
func setupWriterStatContext(ms *ManagedStream) context.Context {
if ms == nil {
panic("no ManagedStream provided")
}
kCtx := ms.ctx
if ms.streamSettings == nil {
return kCtx
}
if ms.streamSettings.streamID != "" {
ctx, err := tag.New(kCtx, tag.Upsert(keyStream, ms.streamSettings.streamID))
if err != nil {
return kCtx // failed to add a tag, return the original context.
}
kCtx = ctx
}
ctx, err = tag.New(ctx, tag.Upsert(keyDataOrigin, dataOrigin))
if err != nil {
logTagOriginOnce.Do(func() {
log.Printf("managedwriter: error creating tag map for 'dataOrigin' key: %v", err)
})
if ms.streamSettings.dataOrigin != "" {
ctx, err := tag.New(kCtx, tag.Upsert(keyDataOrigin, ms.streamSettings.dataOrigin))
if err != nil {
return kCtx
}
kCtx = ctx
}
return ctx
return kCtx
}

// recordWriterStat records a measure which may optionally contain writer-related tags like stream ID
// or data origin.
func recordWriterStat(ms *ManagedStream, m *stats.Int64Measure, n int64) {
stats.Record(ms.ctx, m.M(n))
}

func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
Expand Down
31 changes: 29 additions & 2 deletions bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,16 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq
// to report.
time.Sleep(time.Second)

// metric to key tag names
wantTags := map[string][]string{
"cloud.google.com/go/bigquery/storage/managedwriter/stream_open_count": nil,
"cloud.google.com/go/bigquery/storage/managedwriter/stream_open_retry_count": nil,
"cloud.google.com/go/bigquery/storage/managedwriter/append_requests": []string{"streamID"},
"cloud.google.com/go/bigquery/storage/managedwriter/append_request_bytes": []string{"streamID"},
"cloud.google.com/go/bigquery/storage/managedwriter/append_request_errors": []string{"streamID"},
"cloud.google.com/go/bigquery/storage/managedwriter/append_rows": []string{"streamID"},
}

for _, tv := range testedViews {
// Attempt to further improve race failures by retrying metrics retrieval.
metricData, err := func() ([]*view.Row, error) {
Expand All @@ -894,8 +904,25 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq
t.Errorf("%q: expected 1 row of metrics, got %d", tv.Name, mlen)
continue
}
if len(metricData[0].Tags) != 1 {
t.Errorf("%q: only expected 1 tag, got %d", tv.Name, len(metricData[0].Tags))
if wantKeys, ok := wantTags[tv.Name]; ok {
if wantKeys == nil {
if n := len(tv.TagKeys); n != 0 {
t.Errorf("expected view %q to have no keys, but %d present", tv.Name, n)
}
} else {
for _, wk := range wantKeys {
var found bool
for _, gk := range tv.TagKeys {
if gk.Name() == wk {
found = true
break
}
}
if !found {
t.Errorf("expected view %q to have key %q, but wasn't present", tv.Name, wk)
}
}
}
}
entry := metricData[0].Data
sum, ok := entry.(*view.SumData)
Expand Down
30 changes: 19 additions & 11 deletions bigquery/storage/managedwriter/managed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ type ManagedStream struct {

streamSettings *streamSettings
schemaDescriptor *descriptorpb.DescriptorProto
destinationTable string
c *Client
fc *flowController
retry *statelessRetryer
Expand Down Expand Up @@ -127,6 +126,9 @@ type streamSettings struct {
// dataOrigin can be set for classifying metrics generated
// by a stream.
dataOrigin string

// retains reference to the target table when resolving settings
destinationTable string
}

func defaultStreamSettings() *streamSettings {
Expand Down Expand Up @@ -166,7 +168,7 @@ func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64, opts ...ga
},
}
resp, err := ms.c.rawClient.FlushRows(ctx, req, opts...)
recordStat(ms.ctx, FlushRequests, 1)
recordWriterStat(ms, FlushRequests, 1)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -338,9 +340,9 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error {
numRows := int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows()))
statsOnExit = func() {
// these will get recorded once we exit the critical section.
recordStat(ms.ctx, AppendRequestRows, numRows)
recordStat(ms.ctx, AppendRequests, 1)
recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize))
recordWriterStat(ms, AppendRequestRows, numRows)
recordWriterStat(ms, AppendRequests, 1)
recordWriterStat(ms, AppendRequestBytes, int64(pw.reqSize))
}
ch <- pw
return nil
Expand All @@ -362,8 +364,11 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio
// Append yielded an error. Retry by continuing or return.
status := grpcstatus.Convert(appendErr)
if status != nil {
ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String()))
recordStat(ctx, AppendRequestErrors, 1)
recordCtx := ms.ctx
if ctx, err := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String())); err == nil {
recordCtx = ctx
}
recordStat(recordCtx, AppendRequestErrors, 1)
}
bo, shouldRetry := ms.statelessRetryer().Retry(appendErr, pw.attemptCount)
if shouldRetry {
Expand Down Expand Up @@ -471,6 +476,7 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...
//
// The ManagedStream reference is used for performing re-enqueing of failed writes.
func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) {

for {
select {
case <-ms.ctx.Done():
Expand Down Expand Up @@ -498,14 +504,16 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie
continue
}
// Record that we did in fact get a response from the backend.
recordStat(ms.ctx, AppendResponses, 1)
recordWriterStat(ms, AppendResponses, 1)

if status := resp.GetError(); status != nil {
// The response from the backend embedded a status error. We record that the error
// occurred, and tag it based on the response code of the status.
if tagCtx, tagErr := tag.New(ms.ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil {
recordStat(tagCtx, AppendResponseErrors, 1)
recordCtx := ms.ctx
if tagCtx, tagErr := tag.New(ms.ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr != nil {
recordCtx = tagCtx
}
recordStat(recordCtx, AppendResponseErrors, 1)
respErr := grpcstatus.ErrorProto(status)
if _, shouldRetry := ms.statelessRetryer().Retry(respErr, nextWrite.attemptCount); shouldRetry {
// We use the status error to evaluate and possible re-enqueue the write.
Expand Down Expand Up @@ -540,7 +548,7 @@ func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.Ap
}
// Break out of the loop, we were successful and the write has been
// re-inserted.
recordStat(ms.ctx, AppendRetryCount, 1)
recordWriterStat(ms, AppendRetryCount, 1)
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion bigquery/storage/managedwriter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func WithStreamName(name string) WriterOption {
// projects/{projectid}/datasets/{dataset}/tables/{table}
func WithDestinationTable(destTable string) WriterOption {
return func(ms *ManagedStream) {
ms.destinationTable = destTable
ms.streamSettings.destinationTable = destTable
}
}

Expand Down
4 changes: 2 additions & 2 deletions bigquery/storage/managedwriter/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func TestWriterOptions(t *testing.T) {
options: []WriterOption{WithDestinationTable("foo")},
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
destinationTable: "foo",
streamSettings: defaultStreamSettings(),
}
ms.streamSettings.destinationTable = "foo"
return ms
}(),
},
Expand Down

0 comments on commit 9018b83

Please sign in to comment.