New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-20732][connector/pulsar] Introduction of Pulsar Sink #17452
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 43ffad7 (Mon Oct 11 15:34:58 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There is still a lot of room for improvement in this part of PulsarSink's state saving and recovery, so please help! |
@flinkbot run azure |
Thanks for opening this. I will have a look in the second half of this week. |
@AHeise How are you doing? Can you find the time to help me with the review? I am very much looking forward to it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did the first pass and it seems the sink is somewhat unfinished. Currently, the PulsarWriter does not recover the state, please also add a test for the recovery. The current way of transaction management is hard to understand without a docstring in the Sink.
You can always look at the KafkaSink for some inspiration for tests or guidance on how we deal with transactions. Unfortunately, I am not very familiar with pulsars transaction management.
package org.apache.flink.connector.pulsar.common.schema; | ||
|
||
/** Exception designates the incompatibility between pulsar and flink type. */ | ||
public class IncompatibleSchemaException extends RuntimeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please mark as @Internal
super(message, e); | ||
} | ||
|
||
public IncompatibleSchemaException(String message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ctor is unused?
|
||
private final Configuration configuration; | ||
|
||
public PulsarSink( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constructor can probably be package-private
this.deliveryGuarantee = deliveryGuarantee; | ||
this.topicSelector = topicSelector; | ||
this.serializationSchema = serializationSchema; | ||
this.partitionSelector = partitionSelector; | ||
this.configuration = configuration; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually use Preconditions.checkNotNull()
for to ensure ctor parameters are not null
import java.util.Optional; | ||
|
||
/** | ||
* a pulsar Sink implement. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add more information about the capabilities of the sink? You can have a look at the KafkaSink for a reference :)
return pulsarClient; | ||
} | ||
|
||
protected Producer<?> getProducer(String topic) throws PulsarClientException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why protected?
closer.close(); | ||
} | ||
|
||
public void producerFlush() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private?
CompletableFuture<MessageId> messageIdFuture = messageBuilder.sendAsync(); | ||
pendingRecords.incrementAndGet(); | ||
futures.add(messageIdFuture); | ||
messageIdFuture.whenComplete(sendCallback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take look at the AsyncSinkWriter
how it uses the mailbox to enqueue asynchronous tasks. By using the mailbox you can get rid of all the manual synchronization because the mailbox executes the writer and you callbacks in the same thread.
}; | ||
} | ||
|
||
protected BiConsumer<MessageId, Throwable> initializeSendCallback( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected?
private void acknowledgeMessage() { | ||
synchronized (pendingRecords) { | ||
if (pendingRecords.decrementAndGet() == 0L) { | ||
pendingRecords.notifyAll(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain this block? In general, synchronization should not be necessary if everything is executed in the mailbox.
Thanks @fapaul for all the review. We'll update the PR in the following times. |
8e3d80b
to
41ed8df
Compare
ef5f521
to
c8820e3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fapaul All the sink codes for Pulsar should be ready. Let's kick off the second review.
...connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
Show resolved
Hide resolved
...or-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
Show resolved
Hide resolved
bbf72ae
to
59f46a3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this PR is a bit oversize, I'll divide it to multiple reviews~ This part is for
- pom
- PulsarConfigBuilder and PulsarConfigValidator (config related)
- support for PULSAR_REQUEST_TIMEOUT
* | ||
* @return a Pulsar sink builder. | ||
*/ | ||
@SuppressWarnings("java:S4977") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind provide more context why we have this @SuppressWarnings annotation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used to pass the SonarLint
. Sonar thought this method shouldn't cover other type annotation. https://jira.sonarsource.com/browse/SONARJAVA-2961
@@ -36,12 +36,14 @@ under the License. | |||
<packaging>jar</packaging> | |||
|
|||
<properties> | |||
<pulsar.version>2.8.0</pulsar.version> | |||
<pulsar.version>2.9.1</pulsar.version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Once sink and source are completed, we can document the pulsar versions
<commons-lang3.version>3.11</commons-lang3.version> | ||
<grpc.version>1.33.0</grpc.version> | ||
<pulsar-commons-lang3.version>3.11</pulsar-commons-lang3.version> | ||
<pulsar-zookeeper.version>3.6.3</pulsar-zookeeper.version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are Netty and zookeepers used in test only ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. They are required only for Pulsar Broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a heads up, Pulsar is trying to remove the hard binding of ZK.
Later release should allow standalone pulsar running without ZK.
<dependency> | ||
<groupId>org.apache.pulsar</groupId> | ||
<artifactId>pulsar-client-all</artifactId> | ||
<version>${pulsar.version}</version> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>com.sun.activation</groupId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why we need to exclude these dependencies ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These dependencies are just used for annotation which should be working on the broker side. It's not required on the client side.
<version>${grpc.version}</version> | ||
<version>${pulsar-grpc.version}</version> | ||
<type>pom</type> | ||
<scope>import</scope> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use import
scope to align grpc version to whatever pulsar's grpc version is, is my understanding correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep.
<dependency> | ||
<groupId>io.netty</groupId> | ||
<artifactId>netty-bom</artifactId> | ||
<version>${pulsar-netty.version}</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For dependencies, wondering have we tested there is no obvious runtime class issues ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have checked the dependencies in flink-connector-pulsar
, we only use netty in tests. There is no compiled dependency for netty.
// requestTimeoutMs don't have a setter method on ClientBuilder. We have to use low level | ||
// setter method instead. So we put this at the beginning of the builder. | ||
Integer requestTimeoutMs = configuration.get(PULSAR_REQUEST_TIMEOUT_MS); | ||
builder.loadConf(singletonMap("requestTimeoutMs", requestTimeoutMs)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just a reminder to add this to doc (if not already ~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been added to the document. It was generated by flink-docs
.
* </ul> | ||
*/ | ||
@Internal | ||
public class PulsarConfigValidator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class looks ok to me. Would be nice to have some basic test case for it ~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This review includes non transaction code path:
- Builder
- Writer
} | ||
|
||
/** | ||
* Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment should be sink I guess?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. This is my mistake.
if (ex != null) { | ||
mailboxExecutor.execute( | ||
() -> { | ||
throw new FlinkRuntimeException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering how will this Flink runtime exception propagate ? It's in the mailbox executor, I think it will continue but not stop the job ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope. The mailbox executes the writer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it means that if the mailbox executor throws a FlinkRuntimeException, the writer will not exit and continues , is that correct ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This round:
- Writer and metadata listener
- Schema related classes
The schema part is similar to source, so I ran through it quickly ~
LOG.debug("Start committing the transaction {} for topic {}", txnID, topic); | ||
try { | ||
coordinatorClient.commit(txnID); | ||
} catch (TransactionNotFoundException | CoordinatorNotFoundException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For these 2 exceptions, why do we rethrow it ? Which class is the final handlers of these two exceptions ? Curious because I saw this is the only place we catch these 2 exceptions. I guess these 2 are nonretrayble transaction failures and should be treated to stop pipeline ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the committers and writers are handled in org.apache.flink.streaming.runtime.operators.sink.SinkOperator
and share the same thread. Throwing the exception here would crash the pipeline and restart the application.
TransactionNotFoundException
should be treated as an internal failure cause we can't find the transaction which was created in the writer. This often occurs when the timeout for the transaction is smaller than the checkpoint interval.
CoordinatorNotFoundException
only happens when the client doesn't enable the transaction or the broker doesn't enable the transaction.
So we just throw these two transactions which are non-retryable.
* DeliveryGuarantee#NONE} and {@link DeliveryGuarantee#AT_LEAST_ONCE}. So we couldn't create | ||
* the Pulsar client immediately. | ||
*/ | ||
private TransactionCoordinatorClient transactionCoordinatorClient() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not used, will it be used in the future ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a mistake from my side. This method should be used in the commit
method.
message = new RawMessage<>(bytes); | ||
} | ||
|
||
Long eventTime = sinkContext.timestamp(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we put the event time in when we do the serialization, wondering what is the design considerations here? Asking because I feel like adding event time can happen in a different step
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should give connector users the ability to set the event time. This is just a default implementation that can be overridden.
|
||
/** | ||
* Return all the available topic partitions. We would recalculate the partitions if the topic | ||
* metadata hsa been changed. Otherwise, we would return the cached result for better |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo hsa in the comment
*/ | ||
public List<String> availableTopics() { | ||
if (availableTopics.isEmpty() | ||
&& (!partitionedTopics.isEmpty() || !topicMetadata.isEmpty())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just out of curiousity, we evaluate availableTopics when we call it (evaluate: retrieving data from topicMetadata) in this method instead of in updateTopicMetadata(), is there any special reason to do so ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should point out your mistake. We only evaluate the availableTopics
when it needs updating. This is a cached result, which shouldn't be created every time we call this method.
Putting the evaluate logic here is just my design flavor. XD
@@ -97,16 +94,14 @@ | |||
* PulsarSourceBuilder}. | |||
*/ | |||
PulsarSource( | |||
Configuration configuration, | |||
SourceConfiguration sourceConfiguration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For source related changes, I guess the changes to the source is related to sink because we want to align the naming conventions and hierarchy , but maybe consider putting in a separate commit ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Since the code is still in progress. We can separate the commits after everything is done.
<commons-lang3.version>3.11</commons-lang3.version> | ||
<grpc.version>1.33.0</grpc.version> | ||
<pulsar-commons-lang3.version>3.11</pulsar-commons-lang3.version> | ||
<pulsar-zookeeper.version>3.6.3</pulsar-zookeeper.version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a heads up, Pulsar is trying to remove the hard binding of ZK.
Later release should allow standalone pulsar running without ZK.
} | ||
|
||
/** | ||
* Get a option related config value. We would return default config value which define in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: which is defined
0a409ce
to
067d456
Compare
Tks for your detailed review. It seems like most of your question is just because you aren't familiar with Pulsar. I'll explain the flush behavior and message delayer in the review comments. First of all, let me explain the mechanism for Pulsar to abort the potential lingering transactions. Writing message with transactionPulsar isn't like Kafka in writing messages with a transaction, the pending transactions will not block the messages written in other transactions. The transaction is just a small mark ( The message, written in a specified Pulsar transaction, wouldn't be seen by the consumer until this transaction was committed. That means we don't need to abort the potential lingering transactions.
Message delayerDelayed message delivery would confuse someone like you by its naming. People would think that this behavior would happen on the client-side ( But this is wrong. The message with delayed configuration would be sent to Pulsar async. We didn't cache the message and sent it when meet the delay time. This is handled by Pulsar Bookie on the server-side. Test this feature is just like testing Pulsar. I don't think we need the test on this feature. |
b14f97d
to
8cc9d45
Compare
byte[] serialize(IN element, PulsarSinkContext sinkContext); | ||
|
||
/** Setting message metadata which would rarely be used for normal users. */ | ||
default void metadata(MetadataBuilder<IN> metadataBuilder, PulsarSinkContext sinkContext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to pass <IN> element
as an argument in the metadata(...)
method? Maybe there will be use cases where users want to calculate metadata from the element. Adding element here might give users more "food" to cook XD. WDYT~
eb7a4fd
to
a4bb0c4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PulsarSerializationSchema looks mostly good, great work. I left some last minor cleanup comments.
* @param sinkContext context to provide extra information. | ||
*/ | ||
PulsarMessage serialize( | ||
IN element, PulsarMessageBuilder builder, PulsarSinkContext sinkContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can remove the PulsarMessageBuilder
and only mention in the docstring how to construct the PulsarMessage
.
public PulsarMessageBuilder value(byte[] value) { | ||
this.schema = Schema.BYTES; | ||
this.value = value; | ||
return this; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Maybe it is easier to only offer the other builder method value(Schema<IN> schema, IN value)
and move the Schema.BYTES
delegation to the PulsarSerializationSchemaWrapper
and PulsarSchemaWrapper
.
Otherwise you should probably add some checks that only one of the value methods is called.
} | ||
|
||
public PulsarMessage build() { | ||
return new PulsarMessage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are some of the fields required? I'd recommend to either make them part of the ctor of PulsarMessageBuilder
or check that they are not null in build
.
private final byte[] orderingKey; | ||
private final String key; | ||
private final long eventTime; | ||
private final Schema<?> schema; | ||
private final Object value; | ||
private final Map<String, String> properties; | ||
private final Long sequenceId; | ||
private final List<String> replicationClusters; | ||
private final boolean disableReplication; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add @Nullable
to all optional fields + constructor + getter where applicable
private final String key; | ||
private final long eventTime; | ||
private final Schema<?> schema; | ||
private final Object value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use a generic type i.e. PulsarMessage<OUT>
? It probably has the same type as the Schema<?>
.
a4bb0c4
to
26013d5
Compare
You are right. Using a generic type of |
26013d5
to
38e9475
Compare
….9.1 1. Bump the pulsar-client-all version in pom file. 2. Exclude useless dependencies for pulsar-client-all. 3. Bump the Pulsar docker version. 4. Change the dependencies to pass the tests. 5. Drop PulsarTransactionUtils and fix compile issues in tests. 6. Add bouncycastle to Pulsar e2e tests.
38e9475
to
05023f3
Compare
… for Pulsar source and sink. 1. Define new PulsarConfiguration for common config class. 2. Define new PulsarConfigValidator for common config validation. 3. Merge SourceConfiguration and Configuration into one class. 4. Change source config options' description and regenerate the docs. 5. Fix the compile error in tests. 6. Drop Configuration in constructor parameters for all the source classes.
…ned Pulsar topics.
… matching ProducerConfigurationData.
…r better records serialization.
…st tools based on PulsarStandalone. 1. Drop some unused fields in test classes. 2. Fix the checkstyle issues for source test. 3. Fix violations for Pulsar connector according to the flink-architecture-tests. 4. Create a standalone Pulsar for test. 5. Add new methods to PulsarRuntimeOperator. 6. Fix the bug in PulsarContainerRuntime, support running tests in E2E environment. 7. Create PulsarContainerTestEnvironment for supporting E2E tests. 8. Add a lot of comments for Pulsar testing tools. 9. Drop mocked Pulsar service, use standalone Pulsar instead.
05023f3
to
a24b9c4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great effort 👍
The test failures is unrelated and caused by https://issues.apache.org/jira/browse/FLINK-26174 I rebased the branch locally and all pulsar tests + archUnit still pass |
What is the purpose of the change
This pull request completes the development of Pulsar based on the new Sink, which will enable smoother data interaction between Flink and Pulsar.
Brief change log
Introduction Pulsar Sink Connector :
Sink
,SinkWriter
,Committer
, and other interfaces are implemented.Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes)Documentation