Skip to content

Commit

Permalink
Restore checksum (pingcap#34)
Browse files Browse the repository at this point in the history
* add restore checksum

Signed-off-by: linning <linningde25@gmail.com>
  • Loading branch information
NingLin-P committed Nov 4, 2019
1 parent 2646a49 commit 63c7eed
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 144 deletions.
40 changes: 18 additions & 22 deletions cmd/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ func NewBackupCommand() *cobra.Command {
newTableBackupCommand(),
)

bp.PersistentFlags().BoolP("checksum", "", false, "The checksum verification switch")

bp.PersistentFlags().StringP("timeago", "", "", "The history version of the backup task, e.g. 1m, 1h. Do not exceed GCSafePoint")

bp.PersistentFlags().Uint64P(
Expand Down Expand Up @@ -87,12 +85,11 @@ func newFullBackupCommand() *cobra.Command {
return errors.New("at least one thread required")
}

checksumSwitch, err := command.Flags().GetBool("checksum")
if err != nil {
return err
}

ranges, err := client.GetAllBackupTableRanges(backupTS, checksumSwitch)
ranges, err := client.GetAllBackupTableRanges(backupTS)
if err != nil {
return err
}
Expand All @@ -115,16 +112,16 @@ func newFullBackupCommand() *cobra.Command {
return err
}
}
if checksumSwitch {
valid, err := client.FastChecksum()
if err != nil {
return err
}

if !valid {
log.Error("backup checksumSwitch not passed!")
}
valid, err := client.FastChecksum()
if err != nil {
return err
}

if !valid {
log.Error("backup FastChecksum not passed!")
}

done <- struct{}{}
return client.SaveBackupMeta(u)
},
Expand Down Expand Up @@ -196,12 +193,11 @@ func newTableBackupCommand() *cobra.Command {
return errors.New("at least one thread required")
}

checksumSwitch, err := command.Flags().GetBool("checksum")
if err != nil {
return err
}

ranges, err := client.GetBackupTableRanges(db, table, u, backupTS, rate, concurrency, checksumSwitch)
ranges, err := client.GetBackupTableRanges(db, table, u, backupTS, rate, concurrency)
if err != nil {
return err
}
Expand All @@ -227,16 +223,16 @@ func newTableBackupCommand() *cobra.Command {
return err
}
}
if checksumSwitch {
valid, err := client.FastChecksum()
if err != nil {
return err
}

if !valid {
log.Error("backup checksumSwitch not passed!")
}
valid, err := client.FastChecksum()
if err != nil {
return err
}

if !valid {
log.Error("backup FastChecksum not passed!")
}

done <- struct{}{}
return client.SaveBackupMeta(u)
},
Expand Down
15 changes: 13 additions & 2 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func newFullRestoreCommand() *cobra.Command {
Table: tableRules,
Data: dataRules,
}

err = splitter.Split(ctx, restore.GetRanges(files), rewriteRules)
if err != nil {
return errors.Trace(err)
Expand All @@ -100,7 +101,10 @@ func newFullRestoreCommand() *cobra.Command {
}

err = client.SwitchToNormalMode(ctx)

if err != nil {
return errors.Trace(err)
}
err = client.ValidateChecksum(rewriteRules.Table)
return errors.Trace(err)
},
}
Expand Down Expand Up @@ -176,7 +180,10 @@ func newDbRestoreCommand() *cobra.Command {
}

err = client.SwitchToNormalMode(ctx)

