forked from dolthub/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
executor.go
197 lines (180 loc) · 6.97 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package worker
import (
"fmt"
"time"
"github.com/youtube/vitess/go/vt/discovery"
"github.com/youtube/vitess/go/vt/throttler"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/wrangler"
"golang.org/x/net/context"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// executor takes care of the write-side of the copy.
// There is one executor for each destination shard and writer thread.
// To-be-written data will be passed in through a channel.
// The main purpose of this struct is to aggregate the objects which won't
// change during the execution and remove them from method signatures.
type executor struct {
wr *wrangler.Wrangler
healthCheck discovery.HealthCheck
throttler *throttler.Throttler
keyspace string
shard string
threadID int
// statsKey is the cached metric key which we need when we increment the stats
// variable when we get throttled.
statsKey []string
}
func newExecutor(wr *wrangler.Wrangler, healthCheck discovery.HealthCheck, throttler *throttler.Throttler, keyspace, shard string, threadID int) *executor {
return &executor{
wr: wr,
healthCheck: healthCheck,
throttler: throttler,
keyspace: keyspace,
shard: shard,
threadID: threadID,
statsKey: []string{keyspace, shard, fmt.Sprint(threadID)},
}
}
// fetchLoop loops over the provided insertChannel and sends the commands to the
// current master.
func (e *executor) fetchLoop(ctx context.Context, dbName string, insertChannel chan string) error {
for {
select {
case cmd, ok := <-insertChannel:
if !ok {
// no more to read, we're done
return nil
}
cmd = "INSERT INTO `" + dbName + "`." + cmd
if err := e.fetchWithRetries(ctx, cmd); err != nil {
return fmt.Errorf("ExecuteFetch failed: %v", err)
}
case <-ctx.Done():
// Doesn't really matter if this select gets starved, because the other case
// will also return an error due to executeFetch's context being closed. This case
// does prevent us from blocking indefinitely on insertChannel when the worker is canceled.
return nil
}
}
}
// fetchWithRetries will attempt to run ExecuteFetch for a single command, with
// a reasonably small timeout.
// If will keep retrying the ExecuteFetch (for a finite but longer duration) if
// it fails due to a timeout or a retriable application error.
//
// executeFetchWithRetries will always get the current MASTER tablet from the
// healthcheck instance. If no MASTER is available, it will keep retrying.
func (e *executor) fetchWithRetries(ctx context.Context, command string) error {
retryDuration := 2 * time.Hour
// We should keep retrying up until the retryCtx runs out.
retryCtx, retryCancel := context.WithTimeout(ctx, retryDuration)
defer retryCancel()
// Is this current attempt a retry of a previous attempt?
isRetry := false
for {
var master *discovery.TabletStats
var err error
// Get the current master from the HealthCheck.
masters := discovery.GetCurrentMaster(
e.healthCheck.GetTabletStatsFromTarget(e.keyspace, e.shard, topodatapb.TabletType_MASTER))
if len(masters) == 0 {
e.wr.Logger().Warningf("ExecuteFetch failed for keyspace/shard %v/%v because no MASTER is available; will retry until there is MASTER again", e.keyspace, e.shard)
statsRetryCount.Add(1)
statsRetryCounters.Add(retryCategoryNoMasterAvailable, 1)
goto retry
}
master = masters[0]
// Block if we are throttled.
if e.throttler != nil {
for {
backoff := e.throttler.Throttle(e.threadID)
if backoff == throttler.NotThrottled {
break
}
statsThrottledCounters.Add(e.statsKey, 1)
time.Sleep(backoff)
}
}
// Run the command (in a block since goto above does not allow to introduce
// new variables until the label is reached.)
{
tryCtx, cancel := context.WithTimeout(retryCtx, 2*time.Minute)
_, err = e.wr.TabletManagerClient().ExecuteFetchAsApp(tryCtx, master.Tablet, command, 0)
cancel()
if err == nil {
// success!
return nil
}
succeeded, finalErr := e.checkError(tryCtx, err, isRetry, master)
if succeeded {
// We can ignore the error and don't have to retry.
return nil
}
if finalErr != nil {
// Non-retryable error.
return finalErr
}
}
retry:
masterAlias := "no-master-was-available"
if master != nil {
masterAlias = topoproto.TabletAliasString(master.Tablet.Alias)
}
tabletString := fmt.Sprintf("%v (%v/%v)", masterAlias, e.keyspace, e.shard)
select {
case <-retryCtx.Done():
if retryCtx.Err() == context.DeadlineExceeded {
return fmt.Errorf("failed to connect to destination tablet %v after retrying for %v", tabletString, retryDuration)
}
return fmt.Errorf("interrupted while trying to run %v on tablet %v", command, tabletString)
case <-time.After(*executeFetchRetryTime):
// Retry 30s after the failure using the current master seen by the HealthCheck.
}
isRetry = true
}
}
// checkError returns true if the error can be ignored and the command
// succeeded, false if the error is retryable and a non-nil error if the
// command must not be retried.
func (e *executor) checkError(ctx context.Context, err error, isRetry bool, master *discovery.TabletStats) (bool, error) {
tabletString := fmt.Sprintf("%v (%v/%v)", topoproto.TabletAliasString(master.Tablet.Alias), e.keyspace, e.shard)
// first see if it was a context timeout.
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
e.wr.Logger().Warningf("ExecuteFetch failed on %v; will retry because it was a timeout error on the context", tabletString)
statsRetryCount.Add(1)
statsRetryCounters.Add(retryCategoryTimeoutError, 1)
return false, nil
}
default:
}
// If the ExecuteFetch call failed because of an application error, we will try to figure out why.
// We need to extract the MySQL error number, and will attempt to retry if we think the error is recoverable.
match := errExtract.FindStringSubmatch(err.Error())
var errNo string
if len(match) == 2 {
errNo = match[1]
}
switch {
case errNo == "1290":
e.wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL read-only error: %v", tabletString, err)
statsRetryCount.Add(1)
statsRetryCounters.Add(retryCategoryReadOnly, 1)
case errNo == "2002" || errNo == "2006":
e.wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL connection error: %v", tabletString, err)
statsRetryCount.Add(1)
statsRetryCounters.Add(retryCategoryConnectionError, 1)
case errNo == "1062":
if !isRetry {
return false, fmt.Errorf("ExecuteFetch failed on %v on the first attempt; not retrying as this is not a recoverable error: %v", tabletString, err)
}
e.wr.Logger().Infof("ExecuteFetch failed on %v with a duplicate entry error; marking this as a success, because of the likelihood that this query has already succeeded before being retried: %v", tabletString, err)
return true, nil
default:
// Unknown error.
return false, err
}
return false, nil
}