diff --git a/internal/datacoord/channel.go b/internal/datacoord/channel.go index 024c06d94766..e1f45e5f0f14 100644 --- a/internal/datacoord/channel.go +++ b/internal/datacoord/channel.go @@ -19,7 +19,7 @@ package datacoord import ( "fmt" - "github.com/gogo/protobuf/proto" + "github.com/golang/protobuf/proto" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" diff --git a/internal/datacoord/compaction_policy_clustering.go b/internal/datacoord/compaction_policy_clustering.go index 580282f6edff..847346ce026e 100644 --- a/internal/datacoord/compaction_policy_clustering.go +++ b/internal/datacoord/compaction_policy_clustering.go @@ -307,8 +307,8 @@ func (v *ClusteringSegmentsView) Append(segments ...*SegmentView) { } func (v *ClusteringSegmentsView) String() string { - strs := lo.Map(v.segments, func(v *SegmentView, _ int) string { - return v.String() + strs := lo.Map(v.segments, func(segView *SegmentView, _ int) string { + return segView.String() }) return fmt.Sprintf("label=<%s>, segments=%v", v.label.String(), strs) } diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index 423e9faeacce..8ea149ae3fd1 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -17,6 +17,7 @@ package datacoord import ( + "fmt" "sort" "time" @@ -105,35 +106,47 @@ func AllocatePolicyL1(segments []*SegmentInfo, count int64, return newSegmentAllocations, existedSegmentAllocations } +type SegmentSealPolicy interface { + ShouldSeal(segment *SegmentInfo, ts Timestamp) (bool, string) +} + // segmentSealPolicy seal policy applies to segment -type segmentSealPolicy func(segment *SegmentInfo, ts Timestamp) bool +type segmentSealPolicyFunc func(segment *SegmentInfo, ts Timestamp) (bool, string) + +func (f segmentSealPolicyFunc) ShouldSeal(segment *SegmentInfo, ts Timestamp) (bool, string) { + return f(segment, ts) +} // sealL1SegmentByCapacity get segmentSealPolicy with segment size factor policy -func sealL1SegmentByCapacity(sizeFactor float64) segmentSealPolicy { - return func(segment *SegmentInfo, ts Timestamp) bool { - return float64(segment.currRows) >= sizeFactor*float64(segment.GetMaxRowNum()) +func sealL1SegmentByCapacity(sizeFactor float64) segmentSealPolicyFunc { + return func(segment *SegmentInfo, ts Timestamp) (bool, string) { + return float64(segment.currRows) >= sizeFactor*float64(segment.GetMaxRowNum()), + fmt.Sprintf("Row count capacity full, current rows: %d, max row: %d, seal factor: %f", segment.currRows, segment.GetMaxRowNum(), sizeFactor) } } // sealL1SegmentByLifetimePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime -func sealL1SegmentByLifetime(lifetime time.Duration) segmentSealPolicy { - return func(segment *SegmentInfo, ts Timestamp) bool { +func sealL1SegmentByLifetime(lifetime time.Duration) segmentSealPolicyFunc { + return func(segment *SegmentInfo, ts Timestamp) (bool, string) { pts, _ := tsoutil.ParseTS(ts) epts, _ := tsoutil.ParseTS(segment.GetLastExpireTime()) d := pts.Sub(epts) - return d >= lifetime + return d >= lifetime, + fmt.Sprintf("Segment Lifetime expired, segment last expire: %v, now:%v, max lifetime %v", + pts, epts, lifetime) } } // sealL1SegmentByBinlogFileNumber seal L1 segment if binlog file number of segment exceed configured max number -func sealL1SegmentByBinlogFileNumber(maxBinlogFileNumber int) segmentSealPolicy { - return func(segment *SegmentInfo, ts Timestamp) bool { +func sealL1SegmentByBinlogFileNumber(maxBinlogFileNumber int) segmentSealPolicyFunc { + return func(segment *SegmentInfo, ts Timestamp) (bool, string) { logFileCounter := 0 for _, fieldBinlog := range segment.GetStatslogs() { logFileCounter += len(fieldBinlog.GetBinlogs()) } - return logFileCounter >= maxBinlogFileNumber + return logFileCounter >= maxBinlogFileNumber, + fmt.Sprintf("Segment binlog number too large, binlog number: %d, max binlog number: %d", logFileCounter, maxBinlogFileNumber) } } @@ -145,11 +158,12 @@ func sealL1SegmentByBinlogFileNumber(maxBinlogFileNumber int) segmentSealPolicy // into this segment anymore, so sealLongTimeIdlePolicy will seal these segments to trigger handoff of query cluster. // Q: Why we don't decrease the expiry time directly? // A: We don't want to influence segments which are accepting `frequent small` batch entities. -func sealL1SegmentByIdleTime(idleTimeTolerance time.Duration, minSizeToSealIdleSegment float64, maxSizeOfSegment float64) segmentSealPolicy { - return func(segment *SegmentInfo, ts Timestamp) bool { +func sealL1SegmentByIdleTime(idleTimeTolerance time.Duration, minSizeToSealIdleSegment float64, maxSizeOfSegment float64) segmentSealPolicyFunc { + return func(segment *SegmentInfo, ts Timestamp) (bool, string) { limit := (minSizeToSealIdleSegment / maxSizeOfSegment) * float64(segment.GetMaxRowNum()) return time.Since(segment.lastWrittenTime) > idleTimeTolerance && - float64(segment.currRows) > limit + float64(segment.currRows) > limit, + fmt.Sprintf("segment idle, segment row number :%d, last written time: %v, max idle duration: %v", segment.currRows, segment.lastWrittenTime, idleTimeTolerance) } } diff --git a/internal/datacoord/segment_allocation_policy_test.go b/internal/datacoord/segment_allocation_policy_test.go index e27c4a2d680f..4f3b7cf3d2d9 100644 --- a/internal/datacoord/segment_allocation_policy_test.go +++ b/internal/datacoord/segment_allocation_policy_test.go @@ -172,10 +172,10 @@ func TestSealSegmentPolicy(t *testing.T) { }, } - shouldSeal := p(segment, tsoutil.ComposeTS(nosealTs, 0)) + shouldSeal, _ := p.ShouldSeal(segment, tsoutil.ComposeTS(nosealTs, 0)) assert.False(t, shouldSeal) - shouldSeal = p(segment, tsoutil.ComposeTS(sealTs, 0)) + shouldSeal, _ = p.ShouldSeal(segment, tsoutil.ComposeTS(sealTs, 0)) assert.True(t, shouldSeal) }) } @@ -186,9 +186,12 @@ func Test_sealLongTimeIdlePolicy(t *testing.T) { maxSizeOfSegment := 512.0 policy := sealL1SegmentByIdleTime(idleTimeTolerance, minSizeToSealIdleSegment, maxSizeOfSegment) seg1 := &SegmentInfo{lastWrittenTime: time.Now().Add(idleTimeTolerance * 5)} - assert.False(t, policy(seg1, 100)) + shouldSeal, _ := policy.ShouldSeal(seg1, 100) + assert.False(t, shouldSeal) seg2 := &SegmentInfo{lastWrittenTime: getZeroTime(), currRows: 1, SegmentInfo: &datapb.SegmentInfo{MaxRowNum: 10000}} - assert.False(t, policy(seg2, 100)) + shouldSeal, _ = policy.ShouldSeal(seg2, 100) + assert.False(t, shouldSeal) seg3 := &SegmentInfo{lastWrittenTime: getZeroTime(), currRows: 1000, SegmentInfo: &datapb.SegmentInfo{MaxRowNum: 10000}} - assert.True(t, policy(seg3, 100)) + shouldSeal, _ = policy.ShouldSeal(seg3, 100) + assert.True(t, shouldSeal) } diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 6c7cfe0ef52a..e9d1814459b1 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -116,7 +116,7 @@ type SegmentManager struct { segments []UniqueID estimatePolicy calUpperLimitPolicy allocPolicy AllocatePolicy - segmentSealPolicies []segmentSealPolicy + segmentSealPolicies []SegmentSealPolicy channelSealPolicies []channelSealPolicy flushPolicy flushPolicy } @@ -161,7 +161,7 @@ func withAllocPolicy(policy AllocatePolicy) allocOption { } // get allocOption with segmentSealPolicies -func withSegmentSealPolices(policies ...segmentSealPolicy) allocOption { +func withSegmentSealPolices(policies ...SegmentSealPolicy) allocOption { return allocFunc(func(manager *SegmentManager) { // do override instead of append, to override default options manager.segmentSealPolicies = policies @@ -189,8 +189,8 @@ func defaultAllocatePolicy() AllocatePolicy { return AllocatePolicyL1 } -func defaultSegmentSealPolicy() []segmentSealPolicy { - return []segmentSealPolicy{ +func defaultSegmentSealPolicy() []SegmentSealPolicy { + return []SegmentSealPolicy{ sealL1SegmentByBinlogFileNumber(Params.DataCoordCfg.SegmentMaxBinlogFileNumber.GetAsInt()), sealL1SegmentByLifetime(Params.DataCoordCfg.SegmentMaxLifetime.GetAsDuration(time.Second)), sealL1SegmentByCapacity(Params.DataCoordCfg.SegmentSealProportion.GetAsFloat()), @@ -633,7 +633,8 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error { } // change shouldSeal to segment seal policy logic for _, policy := range s.segmentSealPolicies { - if policy(info, ts) { + if shouldSeal, reason := policy.ShouldSeal(info, ts); shouldSeal { + log.Info("Seal Segment for policy matched", zap.Int64("segmentID", info.GetID()), zap.String("reason", reason)) if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil { return err } diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 42a0e9efc6ef..fefb7c62509d 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -75,7 +75,7 @@ func TestManagerOptions(t *testing.T) { opt := withSegmentSealPolices(defaultSegmentSealPolicy()...) assert.NotNil(t, opt) // manual set nil - segmentManager.segmentSealPolicies = []segmentSealPolicy{} + segmentManager.segmentSealPolicies = []SegmentSealPolicy{} opt.apply(segmentManager) assert.True(t, len(segmentManager.segmentSealPolicies) > 0) }) @@ -637,7 +637,7 @@ func TestTryToSealSegment(t *testing.T) { // Not trigger seal { - segmentManager.segmentSealPolicies = []segmentSealPolicy{sealL1SegmentByLifetime(2)} + segmentManager.segmentSealPolicies = []SegmentSealPolicy{sealL1SegmentByLifetime(2)} segments := segmentManager.meta.segments.segments assert.Equal(t, 1, len(segments)) for _, seg := range segments { @@ -661,7 +661,7 @@ func TestTryToSealSegment(t *testing.T) { // Trigger seal { - segmentManager.segmentSealPolicies = []segmentSealPolicy{sealL1SegmentByBinlogFileNumber(2)} + segmentManager.segmentSealPolicies = []SegmentSealPolicy{sealL1SegmentByBinlogFileNumber(2)} segments := segmentManager.meta.segments.segments assert.Equal(t, 1, len(segments)) for _, seg := range segments { diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index 475617c2edc7..760fbace1a4c 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -1580,11 +1580,8 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { waitTaskDoneFunc := func(sche *taskScheduler) { for { - fmt.Println("wait for read lock") sche.RLock() - fmt.Println("after read lock") taskNum := len(sche.tasks) - fmt.Println("taskNum: ", taskNum) sche.RUnlock() if taskNum == 0 {