Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10455][Kafka Tx] Close transactional producers in case of failure and termination #7107

Merged
merged 1 commit into from Nov 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -45,6 +45,7 @@
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.NetUtils;

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
Expand Down Expand Up @@ -671,6 +672,9 @@ public void close() throws FlinkKafka011Exception {
}
// make sure we propagate pending errors
checkErroneous();
pendingTransactions().forEach(transaction ->
IOUtils.closeQuietly(transaction.getValue().producer)
);
}

// ------------------- Logic for handling checkpoint flushing -------------------------- //
Expand Down Expand Up @@ -714,8 +718,11 @@ protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011
protected void commit(KafkaTransactionState transaction) {
switch (semantic) {
case EXACTLY_ONCE:
transaction.producer.commitTransaction();
recycleTransactionalProducer(transaction.producer);
try {
transaction.producer.commitTransaction();
} finally {
recycleTransactionalProducer(transaction.producer);
}
break;
case AT_LEAST_ONCE:
case NONE:
Expand Down
Expand Up @@ -38,20 +38,24 @@
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.FlinkRuntimeException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.IOException;
import java.time.Clock;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import static java.util.Objects.requireNonNull;
import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -149,6 +153,12 @@ protected TXN currentTransaction() {
return currentTransactionHolder == null ? null : currentTransactionHolder.handle;
}

@Nonnull
protected Stream<Map.Entry<Long, TXN>> pendingTransactions() {
return pendingCommitTransactions.entrySet().stream()
.map(e -> new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().handle));
}

// ------ methods that should be implemented in child class to support two phase commit algorithm ------

/**
Expand Down Expand Up @@ -256,6 +266,7 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception {

Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
Throwable firstError = null;

while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Expand All @@ -269,12 +280,23 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception {
name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);

logWarningIfTimeoutAlmostReached(pendingTransaction);
commit(pendingTransaction.handle);
try {
commit(pendingTransaction.handle);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
}
}

LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);

pendingTransactionIterator.remove();
}

if (firstError != null) {
throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
firstError);
}
}

@Override
Expand Down