Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record max concurrency in MetricItem and metric logs #371

Merged
merged 10 commits into from Jan 8, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 56 additions & 9 deletions core/stat/base/bucket_leap_array.go
Expand Up @@ -89,26 +89,46 @@ func (bla *BucketLeapArray) AddCount(event base.MetricEvent, count int64) {
}

func (bla *BucketLeapArray) addCountWithTime(now uint64, event base.MetricEvent, count int64) {
b := bla.currentBucketWithTime(now)
if b == nil {
return
}
b.Add(event, count)
}

func (bla *BucketLeapArray) UpdateConcurrency(concurrency int32) {
bla.updateConcurrencyWithTime(util.CurrentTimeMillis(), concurrency)
}

func (bla *BucketLeapArray) updateConcurrencyWithTime(now uint64, concurrency int32) {
b := bla.currentBucketWithTime(now)
if b == nil {
return
}
b.UpdateConcurrency(concurrency)
}

func (bla *BucketLeapArray) currentBucketWithTime(now uint64) *MetricBucket {
curBucket, err := bla.data.currentBucketOfTime(now, bla)
if err != nil {
logging.Error(err, "Failed to get current bucket in BucketLeapArray.addCountWithTime()", "now", now)
return
logging.Error(err, "Failed to get current bucket in BucketLeapArray.currentBucketWithTime()", "now", now)
return nil
}
if curBucket == nil {
logging.Error(errors.New("current bucket is nil"), "Nil curBucket in BucketLeapArray.addCountWithTime()")
return
logging.Error(errors.New("current bucket is nil"), "Nil curBucket in BucketLeapArray.currentBucketWithTime()")
return nil
}
mb := curBucket.Value.Load()
if mb == nil {
logging.Error(errors.New("nil bucket"), "Current bucket atomic Value is nil in BucketLeapArray.addCountWithTime()")
return
logging.Error(errors.New("nil bucket"), "Current bucket atomic Value is nil in BucketLeapArray.currentBucketWithTime()")
return nil
}
b, ok := mb.(*MetricBucket)
if !ok {
logging.Error(errors.New("fail to type assert"), "Bucket data type error in BucketLeapArray.addCountWithTime()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name())
return
logging.Error(errors.New("fail to type assert"), "Bucket data type error in BucketLeapArray.currentBucketWithTime()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name())
return nil
}
b.Add(event, count)
return b
}

// Read method, need to adapt upper application
Expand Down Expand Up @@ -178,3 +198,30 @@ func (bla *BucketLeapArray) MinRt() int64 {
}
return ret
}

func (bla *BucketLeapArray) MaxConcurrency() int32 {
_, err := bla.data.CurrentBucket(bla)
if err != nil {
logging.Error(err, "Failed to get current bucket in BucketLeapArray.MaxConcurrency()")
}

ret := int32(0)

for _, v := range bla.data.Values() {
mb := v.Value.Load()
if mb == nil {
logging.Error(errors.New("current bucket is nil"), "Failed to load current bucket in BucketLeapArray.MaxConcurrency()")
continue
}
b, ok := mb.(*MetricBucket)
if !ok {
logging.Error(errors.New("fail to type assert"), "Bucket data type error in BucketLeapArray.MaxConcurrency()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name())
continue
}
mc := b.MaxConcurrency()
if ret < mc {
ret = mc
}
}
return ret
}
9 changes: 9 additions & 0 deletions core/stat/base/bucket_leap_array_test.go
Expand Up @@ -147,6 +147,15 @@ func TestAddCount(t *testing.T) {
assert.True(t, passCount == 1)
}

func TestUpdateConcurrency(t *testing.T) {
bla := NewBucketLeapArray(SampleCount, IntervalInMs)
bla.UpdateConcurrency(1)
bla.UpdateConcurrency(3)
bla.UpdateConcurrency(2)
mc := bla.MaxConcurrency()
assert.True(t, mc == 3)
}

func TestMinRt(t *testing.T) {
t.Run("TestMinRt_Default", func(t *testing.T) {
bla := NewBucketLeapArray(SampleCount, IntervalInMs)
Expand Down
20 changes: 17 additions & 3 deletions core/stat/base/metric_bucket.go
Expand Up @@ -26,13 +26,15 @@ import (
// Note that all operations of the MetricBucket are required to be thread-safe.
type MetricBucket struct {
// Value of statistic
counter [base.MetricEventTotal]int64
minRt int64
counter [base.MetricEventTotal]int64
minRt int64
maxConcurrency int32
liqiangz marked this conversation as resolved.
Show resolved Hide resolved
}

func NewMetricBucket() *MetricBucket {
mb := &MetricBucket{
minRt: base.DefaultStatisticMaxRt,
minRt: base.DefaultStatisticMaxRt,
maxConcurrency: int32(0),
liqiangz marked this conversation as resolved.
Show resolved Hide resolved
}
return mb
}
Expand Down Expand Up @@ -68,6 +70,7 @@ func (mb *MetricBucket) reset() {
atomic.StoreInt64(&mb.counter[i], 0)
}
atomic.StoreInt64(&mb.minRt, base.DefaultStatisticMaxRt)
atomic.StoreInt32(&mb.maxConcurrency, int32(0))
}

func (mb *MetricBucket) AddRt(rt int64) {
Expand All @@ -80,3 +83,14 @@ func (mb *MetricBucket) AddRt(rt int64) {
func (mb *MetricBucket) MinRt() int64 {
return atomic.LoadInt64(&mb.minRt)
}

func (mb *MetricBucket) UpdateConcurrency(concurrency int32) {
cc := concurrency
if cc > atomic.LoadInt32(&mb.maxConcurrency) {
atomic.StoreInt32(&mb.maxConcurrency, cc)
}
}

func (mb *MetricBucket) MaxConcurrency() int32 {
return atomic.LoadInt32(&mb.maxConcurrency)
}
21 changes: 14 additions & 7 deletions core/stat/base/metric_bucket_test.go
Expand Up @@ -27,25 +27,27 @@ func Test_metricBucket_MemSize(t *testing.T) {
mb := NewMetricBucket()
t.Log("mb:", mb)
size := unsafe.Sizeof(*mb)
if size != 48 {
if size != 56 {
t.Error("unexpect memory size of MetricBucket")
}
}

func Test_metricBucket_Normal(t *testing.T) {
mb := NewMetricBucket()

for i := 0; i < 100; i++ {
if i%5 == 0 {
for i := 0; i < 120; i++ {
if i%6 == 0 {
mb.Add(base.MetricEventPass, 1)
} else if i%5 == 1 {
} else if i%6 == 1 {
mb.Add(base.MetricEventBlock, 1)
} else if i%5 == 2 {
} else if i%6 == 2 {
mb.Add(base.MetricEventComplete, 1)
} else if i%5 == 3 {
} else if i%6 == 3 {
mb.Add(base.MetricEventError, 1)
} else if i%5 == 4 {
} else if i%6 == 4 {
mb.AddRt(100)
} else if i%6 == 5 {
mb.UpdateConcurrency(int32(i))
} else {
t.Error("unexpect idx")
}
Expand All @@ -66,6 +68,9 @@ func Test_metricBucket_Normal(t *testing.T) {
if mb.Get(base.MetricEventRt) != 20*100 {
t.Error("unexpect count MetricEventRt")
}
if mb.MaxConcurrency() != 119 {
t.Error("unexpect count MetricEventConcurrency")
}
}

func Test_metricBucket_Concurrent(t *testing.T) {
Expand Down Expand Up @@ -133,5 +138,7 @@ func Test_Reset(t *testing.T) {
mb.AddRt(100)
mb.reset()
rt := mb.MinRt()
mc := mb.MaxConcurrency()
assert.True(t, rt == base.DefaultStatisticMaxRt)
assert.True(t, mc == int32(0))
}
27 changes: 27 additions & 0 deletions core/stat/base/sliding_window_metric.go
Expand Up @@ -162,6 +162,29 @@ func (m *SlidingWindowMetric) MinRT() float64 {
return float64(minRt)
}

func (m *SlidingWindowMetric) MaxConcurrency() int32 {
now := util.CurrentTimeMillis()
satisfiedBuckets := m.getSatisfiedBuckets(now)
maxConcurrency := int32(0)
for _, w := range satisfiedBuckets {
mb := w.Value.Load()
if mb == nil {
logging.Error(errors.New("nil BucketWrap"), "Current bucket value is nil in SlidingWindowMetric.MaxConcurrency()")
continue
}
counter, ok := mb.(*MetricBucket)
if !ok {
logging.Error(errors.New("type assert failed"), "Fail to do type assert in SlidingWindowMetric.MaxConcurrency()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name())
continue
}
v := counter.MaxConcurrency()
if v > maxConcurrency {
maxConcurrency = v
}
}
return maxConcurrency
}

func (m *SlidingWindowMetric) AvgRT() float64 {
return float64(m.GetSum(base.MetricEventRt)) / float64(m.GetSum(base.MetricEventComplete))
}
Expand Down Expand Up @@ -214,6 +237,10 @@ func (m *SlidingWindowMetric) metricItemFromBuckets(ts uint64, ws []*BucketWrap)
item.BlockQps += uint64(mb.Get(base.MetricEventBlock))
item.ErrorQps += uint64(mb.Get(base.MetricEventError))
item.CompleteQps += uint64(mb.Get(base.MetricEventComplete))
mc := uint32(mb.MaxConcurrency())
if mc > item.Concurrency {
item.Concurrency = mc
}
allRt += mb.Get(base.MetricEventRt)
}
if item.CompleteQps > 0 {
Expand Down
10 changes: 10 additions & 0 deletions core/stat/base/sliding_window_metric_test.go
Expand Up @@ -225,6 +225,16 @@ func TestMinRT(t *testing.T) {
assert.True(t, util.Float64Equals(minRt, float64(base.DefaultStatisticMaxRt)))
}

func TestMaxConcurrency(t *testing.T) {
got, err := NewSlidingWindowMetric(4, 2000, NewBucketLeapArray(SampleCount, IntervalInMs))
assert.True(t, err == nil && got != nil)
got.real.UpdateConcurrency(1)
got.real.UpdateConcurrency(3)
got.real.UpdateConcurrency(2)
mc := got.MaxConcurrency()
assert.True(t, mc == int32(3))
}

func TestAvgRT(t *testing.T) {
got, err := NewSlidingWindowMetric(4, 2000, NewBucketLeapArray(SampleCount, IntervalInMs))
assert.True(t, err == nil && got != nil)
Expand Down
9 changes: 9 additions & 0 deletions core/stat/base_node.go
Expand Up @@ -68,6 +68,10 @@ func (n *BaseStatNode) AddCount(event base.MetricEvent, count int64) {
n.arr.AddCount(event, count)
}

func (n *BaseStatNode) UpdateConcurrency(concurrency int32) {
n.arr.UpdateConcurrency(concurrency)
}

func (n *BaseStatNode) AvgRT() float64 {
complete := n.metric.GetSum(base.MetricEventComplete)
if complete <= 0 {
Expand All @@ -80,12 +84,17 @@ func (n *BaseStatNode) MinRT() float64 {
return float64(n.metric.MinRT())
}

func (n *BaseStatNode) MaxConcurrency() int32 {
return n.metric.MaxConcurrency()
}

func (n *BaseStatNode) CurrentConcurrency() int32 {
return atomic.LoadInt32(&(n.concurrency))
}

func (n *BaseStatNode) IncreaseConcurrency() {
atomic.AddInt32(&(n.concurrency), 1)
liqiangz marked this conversation as resolved.
Show resolved Hide resolved
n.UpdateConcurrency(atomic.LoadInt32(&(n.concurrency)))
}

func (n *BaseStatNode) DecreaseConcurrency() {
Expand Down