From aafce3f11390fbad986eaa21cc81463a1fb8bd40 Mon Sep 17 00:00:00 2001 From: liqiangz Date: Fri, 8 Jan 2021 23:12:34 +0800 Subject: [PATCH] Record max concurrency in MetricItem and metric logs (#371) --- core/stat/base/bucket_leap_array.go | 65 +++++++++++++++++--- core/stat/base/bucket_leap_array_test.go | 9 +++ core/stat/base/metric_bucket.go | 22 ++++++- core/stat/base/metric_bucket_test.go | 21 ++++--- core/stat/base/sliding_window_metric.go | 27 ++++++++ core/stat/base/sliding_window_metric_test.go | 10 +++ core/stat/base_node.go | 10 ++- 7 files changed, 144 insertions(+), 20 deletions(-) diff --git a/core/stat/base/bucket_leap_array.go b/core/stat/base/bucket_leap_array.go index 5050d2cf..62c2df10 100644 --- a/core/stat/base/bucket_leap_array.go +++ b/core/stat/base/bucket_leap_array.go @@ -89,26 +89,46 @@ func (bla *BucketLeapArray) AddCount(event base.MetricEvent, count int64) { } func (bla *BucketLeapArray) addCountWithTime(now uint64, event base.MetricEvent, count int64) { + b := bla.currentBucketWithTime(now) + if b == nil { + return + } + b.Add(event, count) +} + +func (bla *BucketLeapArray) UpdateConcurrency(concurrency int32) { + bla.updateConcurrencyWithTime(util.CurrentTimeMillis(), concurrency) +} + +func (bla *BucketLeapArray) updateConcurrencyWithTime(now uint64, concurrency int32) { + b := bla.currentBucketWithTime(now) + if b == nil { + return + } + b.UpdateConcurrency(concurrency) +} + +func (bla *BucketLeapArray) currentBucketWithTime(now uint64) *MetricBucket { curBucket, err := bla.data.currentBucketOfTime(now, bla) if err != nil { - logging.Error(err, "Failed to get current bucket in BucketLeapArray.addCountWithTime()", "now", now) - return + logging.Error(err, "Failed to get current bucket in BucketLeapArray.currentBucketWithTime()", "now", now) + return nil } if curBucket == nil { - logging.Error(errors.New("current bucket is nil"), "Nil curBucket in BucketLeapArray.addCountWithTime()") - return + logging.Error(errors.New("current bucket is nil"), "Nil curBucket in BucketLeapArray.currentBucketWithTime()") + return nil } mb := curBucket.Value.Load() if mb == nil { - logging.Error(errors.New("nil bucket"), "Current bucket atomic Value is nil in BucketLeapArray.addCountWithTime()") - return + logging.Error(errors.New("nil bucket"), "Current bucket atomic Value is nil in BucketLeapArray.currentBucketWithTime()") + return nil } b, ok := mb.(*MetricBucket) if !ok { - logging.Error(errors.New("fail to type assert"), "Bucket data type error in BucketLeapArray.addCountWithTime()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name()) - return + logging.Error(errors.New("fail to type assert"), "Bucket data type error in BucketLeapArray.currentBucketWithTime()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name()) + return nil } - b.Add(event, count) + return b } // Read method, need to adapt upper application @@ -178,3 +198,30 @@ func (bla *BucketLeapArray) MinRt() int64 { } return ret } + +func (bla *BucketLeapArray) MaxConcurrency() int32 { + _, err := bla.data.CurrentBucket(bla) + if err != nil { + logging.Error(err, "Failed to get current bucket in BucketLeapArray.MaxConcurrency()") + } + + ret := int32(0) + + for _, v := range bla.data.Values() { + mb := v.Value.Load() + if mb == nil { + logging.Error(errors.New("current bucket is nil"), "Failed to load current bucket in BucketLeapArray.MaxConcurrency()") + continue + } + b, ok := mb.(*MetricBucket) + if !ok { + logging.Error(errors.New("fail to type assert"), "Bucket data type error in BucketLeapArray.MaxConcurrency()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name()) + continue + } + mc := b.MaxConcurrency() + if ret < mc { + ret = mc + } + } + return ret +} diff --git a/core/stat/base/bucket_leap_array_test.go b/core/stat/base/bucket_leap_array_test.go index 43ffa522..4e50dc65 100644 --- a/core/stat/base/bucket_leap_array_test.go +++ b/core/stat/base/bucket_leap_array_test.go @@ -147,6 +147,15 @@ func TestAddCount(t *testing.T) { assert.True(t, passCount == 1) } +func TestUpdateConcurrency(t *testing.T) { + bla := NewBucketLeapArray(SampleCount, IntervalInMs) + bla.UpdateConcurrency(1) + bla.UpdateConcurrency(3) + bla.UpdateConcurrency(2) + mc := bla.MaxConcurrency() + assert.True(t, mc == 3) +} + func TestMinRt(t *testing.T) { t.Run("TestMinRt_Default", func(t *testing.T) { bla := NewBucketLeapArray(SampleCount, IntervalInMs) diff --git a/core/stat/base/metric_bucket.go b/core/stat/base/metric_bucket.go index f7ed20f9..2294f6c2 100644 --- a/core/stat/base/metric_bucket.go +++ b/core/stat/base/metric_bucket.go @@ -26,13 +26,15 @@ import ( // Note that all operations of the MetricBucket are required to be thread-safe. type MetricBucket struct { // Value of statistic - counter [base.MetricEventTotal]int64 - minRt int64 + counter [base.MetricEventTotal]int64 + minRt int64 + maxConcurrency int32 } func NewMetricBucket() *MetricBucket { mb := &MetricBucket{ - minRt: base.DefaultStatisticMaxRt, + minRt: base.DefaultStatisticMaxRt, + maxConcurrency: 0, } return mb } @@ -68,11 +70,13 @@ func (mb *MetricBucket) reset() { atomic.StoreInt64(&mb.counter[i], 0) } atomic.StoreInt64(&mb.minRt, base.DefaultStatisticMaxRt) + atomic.StoreInt32(&mb.maxConcurrency, int32(0)) } func (mb *MetricBucket) AddRt(rt int64) { mb.addCount(base.MetricEventRt, rt) if rt < atomic.LoadInt64(&mb.minRt) { + // Might not be accurate here. atomic.StoreInt64(&mb.minRt, rt) } } @@ -80,3 +84,15 @@ func (mb *MetricBucket) AddRt(rt int64) { func (mb *MetricBucket) MinRt() int64 { return atomic.LoadInt64(&mb.minRt) } + +func (mb *MetricBucket) UpdateConcurrency(concurrency int32) { + cc := concurrency + if cc > atomic.LoadInt32(&mb.maxConcurrency) { + // Might not be accurate here. + atomic.StoreInt32(&mb.maxConcurrency, cc) + } +} + +func (mb *MetricBucket) MaxConcurrency() int32 { + return atomic.LoadInt32(&mb.maxConcurrency) +} diff --git a/core/stat/base/metric_bucket_test.go b/core/stat/base/metric_bucket_test.go index 945e31a7..44e67408 100644 --- a/core/stat/base/metric_bucket_test.go +++ b/core/stat/base/metric_bucket_test.go @@ -27,7 +27,7 @@ func Test_metricBucket_MemSize(t *testing.T) { mb := NewMetricBucket() t.Log("mb:", mb) size := unsafe.Sizeof(*mb) - if size != 48 { + if size != 56 { t.Error("unexpect memory size of MetricBucket") } } @@ -35,17 +35,19 @@ func Test_metricBucket_MemSize(t *testing.T) { func Test_metricBucket_Normal(t *testing.T) { mb := NewMetricBucket() - for i := 0; i < 100; i++ { - if i%5 == 0 { + for i := 0; i < 120; i++ { + if i%6 == 0 { mb.Add(base.MetricEventPass, 1) - } else if i%5 == 1 { + } else if i%6 == 1 { mb.Add(base.MetricEventBlock, 1) - } else if i%5 == 2 { + } else if i%6 == 2 { mb.Add(base.MetricEventComplete, 1) - } else if i%5 == 3 { + } else if i%6 == 3 { mb.Add(base.MetricEventError, 1) - } else if i%5 == 4 { + } else if i%6 == 4 { mb.AddRt(100) + } else if i%6 == 5 { + mb.UpdateConcurrency(int32(i)) } else { t.Error("unexpect idx") } @@ -66,6 +68,9 @@ func Test_metricBucket_Normal(t *testing.T) { if mb.Get(base.MetricEventRt) != 20*100 { t.Error("unexpect count MetricEventRt") } + if mb.MaxConcurrency() != 119 { + t.Error("unexpect count MetricEventConcurrency") + } } func Test_metricBucket_Concurrent(t *testing.T) { @@ -133,5 +138,7 @@ func Test_Reset(t *testing.T) { mb.AddRt(100) mb.reset() rt := mb.MinRt() + mc := mb.MaxConcurrency() assert.True(t, rt == base.DefaultStatisticMaxRt) + assert.True(t, mc == int32(0)) } diff --git a/core/stat/base/sliding_window_metric.go b/core/stat/base/sliding_window_metric.go index a51b5a5f..937a8654 100644 --- a/core/stat/base/sliding_window_metric.go +++ b/core/stat/base/sliding_window_metric.go @@ -162,6 +162,29 @@ func (m *SlidingWindowMetric) MinRT() float64 { return float64(minRt) } +func (m *SlidingWindowMetric) MaxConcurrency() int32 { + now := util.CurrentTimeMillis() + satisfiedBuckets := m.getSatisfiedBuckets(now) + maxConcurrency := int32(0) + for _, w := range satisfiedBuckets { + mb := w.Value.Load() + if mb == nil { + logging.Error(errors.New("nil BucketWrap"), "Current bucket value is nil in SlidingWindowMetric.MaxConcurrency()") + continue + } + counter, ok := mb.(*MetricBucket) + if !ok { + logging.Error(errors.New("type assert failed"), "Fail to do type assert in SlidingWindowMetric.MaxConcurrency()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name()) + continue + } + v := counter.MaxConcurrency() + if v > maxConcurrency { + maxConcurrency = v + } + } + return maxConcurrency +} + func (m *SlidingWindowMetric) AvgRT() float64 { return float64(m.GetSum(base.MetricEventRt)) / float64(m.GetSum(base.MetricEventComplete)) } @@ -214,6 +237,10 @@ func (m *SlidingWindowMetric) metricItemFromBuckets(ts uint64, ws []*BucketWrap) item.BlockQps += uint64(mb.Get(base.MetricEventBlock)) item.ErrorQps += uint64(mb.Get(base.MetricEventError)) item.CompleteQps += uint64(mb.Get(base.MetricEventComplete)) + mc := uint32(mb.MaxConcurrency()) + if mc > item.Concurrency { + item.Concurrency = mc + } allRt += mb.Get(base.MetricEventRt) } if item.CompleteQps > 0 { diff --git a/core/stat/base/sliding_window_metric_test.go b/core/stat/base/sliding_window_metric_test.go index b06088c7..1666da9c 100644 --- a/core/stat/base/sliding_window_metric_test.go +++ b/core/stat/base/sliding_window_metric_test.go @@ -225,6 +225,16 @@ func TestMinRT(t *testing.T) { assert.True(t, util.Float64Equals(minRt, float64(base.DefaultStatisticMaxRt))) } +func TestMaxConcurrency(t *testing.T) { + got, err := NewSlidingWindowMetric(4, 2000, NewBucketLeapArray(SampleCount, IntervalInMs)) + assert.True(t, err == nil && got != nil) + got.real.UpdateConcurrency(1) + got.real.UpdateConcurrency(3) + got.real.UpdateConcurrency(2) + mc := got.MaxConcurrency() + assert.True(t, mc == int32(3)) +} + func TestAvgRT(t *testing.T) { got, err := NewSlidingWindowMetric(4, 2000, NewBucketLeapArray(SampleCount, IntervalInMs)) assert.True(t, err == nil && got != nil) diff --git a/core/stat/base_node.go b/core/stat/base_node.go index 255be66d..9ef59b77 100644 --- a/core/stat/base_node.go +++ b/core/stat/base_node.go @@ -68,6 +68,10 @@ func (n *BaseStatNode) AddCount(event base.MetricEvent, count int64) { n.arr.AddCount(event, count) } +func (n *BaseStatNode) UpdateConcurrency(concurrency int32) { + n.arr.UpdateConcurrency(concurrency) +} + func (n *BaseStatNode) AvgRT() float64 { complete := n.metric.GetSum(base.MetricEventComplete) if complete <= 0 { @@ -80,12 +84,16 @@ func (n *BaseStatNode) MinRT() float64 { return float64(n.metric.MinRT()) } +func (n *BaseStatNode) MaxConcurrency() int32 { + return n.metric.MaxConcurrency() +} + func (n *BaseStatNode) CurrentConcurrency() int32 { return atomic.LoadInt32(&(n.concurrency)) } func (n *BaseStatNode) IncreaseConcurrency() { - atomic.AddInt32(&(n.concurrency), 1) + n.UpdateConcurrency(atomic.AddInt32(&(n.concurrency), 1)) } func (n *BaseStatNode) DecreaseConcurrency() {