Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-20398][e2e] Migrate test_batch_sql.sh to Java e2e tests framework #24471

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

affo
Copy link
Contributor

@affo affo commented Mar 8, 2024

What is the purpose of the change

Migrate test_batch_sql.sh to end-to-end test frameworks.

Brief change log

  • implement BatchSQLTest porting test_batch_sql.sh
  • fix issue in getting job ID in FlinkDistribution
  • remove test_batch_sql.sh script
  • remove test_batch_sql.sh invocations from run-nightly-tests.sh

Verifying this change

This change added tests and can be verified as follows:

  • Added integration tests for end-to-end batch mode SQL query execution

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? NA

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 8, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@affo
Copy link
Contributor Author

affo commented Mar 11, 2024

@wuchong @XComp

Hi guys, here is the PR for https://issues.apache.org/jira/browse/FLINK-20398.

I decided to go for LocalStandaloneFlinkResourceFactory as it is already used and part of flink-end-to-end-tests-common other options were:

  • MiniCluster
  • FlinkContainers (testcontainers-based)
    Just tell me if you would rather see one of those implementations for some reason 👍

Important concern:
This test used to be part of run-nightly-tests.sh,
now I think it would run differently 🤔

Should it still run nightly?

Copy link
Contributor

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @affo .

This test used to be part of run-nightly-tests.sh,

The Java e2e tests are also triggered in the nightly run (see ./flink-end-to-end-tests/run-nightly-tests.sh:259).

@affo
Copy link
Contributor Author

affo commented Mar 18, 2024

@XComp thank you for your review, gonna address your feedback today (as I had a week off)

@affo
Copy link
Contributor Author

affo commented Mar 19, 2024

@XComp

Required quite of an effort honestly, but here we are with the JUnit5 version of what I had before 👍

This also allowed not to start a separate jar, but to directly include the code in the text and directly run it agains the MiniCluster obtained 👍

Thank you for your detailed review


UPDATE

I am still investigating why the test fails in CI as I cannot reproduce that locally...
I tried to use java8 for compiling and running, but I hit another error actually 😓


UPDATE

rebased and forced pushed, now CI is ok 👍

@affo
Copy link
Contributor Author

affo commented Mar 25, 2024

@XComp everything should be ok now 👍

Copy link
Contributor

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job. I left a few nitty comments. But it looks good overall already. 👍

