Skip to content

Commit

Permalink
Record max concurrency in MetricItem and metric logs (#371)
Browse files Browse the repository at this point in the history
  • Loading branch information
liqiangz committed Jan 8, 2021
1 parent 6cd99cb commit aafce3f
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 20 deletions.
65 changes: 56 additions & 9 deletions core/stat/base/bucket_leap_array.go
Expand Up @@ -89,26 +89,46 @@ func (bla *BucketLeapArray) AddCount(event base.MetricEvent, count int64) {
}

func (bla *BucketLeapArray) addCountWithTime(now uint64, event base.MetricEvent, count int64) {
b := bla.currentBucketWithTime(now)
if b == nil {
return
}
b.Add(event, count)
}

func (bla *BucketLeapArray) UpdateConcurrency(concurrency int32) {
bla.updateConcurrencyWithTime(util.CurrentTimeMillis(), concurrency)
}

func (bla *BucketLeapArray) updateConcurrencyWithTime(now uint64, concurrency int32) {
b := bla.currentBucketWithTime(now)
if b == nil {
return
}
b.UpdateConcurrency(concurrency)
}

func (bla *BucketLeapArray) currentBucketWithTime(now uint64) *MetricBucket {
curBucket, err := bla.data.currentBucketOfTime(now, bla)
if err != nil {
logging.Error(err, "Failed to get current bucket in BucketLeapArray.addCountWithTime()", "now", now)
return
logging.Error(err, "Failed to get current bucket in BucketLeapArray.currentBucketWithTime()", "now", now)
return nil
}
if curBucket == nil {
logging.Error(errors.New("current bucket is nil"), "Nil curBucket in BucketLeapArray.addCountWithTime()")
return
logging.Error(errors.New("current bucket is nil"), "Nil curBucket in BucketLeapArray.currentBucketWithTime()")
return nil
}
mb := curBucket.Value.Load()
if mb == nil {
logging.Error(errors.New("nil bucket"), "Current bucket atomic Value is nil in BucketLeapArray.addCountWithTime()")
return
logging.Error(errors.New("nil bucket"), "Current bucket atomic Value is nil in BucketLeapArray.currentBucketWithTime()")
return nil
}
b, ok := mb.(*MetricBucket)
if !ok {
logging.Error(errors.New("fail to type assert"), "Bucket data type error in BucketLeapArray.addCountWithTime()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name())
return
logging.Error(errors.New("fail to type assert"), "Bucket data type error in BucketLeapArray.currentBucketWithTime()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name())
return nil
}
b.Add(event, count)
return b
}

// Read method, need to adapt upper application
Expand Down Expand Up @@ -178,3 +198,30 @@ func (bla *BucketLeapArray) MinRt() int64 {
}
return ret
}

func (bla *BucketLeapArray) MaxConcurrency() int32 {
_, err := bla.data.CurrentBucket(bla)
if err != nil {
logging.Error(err, "Failed to get current bucket in BucketLeapArray.MaxConcurrency()")
}

ret := int32(0)

for _, v := range bla.data.Values() {
mb := v.Value.Load()
if mb == nil {
logging.Error(errors.New("current bucket is nil"), "Failed to load current bucket in BucketLeapArray.MaxConcurrency()")
continue
}
b, ok := mb.(*MetricBucket)
if !ok {
logging.Error(errors.New("fail to type assert"), "Bucket data type error in BucketLeapArray.MaxConcurrency()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name())
continue
}
mc := b.MaxConcurrency()
if ret < mc {
ret = mc
}
}
return ret
}
9 changes: 9 additions & 0 deletions core/stat/base/bucket_leap_array_test.go
Expand Up @@ -147,6 +147,15 @@ func TestAddCount(t *testing.T) {
assert.True(t, passCount == 1)
}

func TestUpdateConcurrency(t *testing.T) {
bla := NewBucketLeapArray(SampleCount, IntervalInMs)
bla.UpdateConcurrency(1)
bla.UpdateConcurrency(3)
bla.UpdateConcurrency(2)
mc := bla.MaxConcurrency()
assert.True(t, mc == 3)
}

func TestMinRt(t *testing.T) {
t.Run("TestMinRt_Default", func(t *testing.T) {
bla := NewBucketLeapArray(SampleCount, IntervalInMs)
Expand Down
22 changes: 19 additions & 3 deletions core/stat/base/metric_bucket.go
Expand Up @@ -26,13 +26,15 @@ import (
// Note that all operations of the MetricBucket are required to be thread-safe.
type MetricBucket struct {
// Value of statistic
counter [base.MetricEventTotal]int64
minRt int64
counter [base.MetricEventTotal]int64
minRt int64
maxConcurrency int32
}

func NewMetricBucket() *MetricBucket {
mb := &MetricBucket{
minRt: base.DefaultStatisticMaxRt,
minRt: base.DefaultStatisticMaxRt,
maxConcurrency: 0,
}
return mb
}
Expand Down Expand Up @@ -68,15 +70,29 @@ func (mb *MetricBucket) reset() {
atomic.StoreInt64(&mb.counter[i], 0)
}
atomic.StoreInt64(&mb.minRt, base.DefaultStatisticMaxRt)
atomic.StoreInt32(&mb.maxConcurrency, int32(0))
}

func (mb *MetricBucket) AddRt(rt int64) {
mb.addCount(base.MetricEventRt, rt)
if rt < atomic.LoadInt64(&mb.minRt) {
// Might not be accurate here.
atomic.StoreInt64(&mb.minRt, rt)
}
}

func (mb *MetricBucket) MinRt() int64 {
return atomic.LoadInt64(&mb.minRt)
}

func (mb *MetricBucket) UpdateConcurrency(concurrency int32) {
cc := concurrency
if cc > atomic.LoadInt32(&mb.maxConcurrency) {
// Might not be accurate here.
atomic.StoreInt32(&mb.maxConcurrency, cc)
}
}

func (mb *MetricBucket) MaxConcurrency() int32 {
return atomic.LoadInt32(&mb.maxConcurrency)
}
21 changes: 14 additions & 7 deletions core/stat/base/metric_bucket_test.go
Expand Up @@ -27,25 +27,27 @@ func Test_metricBucket_MemSize(t *testing.T) {
mb := NewMetricBucket()
t.Log("mb:", mb)
size := unsafe.Sizeof(*mb)
if size != 48 {
if size != 56 {
t.Error("unexpect memory size of MetricBucket")
}
}

func Test_metricBucket_Normal(t *testing.T) {
mb := NewMetricBucket()

for i := 0; i < 100; i++ {
if i%5 == 0 {
for i := 0; i < 120; i++ {
if i%6 == 0 {
mb.Add(base.MetricEventPass, 1)
} else if i%5 == 1 {
} else if i%6 == 1 {
mb.Add(base.MetricEventBlock, 1)
} else if i%5 == 2 {
} else if i%6 == 2 {
mb.Add(base.MetricEventComplete, 1)
} else if i%5 == 3 {
} else if i%6 == 3 {
mb.Add(base.MetricEventError, 1)
} else if i%5 == 4 {
} else if i%6 == 4 {
mb.AddRt(100)
} else if i%6 == 5 {
mb.UpdateConcurrency(int32(i))
} else {
t.Error("unexpect idx")
}
Expand All @@ -66,6 +68,9 @@ func Test_metricBucket_Normal(t *testing.T) {
if mb.Get(base.MetricEventRt) != 20*100 {
t.Error("unexpect count MetricEventRt")
}
if mb.MaxConcurrency() != 119 {
t.Error("unexpect count MetricEventConcurrency")
}
}

func Test_metricBucket_Concurrent(t *testing.T) {
Expand Down Expand Up @@ -133,5 +138,7 @@ func Test_Reset(t *testing.T) {
mb.AddRt(100)
mb.reset()
rt := mb.MinRt()
mc := mb.MaxConcurrency()
assert.True(t, rt == base.DefaultStatisticMaxRt)
assert.True(t, mc == int32(0))
}
27 changes: 27 additions & 0 deletions core/stat/base/sliding_window_metric.go
Expand Up @@ -162,6 +162,29 @@ func (m *SlidingWindowMetric) MinRT() float64 {
return float64(minRt)
}

func (m *SlidingWindowMetric) MaxConcurrency() int32 {
now := util.CurrentTimeMillis()
satisfiedBuckets := m.getSatisfiedBuckets(now)
maxConcurrency := int32(0)
for _, w := range satisfiedBuckets {
mb := w.Value.Load()
if mb == nil {
logging.Error(errors.New("nil BucketWrap"), "Current bucket value is nil in SlidingWindowMetric.MaxConcurrency()")
continue
}
counter, ok := mb.(*MetricBucket)
if !ok {
logging.Error(errors.New("type assert failed"), "Fail to do type assert in SlidingWindowMetric.MaxConcurrency()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name())
continue
}
v := counter.MaxConcurrency()
if v > maxConcurrency {
maxConcurrency = v
}
}
return maxConcurrency
}

func (m *SlidingWindowMetric) AvgRT() float64 {
return float64(m.GetSum(base.MetricEventRt)) / float64(m.GetSum(base.MetricEventComplete))
}
Expand Down Expand Up @@ -214,6 +237,10 @@ func (m *SlidingWindowMetric) metricItemFromBuckets(ts uint64, ws []*BucketWrap)
item.BlockQps += uint64(mb.Get(base.MetricEventBlock))
item.ErrorQps += uint64(mb.Get(base.MetricEventError))
item.CompleteQps += uint64(mb.Get(base.MetricEventComplete))
mc := uint32(mb.MaxConcurrency())
if mc > item.Concurrency {
item.Concurrency = mc
}
allRt += mb.Get(base.MetricEventRt)
}
if item.CompleteQps > 0 {
Expand Down
10 changes: 10 additions & 0 deletions core/stat/base/sliding_window_metric_test.go
Expand Up @@ -225,6 +225,16 @@ func TestMinRT(t *testing.T) {
assert.True(t, util.Float64Equals(minRt, float64(base.DefaultStatisticMaxRt)))
}

func TestMaxConcurrency(t *testing.T) {
got, err := NewSlidingWindowMetric(4, 2000, NewBucketLeapArray(SampleCount, IntervalInMs))
assert.True(t, err == nil && got != nil)
got.real.UpdateConcurrency(1)
got.real.UpdateConcurrency(3)
got.real.UpdateConcurrency(2)
mc := got.MaxConcurrency()
assert.True(t, mc == int32(3))
}

func TestAvgRT(t *testing.T) {
got, err := NewSlidingWindowMetric(4, 2000, NewBucketLeapArray(SampleCount, IntervalInMs))
assert.True(t, err == nil && got != nil)
Expand Down
10 changes: 9 additions & 1 deletion core/stat/base_node.go
Expand Up @@ -68,6 +68,10 @@ func (n *BaseStatNode) AddCount(event base.MetricEvent, count int64) {
n.arr.AddCount(event, count)
}

func (n *BaseStatNode) UpdateConcurrency(concurrency int32) {
n.arr.UpdateConcurrency(concurrency)
}

func (n *BaseStatNode) AvgRT() float64 {
complete := n.metric.GetSum(base.MetricEventComplete)
if complete <= 0 {
Expand All @@ -80,12 +84,16 @@ func (n *BaseStatNode) MinRT() float64 {
return float64(n.metric.MinRT())
}

func (n *BaseStatNode) MaxConcurrency() int32 {
return n.metric.MaxConcurrency()
}

func (n *BaseStatNode) CurrentConcurrency() int32 {
return atomic.LoadInt32(&(n.concurrency))
}

func (n *BaseStatNode) IncreaseConcurrency() {
atomic.AddInt32(&(n.concurrency), 1)
n.UpdateConcurrency(atomic.AddInt32(&(n.concurrency), 1))
}

func (n *BaseStatNode) DecreaseConcurrency() {
Expand Down

0 comments on commit aafce3f

Please sign in to comment.