diff --git a/configs/milvus.yaml b/configs/milvus.yaml index a16eb1f52b05..e031a3c5ae66 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -543,8 +543,6 @@ dataNode: # if this parameter <= 0, will set it as the maximum number of CPUs that can be executing # suggest to set it bigger on large collection numbers to avoid blocking workPoolSize: -1 - # specify the size of global work pool for channel checkpoint updating - # if this parameter <= 0, will set it as 10 updateChannelCheckpointMaxParallel: 10 updateChannelCheckpointInterval: 60 # the interval duration(in seconds) for datanode to update channel checkpoint of each channel updateChannelCheckpointRPCTimeout: 20 # timeout in seconds for UpdateChannelCheckpoint RPC call @@ -556,6 +554,7 @@ dataNode: readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import. compaction: levelZeroBatchMemoryRatio: 0.05 # The minimal memory ratio of free memory for level zero compaction executing in batch mode + levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1. gracefulStopTimeout: 1800 # seconds. force stop node without graceful stop ip: # if not specified, use the first unicastable address port: 21124 diff --git a/internal/datanode/compaction/l0_compactor.go b/internal/datanode/compaction/l0_compactor.go index 7cbd3487c827..97b4fce5fb96 100644 --- a/internal/datanode/compaction/l0_compactor.go +++ b/internal/datanode/compaction/l0_compactor.go @@ -133,24 +133,19 @@ func (t *LevelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error } var ( - totalSize int64 - totalDeltalogs = make(map[int64][]string) + memorySize int64 + totalDeltalogs = []string{} ) for _, s := range l0Segments { - paths := []string{} for _, d := range s.GetDeltalogs() { for _, l := range d.GetBinlogs() { - paths = append(paths, l.GetLogPath()) - totalSize += l.GetMemorySize() + totalDeltalogs = append(totalDeltalogs, l.GetLogPath()) + memorySize += l.GetMemorySize() } } - if len(paths) > 0 { - totalDeltalogs[s.GetSegmentID()] = paths - } } - batchSize := getMaxBatchSize(totalSize) - resultSegments, err := t.process(ctx, batchSize, targetSegments, lo.Values(totalDeltalogs)...) + resultSegments, err := t.process(ctx, memorySize, targetSegments, totalDeltalogs) if err != nil { return nil, err } @@ -170,15 +165,22 @@ func (t *LevelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error return result, nil } -// batch size means segment count -func getMaxBatchSize(totalSize int64) int { - max := 1 - memLimit := float64(hardware.GetFreeMemoryCount()) * paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat() - if memLimit > float64(totalSize) { - max = int(memLimit / float64(totalSize)) +// BatchSize refers to the L1/L2 segments count that in one batch, batchSize controls the expansion ratio +// of deltadata in memory. +func getMaxBatchSize(baseMemSize, memLimit float64) int { + batchSize := 1 + if memLimit > baseMemSize { + batchSize = int(memLimit / baseMemSize) + } + + maxSizeLimit := paramtable.Get().DataNodeCfg.L0CompactionMaxBatchSize.GetAsInt() + // Set batch size to maxSizeLimit if it is larger than maxSizeLimit. + // When maxSizeLimit <= 0, it means no limit. + if maxSizeLimit > 0 && batchSize > maxSizeLimit { + return maxSizeLimit } - return max + return batchSize } func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWriters map[int64]*SegmentDeltaWriter) ([]*datapb.CompactionSegment, error) { @@ -317,18 +319,15 @@ func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deltaDa return retMap } -func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) { +func (t *LevelZeroCompactionTask) process(ctx context.Context, l0MemSize int64, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) { _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact process") defer span.End() - results := make([]*datapb.CompactionSegment, 0) - batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize))) - log := log.Ctx(ctx).With( - zap.Int64("planID", t.plan.GetPlanID()), - zap.Int("max conc segment counts", batchSize), - zap.Int("total segment counts", len(targetSegments)), - zap.Int("total batch", batch), - ) + ratio := paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat() + memLimit := float64(hardware.GetFreeMemoryCount()) * ratio + if float64(l0MemSize) > memLimit { + return nil, errors.Newf("L0 compaction failed, not enough memory, request memory size: %v, memory limit: %v", l0MemSize, memLimit) + } log.Info("L0 compaction process start") allDelta, err := t.loadDelta(ctx, lo.Flatten(deltaLogs)) @@ -337,6 +336,16 @@ func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, ta return nil, err } + batchSize := getMaxBatchSize(float64(allDelta.Size()), memLimit) + batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize))) + log := log.Ctx(ctx).With( + zap.Int64("planID", t.plan.GetPlanID()), + zap.Int("max conc segment counts", batchSize), + zap.Int("total segment counts", len(targetSegments)), + zap.Int("total batch", batch), + ) + + results := make([]*datapb.CompactionSegment, 0) for i := 0; i < batch; i++ { left, right := i*batchSize, (i+1)*batchSize if right >= len(targetSegments) { diff --git a/internal/datanode/compaction/l0_compactor_test.go b/internal/datanode/compaction/l0_compactor_test.go index 040b4a7eb3ba..d96bf50a8ea2 100644 --- a/internal/datanode/compaction/l0_compactor_test.go +++ b/internal/datanode/compaction/l0_compactor_test.go @@ -79,6 +79,36 @@ func (s *LevelZeroCompactionTaskSuite) SetupTest() { s.dBlob = blob.GetValue() } +func (s *LevelZeroCompactionTaskSuite) TestGetMaxBatchSize() { + tests := []struct { + baseMem float64 + memLimit float64 + batchSizeLimit string + + expected int + description string + }{ + {10, 100, "-1", 10, "no limitation on maxBatchSize"}, + {10, 100, "0", 10, "no limitation on maxBatchSize v2"}, + {10, 100, "11", 10, "maxBatchSize == 11"}, + {10, 100, "1", 1, "maxBatchSize == 1"}, + {10, 12, "-1", 1, "no limitation on maxBatchSize"}, + {10, 12, "100", 1, "maxBatchSize == 100"}, + } + + maxSizeK := paramtable.Get().DataNodeCfg.L0CompactionMaxBatchSize.Key + defer paramtable.Get().Reset(maxSizeK) + for _, test := range tests { + s.Run(test.description, func() { + paramtable.Get().Save(maxSizeK, test.batchSizeLimit) + defer paramtable.Get().Reset(maxSizeK) + + actual := getMaxBatchSize(test.baseMem, test.memLimit) + s.Equal(test.expected, actual) + }) + } +} + func (s *LevelZeroCompactionTaskSuite) TestProcessLoadDeltaFail() { plan := &datapb.CompactionPlan{ PlanID: 19530, diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 90ee8c5a5ccc..f90418f194e7 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3743,7 +3743,8 @@ type dataNodeConfig struct { ReadBufferSizeInMB ParamItem `refreshable:"true"` // Compaction - L0BatchMemoryRatio ParamItem `refreshable:"true"` + L0BatchMemoryRatio ParamItem `refreshable:"true"` + L0CompactionMaxBatchSize ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` @@ -4052,6 +4053,15 @@ if this parameter <= 0, will set it as 10`, } p.L0BatchMemoryRatio.Init(base.mgr) + p.L0CompactionMaxBatchSize = ParamItem{ + Key: "dataNode.compaction.levelZeroMaxBatchSize", + Version: "2.4.5", + Doc: "Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.", + DefaultValue: "-1", + Export: true, + } + p.L0CompactionMaxBatchSize.Init(base.mgr) + p.GracefulStopTimeout = ParamItem{ Key: "dataNode.gracefulStopTimeout", Version: "2.3.7",