Skip to content

Commit 9505384

Browse files
authored
feat(bigquery/storage/managedwriter): more metrics instrumentation (#4690)
* feat(bigquery/storage/managedwriter): more metrics instrumentation
1 parent 1cc9ce0 commit 9505384

File tree

2 files changed

+88
-21
lines changed

2 files changed

+88
-21
lines changed

Diff for: bigquery/storage/managedwriter/instrumentation.go

+74-21
Original file line numberDiff line numberDiff line change
@@ -31,69 +31,122 @@ var (
3131
// We allow users to annotate streams with a data origin for monitoring purposes.
3232
// See the WithDataOrigin writer option for providing this.
3333
keyDataOrigin = tag.MustNewKey("dataOrigin")
34+
35+
// keyError tags metrics using the status code of returned errors.
36+
keyError = tag.MustNewKey("error")
3437
)
3538

39+
// DefaultOpenCensusViews retains the set of all opencensus views that this
40+
// library has instrumented, to add view registration for exporters.
41+
var DefaultOpenCensusViews []*view.View
42+
3643
const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/"
3744

3845
var (
46+
// AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened.
47+
// It is EXPERIMENTAL and subject to change or removal without notice.
48+
AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)
49+
50+
// AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried.
51+
// It is EXPERIMENTAL and subject to change or removal without notice.
52+
AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)
53+
3954
// AppendRequests is a measure of the number of append requests sent.
4055
// It is EXPERIMENTAL and subject to change or removal without notice.
4156
AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)
4257

43-
// AppendBytes is a measure of the bytes sent as append requests.
58+
// AppendRequestBytes is a measure of the bytes sent as append requests.
59+
// It is EXPERIMENTAL and subject to change or removal without notice.
60+
AppendRequestBytes = stats.Int64(statsPrefix+"append_request_bytes", "Number of bytes sent as append requests", stats.UnitBytes)
61+
62+
// AppendRequestErrors is a measure of the number of append requests that errored on send.
63+
// It is EXPERIMENTAL and subject to change or removal without notice.
64+
AppendRequestErrors = stats.Int64(statsPrefix+"append_request_errors", "Number of append requests that yielded immediate error", stats.UnitDimensionless)
65+
66+
// AppendRequestRows is a measure of the number of append rows sent.
4467
// It is EXPERIMENTAL and subject to change or removal without notice.
45-
AppendBytes = stats.Int64(statsPrefix+"append_bytes", "Number of bytes sent as append requests", stats.UnitBytes)
68+
AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless)
4669

4770
// AppendResponses is a measure of the number of append responses received.
4871
// It is EXPERIMENTAL and subject to change or removal without notice.
4972
AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)
5073

74+
// AppendResponseErrors is a measure of the number of append responses received with an error attached.
75+
// It is EXPERIMENTAL and subject to change or removal without notice.
76+
AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless)
77+
5178
// FlushRequests is a measure of the number of FlushRows requests sent.
5279
// It is EXPERIMENTAL and subject to change or removal without notice.
5380
FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless)
81+
)
5482

55-
// AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened.
83+
var (
84+
85+
// AppendClientOpenView is a cumulative sum of AppendClientOpenCount.
5686
// It is EXPERIMENTAL and subject to change or removal without notice.
57-
AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)
87+
AppendClientOpenView *view.View
5888

59-
// AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried.
89+
// AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount.
6090
// It is EXPERIMENTAL and subject to change or removal without notice.
61-
AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)
62-
)
91+
AppendClientOpenRetryView *view.View
6392

