From ec7fb9ab536945041ff522209671c87f815f7372 Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Tue, 4 Jul 2023 16:18:53 +0530 Subject: [PATCH 1/2] KAFKA-15145: Don't re-process records filtered out by SMTs on Kafka client retriable exceptions in AbstractWorkerSourceTask --- .../runtime/AbstractWorkerSourceTask.java | 1 + .../runtime/AbstractWorkerSourceTaskTest.java | 47 ++++++++++++++++--- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index eff4d8493260..6d5446d9c213 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -397,6 +397,7 @@ boolean sendRecords() { if (producerRecord == null || retryWithToleranceOperator.failed()) { counter.skipRecord(); recordDropped(preTransformRecord); + processed++; continue; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java index d77796716f1f..2adb8ee8dd2a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java @@ -16,21 +16,19 @@ */ package org.apache.kafka.connect.runtime; -import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; -import java.util.Arrays; import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.kafka.common.header.internals.RecordHeaders; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; @@ -66,6 +64,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -73,6 +72,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; @@ -90,6 +90,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -99,7 +101,6 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import static org.junit.Assert.assertThrows; @SuppressWarnings("unchecked") @RunWith(MockitoJUnitRunner.StrictStubs.class) @@ -651,6 +652,40 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() { verifyTopicCreation(); } + @Test + public void testSendRecordsRetriableException() { + createWorkerTask(); + + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC); + expectTaskGetTopic(); + + when(transformationChain.apply(eq(record1))).thenReturn(null); + when(transformationChain.apply(eq(record2))).thenReturn(null); + when(transformationChain.apply(eq(record3))).thenReturn(record3); + + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList()); + TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo)); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc)); + + when(producer.send(any(), any())).thenThrow(new RetriableException("Retriable exception")).thenReturn(null); + + workerTask.toSend = Arrays.asList(record1, record2, record3); + + // The producer throws a RetriableException the first time we try to send the third record + assertFalse(workerTask.sendRecords()); + + // The next attempt to send the third record should succeed + assertTrue(workerTask.sendRecords()); + + // Ensure that the first two records that were filtered out by the transformation chain + // aren't re-processed when we retry the call to sendRecords() + verify(transformationChain, times(4)).apply(any(SourceRecord.class)); + } + private void expectSendRecord(Headers headers) { if (headers != null) expectConvertHeadersAndKeyValue(headers, TOPIC); From 00d9bf56dfb4dd04dcd8ef7023cfd5d831a0e702 Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Fri, 7 Jul 2023 15:00:45 +0530 Subject: [PATCH 2/2] Make test verifications more explicit; add clarifying comment to test --- .../kafka/connect/runtime/AbstractWorkerSourceTaskTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java index 2adb8ee8dd2a..9ac7d5cca283 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java @@ -675,6 +675,7 @@ public void testSendRecordsRetriableException() { workerTask.toSend = Arrays.asList(record1, record2, record3); + // The first two records are filtered out / dropped by the transformation chain; only the third record will be attempted to be sent. // The producer throws a RetriableException the first time we try to send the third record assertFalse(workerTask.sendRecords()); @@ -683,7 +684,9 @@ public void testSendRecordsRetriableException() { // Ensure that the first two records that were filtered out by the transformation chain // aren't re-processed when we retry the call to sendRecords() - verify(transformationChain, times(4)).apply(any(SourceRecord.class)); + verify(transformationChain, times(1)).apply(eq(record1)); + verify(transformationChain, times(1)).apply(eq(record2)); + verify(transformationChain, times(2)).apply(eq(record3)); } private void expectSendRecord(Headers headers) {