Skip to content

Commit

Permalink
quota: delay throughput counter update in trieindex.throttle
Browse files Browse the repository at this point in the history
When throughput quota usage might be exhausted in a child rule, in the previous implementation,
the throughput counter would be updated immediately after the check, this might lead to over-accounting
for parent quota configs.

The comment changed the implementation to only update the counter if the traffic is confirmed
to be within quota consumption.

At the same time, go-carbon also makes sure to report throttled data points for soft throughput quota
enforcement (i.e. with "none" dropping policy).
  • Loading branch information
bom-d-van committed May 18, 2022
1 parent e4ce171 commit 47f59ba
Showing 1 changed file with 29 additions and 12 deletions.
41 changes: 29 additions & 12 deletions carbonserver/trie.go
Expand Up @@ -2077,13 +2077,13 @@ func (ti *trieIndex) getNodeFullPath(node *trieNode) string { // skipcq: SCC-U10

// skipcq: RVV-A0005
func (ti *trieIndex) throttle(ps *points.Points, inCache bool) bool {
throughput := int64(len(ps.Data))
if throughput == 0 {
dataLen := int64(len(ps.Data))
if dataLen == 0 {
// WHY: in theory, this should not happen. But in cases where
// go-carbon receives a metric without data points, it should
// still be counted as one data point, for throttling and
// throughput accounting.
throughput = 1
dataLen = 1
}

// WHY: hashes work much faster than the trie tree working.
Expand All @@ -2103,15 +2103,20 @@ func (ti *trieIndex) throttle(ps *points.Points, inCache bool) bool {
// over-consumption of quota, which itself is arguably as bad
// as full throttling.

tus := make([]*throughputUsagePerNamespace, 0, 8)

// TODO: should include in throughputs.depth and simplify the code here a bit.
// check root throughput capacity
if v := throughputs.load("/"); v != nil {
if v.quota().DroppingPolicy != QDPNone && !v.withinQuota(len(ps.Data), ti.resetFrequency) {
atomic.AddInt64(&v.quotaUsage().Throttled, throughput)
return true
if !v.withinQuota(dataLen, ti.resetFrequency) {
atomic.AddInt64(&v.quotaUsage().Throttled, dataLen)

if v.quota().DroppingPolicy != QDPNone {
return true
}
}

v.increase(len(ps.Data))
tus = append(tus, v)
}

for i, d := 0, 0; d <= throughputs.depth && i < len(ps.Metric); i++ {
Expand All @@ -2121,15 +2126,27 @@ func (ti *trieIndex) throttle(ps *points.Points, inCache bool) bool {

ns := ps.Metric[:i]
if v := throughputs.load(ns); v != nil {
if v.quota().DroppingPolicy != QDPNone && !v.withinQuota(len(ps.Data), ti.resetFrequency) {
atomic.AddInt64(&v.quotaUsage().Throttled, throughput)
return true
if !v.withinQuota(dataLen, ti.resetFrequency) {
atomic.AddInt64(&v.quotaUsage().Throttled, dataLen)

if v.quota().DroppingPolicy != QDPNone {
return true
}
}

v.increase(len(ps.Data))
tus = append(tus, v)
}
d++
}

// batch increasing the counter to avoid potential
// over-throttling/accounting in parent nodes as the data is
// later on dropped due to lower child quotas.
//
// TODO: add a test
for _, tu := range tus {
tu.increase(dataLen)
}
}

// quick pass if the metric is already seen in cache
Expand Down Expand Up @@ -2227,7 +2244,7 @@ mloop:
}

if meta.usage != nil {
atomic.AddInt64(&meta.usage.Throttled, throughput)
atomic.AddInt64(&meta.usage.Throttled, dataLen)
}

if quota, ok := meta.quota.Load().(*Quota); ok && quota != nil && quota.DroppingPolicy == QDPNone {
Expand Down

0 comments on commit 47f59ba

Please sign in to comment.