Skip to content

Conversation

@dawidwys
Copy link
Contributor

@dawidwys dawidwys commented Oct 1, 2020

What is the purpose of the change

This PR implements sorting inputs for multi input operators/tasks. It is based on the work in #13521
First I sort all inputs individually and after that I do a sorting merge of all the inputs to make sure that operator sees all incoming records with the same key from all inputs.

Brief change log

For a more thorough explanation see the commits messages. Some highlights:

  • Some of the commits refactor the TwoInputStreamProcessor/MultiInputStreamProcessor etc. to reduce code duplication
  • Some of the commits extract the instantiation logic out of the processor and moves it to the Task. It makes it a) easier to instantiate in tests b) we need to pass less arguments to the ctor from the Task
  • One of the commits implements proper AvailabilityProvider handling logic, which did not work before.

Verifying this change

Added tests in:

  • MultiInputSortingDataInputsTest
  • SortedDataInputITCase
  • SortingBoundedInputITCase.java

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 1, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 6270bf0 (Thu Oct 01 14:35:15 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 1, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@dawidwys
Copy link
Contributor Author

dawidwys commented Oct 7, 2020

@flinkbot run azure

@dawidwys dawidwys force-pushed the FLINK-19473 branch 2 times, most recently from 0adec04 to 0678c76 Compare October 7, 2020 14:04
Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like most of the commits here. Well done @dawidwys with the refactoring/cleanups/deduplicating the code.

I think my biggest concerns are about the bug fix commit hidden in this PR.

Note, I didn't review the MultiInputSortingDataInputs code itself, I hope @aljoscha will be able to do the most work here. Please let me know if you would like me to take a look at something there after all.

Comment on lines 90 to 91
@SuppressWarnings("unchecked")
MultiInputSortingDataInputs<Object> multiInputs = new MultiInputSortingDataInputs<Object>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I remember someone complaining about unchecked warnings 🙈 and Object? 😈 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I must admit I was a bit harsh on this one 😅

@aljoscha
Copy link
Contributor

aljoscha commented Oct 8, 2020

Just one comment regarding commits tags. I mostly use [minor] when it's minor refactoring changes because [hotfix] to me implies a fix for sth that was broken. Lately I have also seen Stephan do commits with just [refactor] ..., that's also an option.

Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall the changes look very good! I had one comment before about commit tags and I had some inline comments about exception messages and some possible code changes.

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* An entry class for creating coupled, sorting inputs. The inputs are sorted independently and afterwards
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget if we mentioned this in the one-input implementations but we should mention that this does sorting by key and timestamp (in that order). It's not some generic sorting utility.

public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter,
long checkpointId) {
throw new UnsupportedOperationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, not sure I caught this on the one-input version but we should give some information about why it's not supported.

*/
private class SortingPhaseDataOutput implements PushingAsyncDataInput.DataOutput<Object> {

int currentIdx;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it maybe be better to have one SortingPhaseDataOutput per indexed input. It could also have "hard links" to the things it needs like key selector, sorter, etc.

And then we also wouldn't need this mutable field.

public void close() throws IOException {
IOException ex = null;
try {
inputs[idx].close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using array accesses for inputs and sorters, could we just get them once in the constructor and store in fields? This is related to the question below about using one SortingPhaseDataOutput per input.

Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very good! +1 to merge once CI is green.

I had two nitpicks about indentation and Javadocs.

* serialized key is constant, or {@link VariableLengthByteKeyComparator} otherwise.
*
* <p>Watermarks, stream statuses, nor latency markers are propagated downstream as they do not make
* sense with buffered records. The input emits a MAX_WATERMARK after all records.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It emits the maximum watermark it has seen, MAX_WATERMARK could be interpreted as Long.MAX_VALUE.

.mapToObj(
idx -> {
try {
KeyAndValueSerializer<Object> keyAndValueSerializer = new KeyAndValueSerializer<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation seems off here. (Another point for having an automatic formatting tool... 😅)

@dawidwys
Copy link
Contributor Author

dawidwys commented Oct 8, 2020

@pnowojski I'd like to double check with you, are you fine with the current state of the PR?

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % a couple of nits

…cessors

I replaced the OperatorChain in StreamOneInputProcessor and other InputProcessors with a BoundedMultiInput. Moroever I made the OperatorChain implement BoundedMultiInput interface. It makes it easier to instantiate StreamOneInputProcessor in tests without the need to instantiate a whole OperatorChain.
I implement a MultiInputSortingDataInputs which is kind of a factory for
multiple, related inputs. In case of sorting inputs of Two/Multiple
input operators not only the independent inputs should be sorted, but
also records across different inputs. Therefore the produced inputs
share a common context to synchronize which input should emit records
next.

The coordination is done via inputs Availability. Only one of the
inputs, with the current smallest element, is available at a time.
Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one optional nit besides, LGTM (assuming azure will be green).

Thanks for the changes @dawidwys.

Comment on lines 177 to 180
inputSelectionHandler.nextSelection();
} else {
secondInputStatus = processor2.processInput();
checkFinished(secondInputStatus);
inputSelectionHandler.nextSelection();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitty nit: move inputSelectionHandler.nextSelection(); outside of the if/else?

@dawidwys dawidwys closed this in 77e4e3b Oct 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants