Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Modify lag rate calculations #1022

Closed
wants to merge 6 commits into from
Closed

Conversation

shanson7
Copy link
Collaborator

Related to #985

We are seeing (as of yet undiagnosed) pauses of a few seconds in our read instances. When this happens, the lag jumps a bit and the priority jumped even more. The code harshly penalizes short pauses. This code uses a moving window to determine rate of kafka growth to estimate "seconds behind".

@shanson7 shanson7 changed the title 503 fix Modify lag rate calculations Aug 28, 2018
@shanson7
Copy link
Collaborator Author

We have been running this for about 5 weeks and it's solved our issues with spurious 503's.

if min < 0 || m < min {
min = m
lag := int(m.highWaterMark - m.readOffset)
if min < 0 || lag < min {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny optimization: at this point we know that len(l.measurements) > 0 so we could initialize min with the lag of the first measurement and then iterate over l.measurements[1:]. This would save one loop and this if condition would be simplified because it only needs to be lag < min instead of min < 0 || lag < min.


// If there are no longer any incoming messages, use our read offsets to compute rate
if high == low {
high, low = latestLag.readOffset, earliestLag.readOffset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason why we wouldn't just always use the .readOffset, instead of only using it if the .highWaterMarks are equal?
If I understand this correctly then it will now calculate the rate based on how fast Kafka receives new messages in this partition, unless it receives no new messages, then it will calculate the rate based on how fast MT consumes them, right? It seems a little surprising to me that the caller of .Rate() doesn't know which of those two rates it will actually get returned...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to estimate the interval between now and the time this message was put into kafka. Using the rate of growth of the partition offsets in kafka is better for this than our read offset, since various things can impact our ingest rate (GC, Prune locks, etc). The old implementation relied on the read offset and was susceptible to over correction due to short pauses in ingest.

As for the failover...it could be better, and I'm open to suggestions. I think that @Dieterbe was looking into a more complete refactor that split the computations from the usage.

I'm most interested in coming up with the most effective heuristic for estimating eligibility in cluster processing. If that means that a well-known fallback behavior is used, then so be it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense 👍

// to just keep using the last computed rate, and wait for the next
// measurement to compute a new rate based on the new baseline
return
if high < low {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be else if.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cannot, due to variable change. high/low are modified in the first branch.

Copy link
Contributor

@replay replay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with minor suggestions for improvements

@shanson7
Copy link
Collaborator Author

Hey, does this look good to take? This helped us immensely with stopping short pauses from causing 503's

@replay
Copy link
Contributor

replay commented Oct 17, 2018

Do you mean the latest commit? yeah, that makes sense, thanks

@Dieterbe
Copy link
Contributor

dieter [5:07 PM]
i have two concerns:

  1. much of the code is crufty (intermingled with offset manager leveldb stuff which we no longer use, 2 loops that can go out of sync vs just 1 loop, code is hard to follow: state management intermingled with score computation etc)
  2. unit tests:

.the LagMonitor unit tests are lacking (both master and bloomberg/503_fix):

  • it's very unclear what kind of scenario or behavior they're testing, there's lots of numbers but ambiguity about where those numbers are coming from or what they represent
  • I think many of it is testing for specific implementation details, rather than desired behavior (which would be along the lines of "with these inputs, we want a score in that range")

I propose we instead test for various scenarios wherein each scenario is a list of events (inputs and expected score)

scenarios:

  • startup: predict time-to-in-sync
  • rate of production goes up/down but lag stays consistent
  • rate of production goes up/down but ingest goes down
  • rate of production goes up/down but ingest goes up slightly or up by the same amount as production goes up
  • rate of consumption goes up/down ->
  • short pause of a few seconds with quick recovery

each line could be:

wallclock time, current offset, and newest (highwatermark-1), expected priority, and a comment describing what's going on

I want to see these tests fail with the current implementation master, and then succeed with the new implementation. (edited)

i would have preferred to address 1 before introducing a logical change such as yours, but i'm suppose i'm ok with doing it the other way around. I do feel more strongly that we should get our unit test story straight

dieter [5:25 PM]
i can help, of course

shanson [5:59 PM]
I’ll be honest, I haven’t really had much time to work on MT stuff this last month and probably won’t be able to for another month
Right now we are maintaining the fork (not too much hassle, fewer rollouts when I’m working on other things) because without it we were getting consistent 503s

@shanson7
Copy link
Collaborator Author

I'm going to try to get to this this week. I can start from scratch if need be (already some conflicts). The problem with adding unit tests that better test example scenarios that fail then pass is that the refactor you are asking for would require refactoring the tests as well. So maybe @Dieterbe could help lay out the expected structure and I could get to work on that.

The current division of responsibilities on master:

lagLogger:

  • Tracks sliding window of lag
  • Calculates the minimum lag

rateLogger:

  • Tracks last (valid) rate of ingest
  • Returns said rate

LagMonitor:

  • Entry point to the other 2 classes
  • Map of partition -> lag and map partition -> rate
  • storeOffset (for rate)
  • storeLag (for lag)
  • Metric -> logic around computed lag, derived from min lag in window and last rate of ingest
  • Explain -> basically a dump of the partition Metric

The current division of responsibilities on PR branch:

lagLogger:

  • One per partition
  • tracks window of offsets, timestamps
  • Calculates the minimum lag
  • Calculates an estimated rate of kafka production, falls back to ingest (if no new messages produced to kafka)

LagMonitor

  • Entry point
  • Map of partition -> lagLogger
  • storeOffsets (lag is computed from the offsets)
  • Metric -> logic around computed lag, derived from min lag in window and avg rate
  • Explain -> basically a dump of the partition Metric

So, the basic change here is to use the average rate over a window, rather than a single snapshotted rate and it uses the rate of production, rather than the rate of ingest.

lag = l = number of messages behind
rate of ingest = ri = messages per second MT processes
rate of production = rp = messages per second Kafka received

l / ri = time (in seconds) until we reach the current newest offset
l / rp = time (in seconds) our position is from current newest offset

Switching from ri to rp means priority is more accurately tracking seconds behind realtime, rather than seconds until we get to current head.

@Dieterbe
Copy link
Contributor

Dieterbe commented Dec 28, 2018

much of the code is crufty (intermingled with offset manager leveldb stuff which we no longer use, 2 loops that can go out of sync vs just 1 loop, code is hard to follow: state management intermingled with score computation etc)

I am working on getting this fixed. #1110 is merged. i'll need to have a look at #1113 and #1171 should probably also go in first too.
when the code is cleaner, your changeset will be easier to follow. it'll require rebasing and fixing merge conflicts, but the extra clarity will be worth it I think

The problem with adding unit tests that better test example scenarios that fail then pass is that the refactor you are asking for would require refactoring the tests as well.

i'll help with this too.

@Dieterbe
Copy link
Contributor

Dieterbe commented Jan 3, 2019

Suggestion. not sure if it's such a good idea and if it is, not sure if it should be part of this PR.
but.. Should we transition the code from being organized as the "monitor thing that uses 2 different kinds of loggers" to the "1 thing to track per-partition status, and 1 thing to track global status across all partitions". it seems it may make more sense and make it easier to understand what's going on.

This would involve:

  • rename lagLogger to PartitionStatus or something, and lagMonitor to KafkaStatus
  • split up LagMonitor.Metric() (and rename it) so we have PartitionStatus.Priority() which computes the priority for 1 partition and KafkaStatus.Priority() which gets the max from all the PartitionStatus ones

@shanson7
Copy link
Collaborator Author

Rebased my changes onto the latest master. In essence, LagMonitor.Metric() now returns an estimate of how long ago the messages we are currently processing were initially put into the kafka partition. To that end, it might be beneficial to rename some of the components/function names but I think the implementations are what we want.

@@ -132,7 +124,7 @@ func TestLagMonitor(t *testing.T) {
for i := 0; i < 100; i++ {
advance(i, int64(i*2), int64(i))
}
So(mon.Metric(), ShouldEqual, 98) // min-lag(98) / rate (1) = 98
So(mon.Metric(), ShouldEqual, 49) // min-lag(98) / input rate (2) = 49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is interesting.
when ingest rate != input rate, this number is off.
when does this happen?

  • when an instance can't keep up with production (input > ingest), lag is underestimated, but perhaps the lag growth that would happen in this case would suffice to make the reported lag big enough.
  • when an instance is backfilling (ingest > input) because it's starting up, lag is overestimated, but it's ok because when it becomes ready it's ingest rate will start to match input rate, so the effect is it becomes ready slightly later.
  • when an instance is backfilling (ingest > input) because it's recovering from an earlier slowdown. same here, would be marked as ready a bit later than usual.

This tradeoff seems fine.

Copy link
Contributor

@Dieterbe Dieterbe Feb 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose "off" is debateable here.
it's not "off" when we think of lag as "how long ago was this produced" as apposed to "at which point will we have caught up" (assuming a constant rate of input)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lag is underestimated

Not really. Lag is well defined as the number of messages this instance's head is behind kafka's head.

Metric (which is really priority / normalization of lag) is what changes. I've redefined it to (normally) mean "How many seconds behind real-time are we (for the worst partition)?". The previous definition was "How many seconds until we catch up?". The former definition is (IMO) more useful for determining when to stop querying an instance.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this is what I was referring to when I said the names could use some cleaning up / clarification

@Dieterbe
Copy link
Contributor

Dieterbe commented Feb 7, 2019

@shanson7 I intend to merge #1218 , which is a cleaned up version of this branch.

@Dieterbe
Copy link
Contributor

Dieterbe commented Feb 7, 2019

#1218 now merged instead.
Thanks for your work Sean

@Dieterbe Dieterbe closed this Feb 7, 2019
@Dieterbe Dieterbe added this to the vnext milestone Feb 11, 2019
@shanson7 shanson7 deleted the 503_fix branch March 6, 2019 14:27
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants