-
Notifications
You must be signed in to change notification settings - Fork 100
Emit latency metric for forwarder requests #750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
if err = post(); err == nil { | ||
atomic.AddUint64(&hfh.messagesSent, 1) | ||
hfh.lastSuccessfulSend.Store(clock.Now(ctx).UnixNano()) | ||
statser.TimingDuration("http.forwarder.post_duration", clock.Since(ctx, startTime), gostatsd.Tags{"status:success"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are per-host timers, it will be a huge cardinality explosion.
For things like this we accumulate data in a variable (make sure to be thread safe), and emit a single value for what we actually want when we flush. See the emitMetrics
function, and that data it consumes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I didn't consider that the internalStatser added a host tag.
But the metrics reported from emitMetrics
are counters and this is latency, should we aggregate it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the cardinality explosion, I reckon we can aggregate these on our side and get host dimension nuked out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've got 3 primary concerns here:
- I have always tried to keep "Atlassianisms" out of gostatsd (there's some in cmd/tester though!). Saying "Atlassian can use our metrics sink to deal with a non-Atlassian problem" goes against DFTC. We should avoid making changes to gostatsd which are useful for all, not for us.
- Sending data through the generic aggregation pipeline burns CPU. Stripping data out later also burns CPU, all along the pipeline. I've always tried to make gostatsd efficient, and not do unnecessary work. An atomic increment vs sending multiple values through a pipeline to eventually be sorted and have numerous calculations performed on it.
- With all my projects, I try to emit telemetry to answer a specific question. Here, the question seems under-specified: "emit latency". But what is that?
- "What is the latency" is a question that requires logging every value, because it's a question without aggregation.
- "What is the max latency" can be generated with a timer, or tracking it in a max variable we flush. The former burns CPU, the latter gives us the ability to decide what we actually want, and only what we want.
- "What is the average latency" can be generated with a timer, or tracking a sum/count which we flush (as two values - we might already have a count though). Again, the former burns CPU, the latter gives us flexibility. We can look at long term averages by doing the math on the backend if we want.
@@ -299,6 +302,8 @@ func (hfh *HttpForwarderHandlerV2) emitMetrics(statser stats.Statser) { | |||
statser.Report("http.forwarder.sent", &hfh.messagesSent, nil) | |||
statser.Report("http.forwarder.retried", &hfh.messagesRetried, nil) | |||
statser.Report("http.forwarder.dropped", &hfh.messagesDropped, nil) | |||
statser.TimingMS("http.forwarder.post_latency.max", float64(hfh.postLatencyMax.Load()), nil) | |||
statser.TimingMS("http.forwarder.post_latency.avg", float64(hfh.postLatencyTotal)/float64(hfh.messagesSent), nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- will
messageSent
ever be 0? hfh.postLatencyTotal
never gets reset to 0, so the result will be incorrect.
|
||
postLatency := clock.Since(ctx, startTime) | ||
atomic.AddInt64(&hfh.postLatencyTotal, postLatency.Milliseconds()) | ||
if postLatency.Milliseconds() > hfh.postLatencyMax.Load() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a race condition, consider if there's two goroutines running here (which is entirely possible, HttpForwarderHandlerV2.Run
will launch goroutines for this, on line 333).
Assuming:
- postLatencyMax = 10
- Goroutine 1 has postLatency = 30
- Goroutine 2 has postLatency = 20
Both enter the if statement
- Goroutine 1 stores its data (max = 30)
- Goroutine 2 stores its data (max = 20)
This can be done via a "simple" loop, instead of a single assignment:
for old := maxAccum.Load(); old < latency; old = maxAccum.Load() {
if maxAccum.CompareAndSwap(old, latency) {
break
}
}
The CompareAndSwap
is an atomic operation which does:
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) {
if *addr == old {
*addr = new
return true
} else {
return false
}
}
Note that this is done atomically, the read and write are guaranteed to occur "at the same time", they can't conflict with other threads.
Now what will happen is:
- 1 will load the old value of 10 (note: as soon as you read it, it becomes "old", because it can change)
- 2 will load the old value of 10
- 1 will compare the old value of 10 to the latency of 30 and decide to update
- 2 will compare the old value of 10 to the latency of 20 and decide to update
- 1 will execute a CompareAndSwap, which compares the stored value of 10 to the old value of 10, sees they're the same, stores the new value 30, and returns true
- 1 exits the loop
- 2 will execute a CompareAndSwap, which compares the stored value of 30 to the old value of 10, sees they're different, and returns false
- 2 will read the old value again and get 30
- 2 will compare its latency of 20 to the old value of 30
- 2 exits the loop
Or...
- 1 will load the old value of 10
- 2 will load the old value of 10
- 1 will compare the old value of 10 to the latency of 30 and decide to update
- 2 will compare the old value of 10 to the latency of 20 and decide to update
- 2 will execute a CompareAndSwap, which compares the stored value of 10 to the old value of 10, sees they're the same, stores the new value 20, and returns true
- 2 exits the loop
- 1 will execute a CompareAndSwap, which compares the stored value of 20 to the old value of 10, sees they're different, and returns false
- 1 will load the old value again, and get 20
- 1 will compare its latency of 30 to the old value of 20, and try to update it
- 1 will execute a CompareAndSwap, which compares the stored value of 20 to the old value of 20, sees they're the same, stores the new value of 30, and returns true
- 1 exits the loop
This can be more generally thought of as the following sequence:
- Load some data
- Compute a value based on that data (this can be an "expensive" operation)
- Check if the value has changed and update it
- If it failed, retry
- If it succeeded, you're done
Or in code:
func AtomicFunction(
value *atomic.Int64,
fn func(old int64) int64,
) {
for {
old := value.Load() // Load some data
new := fn(old) // Compute a value based on the data
if value.CompareAndSwap(old, new) { // Check if the value has changed and update it
return // If it succeded, great
}
// If it didn't, start again
}
}
func atomicMax(value *atomic.Int64, new int64) {
AtomicFunction(value, func(old int64) int64 {
// This can do more or less anything, and we don't have to think about the atomic stuff, it's just inputs -> outputs.
// Note that it must be inputs -> outputs, it can't have side-effects, because it may be ran multiple times.
return max(old, new)
})
}
@@ -299,6 +302,8 @@ func (hfh *HttpForwarderHandlerV2) emitMetrics(statser stats.Statser) { | |||
statser.Report("http.forwarder.sent", &hfh.messagesSent, nil) | |||
statser.Report("http.forwarder.retried", &hfh.messagesRetried, nil) | |||
statser.Report("http.forwarder.dropped", &hfh.messagesDropped, nil) | |||
statser.TimingMS("http.forwarder.post_latency.max", float64(hfh.postLatencyMax.Load()), nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a tricky decision on what to emit here. There's going to be several values sent in the aggregator flush interval (remember the forwarder flushes a lot faster than the aggregator).
Right now, we're getting the lifetime maximum, since we don't reset the max. That may be acceptable?
If we reset the max:
- If we emit a timer, then we're back in the same boat of "all the cardinality".
- If we emit a gauge, then only the last forwarder flush interval will be recorded.
If we don't reset the max:
- If we emit a timer, then we're back in the same boat of "all the cardinality".
- If we emit a gauge, then we get the process lifetime maximum (which may be fine)
@@ -299,6 +302,8 @@ func (hfh *HttpForwarderHandlerV2) emitMetrics(statser stats.Statser) { | |||
statser.Report("http.forwarder.sent", &hfh.messagesSent, nil) | |||
statser.Report("http.forwarder.retried", &hfh.messagesRetried, nil) | |||
statser.Report("http.forwarder.dropped", &hfh.messagesDropped, nil) | |||
statser.TimingMS("http.forwarder.post_latency.max", float64(hfh.postLatencyMax.Load()), nil) | |||
statser.TimingMS("http.forwarder.post_latency.avg", float64(hfh.postLatencyTotal)/float64(hfh.messagesSent), nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hfh.messagesSent
is atomic - it was written before the atomic packages, hence the comment about atomic
in the struct. As such, you must use atomic.LoadInt64()
.
However... Report
won't just report, if we're in forwarder mode, it will also reset to 0:
func (is *InternalStatser) Report(name string, value *uint64, tags gostatsd.Tags) {
if is.forwarderMode {
val := float64(atomic.SwapUint64(value, 0)) // <-- swap
This is a case of side-effects causing problems. We can restructure this like:
func (hfh *HttpForwarderHandlerV2) emitMetrics(statser stats.Statser) {
messagesInvalid := float64(atomic.SwapUint64(&hfh.messagesInvalid, 0))
messagesCreated := float64(atomic.SwapUint64(&hfh.messagesCreated, 0))
messagesSent := float64(atomic.SwapUint64(&hfh.messagesSent, 0))
messagesRetried := float64(atomic.SwapUint64(&hfh.messagesRetried, 0))
messagesDropped := float64(atomic.SwapUint64(&hfh.messagesDropped, 0))
statser.Count("http.forwarder.invalid", messagesInvalid, nil)
statser.Count("http.forwarder.created", messagesCreated, nil)
statser.Count("http.forwarder.sent", messagesSent, nil)
statser.Count("http.forwarder.retried", messagesRetried, nil)
statser.Count("http.forwarder.dropped", messagesDropped, nil)
}
Note that we can only do this behavior here because we know we're in a forwarder. If we were somewhere outside of a forwarder, we would have to know if we should use swap or load.
Now that we have our data in a variable, we can then do:
postLatencyTotal := float64(atomic.SwapUint64(&hfh.postLatencyTotal, 0))
statser.???("http.forwarder.post_latency.avg", hfh.postLatencyTotal/messagesSent, nil)
(See previous comment about which metric type to use)
However... averages aren't super helpful, it's more useful to emit a counter:
statser.Count("http.forwarder.post_latency.sum", float64(hfh.postLatencyTotal), nil)
What this means is on the backend we can query:
Sum = data('http.forwarder.post_latency.sum.count', rollup='sum')
Total = data('http.forwarder.sent.count', rollup='sum')
(Sum/Total).publish('Average latency')
This lets us see the average of any time frame, because we can perform a sum across both the latency and the sent, and get meaningful data.
messagesSent uint64 // atomic - messages successfully sent | ||
messagesRetried uint64 // atomic - retries (first send is not a retry, final failure is not a retry) | ||
messagesDropped uint64 // atomic - final failure | ||
postLatencyTotal int64 // atomic - total of the time taken to send messages in a flush interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make this atomic.Int64
as well. This code pre-dates the atomic
package, hence why we don't have it.
postLatencyMax := hfh.postLatencyMax.Swap(0) | ||
postLatencyTotal := hfh.postLatencyTotal.Swap(0) | ||
statser.Gauge("http.forwarder.post_latency.max", float64(postLatencyMax), nil) | ||
statser.Count("http.forwarder.post_latency.sum", float64(postLatencyTotal), nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just use Report
here since what is does is exactly reset it to 0 and send a counter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Report
function takes a *uint64
and uses atomic function, as it was written before we had the atomic
package. Since the atomic
package is better (less error prone due to actual types), it's better to use that. There's an argument to say we should have Report
take an atomic.Uint64
, and refactor everything for it, but...
The Report
function isn't great - it sounds like it reports a metric, but it doesn't, it also does a reset, and that's not at all clear. I would advocate to remove it entirely, if we were doing a refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tiedotguy could you take a look as well?
|
||
postLatency := clock.Since(ctx, startTime).Milliseconds() | ||
hfh.postLatencyTotal.Add(postLatency) | ||
for old := hfh.postLatencyMax.Load(); old < postLatency; old = hfh.postLatencyMax.Load() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a loop that maybe could use a comment, since if you're not familiar with concurrency patterns, it's not immediately obvious what's going on.
The change generally will need a bump to the CHANGELOG.md
, can you add a comment when you do that? You can do both in a separate PR, or both in this PR - either is ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
Added changes to emit latency metrics for forwarded batches of metrics.
Two new latency metrics are added:
http.forwarder.post_latency.sum
: the total of the time taken to forward a batch in the flush interval. This can be used to find the average latency of forwarder requests by dividing by the number of messages sent.http.forwarder.post_latency.max
: the maximum time taken to forward a batch in the flush interval.