forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tablet_stats_cache_wait.go
132 lines (114 loc) · 3.43 KB
/
tablet_stats_cache_wait.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package discovery
import (
"sync"
"time"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/concurrency"
"github.com/youtube/vitess/go/vt/topo"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
)
var (
// how much to sleep between each check
waitAvailableTabletInterval = 100 * time.Millisecond
)
// keyspaceShard is a helper structure used internally
type keyspaceShard struct {
keyspace string
shard string
}
// WaitForTablets waits for at least one tablet in the given cell /
// keyspace / shard before returning. The tablets do not have to be healthy.
// It will return ctx.Err() if the context is canceled.
func (tc *TabletStatsCache) WaitForTablets(ctx context.Context, cell, keyspace, shard string, types []topodatapb.TabletType) error {
keyspaceShards := map[keyspaceShard]bool{
keyspaceShard{
keyspace: keyspace,
shard: shard,
}: true,
}
return tc.waitForTablets(ctx, keyspaceShards, types, false)
}
// WaitForAllServingTablets waits for at least one healthy serving tablet in
// the given cell for all keyspaces / shards before returning.
// It will return ctx.Err() if the context is canceled.
func (tc *TabletStatsCache) WaitForAllServingTablets(ctx context.Context, ts topo.SrvTopoServer, cell string, types []topodatapb.TabletType) error {
keyspaceShards, err := findAllKeyspaceShards(ctx, ts, cell)
if err != nil {
return err
}
return tc.waitForTablets(ctx, keyspaceShards, types, true)
}
// findAllKeyspaceShards goes through all serving shards in the topology
func findAllKeyspaceShards(ctx context.Context, ts topo.SrvTopoServer, cell string) (map[keyspaceShard]bool, error) {
ksNames, err := ts.GetSrvKeyspaceNames(ctx, cell)
if err != nil {
return nil, err
}
keyspaceShards := make(map[keyspaceShard]bool)
var wg sync.WaitGroup
var mu sync.Mutex
var errRecorder concurrency.AllErrorRecorder
for _, ksName := range ksNames {
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
// get SrvKeyspace for cell/keyspace
ks, err := ts.GetSrvKeyspace(ctx, cell, keyspace)
if err != nil {
errRecorder.RecordError(err)
return
}
// get all shard names that are used for serving
mu.Lock()
for _, ksPartition := range ks.Partitions {
for _, shard := range ksPartition.ShardReferences {
keyspaceShards[keyspaceShard{
keyspace: keyspace,
shard: shard.Name,
}] = true
}
}
mu.Unlock()
}(ksName)
}
wg.Wait()
if errRecorder.HasErrors() {
return nil, errRecorder.Error()
}
return keyspaceShards, nil
}
// waitForTablets is the internal method that polls for tablets
func (tc *TabletStatsCache) waitForTablets(ctx context.Context, keyspaceShards map[keyspaceShard]bool, types []topodatapb.TabletType, requireServing bool) error {
for {
for ks := range keyspaceShards {
allPresent := true
for _, tt := range types {
var stats []TabletStats
if requireServing {
stats = tc.GetHealthyTabletStats(ks.keyspace, ks.shard, tt)
} else {
stats = tc.GetTabletStats(ks.keyspace, ks.shard, tt)
}
if len(stats) == 0 {
allPresent = false
break
}
}
if allPresent {
delete(keyspaceShards, ks)
}
}
if len(keyspaceShards) == 0 {
// we found everything we needed
return nil
}
// Unblock after the sleep or when the context has expired.
timer := time.NewTimer(waitAvailableTabletInterval)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
}
}
}