Skip to content

Commit

Permalink
enhance: Remove useless ops when there is no write (milvus-io#34767)
Browse files Browse the repository at this point in the history
Related to milvus-io#33235

THe querynode pipeline will make map & call ProcessInsert when there is
no write messages. So querynodes will have high CPU usage even when
there is no workload.

This PR check msg length before composing data struct and calling method

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Jul 19, 2024
1 parent d978ae4 commit 324b2a2
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 22 deletions.
13 changes: 6 additions & 7 deletions internal/querynodev2/pipeline/delete_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,13 @@ func (dNode *deleteNode) Operate(in Msg) Msg {
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Dec()
nodeMsg := in.(*deleteNodeMsg)

// partition id = > DeleteData
deleteDatas := make(map[UniqueID]*delegator.DeleteData)
if len(nodeMsg.deleteMsgs) > 0 {
// partition id = > DeleteData
deleteDatas := make(map[UniqueID]*delegator.DeleteData)

for _, msg := range nodeMsg.deleteMsgs {
dNode.addDeleteData(deleteDatas, msg)
}

if len(deleteDatas) > 0 {
for _, msg := range nodeMsg.deleteMsgs {
dNode.addDeleteData(deleteDatas, msg)
}
// do Delete, use ts range max as ts
dNode.delegator.ProcessDelete(lo.Values(deleteDatas), nodeMsg.timeRange.timestampMax)
}
Expand Down
32 changes: 17 additions & 15 deletions internal/querynodev2/pipeline/insert_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,25 @@ func (iNode *insertNode) Operate(in Msg) Msg {
metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Dec()
nodeMsg := in.(*insertNodeMsg)

sort.Slice(nodeMsg.insertMsgs, func(i, j int) bool {
return nodeMsg.insertMsgs[i].BeginTs() < nodeMsg.insertMsgs[j].BeginTs()
})

insertDatas := make(map[UniqueID]*delegator.InsertData)
collection := iNode.manager.Collection.Get(iNode.collectionID)
if collection == nil {
log.Error("insertNode with collection not exist", zap.Int64("collection", iNode.collectionID))
panic("insertNode with collection not exist")
}
if len(nodeMsg.insertMsgs) > 0 {
sort.Slice(nodeMsg.insertMsgs, func(i, j int) bool {
return nodeMsg.insertMsgs[i].BeginTs() < nodeMsg.insertMsgs[j].BeginTs()
})

insertDatas := make(map[UniqueID]*delegator.InsertData)
collection := iNode.manager.Collection.Get(iNode.collectionID)
if collection == nil {
log.Error("insertNode with collection not exist", zap.Int64("collection", iNode.collectionID))
panic("insertNode with collection not exist")
}

// get InsertData and merge datas of same segment
for _, msg := range nodeMsg.insertMsgs {
iNode.addInsertData(insertDatas, msg, collection)
}
// get InsertData and merge datas of same segment
for _, msg := range nodeMsg.insertMsgs {
iNode.addInsertData(insertDatas, msg, collection)
}

iNode.delegator.ProcessInsert(insertDatas)
iNode.delegator.ProcessInsert(insertDatas)
}

metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Inc()

Expand Down

0 comments on commit 324b2a2

Please sign in to comment.