New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-29830][Connector/Pulsar] Create the topic with schema before consuming messages in PulsarSinkITCase #21252
Conversation
The CI failed in e2e tests. But it's not related to Pulsar. So I didn't trigger the CI again. |
…onsuming messages in PulsarSinkITCase.
79cf1b3
to
dca0e2e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @syhily for this PR. I have a few comments. Please find them below.
...src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
Show resolved
Hide resolved
...src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
Show resolved
Hide resolved
this.desiredCounts = messageCounts; | ||
this.consumedRecords = Collections.synchronizedList(new ArrayList<>(messageCounts)); | ||
this.deadline = new AtomicLong(timeout.toMillis() + System.currentTimeMillis()); | ||
this.executor = Executors.newSingleThreadExecutor(); | ||
ConsumerBuilder<String> consumerBuilder = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConsumerBuilder<String> consumerBuilder = | |
final ConsumerBuilder<String> consumerBuilder = |
nit
if (exception != null) { | ||
LOG.error("Error in consuming messages from Pulsar."); | ||
LOG.error("", exception); | ||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like there is a non-optimal separation of concerns here: we shouldn't log in a method that checks the state but rather have the logging be done in the calling method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I think you are right.
.subscriptionType(Exclusive) | ||
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest); | ||
this.consumer = sneakyClient(consumerBuilder::subscribe); | ||
this.throwableException = new AtomicReference<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about working with CompletableFutures
here? It feels more natural to use rather than utilizing an AtomicReference
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok to use CompletableFuture
with better readability.
.subscriptionMode(Durable) | ||
.subscriptionType(Exclusive) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.subscriptionMode(Durable) | |
.subscriptionType(Exclusive) | |
.subscriptionMode(SubscriptionMode.Durable) | |
.subscriptionType(SubscriptionType.Exclusive) |
nit: The enum usage here is not obvious because the enum identifiers are not uppercased. Could we replace the static import for these with a qualified access to improve readability?
private final int desiredCounts; | ||
// This is a thread-safe list. | ||
private final List<String> consumedRecords; | ||
private final AtomicLong deadline; | ||
private final ExecutorService executor; | ||
private final Consumer<String> consumer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Consumer is only accessed in the constructor. I don't see a necessity to create a dedicated field for that one. Or am I missing something in this aspect? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Consumer
is created and used in ControlSource
with an SharedReference
. Because we may use it both in a Flink testing instance and the testing code for consuming the messages from Pulsar and judge if we can stop this application.
It's quite ok to remove it. But I prefer to keep it with a close method. I'll add it in next commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_ci_connect_2
passed. So I believe the necessary part of tests related to this patch pass.
Merging...
Style concern, if it's not enforced by the spotless plugin, varies from person to person. I believe who writes the most part of the code has the right to write code in his/her style.
Patching onto style is welcome, but it's not a blocker to the current patch.
Merge since the functionality part somehow blocks #21249.
…onsuming messages in PulsarSinkITCase. (apache#21252)
…onsuming messages in PulsarSinkITCase. (apache#21252)
Thanks for taking over @tisonkun. I was offline for a week anyway. So, good that you unblocked PR #21249 by merging this PR.
I agree with your view on that. Most of my comments were of cosmetic nature and not really blocking the PR. I created a follow-up issue FLINK-30109 to discuss the sneaky utility methods, though. But please keep in mind to also maintain the corresponding Jira issue properly. This Jira issue is still open. No backports are created. @syhily may you create the backport PRs to finalize FLINK-29830? |
…onsuming messages in PulsarSinkITCase. (apache#21252)
…onsuming messages in PulsarSinkITCase. (apache#21252)
…onsuming messages in PulsarSinkITCase. (#21252)
…onsuming messages in PulsarSinkITCase. (#21252)
…onsuming messages in PulsarSinkITCase. (apache#21252)
…onsuming messages in PulsarSinkITCase. (apache#21252)
…onsuming messages in PulsarSinkITCase. (apache#21252)
…onsuming messages in PulsarSinkITCase. (apache#21252)
…onsuming messages in PulsarSinkITCase. (apache#21252) (#4) Co-authored-by: Yufan Sheng <yufan@streamnative.io>
…onsuming messages in PulsarSinkITCase. (apache#21252) (#4) Co-authored-by: Yufan Sheng <yufan@streamnative.io>
See apache/flink#21252 for a similar fix
What is the purpose of the change
The tests in
PulsarSinkITCase
may failed with some error logs like below. This is caused by we may consume messages on a blank topic with no schema. Pulsar topic's schema is define by the first message sent to it. Or you can create schema on topic manually.In this test, we will manually create the topic schema before the test which fixes this race condition.
Brief change log
PulsarSinkITCase
.ControlSource
for better debugging.Verifying this change
This change is already covered by existing tests, such as:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation