Skip to content

Commit

Permalink
fix: DC painc at dropvchannel when disable compaction
Browse files Browse the repository at this point in the history
Make EnableCompaction able to change dynamicly

See also: milvus-io#31059

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn committed Mar 27, 2024
1 parent c299bde commit 6261377
Show file tree
Hide file tree
Showing 11 changed files with 605 additions and 479 deletions.
103 changes: 63 additions & 40 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ type compactionPlanContext interface {
execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error
// getCompaction return compaction task. If planId does not exist, return nil.
getCompaction(planID int64) *compactionTask
// updateCompaction set the compaction state to timeout or completed
updateCompaction(ts Timestamp) error
// isFull return true if the task pool is full
isFull() bool
// get compaction tasks by signal id
Expand Down Expand Up @@ -138,41 +136,6 @@ func newCompactionPlanHandler(sessions SessionManager, cm ChannelManager, meta C
}
}

func (c *compactionPlanHandler) checkResult() {
// deal results
ts, err := c.GetCurrentTS()
if err != nil {
log.Warn("fail to check result", zap.Error(err))
return
}
err = c.updateCompaction(ts)
if err != nil {
log.Warn("fail to update compaction", zap.Error(err))
return
}
}

func (c *compactionPlanHandler) GetCurrentTS() (Timestamp, error) {
interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), interval)
defer cancel()
ts, err := c.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
return 0, err
}
return ts, nil
}

func (c *compactionPlanHandler) schedule() {
// schedule queuing tasks
tasks := c.scheduler.Schedule()
if len(tasks) > 0 {
c.notifyTasks(tasks)
c.scheduler.LogStatus()
}
}

func (c *compactionPlanHandler) start() {
interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second)
c.stopCh = make(chan struct{})
Expand All @@ -189,7 +152,9 @@ func (c *compactionPlanHandler) start() {
log.Info("compaction handler check result loop quit")
return
case <-checkResultTicker.C:
c.checkResult()
if compactionEnabled() {
c.CheckResult()
}
}
}
}()
Expand All @@ -208,7 +173,9 @@ func (c *compactionPlanHandler) start() {
return

case <-scheduleTicker.C:
c.schedule()
if compactionEnabled() {
c.Schedule()
}
}
}
}()
Expand All @@ -223,12 +190,49 @@ func (c *compactionPlanHandler) start() {
log.Info("Compaction handler quit clean")
return
case <-cleanTicker.C:
c.Clean()
if compactionEnabled() {
c.Clean()
}
}
}
}()
}

func (c *compactionPlanHandler) CheckResult() {
// deal results
ts, err := c.getCurrentTs()
if err != nil {
log.Warn("fail to check result", zap.Error(err))
return
}
err = c.updateCompaction(ts)
if err != nil {
log.Warn("fail to update compaction", zap.Error(err))
return
}
}

func (c *compactionPlanHandler) GetCurrentTS() (Timestamp, error) {
interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), interval)
defer cancel()
ts, err := c.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
return 0, err
}
return ts, nil
}

func (c *compactionPlanHandler) Schedule() {
// schedule queuing tasks
tasks := c.scheduler.Schedule()
if len(tasks) > 0 {
c.notifyTasks(tasks)
c.scheduler.LogStatus()
}
}

func (c *compactionPlanHandler) Clean() {
current := tsoutil.GetCurrentTime()
c.mu.Lock()
Expand All @@ -253,6 +257,10 @@ func (c *compactionPlanHandler) stop() {
}

func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
if !compactionEnabled() {
return
}

c.mu.Lock()
defer c.mu.Unlock()
for id, task := range c.plans {
Expand Down Expand Up @@ -409,6 +417,9 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {

// execCompactionPlan start to execute plan and return immediately
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
if !compactionEnabled() {
return nil
}
return c.enqueuePlan(signal, plan)
}

Expand Down Expand Up @@ -518,12 +529,17 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact

// getCompaction return compaction task. If planId does not exist, return nil.
func (c *compactionPlanHandler) getCompaction(planID int64) *compactionTask {
if !compactionEnabled() {
return nil
}

c.mu.RLock()
defer c.mu.RUnlock()

return c.plans[planID]
}

// updateCompaction is the inner func to check compaction task states from datanode
func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.
Expand Down Expand Up @@ -629,6 +645,9 @@ func (c *compactionPlanHandler) isTimeout(now Timestamp, start Timestamp, timeou

// isFull return true if the task pool is full
func (c *compactionPlanHandler) isFull() bool {
if !compactionEnabled() {
return false
}
return c.scheduler.GetTaskCount() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
}

Expand All @@ -646,6 +665,10 @@ func (c *compactionPlanHandler) getTasksByState(state compactionTaskState) []*co

// get compaction tasks by signal id; if signalID == 0 return all tasks
func (c *compactionPlanHandler) getCompactionTasksBySignalID(signalID int64) []*compactionTask {
if !compactionEnabled() {
return nil
}

c.mu.RLock()
defer c.mu.RUnlock()

Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() {
{
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once()
handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc)
handler.checkResult()
handler.CheckResult()
}

{
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil).Once()
handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc)
handler.checkResult()
handler.CheckResult()
}
}

Expand Down
Loading

0 comments on commit 6261377

Please sign in to comment.