Skip to content

Commit

Permalink
br: use common scan-split-scatter logic (pingcap#52433)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored and 3AceShowHand committed Apr 16, 2024
1 parent 66f93c3 commit b7217f9
Show file tree
Hide file tree
Showing 19 changed files with 201 additions and 272 deletions.
17 changes: 11 additions & 6 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,13 @@ func NewRestoreClient(
pdHTTPCli pdhttp.Client,
tlsConf *tls.Config,
keepaliveConf keepalive.ClientParameters,
isRawKv bool,
) *Client {
return &Client{
pdClient: pdClient,
pdHTTPClient: pdHTTPCli,
toolClient: split.NewSplitClient(pdClient, pdHTTPCli, tlsConf, isRawKv, maxSplitKeysOnce),
pdClient: pdClient,
pdHTTPClient: pdHTTPCli,
// toolClient reuse the split.SplitClient to do miscellaneous things. It doesn't
// call split related functions so set the arguments to arbitrary values.
toolClient: split.NewClient(pdClient, pdHTTPCli, tlsConf, maxSplitKeysOnce, 3),
tlsConf: tlsConf,
keepaliveConf: keepaliveConf,
switchCh: make(chan struct{}),
Expand Down Expand Up @@ -555,7 +556,11 @@ func (rc *Client) InitClients(ctx context.Context, backend *backuppb.StorageBack
useTokenBucket = true
}

metaClient := split.NewSplitClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, isRawKvMode, maxSplitKeysOnce)
var splitClientOpts []split.ClientOptionalParameter
if isRawKvMode {
splitClientOpts = append(splitClientOpts, split.WithRawKV())
}
metaClient := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, rc.GetStoreCount()+1, splitClientOpts...)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, stores, rc.rewriteMode, concurrencyPerStore, useTokenBucket)
}
Expand Down Expand Up @@ -1423,7 +1428,7 @@ func (rc *Client) WrapLogFilesIterWithSplitHelper(logIter LogIter, rules map[int
execCtx := se.GetSessionCtx().GetRestrictedSQLExecutor()
splitSize, splitKeys := utils.GetRegionSplitInfo(execCtx)
log.Info("get split threshold from tikv config", zap.Uint64("split-size", splitSize), zap.Int64("split-keys", splitKeys))
client := split.NewSplitClient(rc.GetPDClient(), rc.pdHTTPClient, rc.GetTLSConfig(), false, maxSplitKeysOnce)
client := split.NewClient(rc.GetPDClient(), rc.pdHTTPClient, rc.GetTLSConfig(), maxSplitKeysOnce, 3)
return NewLogFilesIterWithSplitHelper(logIter, rules, client, splitSize, splitKeys), nil
}

Expand Down
32 changes: 16 additions & 16 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var defaultKeepaliveCfg = keepalive.ClientParameters{
func TestCreateTables(t *testing.T) {
m := mc
g := gluetidb.New()
client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, m.Storage)
require.NoError(t, err)

Expand Down Expand Up @@ -106,7 +106,7 @@ func TestCreateTables(t *testing.T) {
func TestIsOnline(t *testing.T) {
m := mc
g := gluetidb.New()
client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, m.Storage)
require.NoError(t, err)

Expand All @@ -130,7 +130,7 @@ func TestNeedCheckTargetClusterFresh(t *testing.T) {
defer cluster.Stop()

g := gluetidb.New()
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, cluster.Storage)
require.NoError(t, err)

Expand Down Expand Up @@ -160,7 +160,7 @@ func TestCheckTargetClusterFresh(t *testing.T) {
defer cluster.Stop()

g := gluetidb.New()
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, cluster.Storage)
require.NoError(t, err)

Expand All @@ -177,7 +177,7 @@ func TestCheckTargetClusterFreshWithTable(t *testing.T) {
defer cluster.Stop()

g := gluetidb.New()
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, cluster.Storage)
require.NoError(t, err)

Expand Down Expand Up @@ -212,7 +212,7 @@ func TestCheckTargetClusterFreshWithTable(t *testing.T) {
func TestCheckSysTableCompatibility(t *testing.T) {
cluster := mc
g := gluetidb.New()
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, cluster.Storage)
require.NoError(t, err)

Expand Down Expand Up @@ -288,7 +288,7 @@ func TestCheckSysTableCompatibility(t *testing.T) {
func TestInitFullClusterRestore(t *testing.T) {
cluster := mc
g := gluetidb.New()
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, cluster.Storage)
require.NoError(t, err)

Expand All @@ -313,7 +313,7 @@ func TestInitFullClusterRestore(t *testing.T) {
func TestPreCheckTableClusterIndex(t *testing.T) {
m := mc
g := gluetidb.New()
client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, m.Storage)
require.NoError(t, err)

Expand Down Expand Up @@ -402,23 +402,23 @@ func TestGetTSWithRetry(t *testing.T) {
t.Run("PD leader is healthy:", func(t *testing.T) {
retryTimes := -1000
pDClient := fakePDClient{notLeader: false, retryTimes: &retryTimes}
client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg)
_, err := client.GetTSWithRetry(context.Background())
require.NoError(t, err)
})

t.Run("PD leader failure:", func(t *testing.T) {
retryTimes := -1000
pDClient := fakePDClient{notLeader: true, retryTimes: &retryTimes}
client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg)
_, err := client.GetTSWithRetry(context.Background())
require.Error(t, err)
})

t.Run("PD leader switch successfully", func(t *testing.T) {
retryTimes := 0
pDClient := fakePDClient{notLeader: true, retryTimes: &retryTimes}
client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg)
_, err := client.GetTSWithRetry(context.Background())
require.NoError(t, err)
})
Expand Down Expand Up @@ -450,7 +450,7 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) {
g := gluetidb.New()
client := restore.NewRestoreClient(fakePDClient{
stores: mockStores,
}, nil, nil, defaultKeepaliveCfg, false)
}, nil, nil, defaultKeepaliveCfg)
err := client.Init(g, m.Storage)
require.NoError(t, err)

Expand Down Expand Up @@ -574,7 +574,7 @@ func TestSetSpeedLimit(t *testing.T) {
// 1. The cost of concurrent communication is expected to be less than the cost of serial communication.
client := restore.NewRestoreClient(fakePDClient{
stores: mockStores,
}, nil, nil, defaultKeepaliveCfg, false)
}, nil, nil, defaultKeepaliveCfg)
ctx := context.Background()

recordStores = NewRecordStores()
Expand All @@ -600,7 +600,7 @@ func TestSetSpeedLimit(t *testing.T) {
mockStores[5].Id = SET_SPEED_LIMIT_ERROR // setting a fault store
client = restore.NewRestoreClient(fakePDClient{
stores: mockStores,
}, nil, nil, defaultKeepaliveCfg, false)
}, nil, nil, defaultKeepaliveCfg)

// Concurrency needs to be less than the number of stores
err = restore.MockCallSetSpeedLimit(ctx, FakeImporterClient{}, client, 2)
Expand Down Expand Up @@ -680,7 +680,7 @@ func TestDeleteRangeQuery(t *testing.T) {
g := gluetidb.New()
client := restore.NewRestoreClient(fakePDClient{
stores: mockStores,
}, nil, nil, defaultKeepaliveCfg, false)
}, nil, nil, defaultKeepaliveCfg)
err := client.Init(g, m.Storage)
require.NoError(t, err)

Expand Down Expand Up @@ -730,7 +730,7 @@ func TestDeleteRangeQueryExec(t *testing.T) {
client := restore.NewRestoreClient(fakePDClient{
stores: mockStores,
retryTimes: &retryCnt,
}, nil, nil, defaultKeepaliveCfg, false)
}, nil, nil, defaultKeepaliveCfg)
err := client.Init(g, m.Storage)
require.NoError(t, err)

Expand Down
37 changes: 37 additions & 0 deletions br/pkg/restore/import_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,43 @@ func assertRegions(t *testing.T, regions []*split.RegionInfo, keys ...string) {
}
}

// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
func initTestClient(isRawKv bool) *TestClient {
peers := make([]*metapb.Peer, 1)
peers[0] = &metapb.Peer{
Id: 1,
StoreId: 1,
}
keys := [6]string{"", "aay", "bba", "bbh", "cca", ""}
regions := make(map[uint64]*split.RegionInfo)
for i := uint64(1); i < 6; i++ {
startKey := []byte(keys[i-1])
if len(startKey) != 0 {
startKey = codec.EncodeBytesExt([]byte{}, startKey, isRawKv)
}
endKey := []byte(keys[i])
if len(endKey) != 0 {
endKey = codec.EncodeBytesExt([]byte{}, endKey, isRawKv)
}
regions[i] = &split.RegionInfo{
Leader: &metapb.Peer{
Id: i,
},
Region: &metapb.Region{
Id: i,
Peers: peers,
StartKey: startKey,
EndKey: endKey,
},
}
}
stores := make(map[uint64]*metapb.Store)
stores[1] = &metapb.Store{
Id: 1,
}
return NewTestClient(stores, regions, 6)
}

func TestScanSuccess(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
Expand Down
92 changes: 7 additions & 85 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ const (
maxSplitKeysOnce = 10240
)

type SplitContext struct {
isRawKv bool
storeCount int
onSplit OnSplitFunc
}

// RegionSplitter is a executor of region split by rules.
type RegionSplitter struct {
client split.SplitClient
Expand All @@ -66,9 +60,6 @@ type OnSplitFunc func(key [][]byte)
func (rs *RegionSplitter) ExecuteSplit(
ctx context.Context,
ranges []rtree.Range,
storeCount int,
isRawKv bool,
onSplit OnSplitFunc,
) error {
if len(ranges) == 0 {
log.Info("skip split regions, no range")
Expand Down Expand Up @@ -97,22 +88,12 @@ func (rs *RegionSplitter) ExecuteSplit(
sortedKeys = append(sortedKeys, r.EndKey)
totalRangeSize += r.Size
}
// need use first range's start key to scan region
// and the range size must be greater than 0 here
scanStartKey := sortedRanges[0].StartKey
sctx := SplitContext{
isRawKv: isRawKv,
onSplit: onSplit,
storeCount: storeCount,
}
// the range size must be greater than 0 here
return rs.executeSplitByRanges(ctx, sctx, scanStartKey, sortedKeys)
return rs.executeSplitByRanges(ctx, sortedKeys)
}

func (rs *RegionSplitter) executeSplitByRanges(
ctx context.Context,
splitContext SplitContext,
scanStartKey []byte,
sortedKeys [][]byte,
) error {
startTime := time.Now()
Expand All @@ -125,14 +106,14 @@ func (rs *RegionSplitter) executeSplitByRanges(
roughSortedSplitKeys = append(roughSortedSplitKeys, sortedKeys[curRegionIndex])
}
if len(roughSortedSplitKeys) > 0 {
if err := rs.executeSplitByKeys(ctx, splitContext, scanStartKey, roughSortedSplitKeys); err != nil {
if err := rs.executeSplitByKeys(ctx, roughSortedSplitKeys); err != nil {
return errors.Trace(err)
}
}
log.Info("finish spliting regions roughly", zap.Duration("take", time.Since(startTime)))

// Then send split requests to each TiKV.
if err := rs.executeSplitByKeys(ctx, splitContext, scanStartKey, sortedKeys); err != nil {
if err := rs.executeSplitByKeys(ctx, sortedKeys); err != nil {
return errors.Trace(err)
}

Expand All @@ -143,65 +124,13 @@ func (rs *RegionSplitter) executeSplitByRanges(
// executeSplitByKeys will split regions by **sorted** keys with following steps.
// 1. locate regions with correspond keys.
// 2. split these regions with correspond keys.
// 3. make sure new splitted regions are balanced.
// 3. make sure new split regions are balanced.
func (rs *RegionSplitter) executeSplitByKeys(
ctx context.Context,
splitContext SplitContext,
scanStartKey []byte,
sortedKeys [][]byte,
) error {
var mutex sync.Mutex
startTime := time.Now()
minKey := codec.EncodeBytesExt(nil, scanStartKey, splitContext.isRawKv)
maxKey := codec.EncodeBytesExt(nil, sortedKeys[len(sortedKeys)-1], splitContext.isRawKv)
scatterRegions := make([]*split.RegionInfo, 0)
regionsMap := make(map[uint64]*split.RegionInfo)

err := utils.WithRetry(ctx, func() error {
clear(regionsMap)
regions, err := split.PaginateScanRegion(ctx, rs.client, minKey, maxKey, split.ScanRegionPaginationLimit)
if err != nil {
return err
}
splitKeyMap := split.GetSplitKeysOfRegions(sortedKeys, regions, splitContext.isRawKv)
workerPool := util.NewWorkerPool(uint(splitContext.storeCount)+1, "split keys")
eg, ectx := errgroup.WithContext(ctx)
for region, splitKeys := range splitKeyMap {
region := region
keys := splitKeys
sctx := splitContext
workerPool.ApplyOnErrorGroup(eg, func() error {
log.Info("get split keys for split regions",
logutil.Region(region.Region), logutil.Keys(keys))
newRegions, err := rs.splitAndScatterRegions(ectx, region, keys, sctx.isRawKv)
if err != nil {
return err
}
if len(newRegions) != len(keys) {
log.Warn("split key count and new region count mismatch",
zap.Int("new region count", len(newRegions)),
zap.Int("split key count", len(keys)))
}
log.Info("scattered regions", zap.Int("count", len(newRegions)))
mutex.Lock()
for _, r := range newRegions {
regionsMap[r.Region.Id] = r
}
mutex.Unlock()
sctx.onSplit(keys)
return nil
})
}
err = eg.Wait()
if err != nil {
return err
}
for _, r := range regionsMap {
// merge all scatter regions
scatterRegions = append(scatterRegions, r)
}
return nil
}, newSplitBackoffer())
scatterRegions, err := rs.client.SplitKeysAndScatter(ctx, sortedKeys)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -215,13 +144,6 @@ func (rs *RegionSplitter) executeSplitByKeys(
return nil
}

func (rs *RegionSplitter) splitAndScatterRegions(
ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte, _ bool,
) ([]*split.RegionInfo, error) {
newRegions, err := rs.client.SplitWaitAndScatter(ctx, regionInfo, keys)
return newRegions, err
}

// waitRegionsScattered try to wait mutilple regions scatterd in 3 minutes.
// this could timeout, but if many regions scatterd the restore could continue
// so we don't wait long time here.
Expand Down Expand Up @@ -409,7 +331,7 @@ func (helper *LogSplitHelper) splitRegionByPoints(
}

helper.pool.ApplyOnErrorGroup(helper.eg, func() error {
newRegions, errSplit := regionSplitter.splitAndScatterRegions(ctx, region, splitPoints, false)
newRegions, errSplit := regionSplitter.client.SplitWaitAndScatter(ctx, region, splitPoints)
if errSplit != nil {
log.Warn("failed to split the scaned region", zap.Error(errSplit))
_, startKey, _ := codec.DecodeBytes(region.Region.StartKey, nil)
Expand All @@ -419,7 +341,7 @@ func (helper *LogSplitHelper) splitRegionByPoints(
startKey = point
}

return regionSplitter.ExecuteSplit(ctx, ranges, 3, false, func([][]byte) {})
return regionSplitter.ExecuteSplit(ctx, ranges)
}
select {
case <-ctx.Done():
Expand Down

0 comments on commit b7217f9

Please sign in to comment.