64-
var (
6593
// AppendRequestsView is a cumulative sum of AppendRequests.
6694
// It is EXPERIMENTAL and subject to change or removal without notice.
6795
AppendRequestsView *view.View
6896

69-
// AppendBytesView is a cumulative sum of AppendBytes.
97+
// AppendRequestBytesView is a cumulative sum of AppendRequestBytes.
7098
// It is EXPERIMENTAL and subject to change or removal without notice.
71-
AppendBytesView *view.View
99+
AppendRequestBytesView *view.View
72100

73-
// AppendResponsesView is a cumulative sum of AppendResponses.
101+
// AppendRequestErrorsView is a cumulative sum of AppendRequestErrors.
74102
// It is EXPERIMENTAL and subject to change or removal without notice.
75-
AppendResponsesView *view.View
103+
AppendRequestErrorsView *view.View
76104

77-
// FlushRequestsView is a cumulative sum of FlushRequests.
105+
// AppendRequestRowsView is a cumulative sum of AppendRows.
78106
// It is EXPERIMENTAL and subject to change or removal without notice.
79-
FlushRequestsView *view.View
107+
AppendRequestRowsView *view.View
80108

81-
// AppendClientOpenView is a cumulative sum of AppendClientOpenCount.
109+
// AppendResponsesView is a cumulative sum of AppendResponses.
82110
// It is EXPERIMENTAL and subject to change or removal without notice.
83-
AppendClientOpenView *view.View
111+
AppendResponsesView *view.View
84112

85-
// AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount.
113+
// AppendResponseErrorsView is a cumulative sum of AppendResponseErrors.
86114
// It is EXPERIMENTAL and subject to change or removal without notice.
87-
AppendClientOpenRetryView *view.View
115+
AppendResponseErrorsView *view.View
116+
117+
// FlushRequestsView is a cumulative sum of FlushRequests.
118+
// It is EXPERIMENTAL and subject to change or removal without notice.
119+
FlushRequestsView *view.View
88120
)
89121

90122
func init() {
123+
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin)
124+
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin)
125+
91126
AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin)
92-
AppendBytesView = createSumView(stats.Measure(AppendBytes), keyStream, keyDataOrigin)
127+
AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin)
128+
AppendRequestErrorsView = createSumView(stats.Measure(AppendRequestErrors), keyStream, keyDataOrigin, keyError)
129+
AppendRequestRowsView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin)
130+
93131
AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin)
132+
AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError)
133+
94134
FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin)
95-
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin)
96-
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin)
135+
136+
DefaultOpenCensusViews = []*view.View{
137+
AppendClientOpenView,
138+
AppendClientOpenRetryView,
139+
140+
AppendRequestsView,
141+
AppendRequestBytesView,
142+
AppendRequestErrorsView,
143+
AppendRequestRowsView,
144+
145+
AppendResponsesView,
146+
AppendResponseErrorsView,
147+
148+
FlushRequestsView,
149+
}
97150
}
98151

99152
func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View {

Diff for: bigquery/storage/managedwriter/managed_stream.go

+14
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import (
2121
"sync"
2222

2323
"github.com/googleapis/gax-go/v2"
24+
"go.opencensus.io/tag"
2425
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
26+
"google.golang.org/grpc/codes"
2527
grpcstatus "google.golang.org/grpc/status"
2628
"google.golang.org/protobuf/types/descriptorpb"
2729
"google.golang.org/protobuf/types/known/wrapperspb"
@@ -269,7 +271,14 @@ func (ms *ManagedStream) append(pw *pendingWrite, opts ...gax.CallOption) error
269271
err = (*arc).Send(req)
270272
}
271273
recordStat(ms.ctx, AppendRequests, 1)
274+
recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize))
275+
recordStat(ms.ctx, AppendRequestRows, int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows())))
272276
if err != nil {
277+
status := grpcstatus.Convert(err)
278+
if status != nil {
279+
ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String()))
280+
recordStat(ctx, AppendRequestErrors, 1)
281+
}
273282
bo, shouldRetry := r.Retry(err)
274283
if shouldRetry {
275284
if err := gax.Sleep(ms.ctx, bo); err != nil {
@@ -366,6 +375,11 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl
366375
recordStat(ctx, AppendResponses, 1)
367376

368377
if status := resp.GetError(); status != nil {
378+
tagCtx, _ := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String()))
379+
if err != nil {
380+
tagCtx = ctx
381+
}
382+
recordStat(tagCtx, AppendResponseErrors, 1)
369383
nextWrite.markDone(NoStreamOffset, grpcstatus.ErrorProto(status), fc)
370384
continue
371385
}

0 commit comments

Comments
 (0)