Skip to content

Commit

Permalink
KAFKA-15145: Don't re-process records filtered out by SMTs on Kafka c…
Browse files Browse the repository at this point in the history
…lient retriable exceptions in AbstractWorkerSourceTask (#13955)

Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, Chris Egerton <chrise@aiven.io>
  • Loading branch information
yashmayya committed Jul 10, 2023
1 parent 6368d14 commit 9ee28d1
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ boolean sendRecords() {
if (producerRecord == null || retryWithToleranceOperator.failed()) {
counter.skipRecord();
recordDropped(preTransformRecord);
processed++;
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@
*/
package org.apache.kafka.connect.runtime;

import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import java.util.Arrays;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
Expand Down Expand Up @@ -66,13 +64,15 @@

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
Expand All @@ -90,6 +90,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -99,7 +101,6 @@
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.junit.Assert.assertThrows;

@SuppressWarnings("unchecked")
@RunWith(MockitoJUnitRunner.StrictStubs.class)
Expand Down Expand Up @@ -651,6 +652,43 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
verifyTopicCreation();
}

@Test
public void testSendRecordsRetriableException() {
createWorkerTask();

SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);

expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
expectTaskGetTopic();

when(transformationChain.apply(eq(record1))).thenReturn(null);
when(transformationChain.apply(eq(record2))).thenReturn(null);
when(transformationChain.apply(eq(record3))).thenReturn(record3);

TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc));

when(producer.send(any(), any())).thenThrow(new RetriableException("Retriable exception")).thenReturn(null);

workerTask.toSend = Arrays.asList(record1, record2, record3);

// The first two records are filtered out / dropped by the transformation chain; only the third record will be attempted to be sent.
// The producer throws a RetriableException the first time we try to send the third record
assertFalse(workerTask.sendRecords());

// The next attempt to send the third record should succeed
assertTrue(workerTask.sendRecords());

// Ensure that the first two records that were filtered out by the transformation chain
// aren't re-processed when we retry the call to sendRecords()
verify(transformationChain, times(1)).apply(eq(record1));
verify(transformationChain, times(1)).apply(eq(record2));
verify(transformationChain, times(2)).apply(eq(record3));
}

private void expectSendRecord(Headers headers) {
if (headers != null)
expectConvertHeadersAndKeyValue(headers, TOPIC);
Expand Down

0 comments on commit 9ee28d1

Please sign in to comment.