Skip to content

Commit

Permalink
KAFKA-15121: Implement the alterOffsets method in the FileStreamSourc…
Browse files Browse the repository at this point in the history
…eConnector and the FileStreamSinkConnector (apache#13945)

Reviewers: Chris Egerton <chrise@aiven.io>
  • Loading branch information
yashmayya authored and Cerchie committed Jul 25, 2023
1 parent 92ed0eb commit 03d8f39
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 4 deletions.
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.file;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand Down Expand Up @@ -79,4 +80,11 @@ public void stop() {
public ConfigDef config() {
return CONFIG_DEF;
}

@Override
public boolean alterOffsets(Map<String, String> connectorConfig, Map<TopicPartition, Long> offsets) {
// Nothing to do here since FileStreamSinkConnector does not manage offsets externally nor does it require any
// custom offset validation
return true;
}
}
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
Expand All @@ -31,6 +32,9 @@
import java.util.List;
import java.util.Map;

import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;

/**
* Very simple source connector that works with stdin or a file.
*/
Expand Down Expand Up @@ -101,4 +105,49 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
: ExactlyOnceSupport.UNSUPPORTED;
}

@Override
public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) {
AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig);
String filename = config.getString(FILE_CONFIG);
if (filename == null || filename.isEmpty()) {
throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " +
"This is because stdin is used for input and offsets are not tracked.");
}

// This connector makes use of a single source partition at a time which represents the file that it is configured to read from.
// However, there could also be source partitions from previous configurations of the connector.
for (Map.Entry<Map<String, ?>, Map<String, ?>> partitionOffset : offsets.entrySet()) {
Map<String, ?> partition = partitionOffset.getKey();
if (partition == null) {
throw new ConnectException("Partition objects cannot be null");
}

if (!partition.containsKey(FILENAME_FIELD)) {
throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'");
}

Map<String, ?> offset = partitionOffset.getValue();
// null offsets are allowed and represent a deletion of offsets for a partition
if (offset == null) {
continue;
}

if (!offset.containsKey(POSITION_FIELD)) {
throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'");
}

// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value
if (!(offset.get(POSITION_FIELD) instanceof Long)) {
throw new ConnectException("The value for the '" + POSITION_FIELD + "' key in the offset is expected to be a Long value");
}

long offsetPosition = (Long) offset.get(POSITION_FIELD);
if (offsetPosition < 0) {
throw new ConnectException("The value for the '" + POSITION_FIELD + "' key in the offset should be a non-negative value");
}
}

// Let the task check whether the actual value for the offset position is valid for the configured file on startup
return true;
}
}
Expand Up @@ -20,20 +20,26 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FileStreamSourceConnectorTest {

private static final String SINGLE_TOPIC = "test";
Expand Down Expand Up @@ -147,4 +153,78 @@ public void testInvalidBatchSize() {
sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd");
assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
}

@Test
public void testAlterOffsetsStdin() {
sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
Collections.singletonMap(FILENAME_FIELD, FILENAME),
Collections.singletonMap(POSITION_FIELD, 0L)
);
assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets));
}

@Test
public void testAlterOffsetsIncorrectPartitionKey() {
assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap(
Collections.singletonMap("other_partition_key", FILENAME),
Collections.singletonMap(POSITION_FIELD, 0L)
)));

// null partitions are invalid
assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap(
null,
Collections.singletonMap(POSITION_FIELD, 0L)
)));
}

@Test
public void testAlterOffsetsMultiplePartitions() {
Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, 0L));
offsets.put(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"), null);
assertTrue(connector.alterOffsets(sourceProperties, offsets));
}

@Test
public void testAlterOffsetsIncorrectOffsetKey() {
Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
Collections.singletonMap(FILENAME_FIELD, FILENAME),
Collections.singletonMap("other_offset_key", 0L)
);
assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets));
}

@Test
public void testAlterOffsetsOffsetPositionValues() {
Function<Object, Boolean> alterOffsets = offset -> connector.alterOffsets(sourceProperties, Collections.singletonMap(
Collections.singletonMap(FILENAME_FIELD, FILENAME),
Collections.singletonMap(POSITION_FIELD, offset)
));

assertThrows(ConnectException.class, () -> alterOffsets.apply("nan"));
assertThrows(ConnectException.class, () -> alterOffsets.apply(null));
assertThrows(ConnectException.class, () -> alterOffsets.apply(new Object()));
assertThrows(ConnectException.class, () -> alterOffsets.apply(3.14));
assertThrows(ConnectException.class, () -> alterOffsets.apply(-420));
assertThrows(ConnectException.class, () -> alterOffsets.apply("-420"));
assertThrows(ConnectException.class, () -> alterOffsets.apply(10));
assertThrows(ConnectException.class, () -> alterOffsets.apply("10"));
assertThrows(ConnectException.class, () -> alterOffsets.apply(-10L));
assertTrue(() -> alterOffsets.apply(10L));
}

@Test
public void testSuccessfulAlterOffsets() {
Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
Collections.singletonMap(FILENAME_FIELD, FILENAME),
Collections.singletonMap(POSITION_FIELD, 0L)
);

// Expect no exception to be thrown when a valid offsets map is passed. An empty offsets map is treated as valid
// since it could indicate that the offsets were reset previously or that no offsets have been committed yet
// (for a reset operation)
assertTrue(connector.alterOffsets(sourceProperties, offsets));
assertTrue(connector.alterOffsets(sourceProperties, new HashMap<>()));
}
}

0 comments on commit 03d8f39

Please sign in to comment.