-
Notifications
You must be signed in to change notification settings - Fork 13.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-6988] flink-connector-kafka-0.11 with exactly-once semantic #4239
Conversation
723a5be
to
35ee552
Compare
46b3b68
to
2cf5f3b
Compare
* | ||
* <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer | ||
* is constructed. That means that the client that submits the program needs to be able to | ||
* reach the Kafka brokers or ZooKeeper.</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This NOTE is no longer valid and can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it also true for 0.10?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I have a separate PR which cleans that up for all Kafka versions.
* | ||
* <p>Details about approach (a): | ||
* Pre Kafka 0.11 producers only follow approach (a), allowing users to use the producer using the | ||
* DataStream.addSink() method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Pre Kafka 0.11 producers only follow approach (a)" is incorrect.
Kafka 0.10 also supports hybrid.
*/ | ||
private boolean logFailuresOnly; | ||
|
||
private Semantic semantic; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: also include Javadoc for consistency.
/** Number of unacknowledged records. */ | ||
private final AtomicLong pendingRecords = new AtomicLong(); | ||
|
||
private final Map<String, KafkaMetricMuttableWrapper> previouslyCreatedMetrics = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: also include Javadoc for consistency.
* @param <TXN> Transaction to store all of the information required to handle a transaction (must be Serializable) | ||
*/ | ||
@PublicEvolving | ||
public abstract class TwoPhaseCommitSinkFunction<IN, TXN extends Serializable> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like the idea of introducing this abstraction :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, though, I would like to see unit tests specifically for this TwoPhaseCommitSinkFunction
class.
// was triggered) and because there can be concurrent overlapping checkpoints | ||
// (a new one is started before the previous fully finished). | ||
// | ||
// ==> There should never be a case where we have no pending transaction here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets move this comment block as a Javadoc on the method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why do you think so? This is a purely implementation detail, nothing that should bother the user of this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, makes sense. No strong objection here, can keep as is.
// checkpoint (temporary outage in the storage system) but | ||
// could persist a successive checkpoint (the one notified here) | ||
// | ||
// - other (non Pravega sink) tasks could not persist their status during |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it required to mention Pravega here?
* Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.11.x. By default producer | ||
* will use {@link Semantic.EXACTLY_ONCE} semantic. | ||
* | ||
* <p>Implementation note: This producer is a hybrid between a regular regular sink function (a) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this implementation really support the hybrid modes?
As far as I can understand it, FlinkKafkaProducer011
only extends TwoPhaseCommitSinkFunction
, which doesn't support the hybrid modes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, according to all of the tests it passes.
(b) version works by passing instance of FlinkKafkaProducer011
as aSinkFunction
in the KafkaStreamSink<IN> extends StreamSink<IN>
class. Under the hood this StreamSink
makes some checking if SinkFunction
actually implements various versions of checkpointing interfaces and in that way it calls the appropriate methods on FlinkKafkaProducer011
.
// for the reasons discussed in the 'notifyCheckpointComplete()' method. | ||
|
||
pendingCommitTransactionsState = context.getOperatorStateStore().getSerializableListState("pendingCommitTransactions"); | ||
pendingTransactionsState = context.getOperatorStateStore().getSerializableListState("pendingTransactions"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getSerializableListState
is deprecated and discouraged usage.
I would recommend that implementations may also pass in either the TypeInformation
or their own TypeSerializer
for the transaction state holder.
Thanks a lot for opening a pull request for this very important feature @pnowojski. Most notably, some comments so far:
|
Regarding how I would proceed with this big contribution:
|
One other comment regarding the commits: What do you think? |
For |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added tests for TwoPhaseCommitFunction
and opened new PR #4368 for that - however please check responses to your comments here before moving to that new PR.
* | ||
* <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer | ||
* is constructed. That means that the client that submits the program needs to be able to | ||
* reach the Kafka brokers or ZooKeeper.</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it also true for 0.10?
// was triggered) and because there can be concurrent overlapping checkpoints | ||
// (a new one is started before the previous fully finished). | ||
// | ||
// ==> There should never be a case where we have no pending transaction here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why do you think so? This is a purely implementation detail, nothing that should bother the user of this class.
a7a05e5
to
3e635d4
Compare
please add an entry to the |
Now that the prerequisite PRs are merged, we can rebase this now :) |
cc48a21
to
35cc64f
Compare
How does exactly-once sink handle large gap between |
I think there is no way we can to handle it in any different way then to increase the timeout to some very large value. Or is it? |
public void initializeState(FunctionInitializationContext context) throws Exception { | ||
availableTransactionalIds.clear(); | ||
for (int i = 0; i < kafkaProducersPoolSize; i++) { | ||
availableTransactionalIds.add(UUID.randomUUID().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably better to reuse stored ids rather than creating new ones each time. I am thinking of a case where a task goes into crash loop dying even before first commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that makes sense, but I guess its mostly due to that the current code isn't differentiating between used and unused transaction ids in the state. If we differentiate that, it would be possible to reuse stored ids.
Piotr, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a valid issue, however on it's one this solution would not be enough. It would not work for a case when we first (1) scale down, then we (2) scale up. On event (2), we would need to create new transactional ids, but we wouldn't know from which id we can start.
However I think we can deduce the starting point for new IDs using getUnionListState
to track down globally what is the next available transactional id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we wouldn't know from which id we can start.
Not sure if you need 'start id'. You can just abort all of them whether they are any open transactions or not (in fact if you open a new producer with the id, Kafka aborts any that are open). This is mainly a for clarification, will leave it to you guys to decide on specifics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the follow-up revision @pnowojski.
I think the latest approach we're going for seems sane.
I've only checked the code on the producer side. I'm assuming that other codes (consumer, table sink / sources) are mostly identical to the other versions. From what I see, I think this is almost mergeable, minus some comments I had left inline.
* <li>{@link #NONE}</li> | ||
*/ | ||
public enum Semantic { | ||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line before comment block.
* <li>increase size of {@link FlinkKafkaProducer}s pool</li> | ||
*/ | ||
EXACTLY_ONCE, | ||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line before comment block.
* to be acknowledged by the Kafka producer on a checkpoint. | ||
*/ | ||
AT_LEAST_ONCE, | ||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line before comment block.
/** | ||
* Default number of KafkaProducers in the pool. See {@link Semantic#EXACTLY_ONCE}. | ||
*/ | ||
public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you briefly describe the reason of the number 5?
Why not use numConcurrentCheckpoints + 1
(as we discussed offline)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I remember the reason was that it is not easy/not possible at the moment to get this information in the operator. It should be a follow up work. Regardless of this, code of this operator would look the same (because we don't have guarantees for the notifyCheckpointComplete
to always reach us on time).
/** | ||
* Pool of transacional ids backed up in state. | ||
*/ | ||
private ListState<String> transactionalIdsState; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably make this transient
also for documentation purposes.
availableTransactionalIds.add(UUID.randomUUID().toString()); | ||
} | ||
|
||
super.initializeState(context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we initialize the base TwoPhaseCommitSink
first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope :(
this.kafkaProducer = kafkaProducer; | ||
} | ||
|
||
// TODO: is this used anywhere? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this variant is used when using writeToKafkaWithTimestamps
private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class); | ||
|
||
private final KafkaProducer<K, V> kafkaProducer; | ||
@Nullable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line before this field annotation.
if (!(producerId >= 0 && epoch >= 0)) { | ||
throw new IllegalStateException(String.format("Incorrect values for producerId [%s] and epoch [%s]", producerId, epoch)); | ||
} | ||
LOG.info("Attempting to resume transaction with producerId [%s] and epoch [%s]", producerId, epoch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{}
instead of [%s]
s
|
||
public void resumeTransaction(long producerId, short epoch) { | ||
if (!(producerId >= 0 && epoch >= 0)) { | ||
throw new IllegalStateException(String.format("Incorrect values for producerId [%s] and epoch [%s]", producerId, epoch)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use Preconditions.checkState(...)
here.
5ced4ea
to
dfb7e24
Compare
I did a first high-level review of the code. I think it's good so far! Before we can merge this, however, we need a few more things around it:
Also, this has interplay with #4616 but we can resolve that by merging them in any order and fixing up the later changes when merging. |
cbfc50d
to
e2d477f
Compare
What were the bugs that you fixed? |
Bugs in tests (those that you can see in fixup commits) |
We are really looking forward to this 👍 |
ed98a07
to
32ff813
Compare
@aljoscha rebased on latest master and integrated your changes |
Merged! 😃 Could you please close this PR? |
Thanks :) |
First four commits are from #4557 and #4561.