forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 13
/
rebuild.go
307 lines (277 loc) · 9.85 KB
/
rebuild.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package wrangler
import (
"bytes"
"encoding/hex"
"fmt"
"sync"
"github.com/youtube/vitess/go/vt/concurrency"
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/topotools"
"golang.org/x/net/context"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
)
// RebuildShardGraph rebuilds the serving and replication rollup data data while locking
// out other changes.
func (wr *Wrangler) RebuildShardGraph(ctx context.Context, keyspace, shard string, cells []string) (*topo.ShardInfo, error) {
return topotools.RebuildShard(ctx, wr.logger, wr.ts, keyspace, shard, cells, wr.lockTimeout)
}
// RebuildKeyspaceGraph rebuilds the serving graph data while locking out other changes.
// If some shards were recently read / updated, pass them in the cache so
// we don't read them again (and possible get stale replicated data)
func (wr *Wrangler) RebuildKeyspaceGraph(ctx context.Context, keyspace string, cells []string, rebuildSrvShards bool) error {
actionNode := actionnode.RebuildKeyspace()
lockPath, err := wr.lockKeyspace(ctx, keyspace, actionNode)
if err != nil {
return err
}
err = wr.rebuildKeyspace(ctx, keyspace, cells, rebuildSrvShards)
return wr.unlockKeyspace(ctx, keyspace, actionNode, lockPath, err)
}
// findCellsForRebuild will find all the cells in the given keyspace
// and create an entry if the map for them
func (wr *Wrangler) findCellsForRebuild(ki *topo.KeyspaceInfo, shardMap map[string]*topo.ShardInfo, cells []string, srvKeyspaceMap map[string]*pb.SrvKeyspace) {
for _, si := range shardMap {
for _, cell := range si.Cells {
if !topo.InCellList(cell, cells) {
continue
}
if _, ok := srvKeyspaceMap[cell]; !ok {
srvKeyspaceMap[cell] = &pb.SrvKeyspace{
ShardingColumnName: ki.ShardingColumnName,
ShardingColumnType: ki.ShardingColumnType,
ServedFrom: ki.ComputeCellServedFrom(cell),
SplitShardCount: ki.SplitShardCount,
}
}
}
}
}
// This function should only be used with an action lock on the keyspace
// - otherwise the consistency of the serving graph data can't be
// guaranteed.
//
// Take data from the global keyspace and rebuild the local serving
// copies in each cell.
func (wr *Wrangler) rebuildKeyspace(ctx context.Context, keyspace string, cells []string, rebuildSrvShards bool) error {
wr.logger.Infof("rebuildKeyspace %v", keyspace)
ki, err := wr.ts.GetKeyspace(ctx, keyspace)
if err != nil {
return err
}
var shardCache map[string]*topo.ShardInfo
if rebuildSrvShards {
shards, err := wr.ts.GetShardNames(ctx, keyspace)
if err != nil {
return nil
}
// Rebuild all shards in parallel, save the shards
shardCache = make(map[string]*topo.ShardInfo)
wg := sync.WaitGroup{}
mu := sync.Mutex{}
rec := concurrency.FirstErrorRecorder{}
for _, shard := range shards {
wg.Add(1)
go func(shard string) {
if shardInfo, err := wr.RebuildShardGraph(ctx, keyspace, shard, cells); err != nil {
rec.RecordError(fmt.Errorf("RebuildShardGraph failed: %v/%v %v", keyspace, shard, err))
} else {
mu.Lock()
shardCache[shard] = shardInfo
mu.Unlock()
}
wg.Done()
}(shard)
}
wg.Wait()
if rec.HasErrors() {
return rec.Error()
}
} else {
shardCache, err = wr.ts.FindAllShardsInKeyspace(ctx, keyspace)
if err != nil {
return err
}
}
// Build the list of cells to work on: we get the union
// of all the Cells of all the Shards, limited to the provided cells.
//
// srvKeyspaceMap is a map:
// key: cell
// value: topo.SrvKeyspace object being built
srvKeyspaceMap := make(map[string]*pb.SrvKeyspace)
wr.findCellsForRebuild(ki, shardCache, cells, srvKeyspaceMap)
// Then we add the cells from the keyspaces we might be 'ServedFrom'.
for _, ksf := range ki.ServedFroms {
servedFromShards, err := wr.ts.FindAllShardsInKeyspace(ctx, ksf.Keyspace)
if err != nil {
return err
}
wr.findCellsForRebuild(ki, servedFromShards, cells, srvKeyspaceMap)
}
// for each entry in the srvKeyspaceMap map, we do the following:
// - read the SrvShard structures for each shard / cell
// - if not present, build an empty one from global Shard
// - compute the union of the db types (replica, master, ...)
// - sort the shards in the list by range
// - check the ranges are compatible (no hole, covers everything)
for cell, srvKeyspace := range srvKeyspaceMap {
for _, si := range shardCache {
servedTypes := si.GetServedTypesPerCell(cell)
// for each type this shard is supposed to serve,
// add it to srvKeyspace.Partitions
for _, tabletType := range servedTypes {
partition := topoproto.SrvKeyspaceGetPartition(srvKeyspace, tabletType)
if partition == nil {
partition = &pb.SrvKeyspace_KeyspacePartition{
ServedType: tabletType,
}
srvKeyspace.Partitions = append(srvKeyspace.Partitions, partition)
}
partition.ShardReferences = append(partition.ShardReferences, &pb.ShardReference{
Name: si.ShardName(),
KeyRange: si.KeyRange,
})
}
}
if err := wr.orderAndCheckPartitions(cell, srvKeyspace); err != nil {
return err
}
}
// and then finally save the keyspace objects
for cell, srvKeyspace := range srvKeyspaceMap {
wr.logger.Infof("updating keyspace serving graph in cell %v for %v", cell, keyspace)
if err := wr.ts.UpdateSrvKeyspace(ctx, cell, keyspace, srvKeyspace); err != nil {
return fmt.Errorf("writing serving data failed: %v", err)
}
}
return nil
}
// orderAndCheckPartitions will re-order the partition list, and check
// it's correct.
func (wr *Wrangler) orderAndCheckPartitions(cell string, srvKeyspace *pb.SrvKeyspace) error {
// now check them all
for _, partition := range srvKeyspace.Partitions {
tabletType := partition.ServedType
topoproto.ShardReferenceArray(partition.ShardReferences).Sort()
// check the first Start is MinKey, the last End is MaxKey,
// and the values in between match: End[i] == Start[i+1]
first := partition.ShardReferences[0]
if first.KeyRange != nil && len(first.KeyRange.Start) != 0 {
return fmt.Errorf("keyspace partition for %v in cell %v does not start with min key", tabletType, cell)
}
last := partition.ShardReferences[len(partition.ShardReferences)-1]
if last.KeyRange != nil && len(last.KeyRange.End) != 0 {
return fmt.Errorf("keyspace partition for %v in cell %v does not end with max key", tabletType, cell)
}
for i := range partition.ShardReferences[0 : len(partition.ShardReferences)-1] {
fn := partition.ShardReferences[i].KeyRange == nil
sn := partition.ShardReferences[i+1].KeyRange == nil
if fn != sn {
return fmt.Errorf("shards with unconsistent KeyRanges for %v in cell %v at shard %v", tabletType, cell, i)
}
if fn {
// this is the custom sharding case, all KeyRanges must be nil
continue
}
if bytes.Compare(partition.ShardReferences[i].KeyRange.End, partition.ShardReferences[i+1].KeyRange.Start) != 0 {
return fmt.Errorf("non-contiguous KeyRange values for %v in cell %v at shard %v to %v: %v != %v", tabletType, cell, i, i+1, hex.EncodeToString(partition.ShardReferences[i].KeyRange.End), hex.EncodeToString(partition.ShardReferences[i+1].KeyRange.Start))
}
}
}
return nil
}
func strInList(sl []string, s string) bool {
for _, x := range sl {
if x == s {
return true
}
}
return false
}
// RebuildReplicationGraph is a quick and dirty tool to resurrect the TopologyServer data from the
// canonical data stored in the tablet nodes.
//
// cells: local vt cells to scan for all tablets
// keyspaces: list of keyspaces to rebuild
func (wr *Wrangler) RebuildReplicationGraph(ctx context.Context, cells []string, keyspaces []string) error {
if cells == nil || len(cells) == 0 {
return fmt.Errorf("must specify cells to rebuild replication graph")
}
if keyspaces == nil || len(keyspaces) == 0 {
return fmt.Errorf("must specify keyspaces to rebuild replication graph")
}
allTablets := make([]*topo.TabletInfo, 0, 1024)
for _, cell := range cells {
tablets, err := topotools.GetAllTablets(ctx, wr.ts, cell)
if err != nil {
return err
}
allTablets = append(allTablets, tablets...)
}
for _, keyspace := range keyspaces {
wr.logger.Infof("delete keyspace shards: %v", keyspace)
if err := wr.ts.DeleteKeyspaceShards(ctx, keyspace); err != nil {
return err
}
}
keyspacesToRebuild := make(map[string]bool)
shardsCreated := make(map[string]bool)
hasErr := false
mu := sync.Mutex{}
wg := sync.WaitGroup{}
for _, ti := range allTablets {
wg.Add(1)
go func(ti *topo.TabletInfo) {
defer wg.Done()
if !ti.IsInReplicationGraph() {
return
}
if !strInList(keyspaces, ti.Keyspace) {
return
}
mu.Lock()
keyspacesToRebuild[ti.Keyspace] = true
shardPath := ti.Keyspace + "/" + ti.Shard
if !shardsCreated[shardPath] {
if err := wr.ts.CreateShard(ctx, ti.Keyspace, ti.Shard); err != nil && err != topo.ErrNodeExists {
wr.logger.Warningf("failed re-creating shard %v: %v", shardPath, err)
hasErr = true
} else {
shardsCreated[shardPath] = true
}
}
mu.Unlock()
err := topo.UpdateTabletReplicationData(ctx, wr.ts, ti.Tablet)
if err != nil {
mu.Lock()
hasErr = true
mu.Unlock()
wr.logger.Warningf("failed updating replication data: %v", err)
}
}(ti)
}
wg.Wait()
for keyspace := range keyspacesToRebuild {
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
if err := wr.RebuildKeyspaceGraph(ctx, keyspace, nil, true); err != nil {
mu.Lock()
hasErr = true
mu.Unlock()
wr.logger.Warningf("RebuildKeyspaceGraph(%v) failed: %v", keyspace, err)
return
}
}(keyspace)
}
wg.Wait()
if hasErr {
return fmt.Errorf("some errors occurred rebuilding replication graph, consult log")
}
return nil
}