if err != nil {
return errors.Trace(err)
}
err = client.ValidateChecksum(rewriteRules.Table)
return errors.Trace(err)
},
}
Expand Down Expand Up @@ -257,6 +264,10 @@ func newTableRestoreCommand() *cobra.Command {
return errors.Trace(err)
}
err = client.SwitchToNormalMode(ctx)
if err != nil {
return errors.Trace(err)
}
err = client.ValidateChecksum(rewriteRules.Table)
return errors.Trace(err)
},
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/pingcap/pd v1.1.0-beta.0.20191031081404-d0c4df68eb38
github.com/pingcap/tidb v0.0.0-20191022034824-9b4d7891def5
github.com/pingcap/tidb-tools v3.0.6-0.20191102064951-fd78257c02b6+incompatible
github.com/pingcap/tipb v0.0.0-20191101114505-cbd0e985c780
github.com/prometheus/client_golang v0.9.1
github.com/prometheus/common v0.4.1
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ github.com/pingcap/tidb-tools v3.0.6-0.20191102064951-fd78257c02b6+incompatible
github.com/pingcap/tidb-tools v3.0.6-0.20191102064951-fd78257c02b6+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330 h1:rRMLMjIMFulCX9sGKZ1hoov/iROMsKyC8Snc02nSukw=
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20191101114505-cbd0e985c780 h1:SvFkjLhS/ou97Ey60r8Fq3ZF4wq6wuveWoiLtWLGpek=
github.com/pingcap/tipb v0.0.0-20191101114505-cbd0e985c780/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
72 changes: 49 additions & 23 deletions pkg/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/store/tikv"
Expand Down Expand Up @@ -41,8 +42,8 @@ type Backer struct {
addrs []string
cli *http.Client
}
tikvCli tikv.Storage
backupClis struct {
tikvCli tikv.Storage
grpcClis struct {
mu sync.Mutex
clis map[uint64]*grpc.ClientConn
}
Expand Down Expand Up @@ -114,7 +115,7 @@ func NewBacker(ctx context.Context, pdAddrs string) (*Backer, error) {
}
backer.pdHTTP.addrs = addrs
backer.pdHTTP.cli = cli
backer.backupClis.clis = make(map[uint64]*grpc.ClientConn)
backer.grpcClis.clis = make(map[uint64]*grpc.ClientConn)
backer.PDHTTPGet = pdGet
return backer, nil
}
Expand Down Expand Up @@ -142,6 +143,8 @@ func (backer *Backer) GetClusterVersion() (string, error) {

// GetRegionCount returns the total region count in the cluster
func (backer *Backer) GetRegionCount() (int, error) {
var regionCountPrefix = "pd/api/v1/regions/count"

var err error
for _, addr := range backer.pdHTTP.addrs {
v, e := backer.PDHTTPGet(addr, regionCountPrefix, backer.pdHTTP.cli)
Expand Down Expand Up @@ -175,16 +178,7 @@ func (backer *Backer) Context() context.Context {
return backer.Ctx
}

// GetBackupClient get or create a backup client.
func (backer *Backer) GetBackupClient(storeID uint64) (backup.BackupClient, error) {
backer.backupClis.mu.Lock()
defer backer.backupClis.mu.Unlock()

if conn, ok := backer.backupClis.clis[storeID]; ok {
// Find a cached backup client.
return backup.NewBackupClient(conn), nil
}

func (backer *Backer) getGrpcConnLocked(storeID uint64) (*grpc.ClientConn, error) {
store, err := backer.PDClient.GetStore(backer.Ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -209,19 +203,51 @@ func (backer *Backer) GetBackupClient(storeID uint64) (backup.BackupClient, erro
return nil, errors.WithStack(err)
}
// Cache the conn.
backer.backupClis.clis[storeID] = conn
backer.grpcClis.clis[storeID] = conn
return conn, nil
}

// GetBackupClient get or create a backup client.
func (backer *Backer) GetBackupClient(storeID uint64) (backup.BackupClient, error) {
backer.grpcClis.mu.Lock()
defer backer.grpcClis.mu.Unlock()

if conn, ok := backer.grpcClis.clis[storeID]; ok {
// Find a cached backup client.
return backup.NewBackupClient(conn), nil
}

conn, err := backer.getGrpcConnLocked(storeID)
if err != nil {
return nil, errors.Trace(err)
}
return backup.NewBackupClient(conn), nil
}

client := backup.NewBackupClient(conn)
return client, nil
// GetTikvClient get or create a coprocessor client.
func (backer *Backer) GetTikvClient(storeID uint64) (tikvpb.TikvClient, error) {
backer.grpcClis.mu.Lock()
defer backer.grpcClis.mu.Unlock()

if conn, ok := backer.grpcClis.clis[storeID]; ok {
// Find a cached backup client.
return tikvpb.NewTikvClient(conn), nil
}

conn, err := backer.getGrpcConnLocked(storeID)
if err != nil {
return nil, errors.Trace(err)
}
return tikvpb.NewTikvClient(conn), nil
}

// ResetBackupClient reset and close cached backup client.
func (backer *Backer) ResetBackupClient(storeID uint64) error {
backer.backupClis.mu.Lock()
defer backer.backupClis.mu.Unlock()
// ResetGrpcClient reset and close cached backup client.
func (backer *Backer) ResetGrpcClient(storeID uint64) error {
backer.grpcClis.mu.Lock()
defer backer.grpcClis.mu.Unlock()

if conn, ok := backer.backupClis.clis[storeID]; ok {
delete(backer.backupClis.clis, storeID)
if conn, ok := backer.grpcClis.clis[storeID]; ok {
delete(backer.grpcClis.clis, storeID)
if err := conn.Close(); err != nil {
return errors.Trace(err)
}
Expand All @@ -248,7 +274,7 @@ func (backer *Backer) SendBackup(
bcli, err := client.Backup(ctx, &req)
if err != nil {
log.Warn("fail to create backup", zap.Uint64("StoreID", storeID))
if err1 := backer.ResetBackupClient(storeID); err1 != nil {
if err1 := backer.ResetGrpcClient(storeID); err1 != nil {
log.Warn("fail to reset backup client",
zap.Uint64("StoreID", storeID),
zap.Error(err1))
Expand Down
30 changes: 13 additions & 17 deletions pkg/raw/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ func (bc *BackupClient) GetBackupTableRanges(
backupTS uint64,
rateLimit uint64,
concurrency uint32,
checksumSwitch bool,
) ([]Range, error) {
dbSession, err := session.CreateSession(bc.backer.GetTiKV())
if err != nil {
Expand Down Expand Up @@ -162,12 +161,11 @@ func (bc *BackupClient) GetBackupTableRanges(
tableInfo.AutoIncID = globalAutoID

var tblChecksum, tblKvs, tblBytes uint64
if checksumSwitch {
dbSession.GetSessionVars().SnapshotTS = backupTS
tblChecksum, tblKvs, tblBytes, err = bc.getChecksumFromTiDB(dbSession, dbInfo, tableInfo)
if err != nil {
return nil, errors.Trace(err)
}

dbSession.GetSessionVars().SnapshotTS = backupTS
tblChecksum, tblKvs, tblBytes, err = bc.getChecksumFromTiDB(dbSession, dbInfo, tableInfo)
if err != nil {
return nil, errors.Trace(err)
}

dbData, err := json.Marshal(dbInfo)
Expand Down Expand Up @@ -241,7 +239,7 @@ func buildTableRanges(tbl *model.TableInfo) []tableRange {
}

// GetAllBackupTableRanges gets the range of all tables.
func (bc *BackupClient) GetAllBackupTableRanges(backupTS uint64, checksumSwitch bool) ([]Range, error) {
func (bc *BackupClient) GetAllBackupTableRanges(backupTS uint64) ([]Range, error) {
SystemDatabases := [3]string{
"information_schema",
"performance_schema",
Expand All @@ -253,10 +251,8 @@ func (bc *BackupClient) GetAllBackupTableRanges(backupTS uint64, checksumSwitch
return nil, errors.Trace(err)
}

if checksumSwitch {
// make checksumSwitch snapshot is same as backup snapshot
dbSession.GetSessionVars().SnapshotTS = backupTS
}
// make FastChecksum snapshot is same as backup snapshot
dbSession.GetSessionVars().SnapshotTS = backupTS

do := domain.GetDomain(dbSession.(sessionctx.Context))

Expand All @@ -282,12 +278,12 @@ LoadDb:
idAlloc := autoid.NewAllocator(bc.backer.GetTiKV(), dbInfo.ID, false)
for _, tableInfo := range dbInfo.Tables {
var tblChecksum, tblKvs, tblBytes uint64
if checksumSwitch {
tblChecksum, tblKvs, tblBytes, err = bc.getChecksumFromTiDB(dbSession, dbInfo, tableInfo)
if err != nil {
return nil, errors.Trace(err)
}

tblChecksum, tblKvs, tblBytes, err = bc.getChecksumFromTiDB(dbSession, dbInfo, tableInfo)
if err != nil {
return nil, errors.Trace(err)
}

globalAutoID, err := idAlloc.NextGlobalAutoID(tableInfo.ID)
if err != nil {
return nil, errors.Trace(err)
Expand Down
Loading

0 comments on commit 63c7eed

Please sign in to comment.