Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33398][runtime] Support switching from batch to stream mode for one input stream operator #23521

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

Sxnan
Copy link
Contributor

@Sxnan Sxnan commented Oct 13, 2023

What is the purpose of the change

This PR supports switching from batch to stream mode to improve the performance during processing backlog data for one input stream operator.

Brief change log

  • Introduce RecordAttributes to notify the downstream whether the records are backlog data.
  • Propagate the RecordAttributes at runtime.
  • Automatically sort the backlog data for one input operator.

Verifying this change

This change added tests and can be verified as follows:

  • Added integration test for keyed aggregation and keyed windowed aggregation with backlog data.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 13, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@Sxnan Sxnan marked this pull request as ready for review October 16, 2023 03:48
@Sxnan Sxnan force-pushed the FLIP-327 branch 2 times, most recently from 36a0166 to edd6648 Compare October 16, 2023 06:37
@Sxnan
Copy link
Contributor Author

Sxnan commented Oct 17, 2023

@flinkbot run azure

Copy link
Contributor

@yunfengzhou-hub yunfengzhou-hub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left some comments as below.

@Sxnan Sxnan force-pushed the FLIP-327 branch 4 times, most recently from df7e9f5 to 756f929 Compare October 24, 2023 10:11
Copy link
Contributor

@yunfengzhou-hub yunfengzhou-hub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. Left some comments as below.

Besides, this PR does not cover all the changes proposed in FLIP-327. According to offline discussions, we need more PRs to support state cache and built-in multi-input operators. Therefore, it might be better to create child tickets for FLINK-33202 that cover the whole implementation plan, and assign this PR to one of these child tickets.

@Sxnan Sxnan changed the title [FLINK-33202][runtime] Support switching from batch to stream mode to improve throughput when processing backlog data [FLINK-33398][runtime] Support switching from batch to stream mode for one input stream operator Oct 30, 2023
@Sxnan Sxnan force-pushed the FLIP-327 branch 4 times, most recently from d83c464 to a4013fc Compare November 7, 2023 06:35
@Sxnan
Copy link
Contributor Author

Sxnan commented Nov 7, 2023

@yunfengzhou-hub Thanks for the review! I updated the PR. Can you have another look?

Copy link
Contributor

@yunfengzhou-hub yunfengzhou-hub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. Left some minor comments as below. @xintongsong Could you please take a look at this PR?

@Sxnan
Copy link
Contributor Author

Sxnan commented Nov 14, 2023

@xintongsong Could you help review this PR?

@SmirAlex
Copy link
Contributor

Hi @Sxnan! Sorry, I understand that I'm not a reviewer, but it happened that I was testing functionality from this MR recently, and I found a bug (in my opinion). It concerns the logic of RecordAttributesValve.
There was very little backlog data in my test at source + it had parallelism 4 (actually, any value > 1 suits the case). Due to very short backlog phase, time interval between sending RecordAttributes(isBacklog=true) and RecordAttributes(isBacklog=false) was also very short. In addition, due to the high parallelism one source subtask could send RecordAttributes(isBacklog=false) even before RecordAttributes(isBacklog=true) of another subtask. As a result, race condition have occurred in RecordAttributesValve#inputRecordAttributes. backlogChannelsCnt was incrementing and decrementing simultaneously, which led to not reaching numInputChannels, so no RecordAttributes was emitted from RecordAttributesValve at all.

I suggest to have different counters for RecordAttributes(isBacklog=true) and for RecordAttributes(isBacklog=false). Therefore, the race condition I mentioned earlier won't affect the result. Something like this:

    if (recordAttributes.isBacklog()) {
        backlogChannelsCnt += 1;
        if (backlogChannelsCnt != numInputChannels) {
            return;
        }
        backlogChannelsCnt = 0;
    } else {
        nonBacklogChannelsCnt += 1;
        if (nonBacklogChannelsCnt != numInputChannels) {
            return;
        }
        nonBacklogChannelsCnt = 0;
    }

    if (lastOutputAttributes == null
            || lastOutputAttributes.isBacklog() != recordAttributes.isBacklog()) {
        if (lastOutputAttributes != null && !lastOutputAttributes.isBacklog()) {
            LOG.warn(
                    "Switching from non-backlog to backlog is currently not supported. Backlog status remains.");
            return;
        }
        lastOutputAttributes = recordAttributes;
        output.emitRecordAttributes(recordAttributes);
    }

WDYT? Also, it is good to have a test for aforementioned case.

@Sxnan
Copy link
Contributor Author

Sxnan commented Nov 16, 2023

Hi @Sxnan! Sorry, I understand that I'm not a reviewer, but it happened that I was testing functionality from this MR recently, and I found a bug (in my opinion). It concerns the logic of RecordAttributesValve. There was very little backlog data in my test at source + it had parallelism 4 (actually, any value > 1 suits the case). Due to very short backlog phase, time interval between sending RecordAttributes(isBacklog=true) and RecordAttributes(isBacklog=false) was also very short. In addition, due to the high parallelism one source subtask could send RecordAttributes(isBacklog=false) even before RecordAttributes(isBacklog=true) of another subtask. As a result, race condition have occurred in RecordAttributesValve#inputRecordAttributes. backlogChannelsCnt was incrementing and decrementing simultaneously, which led to not reaching numInputChannels, so no RecordAttributes was emitted from RecordAttributesValve at all.

