Skip to content

Emit latency metric for forwarder requests #750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 17, 2025

Conversation

alyssat2
Copy link
Contributor

@alyssat2 alyssat2 commented Apr 4, 2025

Added changes to emit latency metrics for forwarded batches of metrics.

Two new latency metrics are added:

  • http.forwarder.post_latency.sum: the total of the time taken to forward a batch in the flush interval. This can be used to find the average latency of forwarder requests by dividing by the number of messages sent.
  • http.forwarder.post_latency.max: the maximum time taken to forward a batch in the flush interval.

hstan
hstan previously approved these changes Apr 4, 2025
Copy link
Member

@hstan hstan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

if err = post(); err == nil {
atomic.AddUint64(&hfh.messagesSent, 1)
hfh.lastSuccessfulSend.Store(clock.Now(ctx).UnixNano())
statser.TimingDuration("http.forwarder.post_duration", clock.Since(ctx, startTime), gostatsd.Tags{"status:success"})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are per-host timers, it will be a huge cardinality explosion.

For things like this we accumulate data in a variable (make sure to be thread safe), and emit a single value for what we actually want when we flush. See the emitMetrics function, and that data it consumes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I didn't consider that the internalStatser added a host tag.
But the metrics reported from emitMetrics are counters and this is latency, should we aggregate it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the cardinality explosion, I reckon we can aggregate these on our side and get host dimension nuked out.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got 3 primary concerns here:

  1. I have always tried to keep "Atlassianisms" out of gostatsd (there's some in cmd/tester though!). Saying "Atlassian can use our metrics sink to deal with a non-Atlassian problem" goes against DFTC. We should avoid making changes to gostatsd which are useful for all, not for us.
  2. Sending data through the generic aggregation pipeline burns CPU. Stripping data out later also burns CPU, all along the pipeline. I've always tried to make gostatsd efficient, and not do unnecessary work. An atomic increment vs sending multiple values through a pipeline to eventually be sorted and have numerous calculations performed on it.
  3. With all my projects, I try to emit telemetry to answer a specific question. Here, the question seems under-specified: "emit latency". But what is that?
  • "What is the latency" is a question that requires logging every value, because it's a question without aggregation.
  • "What is the max latency" can be generated with a timer, or tracking it in a max variable we flush. The former burns CPU, the latter gives us the ability to decide what we actually want, and only what we want.
  • "What is the average latency" can be generated with a timer, or tracking a sum/count which we flush (as two values - we might already have a count though). Again, the former burns CPU, the latter gives us flexibility. We can look at long term averages by doing the math on the backend if we want.

@@ -299,6 +302,8 @@ func (hfh *HttpForwarderHandlerV2) emitMetrics(statser stats.Statser) {
statser.Report("http.forwarder.sent", &hfh.messagesSent, nil)
statser.Report("http.forwarder.retried", &hfh.messagesRetried, nil)
statser.Report("http.forwarder.dropped", &hfh.messagesDropped, nil)
statser.TimingMS("http.forwarder.post_latency.max", float64(hfh.postLatencyMax.Load()), nil)
statser.TimingMS("http.forwarder.post_latency.avg", float64(hfh.postLatencyTotal)/float64(hfh.messagesSent), nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • will messageSent ever be 0?
  • hfh.postLatencyTotal never gets reset to 0, so the result will be incorrect.


postLatency := clock.Since(ctx, startTime)
atomic.AddInt64(&hfh.postLatencyTotal, postLatency.Milliseconds())
if postLatency.Milliseconds() > hfh.postLatencyMax.Load() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a race condition, consider if there's two goroutines running here (which is entirely possible, HttpForwarderHandlerV2.Run will launch goroutines for this, on line 333).

Assuming:

  • postLatencyMax = 10
  • Goroutine 1 has postLatency = 30
  • Goroutine 2 has postLatency = 20

Both enter the if statement

  • Goroutine 1 stores its data (max = 30)
  • Goroutine 2 stores its data (max = 20)

This can be done via a "simple" loop, instead of a single assignment:

	for old := maxAccum.Load(); old < latency; old = maxAccum.Load() {
		if maxAccum.CompareAndSwap(old, latency) {
			break
		}
	}

The CompareAndSwap is an atomic operation which does:

func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) {
  if *addr == old {
    *addr = new
    return true
  } else {
    return false
  }
}

Note that this is done atomically, the read and write are guaranteed to occur "at the same time", they can't conflict with other threads.

Now what will happen is:

  • 1 will load the old value of 10 (note: as soon as you read it, it becomes "old", because it can change)
  • 2 will load the old value of 10
  • 1 will compare the old value of 10 to the latency of 30 and decide to update
  • 2 will compare the old value of 10 to the latency of 20 and decide to update
  • 1 will execute a CompareAndSwap, which compares the stored value of 10 to the old value of 10, sees they're the same, stores the new value 30, and returns true
  • 1 exits the loop
  • 2 will execute a CompareAndSwap, which compares the stored value of 30 to the old value of 10, sees they're different, and returns false
  • 2 will read the old value again and get 30
  • 2 will compare its latency of 20 to the old value of 30
  • 2 exits the loop

Or...

  • 1 will load the old value of 10
  • 2 will load the old value of 10
  • 1 will compare the old value of 10 to the latency of 30 and decide to update
  • 2 will compare the old value of 10 to the latency of 20 and decide to update
  • 2 will execute a CompareAndSwap, which compares the stored value of 10 to the old value of 10, sees they're the same, stores the new value 20, and returns true
  • 2 exits the loop
  • 1 will execute a CompareAndSwap, which compares the stored value of 20 to the old value of 10, sees they're different, and returns false
  • 1 will load the old value again, and get 20
  • 1 will compare its latency of 30 to the old value of 20, and try to update it
  • 1 will execute a CompareAndSwap, which compares the stored value of 20 to the old value of 20, sees they're the same, stores the new value of 30, and returns true
  • 1 exits the loop

This can be more generally thought of as the following sequence:

  • Load some data
  • Compute a value based on that data (this can be an "expensive" operation)
  • Check if the value has changed and update it
  • If it failed, retry
  • If it succeeded, you're done

Or in code:


func AtomicFunction(
	value *atomic.Int64,
	fn func(old int64) int64,
) {
	for {
		old := value.Load()                 // Load some data
		new := fn(old)                      // Compute a value based on the data
		if value.CompareAndSwap(old, new) { // Check if the value has changed and update it
			return // If it succeded, great
		}
		// If it didn't, start again
	}
}

func atomicMax(value *atomic.Int64, new int64) {
	AtomicFunction(value, func(old int64) int64 {
		// This can do more or less anything, and we don't have to think about the atomic stuff, it's just inputs -> outputs.
		// Note that it must be inputs -> outputs, it can't have side-effects, because it may be ran multiple times.
		return max(old, new)
	})
}

@@ -299,6 +302,8 @@ func (hfh *HttpForwarderHandlerV2) emitMetrics(statser stats.Statser) {
statser.Report("http.forwarder.sent", &hfh.messagesSent, nil)
statser.Report("http.forwarder.retried", &hfh.messagesRetried, nil)
statser.Report("http.forwarder.dropped", &hfh.messagesDropped, nil)
statser.TimingMS("http.forwarder.post_latency.max", float64(hfh.postLatencyMax.Load()), nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tricky decision on what to emit here. There's going to be several values sent in the aggregator flush interval (remember the forwarder flushes a lot faster than the aggregator).

Right now, we're getting the lifetime maximum, since we don't reset the max. That may be acceptable?

If we reset the max:

  • If we emit a timer, then we're back in the same boat of "all the cardinality".
  • If we emit a gauge, then only the last forwarder flush interval will be recorded.

If we don't reset the max:

  • If we emit a timer, then we're back in the same boat of "all the cardinality".
  • If we emit a gauge, then we get the process lifetime maximum (which may be fine)

@@ -299,6 +302,8 @@ func (hfh *HttpForwarderHandlerV2) emitMetrics(statser stats.Statser) {
statser.Report("http.forwarder.sent", &hfh.messagesSent, nil)
statser.Report("http.forwarder.retried", &hfh.messagesRetried, nil)
statser.Report("http.forwarder.dropped", &hfh.messagesDropped, nil)
statser.TimingMS("http.forwarder.post_latency.max", float64(hfh.postLatencyMax.Load()), nil)
statser.TimingMS("http.forwarder.post_latency.avg", float64(hfh.postLatencyTotal)/float64(hfh.messagesSent), nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hfh.messagesSent is atomic - it was written before the atomic packages, hence the comment about atomic in the struct. As such, you must use atomic.LoadInt64().

However... Report won't just report, if we're in forwarder mode, it will also reset to 0:

func (is *InternalStatser) Report(name string, value *uint64, tags gostatsd.Tags) {
	if is.forwarderMode {
		val := float64(atomic.SwapUint64(value, 0))  // <-- swap

This is a case of side-effects causing problems. We can restructure this like:

func (hfh *HttpForwarderHandlerV2) emitMetrics(statser stats.Statser) {
	messagesInvalid := float64(atomic.SwapUint64(&hfh.messagesInvalid, 0))
	messagesCreated := float64(atomic.SwapUint64(&hfh.messagesCreated, 0))
	messagesSent := float64(atomic.SwapUint64(&hfh.messagesSent, 0))
	messagesRetried := float64(atomic.SwapUint64(&hfh.messagesRetried, 0))
	messagesDropped := float64(atomic.SwapUint64(&hfh.messagesDropped, 0))

	statser.Count("http.forwarder.invalid", messagesInvalid, nil)
	statser.Count("http.forwarder.created", messagesCreated, nil)
	statser.Count("http.forwarder.sent", messagesSent, nil)
	statser.Count("http.forwarder.retried", messagesRetried, nil)
	statser.Count("http.forwarder.dropped", messagesDropped, nil)
}

Note that we can only do this behavior here because we know we're in a forwarder. If we were somewhere outside of a forwarder, we would have to know if we should use swap or load.

Now that we have our data in a variable, we can then do:

	postLatencyTotal := float64(atomic.SwapUint64(&hfh.postLatencyTotal, 0))

	statser.???("http.forwarder.post_latency.avg", hfh.postLatencyTotal/messagesSent, nil)

(See previous comment about which metric type to use)

However... averages aren't super helpful, it's more useful to emit a counter:

	statser.Count("http.forwarder.post_latency.sum", float64(hfh.postLatencyTotal), nil)

What this means is on the backend we can query:

Sum = data('http.forwarder.post_latency.sum.count', rollup='sum')
Total = data('http.forwarder.sent.count', rollup='sum')
(Sum/Total).publish('Average latency')

This lets us see the average of any time frame, because we can perform a sum across both the latency and the sent, and get meaningful data.

messagesSent uint64 // atomic - messages successfully sent
messagesRetried uint64 // atomic - retries (first send is not a retry, final failure is not a retry)
messagesDropped uint64 // atomic - final failure
postLatencyTotal int64 // atomic - total of the time taken to send messages in a flush interval
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this atomic.Int64 as well. This code pre-dates the atomic package, hence why we don't have it.

postLatencyMax := hfh.postLatencyMax.Swap(0)
postLatencyTotal := hfh.postLatencyTotal.Swap(0)
statser.Gauge("http.forwarder.post_latency.max", float64(postLatencyMax), nil)
statser.Count("http.forwarder.post_latency.sum", float64(postLatencyTotal), nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just use Report here since what is does is exactly reset it to 0 and send a counter?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Report function takes a *uint64 and uses atomic function, as it was written before we had the atomic package. Since the atomic package is better (less error prone due to actual types), it's better to use that. There's an argument to say we should have Report take an atomic.Uint64, and refactor everything for it, but...

The Report function isn't great - it sounds like it reports a metric, but it doesn't, it also does a reset, and that's not at all clear. I would advocate to remove it entirely, if we were doing a refactor.

Copy link
Member

@hstan hstan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tiedotguy could you take a look as well?


postLatency := clock.Since(ctx, startTime).Milliseconds()
hfh.postLatencyTotal.Add(postLatency)
for old := hfh.postLatencyMax.Load(); old < postLatency; old = hfh.postLatencyMax.Load() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a loop that maybe could use a comment, since if you're not familiar with concurrency patterns, it's not immediately obvious what's going on.

The change generally will need a bump to the CHANGELOG.md, can you add a comment when you do that? You can do both in a separate PR, or both in this PR - either is ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

@alyssat2 alyssat2 merged commit 816d57d into master Apr 17, 2025
5 checks passed
@alyssat2 alyssat2 deleted the alyssat2/emit-latency-metric branch April 17, 2025 00:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants