diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index b4ed1c1144dd..d4254c60adbf 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -4,7 +4,6 @@ package restore import ( "context" "io" - "sync/atomic" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" @@ -80,14 +79,13 @@ func NewStoreMeta(storeId uint64) StoreMeta { // for test type Recovery struct { - allStores []*metapb.Store - StoreMetas []StoreMeta - RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest - MaxAllocID uint64 - mgr *conn.Mgr - progress glue.Progress - concurrency uint32 - totalFlashbackRegions uint64 + allStores []*metapb.Store + StoreMetas []StoreMeta + RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest + MaxAllocID uint64 + mgr *conn.Mgr + progress glue.Progress + concurrency uint32 } func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, concurrency uint32) Recovery { @@ -95,14 +93,13 @@ func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progres var StoreMetas = make([]StoreMeta, totalStores) var regionRecovers = make(map[uint64][]*recovpb.RecoverRegionRequest, totalStores) return Recovery{ - allStores: allStores, - StoreMetas: StoreMetas, - RecoveryPlan: regionRecovers, - MaxAllocID: 0, - mgr: mgr, - progress: progress, - concurrency: concurrency, - totalFlashbackRegions: 0} + allStores: allStores, + StoreMetas: StoreMetas, + RecoveryPlan: regionRecovers, + MaxAllocID: 0, + mgr: mgr, + progress: progress, + concurrency: concurrency} } func (recovery *Recovery) newRecoveryClient(ctx context.Context, storeAddr string) (recovpb.RecoverDataClient, *grpc.ClientConn, error) { @@ -305,12 +302,8 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) { // prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) { - var totalRegions atomic.Uint64 - totalRegions.Store(0) - handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r) - totalRegions.Add(uint64(stats.CompletedRegions)) return stats, err } @@ -321,8 +314,7 @@ func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolve log.Error("region flashback prepare get error") return errors.Trace(err) } - - recovery.totalFlashbackRegions = totalRegions.Load() + recovery.progress.Inc() log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions())) return nil @@ -330,14 +322,8 @@ func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolve // flashback the region data to version resolveTS func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint64, commitTS uint64) (err error) { - var completedRegions atomic.Uint64 - - // only know the total progress of tikv, progress is total state of the whole restore flow. - ratio := int(recovery.totalFlashbackRegions) / len(recovery.allStores) - handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { stats, err := ddl.SendFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, commitTS-1, commitTS, r) - completedRegions.Add(uint64(stats.CompletedRegions)) return stats, err } @@ -352,13 +338,12 @@ func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint return errors.Trace(err) } - recovery.progress.IncBy(int64(completedRegions.Load()) / int64(ratio)) - log.Info("region flashback complete", zap.Uint64("resolveTS", resolveTS), zap.Uint64("commitTS", commitTS), zap.Int("regions", runner.CompletedRegions())) + recovery.progress.Inc() return nil } diff --git a/br/pkg/task/restore_data.go b/br/pkg/task/restore_data.go index 5b177faa9c05..74663ea28d39 100644 --- a/br/pkg/task/restore_data.go +++ b/br/pkg/task/restore_data.go @@ -139,9 +139,9 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto } log.Debug("total tikv", zap.Int("total", numBackupStore), zap.String("progress file", cfg.ProgressFile)) - // progress = read meta + send recovery + iterate tikv + flashback. - progress := g.StartProgress(ctx, cmdName, int64(numBackupStore*4), !cfg.LogProgress) - go progressFileWriterRoutine(ctx, progress, int64(numBackupStore*4), cfg.ProgressFile) + // progress = read meta + send recovery + iterate tikv + (1 * prepareflashback + 1 * flashback) + progress := g.StartProgress(ctx, cmdName, int64(numBackupStore*3+2), !cfg.LogProgress) + go progressFileWriterRoutine(ctx, progress, int64(numBackupStore*3+2), cfg.ProgressFile) // restore tikv data from a snapshot volume var totalRegions int