From 47f59ba8adb87521eaceb5bb2e13530ce87648a9 Mon Sep 17 00:00:00 2001 From: bom-d-van Date: Wed, 18 May 2022 12:42:39 +0200 Subject: [PATCH] quota: delay throughput counter update in trieindex.throttle When throughput quota usage might be exhausted in a child rule, in the previous implementation, the throughput counter would be updated immediately after the check, this might lead to over-accounting for parent quota configs. The comment changed the implementation to only update the counter if the traffic is confirmed to be within quota consumption. At the same time, go-carbon also makes sure to report throttled data points for soft throughput quota enforcement (i.e. with "none" dropping policy). --- carbonserver/trie.go | 41 +++++++++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/carbonserver/trie.go b/carbonserver/trie.go index 7152f6bd3..ca7c82c4f 100644 --- a/carbonserver/trie.go +++ b/carbonserver/trie.go @@ -2077,13 +2077,13 @@ func (ti *trieIndex) getNodeFullPath(node *trieNode) string { // skipcq: SCC-U10 // skipcq: RVV-A0005 func (ti *trieIndex) throttle(ps *points.Points, inCache bool) bool { - throughput := int64(len(ps.Data)) - if throughput == 0 { + dataLen := int64(len(ps.Data)) + if dataLen == 0 { // WHY: in theory, this should not happen. But in cases where // go-carbon receives a metric without data points, it should // still be counted as one data point, for throttling and // throughput accounting. - throughput = 1 + dataLen = 1 } // WHY: hashes work much faster than the trie tree working. @@ -2103,15 +2103,20 @@ func (ti *trieIndex) throttle(ps *points.Points, inCache bool) bool { // over-consumption of quota, which itself is arguably as bad // as full throttling. + tus := make([]*throughputUsagePerNamespace, 0, 8) + // TODO: should include in throughputs.depth and simplify the code here a bit. // check root throughput capacity if v := throughputs.load("/"); v != nil { - if v.quota().DroppingPolicy != QDPNone && !v.withinQuota(len(ps.Data), ti.resetFrequency) { - atomic.AddInt64(&v.quotaUsage().Throttled, throughput) - return true + if !v.withinQuota(dataLen, ti.resetFrequency) { + atomic.AddInt64(&v.quotaUsage().Throttled, dataLen) + + if v.quota().DroppingPolicy != QDPNone { + return true + } } - v.increase(len(ps.Data)) + tus = append(tus, v) } for i, d := 0, 0; d <= throughputs.depth && i < len(ps.Metric); i++ { @@ -2121,15 +2126,27 @@ func (ti *trieIndex) throttle(ps *points.Points, inCache bool) bool { ns := ps.Metric[:i] if v := throughputs.load(ns); v != nil { - if v.quota().DroppingPolicy != QDPNone && !v.withinQuota(len(ps.Data), ti.resetFrequency) { - atomic.AddInt64(&v.quotaUsage().Throttled, throughput) - return true + if !v.withinQuota(dataLen, ti.resetFrequency) { + atomic.AddInt64(&v.quotaUsage().Throttled, dataLen) + + if v.quota().DroppingPolicy != QDPNone { + return true + } } - v.increase(len(ps.Data)) + tus = append(tus, v) } d++ } + + // batch increasing the counter to avoid potential + // over-throttling/accounting in parent nodes as the data is + // later on dropped due to lower child quotas. + // + // TODO: add a test + for _, tu := range tus { + tu.increase(dataLen) + } } // quick pass if the metric is already seen in cache @@ -2227,7 +2244,7 @@ mloop: } if meta.usage != nil { - atomic.AddInt64(&meta.usage.Throttled, throughput) + atomic.AddInt64(&meta.usage.Throttled, dataLen) } if quota, ok := meta.quota.Load().(*Quota); ok && quota != nil && quota.DroppingPolicy == QDPNone {