Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] cherry cherry pick PR of minRegionNumber #53013

Open
wants to merge 1 commit into
base: release-7.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ type Backend interface {
// ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected,
// it means there is duplicate detected. For this situation, all data in the engine must be imported.
// It's safe to reset or cleanup this engine.
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys, minRegionNum int64) error

CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error

Expand Down Expand Up @@ -358,12 +358,12 @@ func NewClosedEngine(backend Backend, logger log.Logger, uuid uuid.UUID, id int3
}

// Import the data written to the engine into the target.
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error {
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys, minRegionNum int64) error {
var err error

for i := 0; i < importMaxRetryTimes; i++ {
task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import")
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize, regionSplitKeys)
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize, regionSplitKeys, minRegionNum)
if !common.IsRetryableError(err) {
if common.ErrFoundDuplicateKeys.Equal(err) {
task.End(zap.WarnLevel, err)
Expand Down
20 changes: 16 additions & 4 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,7 @@ func readAndSplitIntoRange(
engine common.Engine,
sizeLimit int64,
keysLimit int64,
minRegionNum int64,
) ([]common.Range, error) {
startKey, endKey, err := engine.GetKeyRange()
if err != nil {
Expand All @@ -1085,13 +1086,23 @@ func readAndSplitIntoRange(
}

engineFileTotalSize, engineFileLength := engine.KVStatistics()
logger := log.FromContext(ctx).With(zap.String("engine", engine.ID()))
if minRegionNum > 0 && engineFileTotalSize/sizeLimit < minRegionNum {
sizeLimit = engineFileTotalSize / minRegionNum
logger.Info("enforce minRegionNum",
zap.Int64("totalSize", engineFileTotalSize), zap.Int64("minRegionNum", minRegionNum), zap.Int64("sizeLimit", sizeLimit))
}
if minRegionNum > 0 && engineFileLength/keysLimit < minRegionNum {
keysLimit = engineFileLength / minRegionNum
logger.Info("enforce minRegionNum",
zap.Int64("totalCount", engineFileLength), zap.Int64("minRegionNum", minRegionNum), zap.Int64("keysLimit", keysLimit))
}

if engineFileTotalSize <= sizeLimit && engineFileLength <= keysLimit {
ranges := []common.Range{{Start: startKey, End: endKey}}
return ranges, nil
}

logger := log.FromContext(ctx).With(zap.String("engine", engine.ID()))
ranges, err := engine.SplitRanges(startKey, endKey, sizeLimit, keysLimit, logger)
logger.Info("split engine key ranges",
zap.Int64("totalSize", engineFileTotalSize), zap.Int64("totalCount", engineFileLength),
Expand Down Expand Up @@ -1476,6 +1487,7 @@ func (local *Backend) ImportEngine(
ctx context.Context,
engineUUID uuid.UUID,
regionSplitSize, regionSplitKeys int64,
minRegionNum int64,
) error {
var e common.Engine
if externalEngine, ok := local.externalEngine[engineUUID]; ok {
Expand Down Expand Up @@ -1509,7 +1521,7 @@ func (local *Backend) ImportEngine(
}

// split sorted file into range about regionSplitSize per file
regionRanges, err := readAndSplitIntoRange(ctx, e, regionSplitSize, regionSplitKeys)
regionRanges, err := readAndSplitIntoRange(ctx, e, regionSplitSize, regionSplitKeys, minRegionNum)
if err != nil {
return err
}
Expand Down Expand Up @@ -1812,15 +1824,15 @@ func (local *Backend) GetDupeController(dupeConcurrency int, errorMgr *errormana
// into the target and then reset the engine to empty. This method will not
// close the engine. Make sure the engine is flushed manually before calling
// this method.
func (local *Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
func (local *Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys, minRegionNum int64) error {
// DO NOT call be.abstract.CloseEngine()! The engine should still be writable after
// calling UnsafeImportAndReset().
logger := log.FromContext(ctx).With(
zap.String("engineTag", "<import-and-reset>"),
zap.Stringer("engineUUID", engineUUID),
)
closedEngine := backend.NewClosedEngine(local, logger, engineUUID, 0)
if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil {
if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys, minRegionNum); err != nil {
return err
}
return local.ResetEngine(ctx, engineUUID)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ func (*tidbBackend) CleanupEngine(context.Context, uuid.UUID) error {
return nil
}

func (*tidbBackend) ImportEngine(context.Context, uuid.UUID, int64, int64) error {
func (*tidbBackend) ImportEngine(context.Context, uuid.UUID, int64, int64, int64) error {
return nil
}

Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,7 @@ type TikvImporter struct {
CompressKVPairs CompressionType `toml:"compress-kv-pairs" json:"compress-kv-pairs"`
RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"`
RegionSplitKeys int `toml:"region-split-keys" json:"region-split-keys"`
MinRegionNum int `toml:"min-region-num" json:"min-region-num"`
RegionSplitBatchSize int `toml:"region-split-batch-size" json:"region-split-batch-size"`
RegionSplitConcurrency int `toml:"region-split-concurrency" json:"region-split-concurrency"`
RegionCheckBackoffLimit int `toml:"region-check-backoff-limit" json:"region-check-backoff-limit"`
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ const (
SplitRegionSize ByteSize = 96 * units.MiB
SplitRegionKeys int = 1_280_000
MaxSplitRegionSizeRatio int = 10

defaultMaxAllowedPacket = 64 * units.MiB
MinRegionNum int = 0 // disable minRegionNum by default
defaultMaxAllowedPacket = 64 * units.MiB
)

// static vars
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -2011,7 +2011,7 @@ func (rc *Controller) enforceDiskQuota(ctx context.Context) {
var importErr error
for _, engine := range largeEngines {
// Use a larger split region size to avoid split the same region by many times.
if err := localBackend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio)); err != nil {
if err := localBackend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio), int64(config.MinRegionNum)); err != nil {
importErr = multierr.Append(importErr, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,7 +1258,7 @@ func (tr *TableImporter) importKV(
regionSplitKeys = int64(config.SplitRegionKeys)
}
}
err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys)
err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys, int64(rc.cfg.TikvImporter.MinRegionNum))
saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, closedEngine.GetID(), err, checkpoints.CheckpointStatusImported)
// Don't clean up when save checkpoint failed, because we will verifyLocalFile and import engine again after restart.
if err == nil && saveCpErr == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
if err != nil {
return err
}
err = local.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
err = local.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys), int64(config.MinRegionNum))
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {

regionSplitSize := int64(lightning.SplitRegionSize) * int64(lightning.MaxSplitRegionSizeRatio)
regionSplitKeys := int64(lightning.SplitRegionKeys)
if err := ei.closedEngine.Import(bc.ctx, regionSplitSize, regionSplitKeys); err != nil {
if err := ei.closedEngine.Import(bc.ctx, regionSplitSize, regionSplitKeys, int64(lightning.MinRegionNum)); err != nil {
logutil.Logger(bc.ctx).Error(LitErrIngestDataErr, zap.Int64("index ID", ei.indexID),
zap.String("usage info", bc.diskRoot.UsageInfo()))
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (ei *engineInfo) ImportAndClean() error {
logutil.Logger(ei.ctx).Info(LitInfoStartImport, zap.Int64("job ID", ei.jobID),
zap.Int64("index ID", ei.indexID),
zap.String("split region size", strconv.FormatInt(int64(config.SplitRegionSize), 10)))
err := ei.closedEngine.Import(ei.ctx, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
err := ei.closedEngine.Import(ei.ctx, int64(config.SplitRegionSize), int64(config.SplitRegionKeys), int64(config.MinRegionNum))
if err != nil {
logLevel := zap.ErrorLevel
if common.ErrFoundDuplicateKeys.Equal(err) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (e *writeAndIngestStepExecutor) RunSubtask(ctx context.Context, subtask *pr
if err != nil {
return err
}
return localBackend.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
return localBackend.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys), int64(config.MinRegionNum))
}

func (e *writeAndIngestStepExecutor) OnFinished(_ context.Context, subtask *proto.Subtask) error {
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func NewTableImporter(param *JobImportParam, e *LoadDataController, taskID int64
// todo: use different default for single-node import and distributed import.
regionSplitSize: 2 * int64(config.SplitRegionSize),
regionSplitKeys: 2 * int64(config.SplitRegionKeys),
minRegionNum: int64(config.MinRegionNum),
diskQuota: adjustDiskQuota(int64(e.DiskQuota), dir, e.logger),
diskQuotaLock: new(syncutil.RWMutex),
}, nil
Expand All @@ -250,6 +251,7 @@ type TableImporter struct {
logger *zap.Logger
regionSplitSize int64
regionSplitKeys int64
minRegionNum int64
diskQuota int64
diskQuotaLock *syncutil.RWMutex
}
Expand Down Expand Up @@ -477,7 +479,7 @@ func (ti *TableImporter) OpenDataEngine(ctx context.Context, engineID int32) (*b
// ImportAndCleanup imports the engine and cleanup the engine data.
func (ti *TableImporter) ImportAndCleanup(ctx context.Context, closedEngine *backend.ClosedEngine) (int64, error) {
var kvCount int64
importErr := closedEngine.Import(ctx, ti.regionSplitSize, ti.regionSplitKeys)
importErr := closedEngine.Import(ctx, ti.regionSplitSize, ti.regionSplitKeys, ti.minRegionNum)
if closedEngine.GetID() != common.IndexEngineID {
// todo: change to a finer-grain progress later.
// each row is encoded into 1 data key
Expand Down Expand Up @@ -571,6 +573,7 @@ func (ti *TableImporter) CheckDiskQuota(ctx context.Context) {
engine,
int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio),
int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio),
int64(config.MinRegionNum),
); err != nil {
importErr = multierr.Append(importErr, err)
}
Expand Down