Skip to content

Commit

Permalink
fix: DataNode might OOM by estimating based on MemorySize
Browse files Browse the repository at this point in the history
See also: milvus-io#34136

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn committed Jun 26, 2024
1 parent f8bf690 commit 9ac33b7
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 29 deletions.
3 changes: 1 addition & 2 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,6 @@ dataNode:
# if this parameter <= 0, will set it as the maximum number of CPUs that can be executing
# suggest to set it bigger on large collection numbers to avoid blocking
workPoolSize: -1
# specify the size of global work pool for channel checkpoint updating
# if this parameter <= 0, will set it as 10
updateChannelCheckpointMaxParallel: 10
updateChannelCheckpointInterval: 60 # the interval duration(in seconds) for datanode to update channel checkpoint of each channel
updateChannelCheckpointRPCTimeout: 20 # timeout in seconds for UpdateChannelCheckpoint RPC call
Expand All @@ -556,6 +554,7 @@ dataNode:
readBufferSizeInMB: 16 # The data block size (in MB) read from chunk manager by the datanode during import.
compaction:
levelZeroBatchMemoryRatio: 0.05 # The minimal memory ratio of free memory for level zero compaction executing in batch mode
levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.
gracefulStopTimeout: 1800 # seconds. force stop node without graceful stop
ip: # if not specified, use the first unicastable address
port: 21124
Expand Down
61 changes: 35 additions & 26 deletions internal/datanode/compaction/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,19 @@ func (t *LevelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error
}

var (
totalSize int64
totalDeltalogs = make(map[int64][]string)
memorySize int64
totalDeltalogs = []string{}
)
for _, s := range l0Segments {
paths := []string{}
for _, d := range s.GetDeltalogs() {
for _, l := range d.GetBinlogs() {
paths = append(paths, l.GetLogPath())
totalSize += l.GetMemorySize()
totalDeltalogs = append(totalDeltalogs, l.GetLogPath())
memorySize += l.GetMemorySize()
}
}
if len(paths) > 0 {
totalDeltalogs[s.GetSegmentID()] = paths
}
}

batchSize := getMaxBatchSize(totalSize)
resultSegments, err := t.process(ctx, batchSize, targetSegments, lo.Values(totalDeltalogs)...)
resultSegments, err := t.process(ctx, memorySize, targetSegments, totalDeltalogs)
if err != nil {
return nil, err
}
Expand All @@ -170,15 +165,22 @@ func (t *LevelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error
return result, nil
}

// batch size means segment count
func getMaxBatchSize(totalSize int64) int {
max := 1
memLimit := float64(hardware.GetFreeMemoryCount()) * paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat()
if memLimit > float64(totalSize) {
max = int(memLimit / float64(totalSize))
// BatchSize refers to the L1/L2 segments count that in one batch, batchSize controls the expansion ratio
// of deltadata in memory.
func getMaxBatchSize(baseMemSize, memLimit float64) int {
batchSize := 1
if memLimit > baseMemSize {
batchSize = int(memLimit / baseMemSize)
}

maxSizeLimit := paramtable.Get().DataNodeCfg.L0CompactionMaxBatchSize.GetAsInt()
// Set batch size to maxSizeLimit if it is larger than maxSizeLimit.
// When maxSizeLimit <= 0, it means no limit.
if maxSizeLimit > 0 && batchSize > maxSizeLimit {
return maxSizeLimit
}

return max
return batchSize
}

func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWriters map[int64]*SegmentDeltaWriter) ([]*datapb.CompactionSegment, error) {
Expand Down Expand Up @@ -317,18 +319,15 @@ func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deltaDa
return retMap
}

func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
func (t *LevelZeroCompactionTask) process(ctx context.Context, l0MemSize int64, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact process")
defer span.End()

results := make([]*datapb.CompactionSegment, 0)
batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize)))
log := log.Ctx(ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.Int("max conc segment counts", batchSize),
zap.Int("total segment counts", len(targetSegments)),
zap.Int("total batch", batch),
)
ratio := paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat()
memLimit := float64(hardware.GetFreeMemoryCount()) * ratio
if float64(l0MemSize) > memLimit {
return nil, errors.Newf("L0 compaction failed, not enough memory, request memory size: %v, memory limit: %v", l0MemSize, memLimit)
}

log.Info("L0 compaction process start")
allDelta, err := t.loadDelta(ctx, lo.Flatten(deltaLogs))
Expand All @@ -337,6 +336,16 @@ func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, ta
return nil, err
}

batchSize := getMaxBatchSize(float64(allDelta.Size()), memLimit)
batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize)))
log := log.Ctx(ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.Int("max conc segment counts", batchSize),
zap.Int("total segment counts", len(targetSegments)),
zap.Int("total batch", batch),
)

results := make([]*datapb.CompactionSegment, 0)
for i := 0; i < batch; i++ {
left, right := i*batchSize, (i+1)*batchSize
if right >= len(targetSegments) {
Expand Down
30 changes: 30 additions & 0 deletions internal/datanode/compaction/l0_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,36 @@ func (s *LevelZeroCompactionTaskSuite) SetupTest() {
s.dBlob = blob.GetValue()
}

func (s *LevelZeroCompactionTaskSuite) TestGetMaxBatchSize() {
tests := []struct {
baseMem float64
memLimit float64
batchSizeLimit string

expected int
description string
}{
{10, 100, "-1", 10, "no limitation on maxBatchSize"},
{10, 100, "0", 10, "no limitation on maxBatchSize v2"},
{10, 100, "11", 10, "maxBatchSize == 11"},
{10, 100, "1", 1, "maxBatchSize == 1"},
{10, 12, "-1", 1, "no limitation on maxBatchSize"},
{10, 12, "100", 1, "maxBatchSize == 100"},
}

maxSizeK := paramtable.Get().DataNodeCfg.L0CompactionMaxBatchSize.Key
defer paramtable.Get().Reset(maxSizeK)
for _, test := range tests {
s.Run(test.description, func() {
paramtable.Get().Save(maxSizeK, test.batchSizeLimit)
defer paramtable.Get().Reset(maxSizeK)

actual := getMaxBatchSize(test.baseMem, test.memLimit)
s.Equal(test.expected, actual)
})
}
}

func (s *LevelZeroCompactionTaskSuite) TestProcessLoadDeltaFail() {
plan := &datapb.CompactionPlan{
PlanID: 19530,
Expand Down
12 changes: 11 additions & 1 deletion pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3743,7 +3743,8 @@ type dataNodeConfig struct {
ReadBufferSizeInMB ParamItem `refreshable:"true"`

// Compaction
L0BatchMemoryRatio ParamItem `refreshable:"true"`
L0BatchMemoryRatio ParamItem `refreshable:"true"`
L0CompactionMaxBatchSize ParamItem `refreshable:"true"`

GracefulStopTimeout ParamItem `refreshable:"true"`

Expand Down Expand Up @@ -4052,6 +4053,15 @@ if this parameter <= 0, will set it as 10`,
}
p.L0BatchMemoryRatio.Init(base.mgr)

p.L0CompactionMaxBatchSize = ParamItem{
Key: "dataNode.compaction.levelZeroMaxBatchSize",
Version: "2.4.5",
Doc: "Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1.",
DefaultValue: "-1",
Export: true,
}
p.L0CompactionMaxBatchSize.Init(base.mgr)

p.GracefulStopTimeout = ParamItem{
Key: "dataNode.gracefulStopTimeout",
Version: "2.3.7",
Expand Down

0 comments on commit 9ac33b7

Please sign in to comment.