forked from pingcap/br
/
pipeline_items.go
275 lines (232 loc) · 6.89 KB
/
pipeline_items.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package restore
import (
"context"
"sync"
"github.com/pingcap/br/pkg/metautil"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"go.uber.org/zap"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
)
const (
defaultChannelSize = 1024
)
// TableSink is the 'sink' of restored data by a sender.
type TableSink interface {
EmitTables(tables ...CreatedTable)
EmitError(error)
Close()
}
type chanTableSink struct {
outCh chan<- []CreatedTable
errCh chan<- error
}
func (sink chanTableSink) EmitTables(tables ...CreatedTable) {
sink.outCh <- tables
}
func (sink chanTableSink) EmitError(err error) {
sink.errCh <- err
}
func (sink chanTableSink) Close() {
// ErrCh may has multi sender part, don't close it.
close(sink.outCh)
}
// ContextManager is the struct to manage a TiKV 'context' for restore.
// Batcher will call Enter when any table should be restore on batch,
// so you can do some prepare work here(e.g. set placement rules for online restore).
type ContextManager interface {
// Enter make some tables 'enter' this context(a.k.a., prepare for restore).
Enter(ctx context.Context, tables []CreatedTable) error
// Leave make some tables 'leave' this context(a.k.a., restore is done, do some post-works).
Leave(ctx context.Context, tables []CreatedTable) error
// Close closes the context manager, sometimes when the manager is 'killed' and should do some cleanup
// it would be call.
Close(ctx context.Context)
}
// NewBRContextManager makes a BR context manager, that is,
// set placement rules for online restore when enter(see <splitPrepareWork>),
// unset them when leave.
func NewBRContextManager(client *Client) ContextManager {
return &brContextManager{
client: client,
hasTable: make(map[int64]CreatedTable),
}
}
type brContextManager struct {
client *Client
// This 'set' of table ID allow us to handle each table just once.
hasTable map[int64]CreatedTable
}
func (manager *brContextManager) Close(ctx context.Context) {
tbls := make([]*model.TableInfo, 0, len(manager.hasTable))
for _, tbl := range manager.hasTable {
tbls = append(tbls, tbl.Table)
}
splitPostWork(ctx, manager.client, tbls)
}
func (manager *brContextManager) Enter(ctx context.Context, tables []CreatedTable) error {
placementRuleTables := make([]*model.TableInfo, 0, len(tables))
for _, tbl := range tables {
if _, ok := manager.hasTable[tbl.Table.ID]; !ok {
placementRuleTables = append(placementRuleTables, tbl.Table)
}
manager.hasTable[tbl.Table.ID] = tbl
}
return splitPrepareWork(ctx, manager.client, placementRuleTables)
}
func (manager *brContextManager) Leave(ctx context.Context, tables []CreatedTable) error {
placementRuleTables := make([]*model.TableInfo, 0, len(tables))
for _, table := range tables {
placementRuleTables = append(placementRuleTables, table.Table)
}
splitPostWork(ctx, manager.client, placementRuleTables)
log.Info("restore table done", ZapTables(tables))
for _, tbl := range placementRuleTables {
delete(manager.hasTable, tbl.ID)
}
return nil
}
func splitPostWork(ctx context.Context, client *Client, tables []*model.TableInfo) {
err := client.ResetPlacementRules(ctx, tables)
if err != nil {
log.Warn("reset placement rules failed", zap.Error(err))
return
}
}
func splitPrepareWork(ctx context.Context, client *Client, tables []*model.TableInfo) error {
err := client.SetupPlacementRules(ctx, tables)
if err != nil {
log.Error("setup placement rules failed", zap.Error(err))
return errors.Trace(err)
}
err = client.WaitPlacementSchedule(ctx, tables)
if err != nil {
log.Error("wait placement schedule failed", zap.Error(err))
return errors.Trace(err)
}
return nil
}
// CreatedTable is a table created on restore process,
// but not yet filled with data.
type CreatedTable struct {
RewriteRule *RewriteRules
Table *model.TableInfo
OldTable *metautil.Table
}
// TableWithRange is a CreatedTable that has been bind to some of key ranges.
type TableWithRange struct {
CreatedTable
Range []rtree.Range
}
// Exhaust drains all remaining errors in the channel, into a slice of errors.
func Exhaust(ec <-chan error) []error {
out := make([]error, 0, len(ec))
for {
select {
case err := <-ec:
out = append(out, err)
default:
// errCh will NEVER be closed(ya see, it has multi sender-part),
// so we just consume the current backlog of this channel, then return.
return out
}
}
}
// BatchSender is the abstract of how the batcher send a batch.
type BatchSender interface {
// PutSink sets the sink of this sender, user to this interface promise
// call this function at least once before first call to `RestoreBatch`.
PutSink(sink TableSink)
// RestoreBatch will send the restore request.
RestoreBatch(ranges DrainResult)
Close()
}
type tikvSender struct {
client *Client
updateCh glue.Progress
sink TableSink
inCh chan<- DrainResult
wg *sync.WaitGroup
}
func (b *tikvSender) PutSink(sink TableSink) {
// don't worry about visibility, since we will call this before first call to
// RestoreBatch, which is a sync point.
b.sink = sink
}
func (b *tikvSender) RestoreBatch(ranges DrainResult) {
b.inCh <- ranges
}
// NewTiKVSender make a sender that send restore requests to TiKV.
func NewTiKVSender(
ctx context.Context,
cli *Client,
updateCh glue.Progress,
) (BatchSender, error) {
inCh := make(chan DrainResult, defaultChannelSize)
midCh := make(chan DrainResult, defaultChannelSize)
sender := &tikvSender{
client: cli,
updateCh: updateCh,
inCh: inCh,
wg: new(sync.WaitGroup),
}
sender.wg.Add(2)
go sender.splitWorker(ctx, inCh, midCh)
go sender.restoreWorker(ctx, midCh)
return sender, nil
}
func (b *tikvSender) splitWorker(ctx context.Context, ranges <-chan DrainResult, next chan<- DrainResult) {
defer log.Debug("split worker closed")
defer func() {
b.wg.Done()
close(next)
}()
for {
select {
case <-ctx.Done():
return
case result, ok := <-ranges:
if !ok {
return
}
if err := SplitRanges(ctx, b.client, result.Ranges, result.RewriteRules, b.updateCh); err != nil {
log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err))
b.sink.EmitError(err)
return
}
next <- result
}
}
}
func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan DrainResult) {
defer func() {
log.Debug("restore worker closed")
b.wg.Done()
b.sink.Close()
}()
for {
select {
case <-ctx.Done():
return
case result, ok := <-ranges:
if !ok {
return
}
files := result.Files()
if err := b.client.RestoreFiles(ctx, files, result.RewriteRules, b.updateCh); err != nil {
b.sink.EmitError(err)
return
}
log.Info("restore batch done", rtree.ZapRanges(result.Ranges))
b.sink.EmitTables(result.BlankTablesAfterSend...)
}
}
}
func (b *tikvSender) Close() {
close(b.inCh)
b.wg.Wait()
log.Debug("tikv sender closed")
}