Skip to content

Commit 73b6f5e

Browse files
authored
feat(bigquery/storage/managedwriter): add opencensus instrumentation (#4512)
This starts to plumb in oc instrumentation, as we're already using it in bigquery proper and other veneers. Testing instrumentation helped catch another double-close in the recv processor, so this addresses that as well. Towards #4366
1 parent 95a27f7 commit 73b6f5e

File tree

5 files changed

+225
-43
lines changed

5 files changed

+225
-43
lines changed

Diff for: bigquery/go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/golang/protobuf v1.5.2
99
github.com/google/go-cmp v0.5.6
1010
github.com/googleapis/gax-go/v2 v2.0.5
11+
go.opencensus.io v0.23.0
1112
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
1213
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
1314
google.golang.org/api v0.51.0

Diff for: bigquery/storage/managedwriter/client.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*M
7474
}
7575

7676
func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClientFunc, skipSetup bool, opts ...WriterOption) (*ManagedStream, error) {
77-
7877
ctx, cancel := context.WithCancel(ctx)
7978

8079
ms := &ManagedStream{
@@ -122,6 +121,9 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
122121
}
123122
}
124123
if ms.streamSettings != nil {
124+
if ms.ctx != nil {
125+
ms.ctx = keyContextWithStreamID(ms.ctx, ms.streamSettings.streamID)
126+
}
125127
ms.fc = newFlowController(ms.streamSettings.MaxInflightRequests, ms.streamSettings.MaxInflightBytes)
126128
} else {
127129
ms.fc = newFlowController(0, 0)

Diff for: bigquery/storage/managedwriter/instrumentation.go

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright 2021 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package managedwriter
16+
17+
import (
18+
"context"
19+
"log"
20+
"sync"
21+
22+
"go.opencensus.io/stats"
23+
"go.opencensus.io/stats/view"
24+
"go.opencensus.io/tag"
25+
)
26+
27+
var (
28+
// Metrics on a stream are tagged with the stream ID.
29+
keyStream = tag.MustNewKey("streamID")
30+
)
31+
32+
const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/"
33+
34+
var (
35+
// AppendRequests is a measure of the number of append requests sent.
36+
// It is EXPERIMENTAL and subject to change or removal without notice.
37+
AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)
38+
39+
// AppendResponses is a measure of the number of append responses received.
40+
// It is EXPERIMENTAL and subject to change or removal without notice.
41+
AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)
42+
43+
// FlushRequests is a measure of the number of FlushRows requests sent.
44+
// It is EXPERIMENTAL and subject to change or removal without notice.
45+
FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless)
46+
47+
// AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened.
48+
// It is EXPERIMENTAL and subject to change or removal without notice.
49+
AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)
50+
51+
// AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried.
52+
// It is EXPERIMENTAL and subject to change or removal without notice.
53+
AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)
54+
)
55+
56+
var (
57+
// AppendRequestsView is a cumulative sum of AppendRequests.
58+
// It is EXPERIMENTAL and subject to change or removal without notice.
59+
AppendRequestsView *view.View
60+
61+
// AppendResponsesView is a cumulative sum of AppendResponses.
62+
// It is EXPERIMENTAL and subject to change or removal without notice.
63+
AppendResponsesView *view.View
64+
65+
// FlushRequestsView is a cumulative sum of FlushRequests.
66+
// It is EXPERIMENTAL and subject to change or removal without notice.
67+
FlushRequestsView *view.View
68+
69+
// AppendClientOpenView is a cumulative sum of AppendClientOpenCount.
70+
// It is EXPERIMENTAL and subject to change or removal without notice.
71+
AppendClientOpenView *view.View
72+
73+
// AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount.
74+
// It is EXPERIMENTAL and subject to change or removal without notice.
75+
AppendClientOpenRetryView *view.View
76+
)
77+
78+
func init() {
79+
AppendRequestsView = createCountView(stats.Measure(AppendRequests), keyStream)
80+
AppendResponsesView = createCountView(stats.Measure(AppendResponses), keyStream)
81+
FlushRequestsView = createCountView(stats.Measure(FlushRequests), keyStream)
82+
AppendClientOpenView = createCountView(stats.Measure(AppendClientOpenCount), keyStream)
83+
AppendClientOpenRetryView = createCountView(stats.Measure(AppendClientOpenRetryCount), keyStream)
84+
}
85+
86+
func createCountView(m stats.Measure, keys ...tag.Key) *view.View {
87+
return &view.View{
88+
Name: m.Name(),
89+
Description: m.Description(),
90+
TagKeys: keys,
91+
Measure: m,
92+
Aggregation: view.Sum(),
93+
}
94+
}
95+
96+
var logOnce sync.Once
97+
98+
// keyContextWithStreamID returns a new context modified with the streamID tag.
99+
func keyContextWithStreamID(ctx context.Context, streamID string) context.Context {
100+
ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID))
101+
if err != nil {
102+
logOnce.Do(func() {
103+
log.Printf("managedwriter: error creating tag map for 'stream' key: %v", err)
104+
})
105+
}
106+
return ctx
107+
}
108+
109+
func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
110+
stats.Record(ctx, m.M(n))
111+
}

Diff for: bigquery/storage/managedwriter/integration_test.go

+104-42
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package managedwriter
1717
import (
1818
"context"
1919
"fmt"
20+
"math"
2021
"testing"
2122
"time"
2223

@@ -25,6 +26,7 @@ import (
2526
"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
2627
"cloud.google.com/go/internal/testutil"
2728
"cloud.google.com/go/internal/uid"
29+
"go.opencensus.io/stats/view"
2830
"google.golang.org/api/option"
2931
"google.golang.org/protobuf/encoding/protojson"
3032
"google.golang.org/protobuf/proto"
@@ -45,6 +47,14 @@ var testSimpleSchema = bigquery.Schema{
4547
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
4648
}
4749

50+
var testSimpleData = []*testdata.SimpleMessage{
51+
{Name: "one", Value: 1},
52+
{Name: "two", Value: 2},
53+
{Name: "three", Value: 3},
54+
{Name: "four", Value: 1},
55+
{Name: "five", Value: 2},
56+
}
57+
4858
func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) {
4959
if testing.Short() {
5060
t.Skip("Integration tests skipped in short mode")
@@ -162,12 +172,15 @@ func TestIntegration_ManagedWriter(t *testing.T) {
162172
t.Parallel()
163173
testPendingStream(ctx, t, mwClient, bqClient, dataset)
164174
})
175+
t.Run("Instrumentation", func(t *testing.T) {
176+
// Don't run this in parallel, we only want to collect stats from this subtest.
177+
testInstrumentation(ctx, t, mwClient, bqClient, dataset)
178+
})
165179
})
180+
166181
}
167182

168183
func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
169-
170-
// prep a suitable destination table.
171184
testTable := dataset.Table(tableIDs.New())
172185
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil {
173186
t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
@@ -188,17 +201,9 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
188201
}
189202
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")
190203

191-
testData := []*testdata.SimpleMessage{
192-
{Name: "one", Value: 1},
193-
{Name: "two", Value: 2},
194-
{Name: "three", Value: 3},
195-
{Name: "four", Value: 1},
196-
{Name: "five", Value: 2},
197-
}
198-
199204
// First, send the test rows individually.
200205
var results []*AppendResult
201-
for k, mesg := range testData {
206+
for k, mesg := range testSimpleData {
202207
b, err := proto.Marshal(mesg)
203208
if err != nil {
204209
t.Errorf("failed to marshal message %d: %v", k, err)
@@ -211,12 +216,12 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
211216
}
212217
// wait for the result to indicate ready, then validate.
213218
results[0].Ready()
214-
wantRows := int64(len(testData))
219+
wantRows := int64(len(testSimpleData))
215220
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after first send")
216221

217222
// Now, send the test rows grouped into in a single append.
218223
var data [][]byte
219-
for k, mesg := range testData {
224+
for k, mesg := range testSimpleData {
220225
b, err := proto.Marshal(mesg)
221226
if err != nil {
222227
t.Errorf("failed to marshal message %d: %v", k, err)
@@ -313,16 +318,8 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC
313318

314319
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")
315320

316-
testData := []*testdata.SimpleMessage{
317-
{Name: "one", Value: 1},
318-
{Name: "two", Value: 2},
319-
{Name: "three", Value: 3},
320-
{Name: "four", Value: 1},
321-
{Name: "five", Value: 2},
322-
}
323-
324321
var expectedRows int64
325-
for k, mesg := range testData {
322+
for k, mesg := range testSimpleData {
326323
b, err := proto.Marshal(mesg)
327324
if err != nil {
328325
t.Errorf("failed to marshal message %d: %v", k, err)
@@ -368,16 +365,8 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq
368365
}
369366
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")
370367

371-
testData := []*testdata.SimpleMessage{
372-
{Name: "one", Value: 1},
373-
{Name: "two", Value: 2},
374-
{Name: "three", Value: 3},
375-
{Name: "four", Value: 1},
376-
{Name: "five", Value: 2},
377-
}
378-
379368
var results []*AppendResult
380-
for k, mesg := range testData {
369+
for k, mesg := range testSimpleData {
381370
b, err := proto.Marshal(mesg)
382371
if err != nil {
383372
t.Errorf("failed to marshal message %d: %v", k, err)
@@ -390,7 +379,7 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq
390379
}
391380
// wait for the result to indicate ready, then validate.
392381
results[0].Ready()
393-
wantRows := int64(len(testData))
382+
wantRows := int64(len(testSimpleData))
394383
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send")
395384
}
396385

@@ -413,17 +402,9 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
413402
}
414403
validateRowCount(ctx, t, bqClient, testTable, 0, "before send")
415404

416-
testData := []*testdata.SimpleMessage{
417-
{Name: "one", Value: 1},
418-
{Name: "two", Value: 2},
419-
{Name: "three", Value: 3},
420-
{Name: "four", Value: 1},
421-
{Name: "five", Value: 2},
422-
}
423-
424405
// Send data.
425406
var results []*AppendResult
426-
for k, mesg := range testData {
407+
for k, mesg := range testSimpleData {
427408
b, err := proto.Marshal(mesg)
428409
if err != nil {
429410
t.Errorf("failed to marshal message %d: %v", k, err)
@@ -435,7 +416,7 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
435416
}
436417
}
437418
results[0].Ready()
438-
wantRows := int64(len(testData))
419+
wantRows := int64(len(testSimpleData))
439420

440421
// Mark stream complete.
441422
trackedOffset, err := ms.Finalize(ctx)
@@ -457,3 +438,84 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
457438
}
458439
validateRowCount(ctx, t, bqClient, testTable, wantRows, "after send")
459440
}
441+
442+
func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
443+
testedViews := []*view.View{
444+
AppendRequestsView,
445+
AppendResponsesView,
446+
AppendClientOpenView,
447+
}
448+
449+
if err := view.Register(testedViews...); err != nil {
450+
t.Fatalf("couldn't register views: %v", err)
451+
}
452+
453+
testTable := dataset.Table(tableIDs.New())
454+
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testSimpleSchema}); err != nil {
455+
t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
456+
}
457+
458+
m := &testdata.SimpleMessage{}
459+
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
460+
461+
// setup a new stream.
462+
ms, err := mwClient.NewManagedStream(ctx,
463+
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
464+
WithType(DefaultStream),
465+
WithSchemaDescriptor(descriptorProto),
466+
)
467+
if err != nil {
468+
t.Fatalf("NewManagedStream: %v", err)
469+
}
470+
471+
var results []*AppendResult
472+
for k, mesg := range testSimpleData {
473+
b, err := proto.Marshal(mesg)
474+
if err != nil {
475+
t.Errorf("failed to marshal message %d: %v", k, err)
476+
}
477+
data := [][]byte{b}
478+
results, err = ms.AppendRows(ctx, data, NoStreamOffset)
479+
if err != nil {
480+
t.Errorf("single-row append %d failed: %v", k, err)
481+
}
482+
}
483+
// wait for the result to indicate ready.
484+
results[0].Ready()
485+
// Ick. Stats reporting can't force flushing, and there's a race here. Sleep to give the recv goroutine a chance
486+
// to report.
487+
time.Sleep(time.Second)
488+
489+
for _, tv := range testedViews {
490+
metricData, err := view.RetrieveData(tv.Name)
491+
if err != nil {
492+
t.Errorf("view %q RetrieveData: %v", tv.Name, err)
493+
}
494+
if len(metricData) > 1 {
495+
t.Errorf("%q: only expected 1 row, got %d", tv.Name, len(metricData))
496+
}
497+
if len(metricData[0].Tags) != 1 {
498+
t.Errorf("%q: only expected 1 tag, got %d", tv.Name, len(metricData[0].Tags))
499+
}
500+
entry := metricData[0].Data
501+
sum, ok := entry.(*view.SumData)
502+
if !ok {
503+
t.Errorf("unexpected metric type: %T", entry)
504+
}
505+
got := sum.Value
506+
var want int64
507+
switch tv {
508+
case AppendRequestsView:
509+
want = int64(len(testSimpleData))
510+
case AppendResponsesView:
511+
want = int64(len(testSimpleData))
512+
case AppendClientOpenView:
513+
want = 1
514+
}
515+
516+
// float comparison; diff more than error bound is error
517+
if math.Abs(got-float64(want)) > 0.1 {
518+
t.Errorf("%q: metric mismatch, got %f want %d", tv.Name, got, want)
519+
}
520+
}
521+
}

0 commit comments

Comments
 (0)