Skip to content

Commit ff488c8

Browse files
authored
feat(bigquery/storage/managedwriter): more instrumentation support (#4601)
This PR adds a bytes metric to the list of defined instrumentation metrics, and adds an additional key to track data origin. Ability for users to set the data origin comes a new WithDataOrigin option that can be passed to the managed stream constructor. This also does some minor refactoring of how opencensus view creation is handled. Towards: #4366
1 parent 04424f4 commit ff488c8

File tree

5 files changed

+60
-13
lines changed

5 files changed

+60
-13
lines changed

Diff for: bigquery/storage/managedwriter/client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
126126
}
127127
if ms.streamSettings != nil {
128128
if ms.ctx != nil {
129-
ms.ctx = keyContextWithStreamID(ms.ctx, ms.streamSettings.streamID)
129+
ms.ctx = keyContextWithTags(ms.ctx, ms.streamSettings.streamID, ms.streamSettings.dataOrigin)
130130
}
131131
ms.fc = newFlowController(ms.streamSettings.MaxInflightRequests, ms.streamSettings.MaxInflightBytes)
132132
} else {

Diff for: bigquery/storage/managedwriter/instrumentation.go

+36-12
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ import (
2727
var (
2828
// Metrics on a stream are tagged with the stream ID.
2929
keyStream = tag.MustNewKey("streamID")
30+
31+
// We allow users to annotate streams with a data origin for monitoring purposes.
32+
// See the WithDataOrigin writer option for providing this.
33+
keyDataOrigin = tag.MustNewKey("dataOrigin")
3034
)
3135

3236
const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/"
@@ -36,6 +40,10 @@ var (
3640
// It is EXPERIMENTAL and subject to change or removal without notice.
3741
AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)
3842

43+
// AppendBytes is a measure of the bytes sent as append requests.
44+
// It is EXPERIMENTAL and subject to change or removal without notice.
45+
AppendBytes = stats.Int64(statsPrefix+"append_bytes", "Number of bytes sent as append requests", stats.UnitBytes)
46+
3947
// AppendResponses is a measure of the number of append responses received.
4048
// It is EXPERIMENTAL and subject to change or removal without notice.
4149
AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)
@@ -58,6 +66,10 @@ var (
5866
// It is EXPERIMENTAL and subject to change or removal without notice.
5967
AppendRequestsView *view.View
6068

69+
// AppendBytesView is a cumulative sum of AppendBytes.
70+
// It is EXPERIMENTAL and subject to change or removal without notice.
71+
AppendBytesView *view.View
72+
6173
// AppendResponsesView is a cumulative sum of AppendResponses.
6274
// It is EXPERIMENTAL and subject to change or removal without notice.
6375
AppendResponsesView *view.View
@@ -76,31 +88,43 @@ var (
7688
)
7789

7890
func init() {
79-
AppendRequestsView = createCountView(stats.Measure(AppendRequests), keyStream)
80-
AppendResponsesView = createCountView(stats.Measure(AppendResponses), keyStream)
81-
FlushRequestsView = createCountView(stats.Measure(FlushRequests), keyStream)
82-
AppendClientOpenView = createCountView(stats.Measure(AppendClientOpenCount), keyStream)
83-
AppendClientOpenRetryView = createCountView(stats.Measure(AppendClientOpenRetryCount), keyStream)
91+
AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin)
92+
AppendBytesView = createSumView(stats.Measure(AppendBytes), keyStream, keyDataOrigin)
93+
AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin)
94+
FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin)
95+
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin)
96+
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin)
8497
}
8598

86-
func createCountView(m stats.Measure, keys ...tag.Key) *view.View {
99+
func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View {
87100
return &view.View{
88101
Name: m.Name(),
89102
Description: m.Description(),
90103
TagKeys: keys,
91104
Measure: m,
92-
Aggregation: view.Sum(),
105+
Aggregation: agg,
93106
}
94107
}
95108

96-
var logOnce sync.Once
109+
func createSumView(m stats.Measure, keys ...tag.Key) *view.View {
110+
return createView(m, view.Sum(), keys...)
111+
}
112+
113+
var logTagStreamOnce sync.Once
114+
var logTagOriginOnce sync.Once
97115

98-
// keyContextWithStreamID returns a new context modified with the streamID tag.
99-
func keyContextWithStreamID(ctx context.Context, streamID string) context.Context {
116+
// keyContextWithStreamID returns a new context modified with the instrumentation tags.
117+
func keyContextWithTags(ctx context.Context, streamID, dataOrigin string) context.Context {
100118
ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID))
101119
if err != nil {
102-
logOnce.Do(func() {
103-
log.Printf("managedwriter: error creating tag map for 'stream' key: %v", err)
120+
logTagStreamOnce.Do(func() {
121+
log.Printf("managedwriter: error creating tag map for 'streamID' key: %v", err)
122+
})
123+
}
124+
ctx, err = tag.New(ctx, tag.Upsert(keyDataOrigin, dataOrigin))
125+
if err != nil {
126+
logTagOriginOnce.Do(func() {
127+
log.Printf("managedwriter: error creating tag map for 'dataOrigin' key: %v", err)
104128
})
105129
}
106130
return ctx

Diff for: bigquery/storage/managedwriter/managed_stream.go

+4
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ type streamSettings struct {
112112
// TraceID can be set when appending data on a stream. It's
113113
// purpose is to aid in debug and diagnostic scenarios.
114114
TraceID string
115+
116+
// dataOrigin can be set for classifying metrics generated
117+
// by a stream.
118+
dataOrigin string
115119
}
116120

117121
func defaultStreamSettings() *streamSettings {

Diff for: bigquery/storage/managedwriter/writer_option.go

+8
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,11 @@ func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption {
7777
ms.schemaDescriptor = dp
7878
}
7979
}
80+
81+
// WithDataOrigin is used to attach an origin context to the instrumentation metrics
82+
// emitted by the library.
83+
func WithDataOrigin(dataOrigin string) WriterOption {
84+
return func(ms *ManagedStream) {
85+
ms.streamSettings.dataOrigin = dataOrigin
86+
}
87+
}

Diff for: bigquery/storage/managedwriter/writer_option_test.go

+11
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,17 @@ func TestWriterOptions(t *testing.T) {
8383
return ms
8484
}(),
8585
},
86+
{
87+
desc: "WithDataOrigin",
88+
options: []WriterOption{WithDataOrigin("origin")},
89+
want: func() *ManagedStream {
90+
ms := &ManagedStream{
91+
streamSettings: defaultStreamSettings(),
92+
}
93+
ms.streamSettings.dataOrigin = "origin"
94+
return ms
95+
}(),
96+
},
8697
{
8798
desc: "multiple",
8899
options: []WriterOption{

0 commit comments

Comments
 (0)