@EnumSource(
value = BatchShuffleMode.class,
names = {
"ALL_EXCHANGES_BLOCKING",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"ALL_EXCHANGES_BLOCKING",
"ALL_EXCHANGES_PIPELINED",
"ALL_EXCHANGES_BLOCKING",

Does it make sense to add the pipelined mode as well? (just thinking out loud, I don't have much knowledge of this part of the code).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an expert either, but I tried and I get an IllegalState:

At the moment, adaptive batch scheduler requires batch workloads to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'. Note that for DataSet jobs which do not recognize the aforementioned shuffle mode, the ExecutionMode needs to be BATCH_FORCED to force BLOCKING shuffle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to add comment that only ALL_EXCHANGES_BLOCKING, ALL_EXCHANGES_HYBRID_FULL, and ALL_EXCHANGES_HYBRID_SELECTIVE are supported by the adaptive batch scheduler.

@XComp
Copy link
Contributor

XComp commented Apr 3, 2024

fyi: I will be off for the rest of April and, therefore, wouldn't be able to finalize this PR. You might want to reach out to other committers or expect a delay in my responses.

@affo
Copy link
Contributor Author

affo commented Apr 11, 2024

@XComp Glad for your vacation!
Finally I also addressed the deprecation warnings and went through the implementation of a custom connector through DynamicTableSource.

It turned out to be quite tough, as probably it is not that common, or these new APIs are not super-well documented for now.

I wanted to use TableEnvironment.fromValues however, I could not use it as the test was hanging...
I want to understand why and, in case, file an issue for that.

Copy link

@morazow morazow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @affo !

I have added couple questions

Copy link
Contributor

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me 👍 The test runs locally and in CI as well.

$ ./mvnw -Prun-end-to-end-tests -pl flink-end-to-end-tests/flink-batch-sql-test verify -Dfast

I guess it's ready to be merged. I have a few minor things/questions, though.

@affo
Copy link
Contributor Author

affo commented May 23, 2024

@XComp Hello!

Final touches done and your comments are addressed 👍
I added the capability for FromElementsSource to accept a ElementsSupplier at construction phase.

The problem for which I had to implement a Serializable extend RowData was due to the fact that the FromElementsSource had a field List<OUT> elements, where OUT can also be non-serializable (which is the case of RowData), so, when the job was starting the operator couldn't be serialized.

I made it accept a ElementsSupplier extends Serializable so that it is clear that the supplier should be serializable.

In my use case, I simply made the quirk that I preserved the previous implementation using Row (which is serializable) and just convert it to RowData on get. No class has now RowData fields that prevent their serializability.

Copy link
Contributor

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍 Thanks for you contribution. One minor thing on the InternalGenerator. Feel free to reject my proposal

Copy link
Contributor

@XComp XComp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for keeping up with me here. The PR looks good overall modulo CI. 👍 Let's wait for CI to pass and we should be able to merge the change.

@XComp
Copy link
Contributor

XComp commented May 29, 2024

CI test failure is unrelated: FLINK-34513

@XComp
Copy link
Contributor

XComp commented May 29, 2024

@flinkbot run azure

@XComp
Copy link
Contributor

XComp commented May 30, 2024

I'm not gonna wait for another CI round. Looks like the CI bot didn't pick up the rerun command. Anyway, I verified that the test ran (see logs).

@XComp
Copy link
Contributor

XComp commented May 30, 2024

One final thing: I wasn't able to do it myself somehow. Can you change the commit message prefix from [refactor] to [FLINK-20398]? "refactor" isn't a prefix the Flink community usually use.

@affo
Copy link
Contributor Author

affo commented May 30, 2024

@XComp done!

Don't worry in any case, I loved the review process. This is my first contribution and this is part of learning for next ones 🤝

Copy link
Contributor

@JingGe JingGe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @affo for taking care of it. I just left some comments. PTAL.

@EnumSource(
value = BatchShuffleMode.class,
names = {
"ALL_EXCHANGES_BLOCKING",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to add comment that only ALL_EXCHANGES_BLOCKING, ALL_EXCHANGES_HYBRID_FULL, and ALL_EXCHANGES_HYBRID_SELECTIVE are supported by the adaptive batch scheduler.

int keyIndex = 0;
long ms = 0;
while (ms < durationMs) {
elements.add(createRow(keyIndex++, ms, offsetMs));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new implementation will consume more memory than the old one which will generate row iteratively on the fly. This could be a potential issue for large data volume batch tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to add comment that only ALL_EXCHANGES_BLOCKING, ALL_EXCHANGES_HYBRID_FULL, and ALL_EXCHANGES_HYBRID_SELECTIVE are supported by the adaptive batch scheduler.

Definitely 👍

The new implementation will consume more memory than the old one which will generate row iteratively on the fly. This could be a potential issue for large data volume batch tests.

Yep, now all records are generated and then used during execution, while before records were generated on the fly.
It is still possible to have such an implementation, however I am going to add context:

The PR started with a port from bash to Java.
The first pass was easy, but included the use of many deprecated method.
With @XComp we opted for improving that in a second commit.
While solving the deprecation warning, I decided to re-use the FromElementsSource already implemented in the test utils. However, that source is meant to be fault-tolerant, so, it requires to be able to get any record produced by offset, hence the List of elements created is necessary.

Truth be told, for this test, no fault-tolerance mechanism is required. I could have another implementation of that without that strict requirement that can use records on the fly and forget about those.

@JingGe @XComp should I proceed? Thank you!

Copy link
Contributor

@JingGe JingGe Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reply. Commonly, batch processing does not rely on offset. Would you please help me understand why the source should be fault-tolerant and requires getting record by offset for batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingGe yeah, nothing related strictly to this case.

The FromElementsSource is actually generic and can be used and is used in the streaming case.

Here I am using it in batch table mode.

I am just reusing that as it is possible 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, I could have another implementation of the bounded source without any fault-tolerance guarantee 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingGe @XComp

Got it, the number of records is not huge, that's why I did not mention that 👍
However, I understand your concerns as well 👍

At this point I would write another generator as part of this PR.

However, I would provide that as part of test-utils rather than only confined to batch as other tests could benefit from that.

What do you guys think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great! Please feel free to create a follow up ticket and contribute the new generator with a new PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, the number of records is not huge, that's why I did not mention that 👍

True - that's a valid point. I didn't check the number of elements as part of my last comment. I leave the decision up to you whether it's done in a new PR or part of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingGe @XComp Thank you for the feedback!

@XComp I would merge this is as it is.

In the brackground I was already working on something similar, I will create another issue for adding a test source for batch tests and for Table API

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingGe any objections? The refactoring should be ok considering that the amount of data involved is quite low. The actual migration from bash to Java is also done in a separate commit which enables us to revert if we feel it's necessary. WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants