diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java index fbe9cfff1dea..68ee27cb9397 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.file; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -79,4 +80,11 @@ public void stop() { public ConfigDef config() { return CONFIG_DEF; } + + @Override + public boolean alterOffsets(Map connectorConfig, Map offsets) { + // Nothing to do here since FileStreamSinkConnector does not manage offsets externally nor does it require any + // custom offset validation + return true; + } } diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java index eef6da2b91da..13193f8f5012 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.ExactlyOnceSupport; import org.apache.kafka.connect.source.SourceConnector; import org.slf4j.Logger; @@ -31,6 +32,9 @@ import java.util.List; import java.util.Map; +import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD; +import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD; + /** * Very simple source connector that works with stdin or a file. */ @@ -101,4 +105,49 @@ public ExactlyOnceSupport exactlyOnceSupport(Map props) { : ExactlyOnceSupport.UNSUPPORTED; } + @Override + public boolean alterOffsets(Map connectorConfig, Map, Map> offsets) { + AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig); + String filename = config.getString(FILE_CONFIG); + if (filename == null || filename.isEmpty()) { + throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " + + "This is because stdin is used for input and offsets are not tracked."); + } + + // This connector makes use of a single source partition at a time which represents the file that it is configured to read from. + // However, there could also be source partitions from previous configurations of the connector. + for (Map.Entry, Map> partitionOffset : offsets.entrySet()) { + Map partition = partitionOffset.getKey(); + if (partition == null) { + throw new ConnectException("Partition objects cannot be null"); + } + + if (!partition.containsKey(FILENAME_FIELD)) { + throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); + } + + Map offset = partitionOffset.getValue(); + // null offsets are allowed and represent a deletion of offsets for a partition + if (offset == null) { + continue; + } + + if (!offset.containsKey(POSITION_FIELD)) { + throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'"); + } + + // The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value + if (!(offset.get(POSITION_FIELD) instanceof Long)) { + throw new ConnectException("The value for the '" + POSITION_FIELD + "' key in the offset is expected to be a Long value"); + } + + long offsetPosition = (Long) offset.get(POSITION_FIELD); + if (offsetPosition < 0) { + throw new ConnectException("The value for the '" + POSITION_FIELD + "' key in the offset should be a non-negative value"); + } + } + + // Let the task check whether the actual value for the offset position is valid for the configured file on startup + return true; + } } diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java index d3b0265bc89c..185faa80eb34 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java @@ -20,20 +20,26 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.ConnectorTransactionBoundaries; import org.apache.kafka.connect.source.ExactlyOnceSupport; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD; +import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class FileStreamSourceConnectorTest { private static final String SINGLE_TOPIC = "test"; @@ -147,4 +153,78 @@ public void testInvalidBatchSize() { sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd"); assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties)); } + + @Test + public void testAlterOffsetsStdin() { + sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); + Map, Map> offsets = Collections.singletonMap( + Collections.singletonMap(FILENAME_FIELD, FILENAME), + Collections.singletonMap(POSITION_FIELD, 0L) + ); + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); + } + + @Test + public void testAlterOffsetsIncorrectPartitionKey() { + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( + Collections.singletonMap("other_partition_key", FILENAME), + Collections.singletonMap(POSITION_FIELD, 0L) + ))); + + // null partitions are invalid + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap( + null, + Collections.singletonMap(POSITION_FIELD, 0L) + ))); + } + + @Test + public void testAlterOffsetsMultiplePartitions() { + Map, Map> offsets = new HashMap<>(); + offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, 0L)); + offsets.put(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"), null); + assertTrue(connector.alterOffsets(sourceProperties, offsets)); + } + + @Test + public void testAlterOffsetsIncorrectOffsetKey() { + Map, Map> offsets = Collections.singletonMap( + Collections.singletonMap(FILENAME_FIELD, FILENAME), + Collections.singletonMap("other_offset_key", 0L) + ); + assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets)); + } + + @Test + public void testAlterOffsetsOffsetPositionValues() { + Function alterOffsets = offset -> connector.alterOffsets(sourceProperties, Collections.singletonMap( + Collections.singletonMap(FILENAME_FIELD, FILENAME), + Collections.singletonMap(POSITION_FIELD, offset) + )); + + assertThrows(ConnectException.class, () -> alterOffsets.apply("nan")); + assertThrows(ConnectException.class, () -> alterOffsets.apply(null)); + assertThrows(ConnectException.class, () -> alterOffsets.apply(new Object())); + assertThrows(ConnectException.class, () -> alterOffsets.apply(3.14)); + assertThrows(ConnectException.class, () -> alterOffsets.apply(-420)); + assertThrows(ConnectException.class, () -> alterOffsets.apply("-420")); + assertThrows(ConnectException.class, () -> alterOffsets.apply(10)); + assertThrows(ConnectException.class, () -> alterOffsets.apply("10")); + assertThrows(ConnectException.class, () -> alterOffsets.apply(-10L)); + assertTrue(() -> alterOffsets.apply(10L)); + } + + @Test + public void testSuccessfulAlterOffsets() { + Map, Map> offsets = Collections.singletonMap( + Collections.singletonMap(FILENAME_FIELD, FILENAME), + Collections.singletonMap(POSITION_FIELD, 0L) + ); + + // Expect no exception to be thrown when a valid offsets map is passed. An empty offsets map is treated as valid + // since it could indicate that the offsets were reset previously or that no offsets have been committed yet + // (for a reset operation) + assertTrue(connector.alterOffsets(sourceProperties, offsets)); + assertTrue(connector.alterOffsets(sourceProperties, new HashMap<>())); + } }