diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 49ee00480343..5c07503dd59d 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" + "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/logutil" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -77,8 +78,8 @@ type compactionTrigger struct { compactionHandler compactionPlanContext globalTrigger *time.Ticker forceMu lock.Mutex - quit chan struct{} - wg sync.WaitGroup + closeCh lifetime.SafeChan + closeWaiter sync.WaitGroup indexEngineVersionManager IndexEngineVersionManager @@ -105,20 +106,20 @@ func newCompactionTrigger( estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateNonDiskSegmentPolicy: calBySchemaPolicy, handler: handler, + closeCh: lifetime.NewSafeChan(), } } func (t *compactionTrigger) start() { - t.quit = make(chan struct{}) t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second)) - t.wg.Add(2) + t.closeWaiter.Add(2) go func() { defer logutil.LogPanic() - defer t.wg.Done() + defer t.closeWaiter.Done() for { select { - case <-t.quit: + case <-t.closeCh.CloseCh(): log.Info("compaction trigger quit") return case signal := <-t.signals: @@ -145,7 +146,7 @@ func (t *compactionTrigger) start() { func (t *compactionTrigger) startGlobalCompactionLoop() { defer logutil.LogPanic() - defer t.wg.Done() + defer t.closeWaiter.Done() // If AutoCompaction disabled, global loop will not start if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() { @@ -154,7 +155,7 @@ func (t *compactionTrigger) startGlobalCompactionLoop() { for { select { - case <-t.quit: + case <-t.closeCh.CloseCh(): t.globalTrigger.Stop() log.Info("global compaction loop exit") return @@ -168,8 +169,8 @@ func (t *compactionTrigger) startGlobalCompactionLoop() { } func (t *compactionTrigger) stop() { - close(t.quit) - t.wg.Wait() + t.closeCh.Close() + t.closeWaiter.Wait() } func (t *compactionTrigger) getCollection(collectionID UniqueID) (*collectionInfo, error) { @@ -241,7 +242,7 @@ func (t *compactionTrigger) triggerCompaction() error { // triggerSingleCompaction trigger a compaction bundled with collection-partition-channel-segment func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error { // If AutoCompaction disabled, flush request will not trigger compaction - if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() { + if !paramtable.Get().DataCoordCfg.EnableAutoCompaction.GetAsBool() && !paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() { return nil } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 131b79cd5e7a..e05fa1b1b8cc 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -38,6 +38,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -494,6 +495,7 @@ func Test_compactionTrigger_force(t *testing.T) { globalTrigger: tt.fields.globalTrigger, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateNonDiskSegmentPolicy: calBySchemaPolicy, + closeCh: lifetime.NewSafeChan(), testingOnly: true, } _, err := tr.triggerManualCompaction(tt.collectionID) @@ -519,6 +521,7 @@ func Test_compactionTrigger_force(t *testing.T) { globalTrigger: tt.fields.globalTrigger, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateNonDiskSegmentPolicy: calBySchemaPolicy, + closeCh: lifetime.NewSafeChan(), testingOnly: true, } tt.collectionID = 1000 @@ -543,6 +546,7 @@ func Test_compactionTrigger_force(t *testing.T) { globalTrigger: tt.fields.globalTrigger, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateNonDiskSegmentPolicy: calBySchemaPolicy, + closeCh: lifetime.NewSafeChan(), testingOnly: true, } @@ -781,6 +785,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { globalTrigger: tt.fields.globalTrigger, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateNonDiskSegmentPolicy: calBySchemaPolicy, + closeCh: lifetime.NewSafeChan(), testingOnly: true, } _, err := tr.triggerManualCompaction(tt.args.collectionID) @@ -932,6 +937,7 @@ func Test_compactionTrigger_noplan(t *testing.T) { globalTrigger: tt.fields.globalTrigger, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateNonDiskSegmentPolicy: calBySchemaPolicy, + closeCh: lifetime.NewSafeChan(), testingOnly: true, } tr.start() @@ -1119,6 +1125,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) { signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, globalTrigger: tt.fields.globalTrigger, + closeCh: lifetime.NewSafeChan(), testingOnly: true, } tr.start() @@ -1312,6 +1319,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) { indexEngineVersionManager: newMockVersionManager(), estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateNonDiskSegmentPolicy: calBySchemaPolicy, + closeCh: lifetime.NewSafeChan(), testingOnly: true, } tr.start() @@ -1501,6 +1509,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) { indexEngineVersionManager: newMockVersionManager(), estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, estimateNonDiskSegmentPolicy: calBySchemaPolicy, + closeCh: lifetime.NewSafeChan(), testingOnly: true, } tr.start() @@ -1675,6 +1684,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { compactionHandler: tt.fields.compactionHandler, globalTrigger: tt.fields.globalTrigger, indexEngineVersionManager: newMockVersionManager(), + closeCh: lifetime.NewSafeChan(), testingOnly: true, } tr.start() diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 08dc697e6e07..915beb8e8a10 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -45,6 +45,8 @@ type TriggerManager interface { ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error) } +var _ TriggerManager = (*CompactionTriggerManager)(nil) + // CompactionTriggerManager registers Triggers to TriggerType // so that when the certain TriggerType happens, the corresponding triggers can // trigger the correct compaction plans. @@ -93,7 +95,7 @@ func (m *CompactionTriggerManager) Start() { go m.startLoop() } -func (m *CompactionTriggerManager) Close() { +func (m *CompactionTriggerManager) Stop() { close(m.closeSig) m.closeWg.Wait() } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 94e9ecb1df5d..df6783313f37 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -122,7 +122,7 @@ type Server struct { compactionTrigger trigger compactionHandler compactionPlanContext - compactionTriggerManager *CompactionTriggerManager + compactionTriggerManager TriggerManager syncSegmentsScheduler *SyncSegmentsScheduler metricsCacheManager *metricsinfo.MetricsCacheManager @@ -352,11 +352,10 @@ func (s *Server) initDataCoord() error { log.Info("init service discovery done") s.initTaskScheduler(storageCli) - if Params.DataCoordCfg.EnableCompaction.GetAsBool() { - s.createCompactionHandler() - s.createCompactionTrigger() - log.Info("init compaction scheduler done") - } + log.Info("init task scheduler done") + + s.initCompaction() + log.Info("init compaction done") if err = s.initSegmentManager(); err != nil { return err @@ -398,11 +397,6 @@ func (s *Server) Start() error { func (s *Server) startDataCoord() { s.taskScheduler.Start() - if Params.DataCoordCfg.EnableCompaction.GetAsBool() { - s.compactionHandler.start() - s.compactionTrigger.start() - s.compactionTriggerManager.Start() - } s.startServerLoop() // http.Register(&http.Handler{ @@ -497,24 +491,6 @@ func (s *Server) SetIndexNodeCreator(f func(context.Context, string, int64) (typ s.indexNodeCreator = f } -func (s *Server) createCompactionHandler() { - s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.channelManager, s.meta, s.allocator, s.taskScheduler, s.handler) - s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta) -} - -func (s *Server) stopCompactionHandler() { - s.compactionHandler.stop() - s.compactionTriggerManager.Close() -} - -func (s *Server) createCompactionTrigger() { - s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager) -} - -func (s *Server) stopCompactionTrigger() { - s.compactionTrigger.stop() -} - func (s *Server) newChunkManagerFactory() (storage.ChunkManager, error) { chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(Params) cli, err := chunkManagerFactory.NewPersistentStorageChunkManager(s.ctx) @@ -673,7 +649,44 @@ func (s *Server) initIndexNodeManager() { } } +func (s *Server) initCompaction() { + s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.channelManager, s.meta, s.allocator, s.taskScheduler, s.handler) + s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta) + s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager) +} + +func (s *Server) stopCompaction() { + if s.compactionTrigger != nil { + s.compactionTrigger.stop() + } + if s.compactionTriggerManager != nil { + s.compactionTriggerManager.Stop() + } + + if s.compactionHandler != nil { + s.compactionHandler.stop() + } +} + +func (s *Server) startCompaction() { + if s.compactionHandler != nil { + s.compactionHandler.start() + } + + if s.compactionTrigger != nil { + s.compactionTrigger.start() + } + + if s.compactionTriggerManager != nil { + s.compactionTriggerManager.Start() + } +} + func (s *Server) startServerLoop() { + if Params.DataCoordCfg.EnableCompaction.GetAsBool() { + s.startCompaction() + } + s.serverLoopWg.Add(2) s.startWatchService(s.serverLoopCtx) s.startFlushLoop(s.serverLoopCtx) @@ -1002,10 +1015,7 @@ func (s *Server) Stop() error { s.importChecker.Close() s.syncSegmentsScheduler.Stop() - if Params.DataCoordCfg.EnableCompaction.GetAsBool() { - s.stopCompactionTrigger() - s.stopCompactionHandler() - } + s.stopCompaction() logutil.Logger(s.ctx).Info("datacoord compaction stopped") s.taskScheduler.Stop() diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 8437a8bfd9ce..6407e97f0580 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -549,12 +549,10 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath s.flushCh <- req.SegmentID // notify compaction - if paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() { - err := s.compactionTrigger.triggerSingleCompaction(req.GetCollectionID(), req.GetPartitionID(), - req.GetSegmentID(), req.GetChannel(), false) - if err != nil { - log.Warn("failed to trigger single compaction") - } + err := s.compactionTrigger.triggerSingleCompaction(req.GetCollectionID(), req.GetPartitionID(), + req.GetSegmentID(), req.GetChannel(), false) + if err != nil { + log.Warn("failed to trigger single compaction") } }