I suggest to have different counters for RecordAttributes(isBacklog=true) and for RecordAttributes(isBacklog=false). Therefore, the race condition I mentioned earlier won't affect the result. Something like this:

    if (recordAttributes.isBacklog()) {
        backlogChannelsCnt += 1;
        if (backlogChannelsCnt != numInputChannels) {
            return;
        }
        backlogChannelsCnt = 0;
    } else {
        nonBacklogChannelsCnt += 1;
        if (nonBacklogChannelsCnt != numInputChannels) {
            return;
        }
        nonBacklogChannelsCnt = 0;
    }

    if (lastOutputAttributes == null
            || lastOutputAttributes.isBacklog() != recordAttributes.isBacklog()) {
        if (lastOutputAttributes != null && !lastOutputAttributes.isBacklog()) {
            LOG.warn(
                    "Switching from non-backlog to backlog is currently not supported. Backlog status remains.");
            return;
        }
        lastOutputAttributes = recordAttributes;
        output.emitRecordAttributes(recordAttributes);
    }

WDYT? Also, it is good to have a test for aforementioned case.

Hi @Smir. Thanks for trying this out! The RecordAttributesValve combines the RecodAttributes from different input channels from the same input. The input is considered in backlog state if and only if all the input channels are backlog = true. Otherwise, some non-backlog records will be considered as backlog records.

Back to your case, where there is very little backlog data and it has parallelism greater than 1. It is possible that an input channel switches to non-backlog state before other input channels switch to backlog state. When that happens, we just process those data as if they are non-backlog data.

@SmirAlex
Copy link
Contributor

Thanks for the response, @Sxnan. I got your points. BTW, the reason why delivery guarantees of RecordAttributes elements to downstream operators was important to me is that I was testing the possibility to use them in order to properly implement Processing Time Temporal Join. I described my thoughts about this in FLIP-326 discussion thread. Thus, if eventually it will be decided to reuse RecordAttributes logic in order to solve FLINK-19830, the problem I described will arise one way or another.

Also, I should mention that there is another workaround that guarantees delivery of RecordAttributes(isBacklog=false) (but not isBacklog=true), but at the same time prevents considering some non-backlog records as backlog records. Pseudocode may be like this:

if (lastOutputAttributes == null && nonBacklogChannelsCnt == numInputChannels) {
    nonBacklogChannelsCnt = 0;
    output.emitRecordAttributes(RecordAttributes(isBacklog=false));
}

The downside is that it is possible to have RecordAttributes(isBacklog=false) without RecordAttributes(isBacklog=true) previously, but probably, it's ok. In other respects, this approach worked fine for me.

@Sxnan Sxnan force-pushed the FLIP-327 branch 3 times, most recently from 91c334d to ffd611b Compare November 22, 2023 14:28
Comment on lines +81 to +83
if (newKey == null) {
currentWatermark = maxWatermarkDuringBacklog;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does newKey == null mean? And why we need to update the current watermark under such condition?

Comment on lines +399 to +401
public KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> getEventTimeTimersQueue() {
return eventTimeTimersQueue;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels hacky to expose an internal queue from one component and use it to construct another component. If the queue is meant to be shared, I think the correct way is to create the queue out side the two components and pass it in as constructor arguments.

* the timer service will be the maximum watermark during backlog processing.
*/
@Internal
public class InternalBacklogAwareTimerServiceImpl<K, N> implements InternalTimerService<N> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I'm not a big fan of inherit. The problem is that, by directly accessing protected fields of the superclass from the subclasses, the contract between the classes becomes obscure. As subclass can see internals of the superclass, changes to the superclass can easily break things that subclasses depend on thus cause problems, and such issues are very hard to avoid.

In many cases, an inherit approach can be replaced by an equivalent approach based on composition, which has clearer contracts between classes. But admittedly it's no always possible and sometimes gets costly.

For the classes below, I guess I haven't spent enough time to explore whether it's possible to switch to a composition based approach. Do you think it's feasible?

  • BacklogTimeService
  • BatchExecutionInternalTimeService
  • InternalBacklogAwareTimerServiceManagerImpl
  • InternalTimeServiceManagerImpl

Comment on lines +421 to +423
if (Duration.ZERO.equals(
configuration.get(
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here for deciding whether to enable the mixed mode.

Comment on lines +101 to +102
override def processRecordAttributes(recordAttributes: RecordAttributes): Unit =
super.processRecordAttributes(recordAttributes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with Scala. What is this for? Shouldn't the method from the super class be called anyway if we don't override it?

@Sxnan
Copy link
Contributor Author

Sxnan commented Dec 13, 2023

After an offline discussion with @xintongsong, we will split this PR into two PRs. This PR will focus on optimizing the one input operator during backlog. The other PR will focus on propagating the RecordAttributes through the job graph.

This PR should be merged after the other PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants