From 32827f538a8fa9c08cd8c71879e90bea2ad06653 Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 21 Jul 2023 15:30:59 +0800 Subject: [PATCH] add metrics for delegator insert/delete cost (#25733) Signed-off-by: Wei Liu --- .../querynodev2/delegator/delegator_data.go | 11 +++++++++ internal/querynodev2/pipeline/delete_node.go | 3 +++ internal/querynodev2/pipeline/filter_node.go | 2 ++ internal/querynodev2/pipeline/insert_node.go | 5 ++++ pkg/metrics/querynode_metrics.go | 23 +++++++++++++++++++ 5 files changed, 44 insertions(+) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index af9ea10b1e46..780a5006ae56 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/util/commonpbutil" @@ -43,6 +44,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" + "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -97,6 +99,8 @@ func (sd *shardDelegator) newGrowing(segmentID int64, insertData *InsertData) se // ProcessInsert handles insert data in delegator. func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { + method := "ProcessInsert" + tr := timerecord.NewTimeRecorder(method) log := sd.getLogger(context.Background()) for segmentID, insertData := range insertRecords { growing := sd.segmentManager.GetGrowing(segmentID) @@ -126,12 +130,16 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { zap.Uint64("maxTimestamp", insertData.Timestamps[len(insertData.Timestamps)-1]), ) } + metrics.QueryNodeProcessCost.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel). + Observe(float64(tr.ElapseSpan())) } // ProcessDelete handles delete data in delegator. // delegator puts deleteData into buffer first, // then dispatch data to segments acoording to the result of pkOracle. func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { + method := "ProcessDelete" + tr := timerecord.NewTimeRecorder(method) // block load segment handle delete buffer sd.deleteMut.Lock() defer sd.deleteMut.Unlock() @@ -223,6 +231,9 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { log.Warn("failed to apply delete, mark segment offline", zap.Int64s("offlineSegments", offlineSegIDs)) sd.markSegmentOffline(offlineSegIDs...) } + + metrics.QueryNodeProcessCost.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel). + Observe(float64(tr.ElapseSpan())) } // applyDelete handles delete record and apply them to corresponding workers. diff --git a/internal/querynodev2/pipeline/delete_node.go b/internal/querynodev2/pipeline/delete_node.go index 73339b733431..91c3ce9bce1b 100644 --- a/internal/querynodev2/pipeline/delete_node.go +++ b/internal/querynodev2/pipeline/delete_node.go @@ -26,6 +26,8 @@ import ( "github.com/milvus-io/milvus/internal/storage" base "github.com/milvus-io/milvus/internal/util/pipeline" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type deleteNode struct { @@ -61,6 +63,7 @@ func (dNode *deleteNode) addDeleteData(deleteDatas map[UniqueID]*delegator.Delet } func (dNode *deleteNode) Operate(in Msg) Msg { + metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Dec() nodeMsg := in.(*deleteNodeMsg) // partition id = > DeleteData diff --git a/internal/querynodev2/pipeline/filter_node.go b/internal/querynodev2/pipeline/filter_node.go index f95c82c2cb1b..9293e0be5deb 100644 --- a/internal/querynodev2/pipeline/filter_node.go +++ b/internal/querynodev2/pipeline/filter_node.go @@ -100,6 +100,8 @@ func (fNode *filterNode) Operate(in Msg) Msg { out.append(msg) } } + + metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Inc() return out } diff --git a/internal/querynodev2/pipeline/insert_node.go b/internal/querynodev2/pipeline/insert_node.go index 8f1391dcf6a0..8e7c060a6bce 100644 --- a/internal/querynodev2/pipeline/insert_node.go +++ b/internal/querynodev2/pipeline/insert_node.go @@ -28,6 +28,8 @@ import ( "github.com/milvus-io/milvus/internal/storage" base "github.com/milvus-io/milvus/internal/util/pipeline" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -85,6 +87,7 @@ func (iNode *insertNode) addInsertData(insertDatas map[UniqueID]*delegator.Inser // Insert task func (iNode *insertNode) Operate(in Msg) Msg { + metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Dec() nodeMsg := in.(*insertNodeMsg) sort.Slice(nodeMsg.insertMsgs, func(i, j int) bool { @@ -105,6 +108,8 @@ func (iNode *insertNode) Operate(in Msg) Msg { iNode.delegator.ProcessInsert(insertDatas) + metrics.QueryNodeWaitProcessingMsgCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Inc() + return &deleteNodeMsg{ deleteMsgs: nodeMsg.deleteMsgs, timeRange: nodeMsg.timeRange, diff --git a/pkg/metrics/querynode_metrics.go b/pkg/metrics/querynode_metrics.go index 30b1a920d11e..fc3ea7769de5 100644 --- a/pkg/metrics/querynode_metrics.go +++ b/pkg/metrics/querynode_metrics.go @@ -47,6 +47,29 @@ var ( collectionIDLabelName, }) + QueryNodeProcessCost = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "process_insert_or_delete_ms", + Help: "process insert or delete cost in ms", + Buckets: buckets, + }, []string{ + nodeIDLabelName, + msgTypeLabelName, + }) + + QueryNodeWaitProcessingMsgCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.QueryNodeRole, + Name: "wait_processing_msg_count", + Help: "count of wait processing msg", + }, []string{ + nodeIDLabelName, + msgTypeLabelName, + }) + QueryNodeConsumerMsgCount = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: milvusNamespace,