-
Notifications
You must be signed in to change notification settings - Fork 13.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-22902] Port KafkaSink to FLIP-143 #16676
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 483a1a6 (Sat Aug 28 13:11:08 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
22e7bb8
to
0f0ab58
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for your contribution. The general structure looks good and I'm leaving a first impression of the production code.
...rs/flink-connector-base/src/main/java/org/apache/flink/connector/base/DeliveryGuarantee.java
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/streaming/connectors/kafka/sink/DefaultKafkaSinkContext.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/streaming/connectors/kafka/sink/FlinkKafkaInternalProducer.java
Outdated
Show resolved
Hide resolved
...onnector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSink.java
Show resolved
Hide resolved
...onnector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSink.java
Outdated
Show resolved
Hide resolved
...onnector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSink.java
Outdated
Show resolved
Hide resolved
...onnector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSink.java
Outdated
Show resolved
Hide resolved
...r-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaCommittable.java
Outdated
Show resolved
Hide resolved
...or-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSinkITCase.java
Outdated
Show resolved
Hide resolved
b13e3be
to
3b5f480
Compare
@AHeise thanks for your review. I have addressed all your comments, please have another look. |
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more detailed round. I have not deeply looked into the IT but the structure looks good and the covered cases should be sufficient.
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
Show resolved
Hide resolved
...rs/flink-connector-base/src/main/java/org/apache/flink/connector/base/DeliveryGuarantee.java
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/streaming/connectors/kafka/sink/DefaultKafkaSinkContext.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/streaming/connectors/kafka/sink/DefaultKafkaSinkContext.java
Show resolved
Hide resolved
...c/main/java/org/apache/flink/streaming/connectors/kafka/sink/FlinkKafkaInternalProducer.java
Outdated
Show resolved
Hide resolved
...nector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaWriter.java
Outdated
Show resolved
Hide resolved
class KafkaWriterState { | ||
private final int subtaskId; | ||
private final long transactionalIdOffset; | ||
private final String transactionalIdPrefix; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By storing it here, do you effectively allow users to change the prefix even when resuming from checkpoint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to store it here to abort transactions from previous runs. If the job is stopped and started with a new prefix the new one is used for all newly created states.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay nice. Do we want to expose it to the user that they may change the prefix or should we communicate that the prefix should remain stable? I'm assuming quite a few edge cases would not work well if a prefix is changed (think of lingering transactions opened before downscaling without recent checkpoint). So I would probably communicate that the prefix is supposed to be stable for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah downscaling before checkpoint case is definitely a problem, I can update the doc string to hint that the prefix should remain stable.
...r-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSinkBuilder.java
Show resolved
Hide resolved
* Exposes information about how man records have been emitted overall and at the beginning of a | ||
* checkpoint. | ||
*/ | ||
private static final class InfiniteIntegerSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you just use a env.fromSequence(0, Long.MAX_VALUE)
with a chained map that implements this functionality? You are making our future lives harder :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not see how I can easily replace it because I am relying on the fact that the Source finishes after the first checkpointCompleted
event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah correct. Let's keep it this way then.
...-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaWriterITCase.java
Outdated
Show resolved
Hide resolved
e8c6ec0
to
5f63691
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more nits.
/** | ||
* Adapter between {@link Sink.InitContext} and {@link SerializationSchema.InitializationContext}. | ||
*/ | ||
public class InitContextInitializationContextAdapter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that this is implemented similarly to RuntimeContextSerializationInitializationContextAdapter
but has the same flaws.
In general, when implementing a small class it should not rely on a big class to be injected. That makes reuse and testing much harder. In this case, it would be much better to just pass the UserCodeClassLoader
and the MetricGroup
directly and have no dependency to the InitContext
.
Now the metric group should only be added when it's actually needed, so here I would rather use a Supplier<MetricGroup>
(it doesn't make sense that the InitContext
is passed to this Adapter and then passed back to the mapMetricGroup
; the InitContext
is by definition available on call-site)
So signature should be
public InitContextInitializationContextAdapter(
Supplier<MetricGroup> metricGroupSupplier, UserCodeClassLoader userCodeClassLoader)
Finally we should cache the result from the Supplier most easily by using Supplier#memoize
of Guava. I'd probably wrap the ctor parameter before assigning it to the field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should be in flink-connector-base instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should be in flink-connector-base instead.
It can also live in flink-connector-base
I just refrained from putting it there because there is no sink-specific code yet and all these adapters are currently also in core.
@@ -84,4 +92,32 @@ public static void waitUntilJobInitializationFinished( | |||
() -> clusterClient.requestJobResult(id).get(), | |||
userCodeClassloader); | |||
} | |||
|
|||
public static File getMostRecentCompletedCheckpoint(File checkpointDir) throws IOException { | |||
return Files.find(checkpointDir.toPath(), 2, TestUtils::isCompletedCheckpoint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI there is a bug here https://issues.apache.org/jira/browse/FLINK-23647. But we would fix it with that ticket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which ticket do you mean by that
ticket?
private void testRecoveryWithAssertion( | ||
DeliveryGuarantee guarantee, java.util.function.Consumer<List<Long>> recordsAssertion) | ||
throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, there is another pattern that can be used to implement such a wrapping setup/cleanup code.
AutoCloseableResult testRecovery(DeliveryGuarantee guarantee) {
// execute common code
result = // fetch result
AutoCloseable after = () -> { // after code };
return wrap(result, after);
}
You can then use the return value in a try-and-resource and add all your assertions in the block. It has huge benefits over your pattern when you have checked exceptions and often is easier on the eye when the auto-formatter went over it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, I get the idea. I will try to facilitate it next time or do you want to have the tests refactored?
* | ||
* @param transactionalIdPrefix prefix for the id | ||
* @param subtaskId describing the subtask which is opening the transaction | ||
* @param offset an always incrementing number usually capturing the number of checkpoints taken |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be confused with Kafka offset. Maybe use seq number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed it to checkpointOffset
and I hope the docstring for the parameter explains enough to make it apparent it has nothing to do with the partition offset
public static String buildTransactionalId( | ||
String transactionalIdPrefix, int subtaskId, long offset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we could also have an instantiable TransactionalIdFactory
with constant transactionalIdPrefix and subtaskId. You could then have a pre-computed subtask prefix consisting of
prefix = sb.append(transactionalIdPrefix)
.append(TRANSACTIONAL_ID_DELIMITER)
.append(subtaskId)
.append(TRANSACTIONAL_ID_DELIMITER)
```
Then this method would just `return prefix + offset`;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, it means we need to instantiate the factory basically for every transaction. What would be the benefit?
...a/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/TransactionalIdFactory.java
Outdated
Show resolved
Hide resolved
…Schema.InitializationContext
This commit introduces a new KafkaSink which is based on FLIP-143.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thank you very much!
What is the purpose of the change
This commit introduces a new KafkaSink which is based on FLIP-143.
Brief change log
Besides adding the new KafkaSink the PR has the following additional commits.
Verifying this change
The changes are covered by multiple unit tests and also integration tests against a real Kafka cluster.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation
We also plan to add the documentation which is tracked as part of https://issues.apache.org/jira/browse/FLINK-23664