Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restore: support table partition #137

Merged
merged 13 commits into from Jan 7, 2020
24 changes: 18 additions & 6 deletions pkg/checksum/executor.go
Expand Up @@ -59,14 +59,26 @@ func buildChecksumRequest(
}

reqs := make([]*kv.Request, 0, (len(newTable.Indices)+1)*(len(partDefs)+1))
rs, err := buildRequest(newTable, newTable.ID, oldTable, startTS)
var oldTableID int64
if oldTable != nil {
oldTableID = oldTable.Schema.ID
}
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS)
if err != nil {
return nil, err
}
reqs = append(reqs, rs...)

for _, partDef := range partDefs {
rs, err := buildRequest(newTable, partDef.ID, oldTable, startTS)
var oldPartID int64
if oldTable != nil {
for _, oldPartDef := range oldTable.Schema.Partition.Definitions {
if oldPartDef.Name == partDef.Name {
oldPartID = oldPartDef.ID
}
}
}
rs, err := buildRequest(newTable, partDef.ID, oldTable, oldPartID, startTS)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -80,10 +92,11 @@ func buildRequest(
tableInfo *model.TableInfo,
tableID int64,
oldTable *utils.Table,
oldTableID int64,
startTS uint64,
) ([]*kv.Request, error) {
reqs := make([]*kv.Request, 0)
req, err := buildTableRequest(tableID, oldTable, startTS)
req, err := buildTableRequest(tableID, oldTable, oldTableID, startTS)
if err != nil {
return nil, err
}
Expand All @@ -93,13 +106,11 @@ func buildRequest(
if indexInfo.State != model.StatePublic {
continue
}
var oldTableID int64
var oldIndexInfo *model.IndexInfo
if oldTable != nil {
for _, oldIndex := range oldTable.Schema.Indices {
if oldIndex.Name == indexInfo.Name {
oldIndexInfo = oldIndex
oldTableID = oldTable.Schema.ID
break
}
}
Expand All @@ -124,12 +135,13 @@ func buildRequest(
func buildTableRequest(
tableID int64,
oldTable *utils.Table,
oldTableID int64,
startTS uint64,
) (*kv.Request, error) {
var rule *tipb.ChecksumRewriteRule
if oldTable != nil {
rule = &tipb.ChecksumRewriteRule{
OldPrefix: tablecodec.GenTableRecordPrefix(oldTable.Schema.ID),
OldPrefix: tablecodec.GenTableRecordPrefix(oldTableID),
NewPrefix: tablecodec.GenTableRecordPrefix(tableID),
}
}
Expand Down
23 changes: 1 addition & 22 deletions pkg/restore/client.go
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math"
"sort"
"sync"
"time"

Expand All @@ -17,7 +16,6 @@ import (
restore_util "github.com/pingcap/tidb-tools/pkg/restore-util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
Expand Down Expand Up @@ -200,11 +198,6 @@ func (rc *Client) CreateTables(
Data: make([]*import_sstpb.RewriteRule, 0),
}
newTables := make([]*model.TableInfo, 0, len(tables))
// Sort the tables by id for ensuring the new tables has same id ordering as the old tables.
// We require this constrain since newTableID of tableID+1 must be not bigger
// than newTableID of tableID.
sort.Sort(utils.Tables(tables))
Copy link
Member

@overvenus overvenus Jan 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove utils.Tables?

tableIDMap := make(map[int64]int64)
for _, table := range tables {
err := rc.db.CreateTable(rc.ctx, table)
if err != nil {
Expand All @@ -215,21 +208,10 @@ func (rc *Client) CreateTables(
return nil, nil, err
}
rules := GetRewriteRules(newTableInfo, table.Schema, newTS)
tableIDMap[table.Schema.ID] = newTableInfo.ID
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
newTables = append(newTables, newTableInfo)
}
// If tableID + 1 has already exist, then we don't need to add a new rewrite rule for it.
for oldID, newID := range tableIDMap {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update validate.go since we change the rewrite process.

if _, ok := tableIDMap[oldID+1]; !ok {
rewriteRules.Table = append(rewriteRules.Table, &import_sstpb.RewriteRule{
OldKeyPrefix: tablecodec.EncodeTablePrefix(oldID + 1),
NewKeyPrefix: tablecodec.EncodeTablePrefix(newID + 1),
NewTimestamp: newTS,
})
}
}
return rewriteRules, newTables, nil
}

Expand Down Expand Up @@ -277,9 +259,6 @@ func (rc *Client) RestoreTable(
errCh := make(chan error, len(table.Files))
wg := new(sync.WaitGroup)
defer close(errCh)
// We should encode the rewrite rewriteRules before using it to import files
encodedRules := encodeRewriteRules(rewriteRules)

err = rc.setSpeedLimit()
if err != nil {
return err
Expand All @@ -294,7 +273,7 @@ func (rc *Client) RestoreTable(
select {
case <-rc.ctx.Done():
errCh <- nil
case errCh <- rc.fileImporter.Import(fileReplica, encodedRules):
case errCh <- rc.fileImporter.Import(fileReplica, rewriteRules):
updateCh <- struct{}{}
}
})
Expand Down
1 change: 0 additions & 1 deletion pkg/restore/client_test.go
Expand Up @@ -85,7 +85,6 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) {

for i := 0; i < len(tables); i++ {
c.Assert(oldTableIDExist[int64(i)], IsTrue, Commentf("table rule does not exist"))
c.Assert(oldTableIDExist[int64(i+1)], IsTrue, Commentf("table rule does not exist"))
}
}

Expand Down
58 changes: 40 additions & 18 deletions pkg/restore/import.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/pkg/codec"
restore_util "github.com/pingcap/tidb-tools/pkg/restore-util"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -163,25 +164,26 @@ func NewFileImporter(
// Import tries to import a file.
// All rules must contain encoded keys.
func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_util.RewriteRules) error {
log.Debug("import file", zap.Stringer("file", file))
// Rewrite the start key and end key of file to scan regions
scanStartKey, startRule := rewriteRawKeyWithEncodedRules(file.GetStartKey(), rewriteRules)
if startRule == nil {
log.Error("cannot find a rewrite rule for file start key", zap.Stringer("file", file))
return errRewriteRuleNotFound
}
scanEndKey, endRule := rewriteRawKeyWithEncodedRules(file.GetEndKey(), rewriteRules)
if endRule == nil {
log.Error("cannot find a rewrite rule for file end key", zap.Stringer("file", file))
return errRewriteRuleNotFound
startKey, endKey, err := rewriteFileKeys(file, rewriteRules)
if err != nil {
return err
}
err := withRetry(func() error {
log.Debug("rewrite file keys",
zap.Stringer("file", file),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey),
)
err = withRetry(func() error {
ctx, cancel := context.WithTimeout(importer.ctx, importScanResgionTime)
defer cancel()
// Scan regions covered by the file range
regionInfos, err := importer.metaClient.ScanRegions(ctx, scanStartKey, scanEndKey, 0)
regionInfos, err := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0)
if err != nil {
return errors.Trace(err)
}
log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos)))
// Try to download and ingest the file in every region
for _, regionInfo := range regionInfos {
var downloadMeta *import_sstpb.SSTMeta
Expand All @@ -196,8 +198,8 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_ut
log.Warn("download file failed",
zap.Stringer("file", file),
zap.Stringer("region", info.Region),
zap.ByteString("scanStartKey", scanStartKey),
zap.ByteString("scanEndKey", scanEndKey),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey),
zap.Error(err),
)
}
Expand Down Expand Up @@ -261,19 +263,38 @@ func (importer *FileImporter) downloadSST(
if err != nil {
return nil, true, errors.Trace(err)
}
regionRule := findRegionRewriteRule(regionInfo.Region, rewriteRules)
// Assume one region reflects to one rewrite rule
_, key, err := codec.DecodeBytes(regionInfo.Region.GetStartKey())
if err != nil {
return nil, true, err
}
regionRule := matchNewPrefix(key, rewriteRules)
if regionRule == nil {
log.Debug("cannot find rewrite rule, skip region",
zap.Stringer("region", regionInfo.Region),
zap.Array("tableRule", rules(rewriteRules.Table)),
zap.Array("dataRule", rules(rewriteRules.Data)),
zap.Binary("key", key),
)
return nil, true, errRewriteRuleNotFound
}
sstMeta := getSSTMetaFromFile(id, file, regionInfo.Region, regionRule)
rule := import_sstpb.RewriteRule{
OldKeyPrefix: encodeKeyPrefix(regionRule.GetOldKeyPrefix()),
NewKeyPrefix: encodeKeyPrefix(regionRule.GetNewKeyPrefix()),
}
sstMeta := getSSTMetaFromFile(id, file, regionInfo.Region, &rule)
sstMeta.RegionId = regionInfo.Region.GetId()
sstMeta.RegionEpoch = regionInfo.Region.GetRegionEpoch()
req := &import_sstpb.DownloadRequest{
Sst: sstMeta,
StorageBackend: importer.backend,
Name: file.GetName(),
RewriteRule: *regionRule,
RewriteRule: rule,
}
log.Debug("download SST",
zap.Stringer("sstMeta", &sstMeta),
zap.Stringer("region", regionInfo.Region),
)
var resp *import_sstpb.DownloadResponse
for _, peer := range regionInfo.Region.GetPeers() {
resp, err = importer.importClient.DownloadSST(importer.ctx, peer.GetStoreId(), req)
Expand All @@ -290,7 +311,7 @@ func (importer *FileImporter) downloadSST(
}

func (importer *FileImporter) ingestSST(
fileMeta *import_sstpb.SSTMeta,
sstMeta *import_sstpb.SSTMeta,
regionInfo *restore_util.RegionInfo,
) error {
leader := regionInfo.Leader
Expand All @@ -304,8 +325,9 @@ func (importer *FileImporter) ingestSST(
}
req := &import_sstpb.IngestRequest{
Context: reqCtx,
Sst: fileMeta,
Sst: sstMeta,
}
log.Debug("download SST", zap.Stringer("sstMeta", sstMeta))
resp, err := importer.importClient.IngestSST(importer.ctx, leader.GetStoreId(), req)
if err != nil {
return err
Expand Down