Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Rename property to `commitRecovered`.
  • Loading branch information
garyrussell committed Mar 11, 2019
1 parent 6ac6d24 commit dffed19
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class DefaultAfterRollbackProcessor<K, V> implements AfterRollbackProcess

private final FailedRecordTracker failureTracker;

private boolean processInTransaction;
private boolean commitRecovered;

private KafkaTemplate<K, V> kafkaTemplate;

Expand Down Expand Up @@ -116,27 +116,30 @@ public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,

@Override
public boolean isProcessInTransaction() {
return this.processInTransaction;
return this.commitRecovered;
}

/**
* Set to true to run the {@link #process(List, Consumer, Exception, boolean)}
* method in a transaction. Requires a {@link KafkaTemplate}.
* @param processInTransaction true to process in a transaction.
* Set to true to and the container will run the
* {@link #process(List, Consumer, Exception, boolean)} method in a transaction and,
* if a record is skipped and recovered, we will send its offset to the transaction.
* Requires a {@link KafkaTemplate}.
* @param commitRecovered true to process in a transaction.
* @since 2.2.5
* @see #isProcessInTransaction()
* @see #process(List, Consumer, Exception, boolean)
* @see #setKafkaTemplate(KafkaTemplate)
*/
public void setProcessInTransaction(boolean processInTransaction) {
this.processInTransaction = processInTransaction;
public void setCommitRecovered(boolean commitRecovered) {
this.commitRecovered = commitRecovered;
}

/**
* Set a {@link KafkaTemplate} to use to send the offset of a recovered record
* to a transaction.
* @param kafkaTemplate the template
* @since 2.2.5
* @see #setProcessInTransaction(boolean)
* @see #setCommitRecovered(boolean)
*/
public void setKafkaTemplate(KafkaTemplate<K, V> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,14 +633,14 @@ private Object findDeserializerClass(Map<String, Object> props, boolean isValue)
}
}

private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscribingConsumer) {
private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> consumer) {
if (KafkaMessageListenerContainer.this.topicPartitions == null) {
ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener();
if (this.containerProperties.getTopicPattern() != null) {
subscribingConsumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
}
else {
subscribingConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
}
}
else {
Expand All @@ -652,7 +652,7 @@ private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscr
new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent(),
topicPartition.getPosition()));
}
subscribingConsumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
}
}

