From e1ee2f33549f95b90773ef406cd8bf21b99aeee3 Mon Sep 17 00:00:00 2001 From: Konstantine Karantasis Date: Thu, 6 Jul 2017 22:13:50 -0700 Subject: [PATCH 1/3] KAFKA-5567: Connect sink worker should commit offsets of original topic partitions. --- .../kafka/connect/runtime/WorkerSinkTask.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 0c8fcf686720..711af87946b9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -67,6 +67,7 @@ class WorkerSinkTask extends WorkerTask { private final List messageBatch; private Map lastCommittedOffsets; private Map currentOffsets; + private final Map origOffsets; private RuntimeException rebalanceException; private long nextCommit; private int commitSeqno; @@ -95,6 +96,7 @@ public WorkerSinkTask(ConnectorTaskId id, this.time = time; this.messageBatch = new ArrayList<>(); this.currentOffsets = new HashMap<>(); + this.origOffsets = new HashMap<>(); this.pausedForRedelivery = false; this.rebalanceException = null; this.nextCommit = time.milliseconds() + @@ -419,13 +421,14 @@ private KafkaConsumer createConsumer() { } private void convertMessages(ConsumerRecords msgs) { + origOffsets.clear(); for (ConsumerRecord msg : msgs) { log.trace("{} Consuming and converting message in topic '{}' partition {} at offset {} and timestamp {}", this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp()); SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key()); SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value()); Long timestamp = ConnectUtils.checkAndConvertTimestamp(msg.timestamp()); - SinkRecord record = new SinkRecord(msg.topic(), msg.partition(), + SinkRecord origRecord = new SinkRecord(msg.topic(), msg.partition(), keyAndSchema.schema(), keyAndSchema.value(), valueAndSchema.schema(), valueAndSchema.value(), msg.offset(), @@ -433,9 +436,13 @@ private void convertMessages(ConsumerRecords msgs) { msg.timestampType()); log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}", this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value()); - record = transformationChain.apply(record); - if (record != null) { - messageBatch.add(record); + SinkRecord transRecord = transformationChain.apply(origRecord); + if (transRecord != null) { + origOffsets.put( + new TopicPartition(origRecord.topic(), origRecord.kafkaPartition()), + new OffsetAndMetadata(origRecord.kafkaOffset() + 1) + ); + messageBatch.add(transRecord); } else { log.trace("{} Transformations returned null, so dropping record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}", this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value()); @@ -459,9 +466,7 @@ private void deliverMessages() { // Since we reuse the messageBatch buffer, ensure we give the task its own copy log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size()); task.put(new ArrayList<>(messageBatch)); - for (SinkRecord record : messageBatch) - currentOffsets.put(new TopicPartition(record.topic(), record.kafkaPartition()), - new OffsetAndMetadata(record.kafkaOffset() + 1)); + currentOffsets.putAll(origOffsets); messageBatch.clear(); // If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that // the task had not explicitly paused From 039138c48c1b4067e64b063338c03d6e8a472e3b Mon Sep 17 00:00:00 2001 From: Konstantine Karantasis Date: Wed, 19 Jul 2017 14:13:51 -0700 Subject: [PATCH 2/3] Commit all the original offsets even if a record is filtered out. --- .../org/apache/kafka/connect/runtime/WorkerSinkTask.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 711af87946b9..5f2b39f6ff4f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -437,11 +437,11 @@ private void convertMessages(ConsumerRecords msgs) { log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}", this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value()); SinkRecord transRecord = transformationChain.apply(origRecord); + origOffsets.put( + new TopicPartition(origRecord.topic(), origRecord.kafkaPartition()), + new OffsetAndMetadata(origRecord.kafkaOffset() + 1) + ); if (transRecord != null) { - origOffsets.put( - new TopicPartition(origRecord.topic(), origRecord.kafkaPartition()), - new OffsetAndMetadata(origRecord.kafkaOffset() + 1) - ); messageBatch.add(transRecord); } else { log.trace("{} Transformations returned null, so dropping record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}", From 2b506cda9173eb140754008aa1d71905ac9533e3 Mon Sep 17 00:00:00 2001 From: Konstantine Karantasis Date: Wed, 16 Aug 2017 13:11:14 -0700 Subject: [PATCH 3/3] Add test for transforms that mutate records topic. --- .../connect/runtime/WorkerSinkTaskTest.java | 68 ++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 3ab6e0615676..a6806884004c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -825,6 +825,57 @@ public Void answer() throws Throwable { PowerMock.verifyAll(); } + @Test + public void testDeliveryWithMutatingTransform() throws Exception { + expectInitializeTask(); + + expectPollInitialAssignment(); + + expectConsumerPoll(1); + expectConversionAndTransformation(1, "newtopic_"); + sinkTask.put(EasyMock.>anyObject()); + EasyMock.expectLastCall(); + + final Map offsets = new HashMap<>(); + offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); + offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); + sinkTask.preCommit(offsets); + EasyMock.expectLastCall().andReturn(offsets); + + final Capture callback = EasyMock.newCapture(); + consumer.commitAsync(EasyMock.eq(offsets), EasyMock.capture(callback)); + EasyMock.expectLastCall().andAnswer(new IAnswer() { + @Override + public Void answer() throws Throwable { + callback.getValue().onComplete(offsets, null); + return null; + } + }); + + expectConsumerPoll(0); + sinkTask.put(Collections.emptyList()); + EasyMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + + workerTask.iteration(); // initial assignment + + workerTask.iteration(); // first record delivered + + sinkTaskContext.getValue().requestCommit(); + assertTrue(sinkTaskContext.getValue().isCommitRequested()); + assertNotEquals(offsets, Whitebox.>getInternalState(workerTask, "lastCommittedOffsets")); + workerTask.iteration(); // triggers the commit + assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared + assertEquals(offsets, Whitebox.>getInternalState(workerTask, "lastCommittedOffsets")); + assertEquals(0, workerTask.commitFailures()); + + PowerMock.verifyAll(); + } + @Test public void testMissingTimestampPropagation() throws Exception { expectInitializeTask(); @@ -982,6 +1033,10 @@ public ConsumerRecords answer() throws Throwable { } private void expectConversionAndTransformation(final int numMessages) { + expectConversionAndTransformation(numMessages, null); + } + + private void expectConversionAndTransformation(final int numMessages, final String topicPrefix) { EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).times(numMessages); EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).times(numMessages); @@ -990,7 +1045,18 @@ private void expectConversionAndTransformation(final int numMessages) { .andAnswer(new IAnswer() { @Override public SinkRecord answer() { - return recordCapture.getValue(); + SinkRecord origRecord = recordCapture.getValue(); + return topicPrefix != null && !topicPrefix.isEmpty() + ? origRecord.newRecord( + topicPrefix + origRecord.topic(), + origRecord.kafkaPartition(), + origRecord.keySchema(), + origRecord.key(), + origRecord.valueSchema(), + origRecord.value(), + origRecord.timestamp() + ) + : origRecord; } }).times(numMessages); }