forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 1
/
restartable_result_reader.go
359 lines (315 loc) · 12.1 KB
/
restartable_result_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
// Copyright 2016, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package worker
import (
"bytes"
"fmt"
"io"
"strings"
"time"
"golang.org/x/net/context"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/vttablet/queryservice"
"github.com/youtube/vitess/go/vt/vttablet/tabletconn"
querypb "github.com/youtube/vitess/go/vt/proto/query"
tabletmanagerdatapb "github.com/youtube/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// RestartableResultReader will stream all rows within a chunk.
// If the streaming query gets interrupted, it can resume the stream after
// the last row which was read.
type RestartableResultReader struct {
ctx context.Context
logger logutil.Logger
tp tabletProvider
// td is used to get the list of primary key columns at a restart.
td *tabletmanagerdatapb.TableDefinition
chunk chunk
// allowMultipleRetries is true if we are allowed to retry more than once.
allowMultipleRetries bool
query string
tablet *topodatapb.Tablet
conn queryservice.QueryService
fields []*querypb.Field
output sqltypes.ResultStream
// lastRow is used during a restart to determine after which row the restart
// should start.
lastRow []sqltypes.Value
}
// NewRestartableResultReader creates a new RestartableResultReader for
// the provided tablet and chunk.
// It will automatically create the necessary query to read all rows within
// the chunk.
// NOTE: We assume that the Columns field in "td" was ordered by a preceding
// call to reorderColumnsPrimaryKeyFirst().
func NewRestartableResultReader(ctx context.Context, logger logutil.Logger, tp tabletProvider, td *tabletmanagerdatapb.TableDefinition, chunk chunk, allowMultipleRetries bool) (*RestartableResultReader, error) {
r := &RestartableResultReader{
ctx: ctx,
logger: logger,
tp: tp,
td: td,
chunk: chunk,
allowMultipleRetries: allowMultipleRetries,
}
// If the initial connection fails, we do not restart.
if _ /* retryable */, err := r.getTablet(); err != nil {
return nil, fmt.Errorf("tablet=unknown: %v", err)
}
if _ /* retryable */, err := r.startStream(); err != nil {
return nil, fmt.Errorf("tablet=%v: %v", topoproto.TabletAliasString(r.tablet.Alias), err)
}
return r, nil
}
// getTablet (re)sets the tablet which is used for the streaming query.
// If the method returns an error, the first return value specifies if it is
// okay to retry.
func (r *RestartableResultReader) getTablet() (bool, error) {
if r.tablet != nil {
// If there was a tablet before, return it to the tabletProvider.
r.Close(r.ctx)
r.tablet = nil
r.conn = nil
r.fields = nil
r.output = nil
}
// Get a tablet from the tablet provider.
tablet, err := r.tp.getTablet()
if err != nil {
return true /* retryable */, fmt.Errorf("failed get tablet for streaming query: %v", err)
}
// Connect (dial) to the tablet.
conn, err := tabletconn.GetDialer()(tablet, *remoteActionsTimeout)
if err != nil {
return false /* retryable */, fmt.Errorf("failed to get dialer for tablet: %v", err)
}
r.tablet = tablet
r.conn = conn
return false /* retryable */, nil
}
// startStream assumes that getTablet() was succesfully called before and now
// tries to connect to the set tablet and start the streaming query.
// If the method returns an error, the first return value specifies if it is
// okay to retry.
func (r *RestartableResultReader) startStream() (bool, error) {
// Start the streaming query.
r.generateQuery()
stream := queryservice.ExecuteWithStreamer(r.ctx, r.conn, &querypb.Target{
Keyspace: r.tablet.Keyspace,
Shard: r.tablet.Shard,
TabletType: r.tablet.Type,
}, r.query, make(map[string]interface{}), nil)
// Read the fields information.
cols, err := stream.Recv()
if err != nil {
return true /* retryable */, fmt.Errorf("cannot read Fields for query '%v': %v", r.query, err)
}
r.fields = cols.Fields
r.output = stream
alias := topoproto.TabletAliasString(r.tablet.Alias)
statsStreamingQueryCounters.Add(alias, 1)
log.V(2).Infof("tablet=%v table=%v chunk=%v: Starting to stream rows using query '%v'.", alias, r.td.Name, r.chunk, r.query)
return false, nil
}
// Next returns the next result on the stream. It implements ResultReader.
func (r *RestartableResultReader) Next() (*sqltypes.Result, error) {
result, err := r.output.Recv()
if err != nil && err != io.EOF {
// We start the retries only on the second attempt to avoid the cost
// of starting a timer (for the retry timeout) for every Next() call
// when no error occurs.
alias := topoproto.TabletAliasString(r.tablet.Alias)
statsStreamingQueryErrorsCounters.Add(alias, 1)
log.V(2).Infof("tablet=%v table=%v chunk=%v: Failed to read next rows from active streaming query. Trying to restart stream on the same tablet. Original Error: %v", alias, r.td.Name, r.chunk, err)
result, err = r.nextWithRetries()
}
if result != nil && len(result.Rows) > 0 {
r.lastRow = result.Rows[len(result.Rows)-1]
}
return result, err
}
func (r *RestartableResultReader) nextWithRetries() (*sqltypes.Result, error) {
// In case of errors we will keep retrying until retryCtx is done.
retryCtx, retryCancel := context.WithTimeout(r.ctx, *retryDuration)
defer retryCancel()
// Note: The first retry will be the second attempt.
attempt := 1
start := time.Now()
for {
attempt++
var result *sqltypes.Result
var retryable bool
var err error
if attempt > 2 {
// Do not failover to a different tablet at the 2nd attempt, but always
// after that.
// That's because the first restart is meant to fix transient problems
// which go away at the next retry. For example, when MySQL killed the
// vttablet connection due to net_write_timeout being reached.
retryable, err = r.getTablet()
if err != nil {
if !retryable {
r.logger.Errorf("table=%v chunk=%v: Failed to restart streaming query (attempt %d) and failover to a different tablet (%v) due to a non-retryable error: %v", r.td.Name, r.chunk, attempt, r.tablet, err)
return nil, err
}
goto retry
}
}
if attempt > 1 {
// Restart streaming query.
retryable, err = r.startStream()
if err != nil {
if !retryable {
r.logger.Errorf("tablet=%v table=%v chunk=%v: Failed to restart streaming query (attempt %d) with query '%v' and stopped due to a non-retryable error: %v", topoproto.TabletAliasString(r.tablet.Alias), r.td.Name, r.chunk, attempt, r.query, err)
return nil, err
}
goto retry
}
}
result, err = r.output.Recv()
if err == nil || err == io.EOF {
alias := topoproto.TabletAliasString(r.tablet.Alias)
log.V(2).Infof("tablet=%v table=%v chunk=%v: Successfully restarted streaming query with query '%v' after %.1f seconds.", alias, r.td.Name, r.chunk, r.query, time.Now().Sub(start).Seconds())
if attempt == 2 {
statsStreamingQueryRestartsSameTabletCounters.Add(alias, 1)
} else {
statsStreamingQueryRestartsDifferentTablet.Add(1)
}
// Recv() was successful.
return result, err
}
retry:
if attempt == 2 && !r.allowMultipleRetries {
// Offline source tablets must not be retried forever. Fail early.
return nil, fmt.Errorf("%v: first retry to restart the streaming query on the same tablet failed. We're failing at this point because we're not allowed to keep retrying. err: %v", r.tp.description(), err)
}
alias := "unknown"
if r.tablet != nil {
alias = topoproto.TabletAliasString(r.tablet.Alias)
// tablet may be nil if e.g. the HealthCheck module currently does not
// return a tablet.
statsStreamingQueryErrorsCounters.Add(alias, 1)
}
deadline, _ := retryCtx.Deadline()
log.V(2).Infof("tablet=%v table=%v chunk=%v: Failed to restart streaming query (attempt %d) with query '%v'. Retrying to restart stream on a different tablet (for up to %.1f minutes). Next retry is in %.1f seconds. Error: %v", alias, r.td.Name, r.chunk, attempt, r.query, deadline.Sub(time.Now()).Minutes(), executeFetchRetryTime.Seconds(), err)
select {
case <-retryCtx.Done():
if retryCtx.Err() == context.DeadlineExceeded {
return nil, fmt.Errorf("%v: failed to restart the streaming connection after retrying for %v", r.tp.description(), *retryDuration)
}
return nil, fmt.Errorf("%v: interrupted (context error: %v) while trying to restart the streaming connection (%.1f minutes elapsed so far)", r.tp.description(), retryCtx.Err(), time.Now().Sub(start).Minutes())
case <-time.After(*executeFetchRetryTime):
// Make a pause between the retries to avoid hammering the servers.
}
}
}
// Fields returns the field data. It implements ResultReader.
func (r *RestartableResultReader) Fields() []*querypb.Field {
return r.fields
}
// Close closes the connection to the tablet.
func (r *RestartableResultReader) Close(ctx context.Context) {
if r.conn != nil {
r.conn.Close(ctx)
r.tp.returnTablet(r.tablet)
}
}
func (r *RestartableResultReader) generateQuery() {
query := "SELECT " + strings.Join(escapeAll(r.td.Columns), ",") + " FROM " + escape(r.td.Name)
// Build WHERE clauses.
var clauses []string
// start value.
if r.lastRow == nil {
// Initial query.
if !r.chunk.start.IsNull() {
var b bytes.Buffer
writeEscaped(&b, r.td.PrimaryKeyColumns[0])
b.WriteString(">=")
r.chunk.start.EncodeSQL(&b)
clauses = append(clauses, b.String())
}
} else {
// This is a restart. Read after the last row.
// Note that we don't have to be concerned that the new start might be > end
// because lastRow < end is always true. That's because the initial query
// had the clause 'WHERE PrimaryKeyColumns[0] < end'.
// TODO(mberlin): Write an e2e test to verify that restarts also work with
// string types and MySQL collation rules.
clauses = append(clauses, greaterThanTupleWhereClause(r.td.PrimaryKeyColumns, r.lastRow)...)
}
// end value.
if !r.chunk.end.IsNull() {
var b bytes.Buffer
writeEscaped(&b, r.td.PrimaryKeyColumns[0])
b.WriteString("<")
r.chunk.end.EncodeSQL(&b)
clauses = append(clauses, b.String())
}
if len(clauses) > 0 {
query += " WHERE " + strings.Join(clauses, " AND ")
}
if len(r.td.PrimaryKeyColumns) > 0 {
query += " ORDER BY " + strings.Join(escapeAll(r.td.PrimaryKeyColumns), ",")
}
r.query = query
}
// greaterThanTupleWhereClause builds a greater than (">") WHERE clause
// expression for the first "columns" in "row".
// The caller has to ensure that "columns" matches with the values in "row".
// Examples:
// one column: a > 1
// two columns: a>=1 AND (a,b) > (1,2)
// (Input for that would be: columns{"a", "b"}, row{1, 2}.)
// three columns: a>=1 AND (a,b,c) > (1,2,3)
//
// Note that we are using the short-form for row comparisons. This is defined
// by MySQL. See: http://dev.mysql.com/doc/refman/5.5/en/comparison-operators.html
// <quote>
// For row comparisons, (a, b) < (x, y) is equivalent to:
// (a < x) OR ((a = x) AND (b < y))
// </quote>
//
// NOTE: If there is more than one column, we add an extra clause for the
// first column because older MySQL versions seem to do a full table scan
// when we use the short-form. With the additional clause we skip the full
// table scan up the primary key we're interested it.
func greaterThanTupleWhereClause(columns []string, row []sqltypes.Value) []string {
var clauses []string
// Additional clause on the first column for multi-columns.
if len(columns) > 1 {
var b bytes.Buffer
writeEscaped(&b, columns[0])
b.WriteString(">=")
row[0].EncodeSQL(&b)
clauses = append(clauses, b.String())
}
var b bytes.Buffer
// List of columns.
if len(columns) > 1 {
b.WriteByte('(')
}
b.WriteString(strings.Join(escapeAll(columns), ","))
if len(columns) > 1 {
b.WriteByte(')')
}
// Operator.
b.WriteString(">")
// List of values.
if len(columns) > 1 {
b.WriteByte('(')
}
for i := 0; i < len(columns); i++ {
if i != 0 {
b.WriteByte(',')
}
row[i].EncodeSQL(&b)
}
if len(columns) > 1 {
b.WriteByte(')')
}
clauses = append(clauses, b.String())
return clauses
}