Skip to content

Commit

Permalink
*: caller can use a fixed TS for fast reorg (#52993)
Browse files Browse the repository at this point in the history
close #46986, ref #49233
  • Loading branch information
lance6716 committed May 10, 2024
1 parent 49f09fe commit f46db20
Show file tree
Hide file tree
Showing 25 changed files with 335 additions and 125 deletions.
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type BackfillSubTaskMeta struct {
RangeSplitKeys [][]byte `json:"range_split_keys,omitempty"`
DataFiles []string `json:"data-files,omitempty"`
StatFiles []string `json:"stat-files,omitempty"`
TS uint64 `json:"ts,omitempty"`
// Each group of MetaGroups represents a different index kvs meta.
MetaGroups []*external.SortedKVMeta `json:"meta_groups,omitempty"`
// Only used for adding one single index.
Expand Down
12 changes: 12 additions & 0 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/backoff"
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -405,6 +406,16 @@ func splitSubtaskMetaForOneKVMetaGroup(
// Skip global sort for empty table.
return nil, nil
}
pdCli := store.GetPDClient()
p, l, err := pdCli.GetTS(ctx)
if err != nil {
return nil, err
}
ts := oracle.ComposeTS(p, l)
failpoint.Inject("mockTSForGlobalSort", func(val failpoint.Value) {
i := val.(int)
ts = uint64(i)
})
splitter, err := getRangeSplitter(
ctx, store, cloudStorageURI, int64(kvMeta.TotalKVSize), instanceCnt, kvMeta.MultipleFilesStats, logger)
if err != nil {
Expand Down Expand Up @@ -446,6 +457,7 @@ func splitSubtaskMetaForOneKVMetaGroup(
DataFiles: dataFiles,
StatFiles: statFiles,
RangeSplitKeys: rangeSplitKeys,
TS: ts,
}
metaBytes, err := json.Marshal(m)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
TotalKVCount: 0,
CheckHotspot: true,
},
TS: sm.TS,
}, engineUUID)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ go_library(
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
Expand Down Expand Up @@ -88,6 +89,8 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_pd_client//:client",
"@org_uber_go_goleak//:goleak",
],
)
32 changes: 31 additions & 1 deletion pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,17 @@ func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
return err
}

err := bc.backend.ResetEngine(bc.ctx, ei.uuid)
resetFn := bc.backend.ResetEngineSkipAllocTS
mgr := bc.GetCheckpointManager()
if mgr == nil {
// disttask case, no need to refresh TS.
//
// TODO(lance6716): for disttask local sort case, we need to use a fixed TS. But
// it doesn't have checkpoint, so we need to find a way to save TS.
resetFn = bc.backend.ResetEngine
}

err := resetFn(bc.ctx, ei.uuid)
if err != nil {
logutil.Logger(bc.ctx).Error(LitErrResetEngineFail, zap.Int64("index ID", ei.indexID))
err1 := ei.closedEngine.Cleanup(bc.ctx)
Expand All @@ -262,13 +272,33 @@ func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
ei.closedEngine = nil
return err
}

if mgr == nil {
return nil
}

// for local disk case, we need to refresh TS because duplicate detection
// requires each ingest to have a unique TS.
//
// TODO(lance6716): there's still a chance that data is imported but because of
// checkpoint is low-watermark, the data will still be imported again with
// another TS after failover. Need to refine the checkpoint mechanism.
newTS, err := mgr.refreshTSAndUpdateCP()
if err != nil {
return errors.Trace(err)
}
ei.openedEngine.SetTS(newTS)
return nil
}

// ForceSyncFlagForTest is a flag to force sync only for test.
var ForceSyncFlagForTest = false

func (bc *litBackendCtx) checkFlush(mode FlushMode) (shouldFlush bool, shouldImport bool) {
failpoint.Inject("forceSyncFlagForTest", func() {
// used in a manual test
ForceSyncFlagForTest = true
})
if mode == FlushModeForceFlushAndImport || ForceSyncFlagForTest {
return true, true
}
Expand Down

0 comments on commit f46db20

Please sign in to comment.