Skip to content

Commit

Permalink
enhance: Pre-allocate insert buffer capacity for writebuffer
Browse files Browse the repository at this point in the history
See also milvus-io#33561

This PR:
- Adds a new param item for insert buffer chunk size
- Pre-allocate for each insert buffer preventing frequent `growslice`

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Jun 11, 2024
1 parent 2b7ee19 commit eeaca07
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 37 deletions.
2 changes: 1 addition & 1 deletion internal/datanode/metacache/bloom_filter_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *BloomFilterSetSuite) GetFieldData(ids []int64) storage.FieldData {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, len(ids))
s.Require().NoError(err)

for _, id := range ids {
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/syncmgr/storage_serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (s *StorageV1SerializerSuite) getBfs() *metacache.BloomFilterSet {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, 16)
s.Require().NoError(err)

ids := []int64{1, 2, 3, 4, 5, 6, 7}
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/syncmgr/storage_v2_serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (s *StorageV2SerializerSuite) getBfs() *metacache.BloomFilterSet {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, 16)
s.Require().NoError(err)

ids := []int64{1, 2, 3, 4, 5, 6, 7}
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/syncmgr/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (s *SyncTaskSuite) TestRunNormal() {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, 16)
s.Require().NoError(err)

ids := []int64{1, 2, 3, 4, 5, 6, 7}
Expand Down Expand Up @@ -299,7 +299,7 @@ func (s *SyncTaskSuite) TestCompactToNull() {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, 16)
s.Require().NoError(err)

ids := []int64{1, 2, 3, 4, 5, 6, 7}
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/syncmgr/taskv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (s *SyncTaskSuiteV2) TestRunNormal() {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, 16)
s.Require().NoError(err)

ids := []int64{1, 2, 3, 4, 5, 6, 7}
Expand Down
86 changes: 76 additions & 10 deletions internal/datanode/writebuffer/insert_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ type InsertBuffer struct {
BufferBase
collSchema *schemapb.CollectionSchema

estRow int64
chunkSize int64
// buffer *storage.InsertData
buffers []*InsertBufferChunk
}

// InsertBufferChunk resembles pre-allocated insert data and statistic.
type InsertBufferChunk struct {
BufferBase

buffer *storage.InsertData
}

Expand All @@ -87,40 +97,96 @@ func NewInsertBuffer(sch *schemapb.CollectionSchema) (*InsertBuffer, error) {
if estSize == 0 {
return nil, errors.New("Invalid schema")
}
buffer, err := storage.NewInsertData(sch)
if err != nil {
return nil, err
}

sizeLimit := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt64()
chunkSize := paramtable.Get().DataNodeCfg.InsertBufferChunkSize.GetAsInt64()
// use size Limit when chunkSize not valid
if chunkSize <= 0 || chunkSize > sizeLimit {
log.Warn("invalidate chunk size, use insert buffer size", zap.Int64("chunkSize", chunkSize), zap.Int64("insertBufferSize", sizeLimit))
chunkSize = sizeLimit
}

return &InsertBuffer{
estRow := chunkSize / int64(estSize)
ib := &InsertBuffer{
BufferBase: BufferBase{
rowLimit: noLimit,
sizeLimit: sizeLimit,
TimestampFrom: math.MaxUint64,
TimestampTo: 0,
},
collSchema: sch,
buffer: buffer,
}, nil
estRow: estRow,
chunkSize: chunkSize,
buffers: make([]*InsertBufferChunk, 0, sizeLimit/chunkSize),
}
err = ib.nextBatch()
if err != nil {
return nil, err
}

return ib, nil
}

func (ib *InsertBuffer) nextBatch() error {
buffer, err := storage.NewInsertDataWithCap(ib.collSchema, int(ib.estRow))
if err != nil {
return err
}
ib.buffers = append(ib.buffers, &InsertBufferChunk{
BufferBase: BufferBase{
rowLimit: ib.estRow,
sizeLimit: ib.chunkSize,
TimestampFrom: math.MaxUint64,
TimestampTo: 0,
},
buffer: buffer,
})
return nil
}

func (ib *InsertBuffer) currentBuffer() *InsertBufferChunk {
idx := len(ib.buffers) - 1
if idx < 0 || ib.buffers[idx].IsFull() {
ib.nextBatch()
idx++
}
return ib.buffers[idx]
}

func (ib *InsertBuffer) buffer(inData *storage.InsertData, tr TimeRange, startPos, endPos *msgpb.MsgPosition) {
buffer := ib.currentBuffer()
storage.MergeInsertData(buffer.buffer, inData)
buffer.UpdateStatistics(int64(inData.GetRowNum()), int64(inData.GetMemorySize()), tr, startPos, endPos)
}

func (ib *InsertBuffer) Yield() *storage.InsertData {
if ib.IsEmpty() {
return nil
}

return ib.buffer
// avoid copy when there is only one buffer
if len(ib.buffers) == 1 {
return ib.currentBuffer().buffer
}
// no error assumed, buffer created before
result, _ := storage.NewInsertDataWithCap(ib.collSchema, int(ib.rows))
for _, chunk := range ib.buffers {
storage.MergeInsertData(result, chunk.buffer)
}
// set buffer nil to so that fragmented buffer could get GCed
ib.buffers = nil
return result
}

func (ib *InsertBuffer) Buffer(inData *inData, startPos, endPos *msgpb.MsgPosition) int64 {
bufferedSize := int64(0)
for idx, data := range inData.data {
storage.MergeInsertData(ib.buffer, data)
tsData := inData.tsField[idx]
tr := ib.getTimestampRange(tsData)
ib.buffer(data, tr, startPos, endPos)

// update buffer size
ib.UpdateStatistics(int64(data.GetRowNum()), int64(data.GetMemorySize()), ib.getTimestampRange(tsData), startPos, endPos)
ib.UpdateStatistics(int64(data.GetRowNum()), int64(data.GetMemorySize()), tr, startPos, endPos)
bufferedSize += int64(data.GetMemorySize())
}
return bufferedSize
Expand Down
42 changes: 23 additions & 19 deletions internal/storage/insert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,24 @@ type InsertData struct {
}

func NewInsertData(schema *schemapb.CollectionSchema) (*InsertData, error) {
return NewInsertDataWithCap(schema, 0)
}

func NewInsertDataWithCap(schema *schemapb.CollectionSchema, cap int) (*InsertData, error) {
if schema == nil {
return nil, fmt.Errorf("Nil input schema")
return nil, merr.WrapErrParameterMissing("collection schema")
}

idata := &InsertData{
Data: make(map[FieldID]FieldData),
}

for _, fSchema := range schema.Fields {
fieldData, err := NewFieldData(fSchema.DataType, fSchema)
for _, field := range schema.GetFields() {
fieldData, err := NewFieldData(field.DataType, field, cap)
if err != nil {
return nil, err
}
idata.Data[fSchema.FieldID] = fieldData
idata.Data[field.FieldID] = fieldData
}
return idata, nil
}
Expand Down Expand Up @@ -147,7 +151,7 @@ type FieldData interface {
GetDataType() schemapb.DataType
}

func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema) (FieldData, error) {
func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, cap int) (FieldData, error) {
typeParams := fieldSchema.GetTypeParams()
switch dataType {
case schemapb.DataType_Float16Vector:
Expand All @@ -156,7 +160,7 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema)
return nil, err
}
return &Float16VectorFieldData{
Data: make([]byte, 0),
Data: make([]byte, 0, cap),
Dim: dim,
}, nil
case schemapb.DataType_BFloat16Vector:
Expand All @@ -165,7 +169,7 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema)
return nil, err
}
return &BFloat16VectorFieldData{
Data: make([]byte, 0),
Data: make([]byte, 0, cap),
Dim: dim,
}, nil
case schemapb.DataType_FloatVector:
Expand All @@ -174,7 +178,7 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema)
return nil, err
}
return &FloatVectorFieldData{
Data: make([]float32, 0),
Data: make([]float32, 0, cap),
Dim: dim,
}, nil
case schemapb.DataType_BinaryVector:
Expand All @@ -183,56 +187,56 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema)
return nil, err
}
return &BinaryVectorFieldData{
Data: make([]byte, 0),
Data: make([]byte, 0, cap),
Dim: dim,
}, nil
case schemapb.DataType_SparseFloatVector:
return &SparseFloatVectorFieldData{}, nil
case schemapb.DataType_Bool:
return &BoolFieldData{
Data: make([]bool, 0),
Data: make([]bool, 0, cap),
}, nil

case schemapb.DataType_Int8:
return &Int8FieldData{
Data: make([]int8, 0),
Data: make([]int8, 0, cap),
}, nil

case schemapb.DataType_Int16:
return &Int16FieldData{
Data: make([]int16, 0),
Data: make([]int16, 0, cap),
}, nil

case schemapb.DataType_Int32:
return &Int32FieldData{
Data: make([]int32, 0),
Data: make([]int32, 0, cap),
}, nil

case schemapb.DataType_Int64:
return &Int64FieldData{
Data: make([]int64, 0),
Data: make([]int64, 0, cap),
}, nil
case schemapb.DataType_Float:
return &FloatFieldData{
Data: make([]float32, 0),
Data: make([]float32, 0, cap),
}, nil

case schemapb.DataType_Double:
return &DoubleFieldData{
Data: make([]float64, 0),
Data: make([]float64, 0, cap),
}, nil
case schemapb.DataType_JSON:
return &JSONFieldData{
Data: make([][]byte, 0),
Data: make([][]byte, 0, cap),
}, nil
case schemapb.DataType_Array:
return &ArrayFieldData{
Data: make([]*schemapb.ScalarField, 0),
Data: make([]*schemapb.ScalarField, 0, cap),
ElementType: fieldSchema.GetElementType(),
}, nil
case schemapb.DataType_String, schemapb.DataType_VarChar:
return &StringFieldData{
Data: make([]string, 0),
Data: make([]string, 0, cap),
DataType: dataType,
}, nil
default:
Expand Down
2 changes: 1 addition & 1 deletion internal/util/importutilv2/binlog/field_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newFieldReader(ctx context.Context, cm storage.ChunkManager, fieldSchema *s
}

func (r *fieldReader) Next() (storage.FieldData, error) {
fieldData, err := storage.NewFieldData(r.fieldSchema.GetDataType(), r.fieldSchema)
fieldData, err := storage.NewFieldData(r.fieldSchema.GetDataType(), r.fieldSchema, 0)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/util/importutilv2/binlog/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
expectInsertData, err := storage.NewInsertData(schema)
suite.NoError(err)
for _, field := range schema.GetFields() {
expectInsertData.Data[field.GetFieldID()], err = storage.NewFieldData(field.GetDataType(), field)
expectInsertData.Data[field.GetFieldID()], err = storage.NewFieldData(field.GetDataType(), field, suite.numRows)
suite.NoError(err)
}
OUTER:
Expand Down

0 comments on commit eeaca07

Please sign in to comment.