Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record max concurrency in MetricItem and metric logs #371

Merged
merged 10 commits into from Jan 8, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/base/constant.go
Expand Up @@ -29,4 +29,6 @@ const (
DefaultIntervalMsTotal uint32 = 10000

DefaultStatisticMaxRt = int64(60000)

DefaultStatisticMinConcurrency = int32(0)
liqiangz marked this conversation as resolved.
Show resolved Hide resolved
)
2 changes: 2 additions & 0 deletions core/base/stat.go
Expand Up @@ -35,6 +35,8 @@ const (
MetricEventError
// request execute rt, unit is millisecond
MetricEventRt
// the Concurrency statistic
MetricEventConcurrency
// hack for the number of event
MetricEventTotal
)
Expand Down
24 changes: 21 additions & 3 deletions core/stat/base/metric_bucket.go
Expand Up @@ -26,13 +26,15 @@ import (
// Note that all operations of the MetricBucket are required to be thread-safe.
type MetricBucket struct {
// Value of statistic
counter [base.MetricEventTotal]int64
minRt int64
counter [base.MetricEventTotal]int64
minRt int64
maxConcurrency int32
liqiangz marked this conversation as resolved.
Show resolved Hide resolved
}

func NewMetricBucket() *MetricBucket {
mb := &MetricBucket{
minRt: base.DefaultStatisticMaxRt,
minRt: base.DefaultStatisticMaxRt,
maxConcurrency: base.DefaultStatisticMinConcurrency,
}
return mb
}
Expand All @@ -47,6 +49,10 @@ func (mb *MetricBucket) Add(event base.MetricEvent, count int64) {
mb.AddRt(count)
return
}
if event == base.MetricEventConcurrency {
liqiangz marked this conversation as resolved.
Show resolved Hide resolved
mb.SetConcurrency(count)
return
}
mb.addCount(event, count)
}

Expand All @@ -68,6 +74,7 @@ func (mb *MetricBucket) reset() {
atomic.StoreInt64(&mb.counter[i], 0)
}
atomic.StoreInt64(&mb.minRt, base.DefaultStatisticMaxRt)
atomic.StoreInt32(&mb.maxConcurrency, base.DefaultStatisticMinConcurrency)
}

func (mb *MetricBucket) AddRt(rt int64) {
Expand All @@ -80,3 +87,14 @@ func (mb *MetricBucket) AddRt(rt int64) {
func (mb *MetricBucket) MinRt() int64 {
return atomic.LoadInt64(&mb.minRt)
}

func (mb *MetricBucket) SetConcurrency(concurrency int64) {
cc := int32(concurrency)
if cc > atomic.LoadInt32(&mb.maxConcurrency) {
atomic.StoreInt32(&mb.maxConcurrency, cc)
}
}

func (mb *MetricBucket) MaxConcurrency() int32 {
return atomic.LoadInt32(&mb.maxConcurrency)
}
21 changes: 14 additions & 7 deletions core/stat/base/metric_bucket_test.go
Expand Up @@ -27,25 +27,27 @@ func Test_metricBucket_MemSize(t *testing.T) {
mb := NewMetricBucket()
t.Log("mb:", mb)
size := unsafe.Sizeof(*mb)
if size != 48 {
if size != 64 {
t.Error("unexpect memory size of MetricBucket")
}
}

func Test_metricBucket_Normal(t *testing.T) {
mb := NewMetricBucket()

for i := 0; i < 100; i++ {
if i%5 == 0 {
for i := 0; i < 120; i++ {
if i%6 == 0 {
mb.Add(base.MetricEventPass, 1)
} else if i%5 == 1 {
} else if i%6 == 1 {
mb.Add(base.MetricEventBlock, 1)
} else if i%5 == 2 {
} else if i%6 == 2 {
mb.Add(base.MetricEventComplete, 1)
} else if i%5 == 3 {
} else if i%6 == 3 {
mb.Add(base.MetricEventError, 1)
} else if i%5 == 4 {
} else if i%6 == 4 {
mb.AddRt(100)
} else if i%6 == 5 {
mb.SetConcurrency(int64(i))
} else {
t.Error("unexpect idx")
}
Expand All @@ -66,6 +68,9 @@ func Test_metricBucket_Normal(t *testing.T) {
if mb.Get(base.MetricEventRt) != 20*100 {
t.Error("unexpect count MetricEventRt")
}
if mb.MaxConcurrency() != 119 {
t.Error("unexpect count MetricEventConcurrency")
}
}

func Test_metricBucket_Concurrent(t *testing.T) {
Expand Down Expand Up @@ -133,5 +138,7 @@ func Test_Reset(t *testing.T) {
mb.AddRt(100)
mb.reset()
rt := mb.MinRt()
mc := mb.MaxConcurrency()
assert.True(t, rt == base.DefaultStatisticMaxRt)
assert.True(t, mc == base.DefaultStatisticMinConcurrency)
}
27 changes: 27 additions & 0 deletions core/stat/base/sliding_window_metric.go
Expand Up @@ -162,6 +162,29 @@ func (m *SlidingWindowMetric) MinRT() float64 {
return float64(minRt)
}

func (m *SlidingWindowMetric) MaxConcurrency() int32 {
now := util.CurrentTimeMillis()
satisfiedBuckets := m.getSatisfiedBuckets(now)
maxConcurrency := base.DefaultStatisticMinConcurrency
for _, w := range satisfiedBuckets {
mb := w.Value.Load()
if mb == nil {
logging.Error(errors.New("nil BucketWrap"), "Current bucket value is nil in SlidingWindowMetric.MaxConcurrency()")
continue
}
counter, ok := mb.(*MetricBucket)
if !ok {
logging.Error(errors.New("type assert failed"), "Fail to do type assert in SlidingWindowMetric.MaxConcurrency()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name())
continue
}
v := counter.MaxConcurrency()
if v > maxConcurrency {
maxConcurrency = v
}
}
return maxConcurrency
}

func (m *SlidingWindowMetric) AvgRT() float64 {
return float64(m.GetSum(base.MetricEventRt)) / float64(m.GetSum(base.MetricEventComplete))
}
Expand Down Expand Up @@ -214,6 +237,10 @@ func (m *SlidingWindowMetric) metricItemFromBuckets(ts uint64, ws []*BucketWrap)
item.BlockQps += uint64(mb.Get(base.MetricEventBlock))
item.ErrorQps += uint64(mb.Get(base.MetricEventError))
item.CompleteQps += uint64(mb.Get(base.MetricEventComplete))
mc := uint32(mb.MaxConcurrency())
if mc > item.Concurrency {
item.Concurrency = mc
}
allRt += mb.Get(base.MetricEventRt)
}
if item.CompleteQps > 0 {
Expand Down
5 changes: 5 additions & 0 deletions core/stat/base_node.go
Expand Up @@ -80,12 +80,17 @@ func (n *BaseStatNode) MinRT() float64 {
return float64(n.metric.MinRT())
}

func (n *BaseStatNode) MaxConcurrency() int32 {
return n.metric.MaxConcurrency()
}

func (n *BaseStatNode) CurrentConcurrency() int32 {
return atomic.LoadInt32(&(n.concurrency))
}

func (n *BaseStatNode) IncreaseConcurrency() {
atomic.AddInt32(&(n.concurrency), 1)
liqiangz marked this conversation as resolved.
Show resolved Hide resolved
n.AddCount(base.MetricEventConcurrency, int64(atomic.LoadInt32(&(n.concurrency))))
}

func (n *BaseStatNode) DecreaseConcurrency() {
Expand Down