-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test #7896
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
| new Thread( | ||
| () -> { | ||
| try { | ||
| env.execute(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tillrohrmann the test currently fails (after it actually succeeded) because the job isn't cancelled and generates exceptions in the log (after Kinesalite was terminated). I could not quite figure out how I can accommodate both the verification and execution within the main method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This issue is resolved by stopping the flink cluster before Kinesalite in the script. But please see below for need for System.exit(0) to bypass the CLI job check. Is there a better way to fork the job and have main run the test logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check for exceptions in the log is a piece of questionable design that is hopefully improved in the future. I think so far, we added certain exceptions that can safely be ignored to a white-list in the script function that checks the logs for exceptions. So you could add the Kinesalite exception there to not have it fail your test. If the exception is of a very generic type ... this is where the questionable design part becomes problematic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer an issue. The exceptions are avoided with graceful shutdown, terminating the job prior to Kinesalite.
...aming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
Outdated
Show resolved
Hide resolved
e1c4ed4 to
0811e2e
Compare
...aming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
Outdated
Show resolved
Hide resolved
...aming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
Outdated
Show resolved
Hide resolved
...-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java
Outdated
Show resolved
Hide resolved
58994a3 to
20eeb11
Compare
b9af7ee to
f57c396
Compare
| executeException.set(e); | ||
| } | ||
| }); | ||
| executeThread.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to also join on this thread somewhere later, maybe before we enter the readMessages part (assuming the generator will not block, if it does join can still happen somewhere later).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is possible before the loop, we can also check the exception status just once right there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thread that runs the job does not terminate (I don't have a reference to the job and cannot cancel it). The job needs to start after the streams are created and run in parallel to the validation logic. Once the expected results are in, the driver exits and the job/cluster is terminated from the script, prior to terminating Kinesalite.
|
@StefanRRichter could you approve the PR or let me know if anything else should be done here? |
77999cd to
05672ba
Compare
|
I recently opened a PR containing a prototype of a java-based E2E framework here. It may make sense to delay this PR until mine has been approved. |
|
@StefanRRichter thanks for taking a look. It's good to learn that we have an effort to provide a Java based e2e framework. It should simplify new tests and make existing ones more maintainable. This Kinesis test was under discussion for a while and it is a current gap in coverage. I would like to merge it and also backport it to 1.8.x. Happy to make changes to adopt the new framework when it is ready, but we should not block on it. As for separating test logic and program, that's more or less already the case. The pipeline code, the test driver and the Kinesis client (in the future "resource") live in separate classes. Should be easy to port to the new framework when it is ready. |
|
Test looks good to me, if @zentol is ok with delaying the port to the new framework I would approve and merge. |
|
I won't block this PR. |
StefanRRichter
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
What is the purpose of the change
This PR adds an end to end test for Kinesis consumer and producer, similar to what we already have for Kafka. The test is based on Kinesalite (running as docker container).
Brief change log
Verifying this change
This change adds a test and can be verified as follows:
The test can be run as single test via:
Does this pull request potentially affect one of the following parts:
This change is test code only.
Documentation