Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31363] Do not checkpoint a KafkaCommittable if the transaction was empty #15

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,8 +17,11 @@

package org.apache.flink.connector.kafka.sink;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.errors.ProducerFencedException;
Expand All @@ -33,6 +36,7 @@
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.Future;

import static org.apache.flink.util.Preconditions.checkState;

Expand All @@ -49,6 +53,7 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {

@Nullable private String transactionalId;
private volatile boolean inTransaction;
private volatile boolean hasRecordsInTransaction;
private volatile boolean closed;

public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) {
Expand All @@ -67,6 +72,14 @@ private static Properties withTransactionalId(
return props;
}

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
if (inTransaction) {
hasRecordsInTransaction = true;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this boolean be set in the callback, in the successful send scenario?

Copy link
Contributor Author

@tzulitai tzulitai Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, that's a good point. I think the question to ask is: is it incorrect to set this flag (to allow a KafkaCommittable to be generated for the txn at pre-commit time) preemptively, instead of only setting it when data has actually been successfully written?

I think the answer is that it is not incorrect, so it is ok to leave this as is. Reasoning is as follows:

  • At pre-commit time and performing flush, if some data failed to be flushed, the pre-commit will fail so a KafkaCommittable will not be checkpointed for the txn anyways. In this scenario, the hasRecordsInTransaction flag is irrelevant no matter its value.

  • If all records are correctly flushed, then good; a KafkaCommittable should be generated for the txn. We're good here because we've alraedy preemptively set the hasRecordsInTransaction flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speaking of which, it might give a more complete picture of this interaction after I rebase this PR branch on top of the latest changes (to include the fix that adds checkAsyncExceptions).

}
return super.send(record, callback);
}

@Override
public void flush() {
super.flush();
Expand All @@ -86,6 +99,7 @@ public void abortTransaction() throws ProducerFencedException {
LOG.debug("abortTransaction {}", transactionalId);
checkState(inTransaction, "Transaction was not started");
inTransaction = false;
hasRecordsInTransaction = false;
super.abortTransaction();
}

Expand All @@ -94,13 +108,18 @@ public void commitTransaction() throws ProducerFencedException {
LOG.debug("commitTransaction {}", transactionalId);
checkState(inTransaction, "Transaction was not started");
inTransaction = false;
hasRecordsInTransaction = false;
super.commitTransaction();
}

public boolean isInTransaction() {
return inTransaction;
}

public boolean hasRecordsInTransaction() {
return hasRecordsInTransaction;
}

@Override
public void close() {
closed = true;
Expand Down Expand Up @@ -302,8 +321,18 @@ public void resumeTransaction(long producerId, short epoch) {
transitionTransactionManagerStateTo(transactionManager, "READY");

transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION");

// the transactionStarted flag in the KafkaProducer controls whether
// an EndTxnRequest will actually be sent to Kafka for a commit
// or abort API call. This flag is set only after the first send (i.e.
// only if data is actually written to some partition).
// In checkpoints, we only ever store metadata of pre-committed
// transactions that actually have records; therefore, on restore
// when we create recovery producers to resume transactions and commit
// them, we should always set this flag.
setField(transactionManager, "transactionStarted", true);
this.inTransaction = true;
this.hasRecordsInTransaction = true;
}
}

Expand Down
Expand Up @@ -210,13 +210,22 @@ public void flush(boolean endOfInput) throws IOException, InterruptedException {

@Override
public Collection<KafkaCommittable> prepareCommit() {
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) {
return Collections.emptyList();
}

// only return a KafkaCommittable if the current transaction has been written some data
if (currentProducer.hasRecordsInTransaction()) {
final List<KafkaCommittable> committables =
Collections.singletonList(
KafkaCommittable.of(currentProducer, producerPool::add));
LOG.debug("Committing {} committables.", committables);
return committables;
}

// otherwise, we commit the empty transaction as is (no-op) and just recycle the producer
currentProducer.commitTransaction();
producerPool.add(currentProducer);
return Collections.emptyList();
}

Expand Down
Expand Up @@ -63,16 +63,15 @@ class FlinkKafkaInternalProducerITCase {
private static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG).withEmbeddedZookeeper();

private static final String TRANSACTION_PREFIX = "test-transaction-";

@Test
void testInitTransactionId() {
final String topic = "test-init-transactions";
final String transactionIdPrefix = "testInitTransactionId-";
try (FlinkKafkaInternalProducer<String, String> reuse =
new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) {
int numTransactions = 20;
for (int i = 1; i <= numTransactions; i++) {
reuse.initTransactionId(TRANSACTION_PREFIX + i);
reuse.initTransactionId(transactionIdPrefix + i);
reuse.beginTransaction();
reuse.send(new ProducerRecord<>(topic, "test-value-" + i));
if (i % 2 == 0) {
Expand All @@ -81,12 +80,58 @@ void testInitTransactionId() {
reuse.flush();
reuse.abortTransaction();
}
assertNumTransactions(i);
assertNumTransactions(i, transactionIdPrefix);
assertThat(readRecords(topic).count()).isEqualTo(i / 2);
}
}
}

@Test
void testCommitResumedTransaction() {
final String topic = "test-commit-resumed-transaction";
final String transactionIdPrefix = "testCommitResumedTransaction-";
final String transactionalId = transactionIdPrefix + "id";

KafkaCommittable snapshottedCommittable;
try (FlinkKafkaInternalProducer<String, String> producer =
new FlinkKafkaInternalProducer<>(getProperties(), transactionalId)) {
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>(topic, "test-value"));
producer.flush();
snapshottedCommittable = KafkaCommittable.of(producer, ignored -> {});
}

try (FlinkKafkaInternalProducer<String, String> resumedProducer =
new FlinkKafkaInternalProducer<>(getProperties(), transactionalId)) {
resumedProducer.resumeTransaction(
snapshottedCommittable.getProducerId(), snapshottedCommittable.getEpoch());
resumedProducer.commitTransaction();
}

assertNumTransactions(1, transactionIdPrefix);
assertThat(readRecords(topic).count()).isEqualTo(1);
}

@Test
void testCommitResumedEmptyTransactionShouldFail() {
KafkaCommittable snapshottedCommittable;
try (FlinkKafkaInternalProducer<String, String> producer =
new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) {
producer.initTransactions();
producer.beginTransaction();
snapshottedCommittable = KafkaCommittable.of(producer, ignored -> {});
}

try (FlinkKafkaInternalProducer<String, String> resumedProducer =
new FlinkKafkaInternalProducer<>(getProperties(), "dummy")) {
resumedProducer.resumeTransaction(
snapshottedCommittable.getProducerId(), snapshottedCommittable.getEpoch());

assertThatThrownBy(resumedProducer::commitTransaction);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we check the exception type also using
isInstanceOf(<exception type>.class)

}
}

@ParameterizedTest
@MethodSource("provideTransactionsFinalizer")
void testResetInnerTransactionIfFinalizingTransactionFailed(
Expand Down Expand Up @@ -131,10 +176,10 @@ private static Properties getProperties() {
FlinkKafkaInternalProducer::abortTransaction);
}

private void assertNumTransactions(int numTransactions) {
private void assertNumTransactions(int numTransactions, String transactionIdPrefix) {
List<KafkaTransactionLog.TransactionRecord> transactions =
new KafkaTransactionLog(getProperties())
.getTransactions(id -> id.startsWith(TRANSACTION_PREFIX));
.getTransactions(id -> id.startsWith(transactionIdPrefix));
assertThat(
transactions.stream()
.map(KafkaTransactionLog.TransactionRecord::getTransactionId)
Expand Down
Expand Up @@ -317,6 +317,7 @@ void usePoolForTransactional() throws Exception {
getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) {
assertThat(writer.getProducerPool()).hasSize(0);

writer.write(1, SINK_WRITER_CONTEXT);
writer.flush(false);
Collection<KafkaCommittable> committables0 = writer.prepareCommit();
writer.snapshotState(1);
Expand All @@ -336,6 +337,7 @@ void usePoolForTransactional() throws Exception {
committable.getProducer().get().close();
assertThat(writer.getProducerPool()).hasSize(1);

writer.write(1, SINK_WRITER_CONTEXT);
writer.flush(false);
Collection<KafkaCommittable> committables1 = writer.prepareCommit();
writer.snapshotState(2);
Expand All @@ -349,6 +351,30 @@ void usePoolForTransactional() throws Exception {
}
}

/**
* Tests that if a pre-commit attempt occurs on an empty transaction, the writer should not emit
* a KafkaCommittable, and instead immediately commit the empty transaction and recycle the
* producer.
*/
@Test
void prepareCommitForEmptyTransaction() throws Exception {
try (final KafkaWriter<Integer> writer =
createWriterWithConfiguration(
getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) {
assertThat(writer.getProducerPool()).hasSize(0);

// no data written to current transaction
writer.flush(false);
Collection<KafkaCommittable> emptyCommittables = writer.prepareCommit();

assertThat(emptyCommittables).hasSize(0);
assertThat(writer.getProducerPool()).hasSize(1);
final FlinkKafkaInternalProducer<?, ?> recycledProducer =
writer.getProducerPool().pop();
assertThat(recycledProducer.isInTransaction()).isFalse();
}
}

/**
* Tests that open transactions are automatically aborted on close such that successive writes
* succeed.
Expand Down