Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34252][table] Fix lastRecordTime tracking in WatermarkAssignerOperator #24211

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

dchristle
Copy link
Contributor

@dchristle dchristle commented Jan 29, 2024

What is the purpose of the change

In the current implementation, the lastRecordTime variable, which tracks the time of the last received data element, is updated only when the WatermarkStatus transitions from IDLE to ACTIVE. However, it is not updated when WatermarkStatus is ACTIVE, which means even under continuous data flow, the condition (currentTime - lastRecordTime > idleTimeout) will eventually always become true, and the WatermarkStatus will erroneously be marked IDLE.

I believe this bug technically causes incorrect outputs since downstream watermarks advance earlier than they otherwise would. The incorrect state doesn't last forever, though, since when the WatermarkStatus is in in the IDLE state, the next processElement will cause a WatermarkStatus.ACTIVE to be emitted.

The new unit test illustrates the flip-flop behavior before the fix:

[ERROR] org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest.testIdleStateAvoidanceWithConsistentDataFlow -- Time elapsed: 0.013 s <<< FAILURE!
java.lang.AssertionError:

Expecting
  [WatermarkStatus(IDLE),
    WatermarkStatus(ACTIVE),
    WatermarkStatus(IDLE),
    WatermarkStatus(ACTIVE),
    WatermarkStatus(IDLE),
    WatermarkStatus(ACTIVE),
    WatermarkStatus(IDLE),
    WatermarkStatus(ACTIVE),
    WatermarkStatus(IDLE)]
not to contain
  [WatermarkStatus(IDLE)]
but found
  [WatermarkStatus(IDLE)]

Brief change log

  • Update lastRecordTime in table WatermarkAssignerOperator on each record to prevent the stream from incorrectly being marked idle

Verifying this change

This change added tests and can be verified as follows:

  • Added test that validates the WatermarkStatus is not set to idle when records are sent more frequently than the idleTimeout

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (yes)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

…Operator to prevent erroneous idle WatermarkStatus
@dchristle dchristle changed the title [FLINK-34252] [table] Fix lastRecordTime tracking in WatermarkAssignerOperator [FLINK-34252][table] Fix lastRecordTime tracking in WatermarkAssignerOperator Jan 29, 2024
@flinkbot
Copy link
Collaborator

flinkbot commented Jan 29, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@1996fanrui 1996fanrui self-assigned this Feb 18, 2024
Copy link
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dchristle for the fix!

LGTM.

Hi @dawidwys , I guess this bug is introduced by FLINK-22881 [1][2]). Would you mind helping review this PR as well? Thanks a lot~

[1] #16221
[2] 5382cf5

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the bug report and the fix! I've spotted one issue.

