Skip to content

Commit

Permalink
ddl: fix runnable ingest job checking (pingcap#52503)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored and 3AceShowHand committed Apr 16, 2024
1 parent d766f7b commit 5203cb8
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 87 deletions.
8 changes: 1 addition & 7 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
if err != nil {
logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err))
}
d.runningJobs.clear()
})

d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil)
Expand Down Expand Up @@ -869,13 +870,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
defer d.sessPool.Put(ctx)

ingest.InitGlobalLightningEnv()
d.ownerManager.SetRetireOwnerHook(func() {
// Since this instance is not DDL owner anymore, we clean up the processing job info.
if ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.MarkJobFinish()
}
d.runningJobs.clear()
})

return nil
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/ddl/ddl_running_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

type runningJobs struct {
Expand All @@ -36,6 +39,11 @@ type runningJobs struct {
// It is not necessarily being processed by a worker.
unfinishedIDs map[int64]struct{}
unfinishedSchema map[string]map[string]struct{} // database -> table -> struct{}

// processingReorgJobID records the ID of the ingest job that is being processed by a worker.
// TODO(tangenta): remove this when we support running multiple concurrent ingest jobs.
processingIngestJobID int64
lastLoggingTime time.Time
}

func newRunningJobs() *runningJobs {
Expand All @@ -47,6 +55,8 @@ func newRunningJobs() *runningJobs {
}

func (j *runningJobs) clear() {
j.Lock()
defer j.Unlock()
j.unfinishedIDs = make(map[int64]struct{})
j.unfinishedSchema = make(map[string]map[string]struct{})
}
Expand All @@ -56,6 +66,9 @@ func (j *runningJobs) add(job *model.Job) {
defer j.Unlock()
j.processingIDs[job.ID] = struct{}{}
j.updateInternalRunningJobIDs()
if isIngestJob(job) {
j.processingIngestJobID = job.ID
}

if _, ok := j.unfinishedIDs[job.ID]; ok {
// Already exists, no need to add it again.
Expand All @@ -75,6 +88,9 @@ func (j *runningJobs) remove(job *model.Job) {
defer j.Unlock()
delete(j.processingIDs, job.ID)
j.updateInternalRunningJobIDs()
if isIngestJob(job) && job.ID == j.processingIngestJobID {
j.processingIngestJobID = 0
}

if job.IsFinished() || job.IsSynced() {
delete(j.unfinishedIDs, job.ID)
Expand Down Expand Up @@ -115,6 +131,16 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
// Already processing by a worker. Skip running it again.
return false
}
if isIngestJob(job) && j.processingIngestJobID != 0 {
// We only allow one task to use ingest at the same time in order to limit the CPU/memory usage.
if time.Since(j.lastLoggingTime) > 1*time.Minute {
logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job",
zap.String("category", "ddl-ingest"),
zap.Int64("processing job ID", j.processingIngestJobID))
j.lastLoggingTime = time.Now()
}
return false
}
for _, info := range job.GetInvolvingSchemaInfo() {
if _, ok := j.unfinishedSchema[model.InvolvingAll]; ok {
return false
Expand All @@ -136,3 +162,9 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
}
return true
}

func isIngestJob(job *model.Job) bool {
return (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
job.ReorgMeta != nil &&
job.ReorgMeta.IsFastReorg
}
11 changes: 0 additions & 11 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -693,7 +692,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
startTime := time.Now()
defer func() {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
markJobFinish(job)
}()

if JobNeedGC(job) {
Expand Down Expand Up @@ -743,15 +741,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
return errors.Trace(err)
}

func markJobFinish(job *model.Job) {
if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
job.ReorgMeta != nil &&
job.ReorgMeta.IsFastReorg &&
ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.MarkJobFinish()
}
}

func (w *worker) writeDDLSeqNum(job *model.Job) {
w.ddlSeqNumMu.Lock()
w.ddlSeqNumMu.seqNum++
Expand Down
4 changes: 4 additions & 0 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/metrics"
Expand Down Expand Up @@ -853,6 +854,9 @@ func cleanupSortPath(ctx context.Context, currentJobID int64) error {
logutil.Logger(ctx).Warn(ingest.LitErrCleanSortPath, zap.Error(err))
return nil
}
failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() {
close(local.WaitRMFolderChForTest)
})
}
}
return nil
Expand Down
43 changes: 10 additions & 33 deletions pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ import (
"fmt"
"math"
"strconv"
"sync"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/pingcap/tidb/pkg/util/logutil"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand All @@ -48,18 +49,12 @@ type BackendCtxMgr interface {
) (BackendCtx, error)
Unregister(jobID int64)
Load(jobID int64) (BackendCtx, bool)

MarkJobProcessing(jobID int64) (ok bool)
MarkJobFinish()
}

type litBackendCtxMgr struct {
generic.SyncMap[int64, *litBackendCtx]
memRoot MemRoot
diskRoot DiskRoot
processingJobID int64
lastLoggingTime time.Time
mu sync.Mutex
memRoot MemRoot
diskRoot DiskRoot
}

func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
Expand All @@ -80,30 +75,6 @@ func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
return mgr
}

// MarkJobProcessing marks ingest backfill is processing.
func (m *litBackendCtxMgr) MarkJobProcessing(jobID int64) bool {
m.mu.Lock()
defer m.mu.Unlock()
if m.processingJobID == 0 || m.processingJobID == jobID {
m.processingJobID = jobID
return true
}
if time.Since(m.lastLoggingTime) > 1*time.Minute {
logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job",
zap.String("category", "ddl-ingest"),
zap.Int64("processing job ID", m.processingJobID))
m.lastLoggingTime = time.Now()
}
return false
}

// MarkJobFinish marks ingest backfill is finished.
func (m *litBackendCtxMgr) MarkJobFinish() {
m.mu.Lock()
m.processingJobID = 0
m.mu.Unlock()
}

// CheckAvailable checks if the ingest backfill is available.
func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
if err := m.diskRoot.PreCheckUsage(); err != nil {
Expand All @@ -113,6 +84,9 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
return true, nil
}

// ResignOwnerForTest is only used for test.
var ResignOwnerForTest = atomic.NewBool(false)

// Register creates a new backend and registers it to the backend context.
func (m *litBackendCtxMgr) Register(
ctx context.Context,
Expand All @@ -137,6 +111,9 @@ func (m *litBackendCtxMgr) Register(
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
}
failpoint.Inject("beforeCreateLocalBackend", func() {
ResignOwnerForTest.Store(true)
})
bd, err := createLocalBackend(ctx, cfg, pdSvcDiscovery)
if err != nil {
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err))
Expand Down
9 changes: 0 additions & 9 deletions pkg/ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken
}
}

// MarkJobProcessing implements BackendCtxMgr.MarkJobProcessing interface.
func (*MockBackendCtxMgr) MarkJobProcessing(_ int64) bool {
return true
}

// MarkJobFinish implements BackendCtxMgr.MarkJobFinish interface.
func (*MockBackendCtxMgr) MarkJobFinish() {
}

// CheckAvailable implements BackendCtxMgr.Available interface.
func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) {
return len(m.runningJobs) == 0, nil
Expand Down
20 changes: 9 additions & 11 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,6 @@ func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) {
if !d.runningJobs.checkRunnable(job) {
return false, nil
}
if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
job.State == model.JobStateQueueing &&
job.ReorgMeta != nil &&
job.ReorgMeta.IsFastReorg &&
ingest.LitBackCtxMgr != nil {
succeed := ingest.LitBackCtxMgr.MarkJobProcessing(job.ID)
if !succeed {
// We only allow one task to use ingest at the same time in order to limit the CPU/memory usage.
return false, nil
}
}
// Check if there is any block ddl running, like drop schema and flashback cluster.
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where "+
"(CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and type = %d and processing) "+
Expand Down Expand Up @@ -292,6 +281,15 @@ func (d *ddl) startDispatchLoop() {
time.Sleep(dispatchLoopWaitingDuration)
continue
}
failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() {
if ingest.ResignOwnerForTest.Load() {
err2 := d.ownerManager.ResignOwner(context.Background())
if err2 != nil {
logutil.BgLogger().Info("resign meet error", zap.Error(err2))
}
ingest.ResignOwnerForTest.Store(false)
}
})
select {
case <-d.ddlJobCh:
case <-ticker.C:
Expand Down
1 change: 1 addition & 0 deletions pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ go_library(
"//pkg/util/compress",
"//pkg/util/engine",
"//pkg/util/hack",
"//pkg/util/logutil",
"//pkg/util/mathutil",
"//pkg/util/ranger",
"@com_github_cockroachdb_pebble//:pebble",
Expand Down
26 changes: 26 additions & 0 deletions pkg/lightning/backend/local/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"math"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/vfs"
"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/pingcap/errors"
Expand All @@ -33,6 +36,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/lightning/manual"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/oracle"
tikvclient "github.com/tikv/client-go/v2/tikv"
"go.uber.org/atomic"
Expand Down Expand Up @@ -581,6 +585,25 @@ func (em *engineManager) getBufferPool() *membuf.Pool {
return em.bufferPool
}

// only used in tests
type slowCreateFS struct {
vfs.FS
}

// WaitRMFolderChForTest is a channel for testing.
var WaitRMFolderChForTest = make(chan struct{})

func (s slowCreateFS) Create(name string) (vfs.File, error) {
if strings.Contains(name, "temporary") {
select {
case <-WaitRMFolderChForTest:
case <-time.After(1 * time.Second):
logutil.BgLogger().Info("no one removes folder")
}
}
return s.FS.Create(name)
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
dbPath := filepath.Join(storeDir, duplicateDBName)
// TODO: Optimize the opts for better write.
Expand All @@ -589,6 +612,9 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) {
newRangePropertiesCollector,
},
}
failpoint.Inject("slowCreateFS", func() {
opts.FS = slowCreateFS{vfs.Default}
})
return pebble.Open(dbPath, opts)
}

Expand Down
12 changes: 1 addition & 11 deletions pkg/owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ type Manager interface {

// SetBeOwnerHook sets a hook. The hook is called before becoming an owner.
SetBeOwnerHook(hook func())
// SetRetireOwnerHook will be called after retiring the owner.
SetRetireOwnerHook(hook func())
}

const (
Expand Down Expand Up @@ -118,8 +116,7 @@ type ownerManager struct {
wg sync.WaitGroup
campaignCancel context.CancelFunc

beOwnerHook func()
retireOwnerHook func()
beOwnerHook func()
}

// NewOwnerManager creates a new Manager.
Expand Down Expand Up @@ -164,10 +161,6 @@ func (m *ownerManager) SetBeOwnerHook(hook func()) {
m.beOwnerHook = hook
}

func (m *ownerManager) SetRetireOwnerHook(hook func()) {
m.retireOwnerHook = hook
}

// ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing.
var ManagerSessionTTL = 60

Expand Down Expand Up @@ -230,9 +223,6 @@ func (m *ownerManager) toBeOwner(elec *concurrency.Election) {

// RetireOwner make the manager to be a not owner.
func (m *ownerManager) RetireOwner() {
if m.retireOwnerHook != nil {
m.retireOwnerHook()
}
atomic.StorePointer(&m.elec, nil)
}

Expand Down

0 comments on commit 5203cb8

Please sign in to comment.