-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
instrumentation.go
188 lines (148 loc) · 7.75 KB
/
instrumentation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"context"
"log"
"sync"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)
var (
// Metrics on a stream are tagged with the stream ID.
keyStream = tag.MustNewKey("streamID")
// We allow users to annotate streams with a data origin for monitoring purposes.
// See the WithDataOrigin writer option for providing this.
keyDataOrigin = tag.MustNewKey("dataOrigin")
// keyError tags metrics using the status code of returned errors.
keyError = tag.MustNewKey("error")
)
// DefaultOpenCensusViews retains the set of all opencensus views that this
// library has instrumented, to add view registration for exporters.
var DefaultOpenCensusViews []*view.View
const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/"
var (
// AppendClientOpenCount is a measure of the number of times the AppendRowsClient was opened.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)
// AppendClientOpenRetryCount is a measure of the number of times the AppendRowsClient open was retried.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)
// AppendRequests is a measure of the number of append requests sent.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)
// AppendRequestBytes is a measure of the bytes sent as append requests.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestBytes = stats.Int64(statsPrefix+"append_request_bytes", "Number of bytes sent as append requests", stats.UnitBytes)
// AppendRequestErrors is a measure of the number of append requests that errored on send.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestErrors = stats.Int64(statsPrefix+"append_request_errors", "Number of append requests that yielded immediate error", stats.UnitDimensionless)
// AppendRequestRows is a measure of the number of append rows sent.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless)
// AppendResponses is a measure of the number of append responses received.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)
// AppendResponseErrors is a measure of the number of append responses received with an error attached.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless)
// FlushRequests is a measure of the number of FlushRows requests sent.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless)
)
var (
// AppendClientOpenView is a cumulative sum of AppendClientOpenCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenView *view.View
// AppendClientOpenRetryView is a cumulative sum of AppendClientOpenRetryCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendClientOpenRetryView *view.View
// AppendRequestsView is a cumulative sum of AppendRequests.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestsView *view.View
// AppendRequestBytesView is a cumulative sum of AppendRequestBytes.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestBytesView *view.View
// AppendRequestErrorsView is a cumulative sum of AppendRequestErrors.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestErrorsView *view.View
// AppendRequestRowsView is a cumulative sum of AppendRows.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRequestRowsView *view.View
// AppendResponsesView is a cumulative sum of AppendResponses.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponsesView *view.View
// AppendResponseErrorsView is a cumulative sum of AppendResponseErrors.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponseErrorsView *view.View
// FlushRequestsView is a cumulative sum of FlushRequests.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlushRequestsView *view.View
)
func init() {
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin)
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin)
AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin)
AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin)
AppendRequestErrorsView = createSumView(stats.Measure(AppendRequestErrors), keyStream, keyDataOrigin, keyError)
AppendRequestRowsView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin)
AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin)
AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError)
FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin)
DefaultOpenCensusViews = []*view.View{
AppendClientOpenView,
AppendClientOpenRetryView,
AppendRequestsView,
AppendRequestBytesView,
AppendRequestErrorsView,
AppendRequestRowsView,
AppendResponsesView,
AppendResponseErrorsView,
FlushRequestsView,
}
}
func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View {
return &view.View{
Name: m.Name(),
Description: m.Description(),
TagKeys: keys,
Measure: m,
Aggregation: agg,
}
}
func createSumView(m stats.Measure, keys ...tag.Key) *view.View {
return createView(m, view.Sum(), keys...)
}
var logTagStreamOnce sync.Once
var logTagOriginOnce sync.Once
// keyContextWithStreamID returns a new context modified with the instrumentation tags.
func keyContextWithTags(ctx context.Context, streamID, dataOrigin string) context.Context {
ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID))
if err != nil {
logTagStreamOnce.Do(func() {
log.Printf("managedwriter: error creating tag map for 'streamID' key: %v", err)
})
}
ctx, err = tag.New(ctx, tag.Upsert(keyDataOrigin, dataOrigin))
if err != nil {
logTagOriginOnce.Do(func() {
log.Printf("managedwriter: error creating tag map for 'dataOrigin' key: %v", err)
})
}
return ctx
}
func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
stats.Record(ctx, m.M(n))
}