Comment on lines +103 to 109
if (idleTimeout > 0) {
if (currentStatus.equals(WatermarkStatus.IDLE)) {
// mark the channel active
emitWatermarkStatus(WatermarkStatus.ACTIVE);
}
lastRecordTime = getProcessingTimeService().getCurrentProcessingTime();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not call getProcessingTimeService().getCurrentProcessingTime() per every record, that's a too costly operation.

I think we could use a trick to count emitted records here, in the processElement. Then in WatermarkAssignerOperator#onProcessingTime you could periodically check if the processed elements count has changed and update lastRecordTime there if it did. That would loose us a little bit of accuracy, but not much. For example if processing timer is triggered ~5x more frequently than idleTimeout, the average accuracy lost would be only ~10%, which is negligible.

A couple of extra complications that I see are:

  • This operator registers a processing time timer only if periodic watermarks are enabled (watermarkInterval > 0). The code would have to be adapted to register and fire timers also if watermarkInterval == 0 && idleTimeout > 0.
  • If both idleTimeout > 0 && watermarkInterval > 0, we might need to somehow handle two timers frequencies:
    • I guess we could register two different ProcessingTimeCallback with two different frequencies.
    • Or we can ignore the problem and if both idleTimeout > 0 && watermarkInterval > 0, we could just have a single timer with watermarkInterval latency. This option is probably simpler, and might be good enough as usually (almost always?) watermarkInterval << idleTimeout.

Or have I missed something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pnowojski for this valuable comment!

We can not call getProcessingTimeService().getCurrentProcessingTime() per every record, that's a too costly operation.

Good catch, I didn't notice it before.

I think we could use a trick to count emitted records here, in the processElement. Then in WatermarkAssignerOperator#onProcessingTime you could periodically check if the processed elements count has changed and update lastRecordTime there if it did. That would loose us a little bit of accuracy, but not much. For example if processing timer is triggered ~5x more frequently than idleTimeout, the average accuracy lost would be only ~10%, which is negligible.

The solution make sense to me.

If both idleTimeout > 0 && watermarkInterval > 0, we might need to somehow handle two timers frequencies:
I guess we could register two different ProcessingTimeCallback with two different frequencies.
Or we can ignore the problem and if both idleTimeout > 0 && watermarkInterval > 0, we could just have a single timer with watermarkInterval latency. This option is probably simpler, and might be good enough as usually (almost always?) watermarkInterval << idleTimeout.

In general, watermarkInterval << idleTimeout, but it isn't always right. Users can set them randomly, we cannot ensure how user set them.

A workaround solution is we only register a timer with min(idleTimeout, watermarkInterval) latency.

For watermarkInterval == 0 && idleTimeout > 0.

We can register a timer with idleTimeout latency.

WDYT?

Copy link
Contributor

@pnowojski pnowojski Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A workaround solution is we only register a timer with min(idleTimeout, watermarkInterval) latency.

Does idleTimeout < watermarkInterval have any sense? Shouldn't this be disallowed even? But if you have doubts, we can support that case here.

Given that when timer is registered for value <> watermarkInterval, and in that case we need some special handling in the timer callback, whether it's time to emit watermark or not, maybe it makes more sense to calculate some more fancy timer interval? For example:

if min(watermarkInterval, idleTimeout) * 5  < max(watermarkInterval, idleTimeout)
  return min(watermarkInterval, idleTimeout)
else 
  return max(min(watermarkInterval, idleTimeout)/5, 1)

That would ensure our accuracy is roughly speaking always good enough? And the code in the callback to determine whether it's time to check idleness and/or emit watermark would basically remain the same.

We can register a timer with idleTimeout latency.

Yes, that's what I had in mind :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does idleTimeout < watermarkInterval have any sense? Shouldn't this be disallowed even? But if you have doubts, we can support that case here.

Sorry, I mean we didn't forbid it, so we cannot assume it.

From your comment, I think we can limit idleTimeout > watermarkInterval.

Copy link
Contributor Author

@dchristle dchristle Feb 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pnowojski and @1996fanrui for the review. The approach to avoid getProcessingTimeService().getCurrentProcessingTime() each record seems sound to me and should be straightforward to implement.

For watermarkInterval == 0 && idleTimeout > 0

Do we know if watermarkInterval == 0 is a valid case where we are sure we need to periodically emit WatermarkStatus? My interpretation when first reading this code was that a zero value is a sentinel indicating the user isn't using watermarks. If that's the case, does the concept of "idleness" make sense anymore? Perhaps we don't emit anything for watermarkInterval == 0.

A mailing list thread discusses some confusion in the docs on the meaning of an auto-watermark interval set to zero -- whether it means emitting a watermark each record, or not emitting them at all.

A reply by @wuchong writes that the zero value:

just disable periodic watermark emission, it doesn't mean the watermark will never be emitted.

So, there may be watermarks from elsewhere when watermarkInterval == 0. But does that mean we need to support idleness detection here?

The current code would work in the case of a zero watermark interval, but only because of the last part of processElement where advanceWatermark() is called. That code is commented with:

// eagerly emit watermark to avoid period timer not called (this often happens when cpu load is high)

It seems like the intent was to ensure advancement under high load, rather than support the watermarkInterval == 0 case. Without this eager advancement, the code would never emit Watermark, since no timers would be scheduled & we'd never try to detect idleness.

@pnowojski
Copy link
Contributor

Do we know if watermarkInterval == 0 is a valid case where we are sure we need to periodically emit WatermarkStatus? My interpretation when first reading this code was that a zero value is a sentinel indicating the user isn't using watermarks. If that's the case, does the concept of "idleness" make sense anymore? Perhaps we don't emit anything for watermarkInterval == 0.

Yes, that's a valid combination with idleness in at least a couple of cases:

  • watermark is emitted in onEvent, for example for every record OR if there is already some special record in the source that represents a watermark
  • given Source/SourceReader handles emission of watermarks on it's own, but doesn't handle idleness
    In both cases watermarkInterval = 0 and idleTimeout > 0 could be a desired configuration.

It seems like the intent was to ensure advancement under high load, rather than support the watermarkInterval == 0 case. Without this eager advancement, the code would never emit Watermark, since no timers would be scheduled & we'd never try to detect idleness.

That's a good catch @dchristle!

This safety net for an overloaded subtask thread I think might no longer be needed. In the past the timer thread trying to emit periodic watermark and the subtask/source thread would try to compete to acquire the same lock. AFAIK the original authors were worried about lock starvation (I'm not even sure if this was really needed, as AFAIK kernels do not guarantee fairness, but guarantee that threads won't be starved on locks acquisition). Currently the synchronisation mechanism is very different, and I would say there is no real need for this extra code path.

So if it makes things simpler I would be fine removing this safety net.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants