Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33810][Runtime] Propagate RecordAttributes that contains isProcessingBacklog status #23919

Closed
wants to merge 9 commits into from

Conversation

Sxnan
Copy link
Contributor

@Sxnan Sxnan commented Dec 13, 2023

What is the purpose of the change

This PR introduces RecordAttributes that contain information about whether the data is backlog. The RecordAttributes will propagate through the job graph along with the data at runtime.

Brief change log

  • Introduce RecordAttributes to notify the downstream whether the records are backlog data.
  • Propagate the RecordAttributes at runtime.

Verifying this change

  • Unit tests are added to each affected component.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializer: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 13, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@xintongsong xintongsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Sxnan, thanks for addressing my comments in PR#23521. The code quality looks quite good to me. Nice work, very impressive.

I think I have one last question regarding the initial status before receiving the first RecordAttributes. Currently, this seems undefined, or depends on the operators. Shall we explicitly define the initial status? Would it be a problem if we don't? (E.g., any consistency issues?)

Comment on lines 68 to 109
/** If any of the input channels is backlog, the combined RecordAttributes is backlog. */
private boolean combineIsBacklog(
RecordAttributes lastRecordAttributes, RecordAttributes recordAttributes) {
if (lastRecordAttributes == null
|| lastRecordAttributes.isBacklog() != recordAttributes.isBacklog()) {
if (lastRecordAttributes != null && recordAttributes.isBacklog()) {
nonBacklogChannelsCnt -= 1;
}
if (!recordAttributes.isBacklog()) {
nonBacklogChannelsCnt += 1;
}
}

return nonBacklogChannelsCnt < numInputChannels;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand the behavior during job initialization. According to this method, this will be:

  • As soon as the first RecordAttributes is received, despite its isBacklog is true or false, the combiner will emit a RecordAttributes with isBacklog being false to the downstream, unless there's only one input channel. Is that correct?

  • Then the questions is, what happens before the first RecordAttributes is received? What is the initial status, and how should the operators behave? Would it be possible that the operators are initialized for one mode (e.g., non-backlog) and have to switch to another mode (e.g., backlog) before receiving any records? Or even worse, different operators might be initialized with inconsistent modes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!

I updated the RecordAttributesCombiner to address the initialization issue. The isBacklog of each input channel has three states, namely undefined, isBacklog=true, and isBacklog=false. We decide the isBacklog of the input as the following:

  • if any input channel isBacklog=true, then the input is isBacklog=true
  • otherwise, if any input channel is undefined, the isBacklog status of the input is unchanged
  • otherwise (all the channel is defined and isBacklog=false), the input is isBacklog=false

Currently, all the operators are initialized with non-backlog mode. I agree that, ideally, we should determine the initial status before receiving the first RecordAttributes so that we don't have to initialize the operator in non-backlog mode and immediately switch to backlog mode before processing any records. However, It turns out that it is non-trivial and I don't think it should block this PR. Thus, I prefer to keep the current PR simple and address the problem in the future. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. Could you please open a JIRA ticket about this future work, so that we don't lose track on it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JIRA ticket is created.

Copy link
Contributor

@xintongsong xintongsong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Merging.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants