forked from dolthub/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
topo_utils.go
149 lines (129 loc) · 6.47 KB
/
topo_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package worker
import (
"flag"
"fmt"
"math/rand"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/discovery"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/wrangler"
"golang.org/x/net/context"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
var (
// waitForHealthyTabletsTimeout intends to wait for the
// healthcheck to automatically return rdonly instances which
// have been taken out by previous *Clone or *Diff runs.
// Therefore, the default for this variable must be higher
// than vttablet's -health_check_interval.
waitForHealthyTabletsTimeout = flag.Duration("wait_for_healthy_rdonly_tablets_timeout", 60*time.Second, "maximum time to wait at the start if less than --min_healthy_rdonly_tablets are available")
)
// FindHealthyRdonlyTablet returns a random healthy RDONLY tablet.
// Since we don't want to use them all, we require at least
// minHealthyRdonlyTablets servers to be healthy.
// May block up to -wait_for_healthy_rdonly_tablets_timeout.
func FindHealthyRdonlyTablet(ctx context.Context, wr *wrangler.Wrangler, tsc *discovery.TabletStatsCache, cell, keyspace, shard string, minHealthyRdonlyTablets int) (*topodatapb.TabletAlias, error) {
if tsc == nil {
// No healthcheck instance provided. Create one.
healthCheck := discovery.NewHealthCheck(*remoteActionsTimeout, *healthcheckRetryDelay, *healthCheckTimeout)
tsc = discovery.NewTabletStatsCache(healthCheck, cell)
watcher := discovery.NewShardReplicationWatcher(wr.TopoServer(), healthCheck, cell, keyspace, shard, *healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency)
defer watcher.Stop()
defer healthCheck.Close()
}
healthyTablets, err := waitForHealthyRdonlyTablets(ctx, wr, tsc, cell, keyspace, shard, minHealthyRdonlyTablets, *waitForHealthyTabletsTimeout)
if err != nil {
return nil, err
}
// random server in the list is what we want
index := rand.Intn(len(healthyTablets))
return healthyTablets[index].Tablet.Alias, nil
}
func waitForHealthyRdonlyTablets(ctx context.Context, wr *wrangler.Wrangler, tsc *discovery.TabletStatsCache, cell, keyspace, shard string, minHealthyRdonlyTablets int, timeout time.Duration) ([]discovery.TabletStats, error) {
busywaitCtx, busywaitCancel := context.WithTimeout(ctx, timeout)
defer busywaitCancel()
start := time.Now()
deadlineForLog, _ := busywaitCtx.Deadline()
log.V(2).Infof("Waiting for enough healthy RDONLY tablets to become available in (%v,%v/%v). required: %v Waiting up to %.1f seconds.",
cell, keyspace, shard, minHealthyRdonlyTablets, deadlineForLog.Sub(time.Now()).Seconds())
// Wait for at least one RDONLY tablet initially before checking the list.
if err := tsc.WaitForTablets(busywaitCtx, cell, keyspace, shard, []topodatapb.TabletType{topodatapb.TabletType_RDONLY}); err != nil {
return nil, fmt.Errorf("error waiting for RDONLY tablets for (%v,%v/%v): %v", cell, keyspace, shard, err)
}
var healthyTablets []discovery.TabletStats
for {
select {
case <-busywaitCtx.Done():
return nil, fmt.Errorf("not enough healthy RDONLY tablets to choose from in (%v,%v/%v), have %v healthy ones, need at least %v Context error: %v",
cell, keyspace, shard, len(healthyTablets), minHealthyRdonlyTablets, busywaitCtx.Err())
default:
}
healthyTablets = discovery.RemoveUnhealthyTablets(tsc.GetTabletStats(keyspace, shard, topodatapb.TabletType_RDONLY))
if len(healthyTablets) >= minHealthyRdonlyTablets {
break
}
deadlineForLog, _ := busywaitCtx.Deadline()
wr.Logger().Infof("Waiting for enough healthy RDONLY tablets to become available (%v,%v/%v). available: %v required: %v Waiting up to %.1f more seconds.",
cell, keyspace, shard, len(healthyTablets), minHealthyRdonlyTablets, deadlineForLog.Sub(time.Now()).Seconds())
// Block for 1 second because 2 seconds is the -health_check_interval flag value in integration tests.
timer := time.NewTimer(1 * time.Second)
select {
case <-busywaitCtx.Done():
timer.Stop()
case <-timer.C:
}
}
log.V(2).Infof("At least %v healthy RDONLY tablets are available in (%v,%v/%v) (required: %v). Took %.1f seconds to find this out.",
len(healthyTablets), cell, keyspace, shard, minHealthyRdonlyTablets, time.Now().Sub(start).Seconds())
return healthyTablets, nil
}
// FindWorkerTablet will:
// - find a rdonly instance in the keyspace / shard
// - mark it as worker
// - tag it with our worker process
func FindWorkerTablet(ctx context.Context, wr *wrangler.Wrangler, cleaner *wrangler.Cleaner, tsc *discovery.TabletStatsCache, cell, keyspace, shard string, minHealthyRdonlyTablets int) (*topodatapb.TabletAlias, error) {
tabletAlias, err := FindHealthyRdonlyTablet(ctx, wr, tsc, cell, keyspace, shard, minHealthyRdonlyTablets)
if err != nil {
return nil, err
}
// We add the tag before calling ChangeSlaveType, so the destination
// vttablet reloads the worker URL when it reloads the tablet.
ourURL := servenv.ListeningURL.String()
wr.Logger().Infof("Adding tag[worker]=%v to tablet %v", ourURL, topoproto.TabletAliasString(tabletAlias))
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
_, err = wr.TopoServer().UpdateTabletFields(shortCtx, tabletAlias, func(tablet *topodatapb.Tablet) error {
if tablet.Tags == nil {
tablet.Tags = make(map[string]string)
}
tablet.Tags["worker"] = ourURL
tablet.Tags["drain_reason"] = "Used by vtworker"
return nil
})
cancel()
if err != nil {
return nil, err
}
// Using "defer" here because we remove the tag *before* calling
// ChangeSlaveType back, so we need to record this tag change after the change
// slave type change in the cleaner.
defer wrangler.RecordTabletTagAction(cleaner, tabletAlias, "worker", "")
defer wrangler.RecordTabletTagAction(cleaner, tabletAlias, "drain_reason", "")
wr.Logger().Infof("Changing tablet %v to '%v'", topoproto.TabletAliasString(tabletAlias), topodatapb.TabletType_DRAINED)
shortCtx, cancel = context.WithTimeout(ctx, *remoteActionsTimeout)
err = wr.ChangeSlaveType(shortCtx, tabletAlias, topodatapb.TabletType_DRAINED)
cancel()
if err != nil {
return nil, err
}
// Record a clean-up action to take the tablet back to rdonly.
wrangler.RecordChangeSlaveTypeAction(cleaner, tabletAlias, topodatapb.TabletType_DRAINED, topodatapb.TabletType_RDONLY)
return tabletAlias, nil
}
func init() {
rand.Seed(time.Now().UnixNano())
}