New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
allow passing freshness checker after an idle threshold #11345
allow passing freshness checker after an idle threshold #11345
Conversation
Codecov Report
@@ Coverage Diff @@
## master #11345 +/- ##
============================================
- Coverage 68.48% 62.90% -5.59%
+ Complexity 6535 1077 -5458
============================================
Files 2233 2301 +68
Lines 119952 123666 +3714
Branches 18191 18798 +607
============================================
- Hits 82148 77787 -4361
- Misses 31964 40356 +8392
+ Partials 5840 5523 -317
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 736 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
looks like the only test failure is a flake. cc @Jackie-Jiang |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly good
* This depends on the user configured "idle.timeout.millis" stream config. | ||
* - the total idle time which only resets when we consume something. | ||
* | ||
* This is not a running timer. It only advances as long as we keep calling "markIdle". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just tracking the start time, and ask for nowMs
when reading the idle time? If the start time is 0, return 0 then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this a lot, so tell me if my logic doesn't make sense.
- the user can request the idle time for the stream or the idle time for consumption
- meaning you have to be able to answer
now - streamIdleStart
or now - consumeIdleStart` - so you have to track either the start time of both, or 1 start time + the difference between both start times
Tracking both start times felt like easier logic to reason about. I had a version of the code where we tracked the difference, and it was really confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, tracking both start times sounds good. My question is about whether we should calculate idle time at marking time or reading time. It would be more accurate if we calculate idle time at reading time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was mostly following the current convention, but you're right that it makes less sense with a separate class. I've changed the logic to calculate idleness on read rather than on "mark idle" time.
|
||
/** | ||
* The IdleTimer is responsible for keeping track of 2 different idle times: | ||
* - The stream idle time which resets every time we remake the stream consumer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully follow the difference between these 2 idle time. Currently seems we always update and reset them together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oof, bad bug on my part. When we recreate the stream consumer, we're supposed to markStreamNotIdle
. I see in our logs we're basically resetting the stream consumer every time.
One is supposed to track how long the stream has been idle and reset every time we recreate the consumer. The other continues to run even if the stream is reset until we consume data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I redeployed this internally, and I'm seeing the log volume come down a lot. Thanks for catching!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, steam idle means consumer idle (reset when we recreate consumer), consume idle means no event (to me that is closer to stream idle). Should we consider renaming them? IMHO current names are a little bit confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair, i've renamed the variables to be much more indicative of what they're actually tracking
*/ | ||
public class IdleTimer { | ||
|
||
private long _streamIdleStartTimeMs = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are going to access them from a different thread, need to make them thread safe (either volatile
or add synchronized
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -76,15 +90,36 @@ protected boolean isSegmentCaughtUp(String segmentName, LLRealtimeSegmentDataMan | |||
StreamPartitionMsgOffset currentOffset = rtSegmentDataManager.getCurrentOffset(); | |||
StreamPartitionMsgOffset latestStreamOffset = rtSegmentDataManager.fetchLatestStreamOffset(5000); | |||
if (isOffsetCaughtUp(currentOffset, latestStreamOffset)) { | |||
_logger.info("Segment {} with freshness {}ms has not caught up within min freshness {}." | |||
_logger.info("Segment {} with freshness {}ms has not caught up within min freshness {}. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not introduced in this PR, but does isOffsetCaughtUp()
never works for Kafka?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, not unless you change the code yourself. The initial version of this checker effectively said if it's a LongMessageOffset, check that currentOffset <= latestOffset -1
. But there was contention about having that special case in here. I worked around it internally by updating our ingestion plugin to do the -1 whenever someone requested the latest offset criteria.
// If the earliest available kafka offset is later than the current offset, then for the purposes of | ||
// this check, we are caught up. This can happen if the segment has not consumed data for some time, | ||
// and that data has since been retentioned. | ||
StreamPartitionMsgOffset earliestStreamOffset = rtSegmentDataManager.fetchEarliestStreamOffset(5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to count this as pass? If the consumer works as expected, it should be able to skip to the next available offset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's fair. and even if it doesn't for some reason, the idleness check would catch it for us. I left the earliest offset for logging purposes.
…te; do not pass freshness checker at earliest offset
I've tested this again on 2 clusters internally. Still seeing the same behavior:
|
…te; do not pass freshness checker at earliest offset (apache#267) ### Notify cc stripe-private-oss-forks/pinot-reviewers r? priyen ### Summary - important fix to mark the stream as not idle after it's recreated. this led to a ton of kproxy traffic and logs - remove the "earliest" offset check. jackie said the consumer should be able to progress past this on its own. so we'll just rely on the idleness check instead ### Motivation this is in response to OSS feedback, apache#11345 ### Testing deployed all hosts in QA and restarted. all tables are still healthy, and log volume/kproxy traffic is back down ### Rollout/monitoring/revert plan (Squashed by Merge Queue - Original PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/267)
…pache#295) ### Notify cc stripe-private-oss-forks/pinot-reviewers ### Summary Gist for pre merge diffs: https://git.corp.stripe.com/gist/5d1c5798dac53c9ca2cf791e153ca406 Gist for post merge diffs: https://git.corp.stripe.com/gist/d14940e109b4ea6c9552a1775a2872d3 Gist for diff of diffs: https://git.corp.stripe.com/gist/0228792a51c0920b2e59725c44b721de Gist for merge conflict resolution: https://git.corp.stripe.com/gist/15826b47e566a1673db8a1ffdeec2eda For upstream pulls, please list all the PRs of interest and risk I needed to add 1 more commit because a duplicate method didn't show up as a conflict. - apache#11184 upstream commit for Priyen's new tenants endpoint - apache#10770 UI now actually supports multiple options - apache#11301 now there's a metric for broker queries missing segments. but we already fail these queries - apache#11151 MSE now returns the correct type on SUM/MAX/MIN - apache#11128 OSS added an api to rebalance a whole tenant! it looks a bit complicated. likely needs some testing - apache#11273 API to get server pool from the server - apache#11328 this changes some CASE/WHEN semantics during upgrade. We only use case/when in RDP and auth rates queries. **todo** let's check with startree if this is ok or not - apache#11341 new `/query` API uses MSE by default - apache#11349 fixes "query time" in logs when broker response is empty - apache#11363 and apache#11513 better exception formatting in the UI - apache#11380 table shutdowns should generally be safer (can't query/add/remove segments) - apache#11345 pulling in the freshness checker fixes we have already - apache#11421 logging fix we contributed - apache#11369 this upgrades hadoop, but also we shade it. we need to test this on prod sandbox - apache#11443 fix we upstreamed to throttling metrics - apache#11467 SQL between is now inclusive. this was only broken if it was in the select clause like `CASE WHEN X between 0 and 1`. we don't allow that form in decibel, https://livegrep.corp.stripe.com/view/stripe-internal/zoolander/src/scala/com/stripe/decibel/pinot/PinotJobConfig.scala#L115 - apache#11476 this is some fix for changing partition count on kafka topics. we don't allow this internally, but maybe this gets us closer - apache#11532 upstream of handling empty parquet files - apache#11515 upsert snapshot size metric, we don't use snapshots yet for RDP - apache#11506 makes broker response public, might be useful for findata since they use java client - apache#11553 we don't use protos or support nulls yet, but we will ### Motivation - this is the PR the 1.0.0 release is based on - This gets us to < 730 lines of difference between us and OSS! ### Testing For upstream pulls: https://docs.google.com/document/d/1Zj6lBimVlCDBCycBI6IuTEuI5xwtYUYeDDOMpkwNHhk/edit#heading=h.g37jl5jhx4j5 ### Rollout/monitoring/revert plan (Merge-committed by Merge Queue - Original PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/295)
This is a small feature addition to allow servers to become healthy via
realtimeFreshnessIdleTimeoutMs
if the stream is not making any progress. There's also an addition to the freshness checker to mark itself healthy if the current stream offset is now out of retention. By default, this changes nothing since the default 0ms config has no effect.This has been an issue for us in low volume QA environments because we have servers wait 45 minutes to catch up before marking themselves as healthy. But there are occasions where servers have been down for too long and offsets are retentioned. Or kafka just never gives us back the requested offsets (we believe these are control messages from transactional publishes).
We're running this on all QA clusters with no issue.