Expand Down Expand Up @@ -755,7 +755,7 @@ public void run() {
try {
pollAndInvoke();
}
catch (@SuppressWarnings("unused") WakeupException e) {
catch (WakeupException e) {
// Ignore, we're stopping
}
catch (NoOffsetForPartitionException nofpe) {
Expand Down Expand Up @@ -874,7 +874,7 @@ public void wrapUp() {
try {
this.consumer.unsubscribe();
}
catch (@SuppressWarnings("unused") WakeupException e) {
catch (WakeupException e) {
// No-op. Continue process
}
}
Expand Down Expand Up @@ -960,7 +960,7 @@ private void processAck(ConsumerRecord<K, V> record) {
try {
ackImmediate(record);
}
catch (@SuppressWarnings("unused") WakeupException e) {
catch (WakeupException e) {
// ignore - not polling
}
}
Expand Down Expand Up @@ -1010,7 +1010,6 @@ private void invokeBatchListener(final ConsumerRecords<K, V> records) {
@SuppressWarnings({ UNCHECKED, RAW_TYPES })
private void invokeBatchListenerInTx(final ConsumerRecords<K, V> records,
final List<ConsumerRecord<K, V>> recordList) {

try {
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {

Expand All @@ -1033,34 +1032,15 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
this.logger.error("Transaction rolled back", e);
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse =
(AfterRollbackProcessor<K, V>) getAfterRollbackProcessor();
if (afterRollbackProcessorToUse.isProcessInTransaction() && this.transactionTemplate != null) {
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {

@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
}

});
if (recordList == null) {
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, e, false);
}
else {
batchAfterRollback(records, recordList, e, afterRollbackProcessorToUse);
afterRollbackProcessorToUse.process(recordList, this.consumer, e, false);
}
}
}

private void batchAfterRollback(final ConsumerRecords<K, V> records,
final List<ConsumerRecord<K, V>> recordList, RuntimeException e,
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {

if (recordList == null) {
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, e, false);
}
else {
afterRollbackProcessorToUse.process(recordList, this.consumer, e, false);
}
}

private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V> records) {
return StreamSupport.stream(records.spliterator(), false)
.collect(Collectors.toList());
Expand Down Expand Up @@ -1100,7 +1080,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
throw er;
}
}
catch (@SuppressWarnings("unused") InterruptedException e) {
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
Expand Down Expand Up @@ -1181,7 +1161,7 @@ private void invokeRecordListener(final ConsumerRecords<K, V> records) {
* Invoke the listener with each record in a separate transaction.
* @param records the records.
*/
@SuppressWarnings(RAW_TYPES)
@SuppressWarnings({ UNCHECKED, RAW_TYPES })
private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
Expand Down Expand Up @@ -1212,40 +1192,20 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
}
catch (RuntimeException e) {
this.logger.error("Transaction rolled back", e);
recordAfterRollback(iterator, record, e);
List<ConsumerRecord<K, V>> unprocessed = new ArrayList<>();
unprocessed.add(record);
while (iterator.hasNext()) {
unprocessed.add(iterator.next());
}
((AfterRollbackProcessor<K, V>) getAfterRollbackProcessor())
.process(unprocessed, this.consumer, e, true);
}
finally {
TransactionSupport.clearTransactionIdSuffix();
}
}
}

private void recordAfterRollback(Iterator<ConsumerRecord<K, V>> iterator, final ConsumerRecord<K, V> record,
RuntimeException e) {

List<ConsumerRecord<K, V>> unprocessed = new ArrayList<>();
unprocessed.add(record);
while (iterator.hasNext()) {
unprocessed.add(iterator.next());
}
@SuppressWarnings(UNCHECKED)
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse =
(AfterRollbackProcessor<K, V>) getAfterRollbackProcessor();
if (afterRollbackProcessorToUse.isProcessInTransaction() && this.transactionTemplate != null) {
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {

@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
afterRollbackProcessorToUse.process(unprocessed, ListenerConsumer.this.consumer, e, true);
}

});
}
else {
afterRollbackProcessorToUse.process(unprocessed, this.consumer, e, true);
}
}

private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
while (iterator.hasNext()) {
Expand Down Expand Up @@ -1583,7 +1543,7 @@ private void commitIfNecessary() {
this.consumer.commitAsync(commits, this.commitCallback);
}
}
catch (@SuppressWarnings("unused") WakeupException e) {
catch (WakeupException e) {
// ignore - not polling
this.logger.debug("Woken up during commit");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
};
DefaultAfterRollbackProcessor<Object, Object> afterRollbackProcessor =
spy(new DefaultAfterRollbackProcessor<>(recoverer, 3));
afterRollbackProcessor.setProcessInTransaction(true);
afterRollbackProcessor.setCommitRecovered(true);
afterRollbackProcessor.setKafkaTemplate(dlTemplate);
container.setAfterRollbackProcessor(afterRollbackProcessor);
final CountDownLatch stopLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -584,7 +584,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
assertThat(headers.get("baz")).isEqualTo("qux".getBytes());
pf.destroy();
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
verify(afterRollbackProcessor, times(3)).isProcessInTransaction();
verify(afterRollbackProcessor, times(3)).isCommitRecovered();
verify(afterRollbackProcessor, times(3)).process(any(), any(), any(), anyBoolean());
verify(afterRollbackProcessor).clearThreadState();
verify(dlTemplate).send(any(ProducerRecord.class));
Expand Down
2 changes: 1 addition & 1 deletion src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3043,7 +3043,7 @@ See also <<dead-letters>>.

Starting with version 2.2.5, the `DefaultAfterRollbackProcessor` can be invoked in a new transaction (started after the failed transaction rolls back).
Then, if you are using the `DeadLetterPublishingRecoverer` to publish a failed record, the processor will send the recovered record's offset in the original topic/partition to the transaction.
To enable this feature, set the `processInTransaction` and `kafkaTemplate` properties on the `DefaultAfterRollbackProcessor`.
To enable this feature, set the `commitRecovered` and `kafkaTemplate` properties on the `DefaultAfterRollbackProcessor`.

[[dead-letters]]
===== Publishing Dead-letter Records
Expand Down

0 comments on commit dffed19

Please sign in to comment.