-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
send_optimizer.go
271 lines (244 loc) · 9.07 KB
/
send_optimizer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"bytes"
"encoding/binary"
"hash/crc32"
"time"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/descriptorpb"
)
// sendOptimizer handles the general task of optimizing AppendRowsRequest messages send to the backend.
//
// The general premise is that the ordering of AppendRowsRequests on a connection provides some opportunities
// to reduce payload size, thus potentially increasing throughput. Care must be taken, however, as deep inspection
// of requests is potentially more costly (in terms of CPU usage) than gains from reducing request sizes.
type sendOptimizer interface {
// signalReset is used to signal to the optimizer that the connection is freshly (re)opened, or that a previous
// send yielded an error.
signalReset()
// optimizeSend handles possible manipulation of a request, and triggers the send.
optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error
// isMultiplexing tracks if we've actually sent writes to more than a single stream on this connection.
isMultiplexing() bool
}
// verboseOptimizer is a primarily a testing optimizer that always sends the full request.
type verboseOptimizer struct {
}
func (vo *verboseOptimizer) signalReset() {
// This optimizer is stateless.
}
// optimizeSend populates a full request every time.
func (vo *verboseOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error {
return arc.Send(pw.constructFullRequest(true))
}
func (vo *verboseOptimizer) isMultiplexing() bool {
// we declare this no to ensure we always reconnect on schema changes.
return false
}
// simplexOptimizer is used for connections bearing AppendRowsRequest for only a single stream.
//
// The optimizations here are straightforward:
// * The first request on a connection is unmodified.
// * Subsequent requests can redact WriteStream, WriterSchema, and TraceID.
//
// Behavior of schema evolution differs based on the type of stream.
// * For an explicit stream, the connection must reconnect to signal schema change (handled in connection).
// * For default streams, the new descriptor (inside WriterSchema) can simply be sent.
type simplexOptimizer struct {
haveSent bool
}
func (so *simplexOptimizer) signalReset() {
so.haveSent = false
}
func (so *simplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error {
var err error
if so.haveSent {
// subsequent send, we can send the request unmodified.
err = arc.Send(pw.req)
} else {
// first request, build a full request.
err = arc.Send(pw.constructFullRequest(true))
}
so.haveSent = err == nil
return err
}
func (so *simplexOptimizer) isMultiplexing() bool {
// A simplex optimizer is not designed for multiplexing.
return false
}
// multiplexOptimizer is used for connections where requests for multiple default streams are sent on a common
// connection. Only default streams can currently be multiplexed.
//
// In this case, the optimizations are as follows:
// * We must send the WriteStream on all requests.
// * For sequential requests to the same stream, schema can be redacted after the first request.
// * Trace ID can be redacted from all requests after the first.
//
// Schema evolution is simply a case of sending the new WriterSchema as part of the request(s). No explicit
// reconnection is necessary.
type multiplexOptimizer struct {
prevStream string
prevTemplate *versionedTemplate
multiplexStreams bool
}
func (mo *multiplexOptimizer) signalReset() {
mo.prevStream = ""
mo.multiplexStreams = false
mo.prevTemplate = nil
}
func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error {
var err error
if mo.prevStream == "" {
// startup case, send a full request (with traceID).
req := pw.constructFullRequest(true)
err = arc.Send(req)
if err == nil {
mo.prevStream = req.GetWriteStream()
mo.prevTemplate = pw.reqTmpl
}
} else {
// We have a previous send. Determine if it's the same stream or a different one.
if mo.prevStream == pw.writeStreamID {
// add the stream ID to the optimized request, as multiplex-optimization wants it present.
if pw.req.GetWriteStream() == "" {
pw.req.WriteStream = pw.writeStreamID
}
// swapOnSuccess tracks if we need to update schema versions on successful send.
swapOnSuccess := false
req := pw.req
if mo.prevTemplate != nil {
if !mo.prevTemplate.Compatible(pw.reqTmpl) {
swapOnSuccess = true
req = pw.constructFullRequest(false) // full request minus traceID.
}
}
err = arc.Send(req)
if err == nil && swapOnSuccess {
mo.prevTemplate = pw.reqTmpl
}
} else {
// The previous send was for a different stream. Send a full request, minus traceId.
req := pw.constructFullRequest(false)
err = arc.Send(req)
if err == nil {
// Send successful. Update state to reflect this send is now the "previous" state.
mo.prevStream = pw.writeStreamID
mo.prevTemplate = pw.reqTmpl
}
// Also, note that we've sent traffic for multiple streams, which means the backend recognizes this
// is a multiplex stream as well.
mo.multiplexStreams = true
}
}
return err
}
func (mo *multiplexOptimizer) isMultiplexing() bool {
return mo.multiplexStreams
}
// versionedTemplate is used for faster comparison of the templated part of
// an AppendRowsRequest, which bears settings-like fields related to schema
// and default value configuration. Direct proto comparison through something
// like proto.Equal is far too expensive, so versionTemplate leverages a faster
// hash-based comparison to avoid the deep equality checks.
type versionedTemplate struct {
versionTime time.Time
hashVal uint32
tmpl *storagepb.AppendRowsRequest
}
func newVersionedTemplate() *versionedTemplate {
vt := &versionedTemplate{
versionTime: time.Now(),
tmpl: &storagepb.AppendRowsRequest{},
}
vt.computeHash()
return vt
}
// computeHash is an internal utility function for calculating the hash value
// for faster comparison.
func (vt *versionedTemplate) computeHash() {
buf := new(bytes.Buffer)
if b, err := proto.Marshal(vt.tmpl); err == nil {
buf.Write(b)
} else {
// if we fail to serialize the proto (unlikely), consume the timestamp for input instead.
binary.Write(buf, binary.LittleEndian, vt.versionTime.UnixNano())
}
vt.hashVal = crc32.ChecksumIEEE(buf.Bytes())
}
type templateRevisionF func(m *storagepb.AppendRowsRequest)
// revise makes a new versionedTemplate from the existing template, applying any changes.
// The original revision is returned if there's no effective difference after changes are
// applied.
func (vt *versionedTemplate) revise(changes ...templateRevisionF) *versionedTemplate {
before := vt
if before == nil {
before = newVersionedTemplate()
}
if len(changes) == 0 {
// if there's no changes, return the base revision immediately.
return before
}
out := &versionedTemplate{
versionTime: time.Now(),
tmpl: proto.Clone(before.tmpl).(*storagepb.AppendRowsRequest),
}
for _, r := range changes {
r(out.tmpl)
}
out.computeHash()
if out.Compatible(before) {
// The changes didn't yield an measured difference. Return the base revision to avoid
// possible connection churn from no-op revisions.
return before
}
return out
}
// Compatible is effectively a fast equality check, that relies on the hash value
// and avoids the potentially very costly deep comparison of the proto message templates.
func (vt *versionedTemplate) Compatible(other *versionedTemplate) bool {
if other == nil {
return vt == nil
}
return vt.hashVal == other.hashVal
}
func reviseProtoSchema(newSchema *descriptorpb.DescriptorProto) templateRevisionF {
return func(m *storagepb.AppendRowsRequest) {
if m != nil {
m.Rows = &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: proto.Clone(newSchema).(*descriptorpb.DescriptorProto),
},
},
}
}
}
}
func reviseMissingValueInterpretations(vi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) templateRevisionF {
return func(m *storagepb.AppendRowsRequest) {
if m != nil {
m.MissingValueInterpretations = vi
}
}
}
func reviseDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) templateRevisionF {
return func(m *storagepb.AppendRowsRequest) {
if m != nil {
m.DefaultMissingValueInterpretation = def
}
}
}