Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15121: Implement the alterOffsets method in the FileStreamSourceConnector and the FileStreamSinkConnector #13945

Merged
merged 5 commits into from Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.file;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand Down Expand Up @@ -79,4 +80,11 @@ public void stop() {
public ConfigDef config() {
return CONFIG_DEF;
}

@Override
public boolean alterOffsets(Map<String, String> connectorConfig, Map<TopicPartition, Long> offsets) {
// Nothing to do here since FileStreamSinkConnector does not manage offsets externally nor does it require any
// custom offset validation
return true;
}
}
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
Expand All @@ -31,6 +32,9 @@
import java.util.List;
import java.util.Map;

import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;

/**
* Very simple source connector that works with stdin or a file.
*/
Expand Down Expand Up @@ -101,4 +105,49 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
: ExactlyOnceSupport.UNSUPPORTED;
}

@Override
public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) {
AbstractConfig config = new AbstractConfig(CONFIG_DEF, connectorConfig);
String filename = config.getString(FILE_CONFIG);
if (filename == null || filename.isEmpty()) {
throw new ConnectException("Offsets cannot be modified if the '" + FILE_CONFIG + "' configuration is unspecified. " +
"This is because stdin is used for input and offsets are not tracked.");
}
C0urante marked this conversation as resolved.
Show resolved Hide resolved

// This connector makes use of a single source partition at a time which represents the file that it is configured to read from.
// However, there could also be source partitions from previous configurations of the connector.
for (Map.Entry<Map<String, ?>, Map<String, ?>> partitionOffset : offsets.entrySet()) {
Map<String, ?> partition = partitionOffset.getKey();
if (partition == null) {
throw new ConnectException("Partition objects cannot be null");
}

if (!partition.containsKey(FILENAME_FIELD)) {
throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'");
}

Map<String, ?> offset = partitionOffset.getValue();
// null offsets are allowed and represent a deletion of offsets for a partition
if (offset == null) {
continue;
}

if (!offset.containsKey(POSITION_FIELD)) {
throw new ConnectException("Offset objects should either be null or contain the key '" + POSITION_FIELD + "'");
}

// The 'position' in the offset represents the position in the file's byte stream and should be a non-negative long value
if (!(offset.get(POSITION_FIELD) instanceof Long)) {
throw new ConnectException("The value for the '" + POSITION_FIELD + "' key in the offset is expected to be a Long value");
}

long offsetPosition = (Long) offset.get(POSITION_FIELD);
if (offsetPosition < 0) {
throw new ConnectException("The value for the '" + POSITION_FIELD + "' key in the offset should be a non-negative value");
}
}

// Let the task check whether the actual value for the offset position is valid for the configured file on startup
return true;
}
}
Expand Up @@ -20,20 +20,26 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FileStreamSourceConnectorTest {

private static final String SINGLE_TOPIC = "test";
Expand Down Expand Up @@ -147,4 +153,78 @@ public void testInvalidBatchSize() {
sourceProperties.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, "abcd");
assertThrows(ConfigException.class, () -> new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, sourceProperties));
}

@Test
public void testAlterOffsetsStdin() {
sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
Collections.singletonMap(FILENAME_FIELD, FILENAME),
Collections.singletonMap(POSITION_FIELD, 0L)
);
assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets));
}

@Test
public void testAlterOffsetsIncorrectPartitionKey() {
assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap(
Collections.singletonMap("other_partition_key", FILENAME),
Collections.singletonMap(POSITION_FIELD, 0L)
)));

// null partitions are invalid
assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, Collections.singletonMap(
null,
Collections.singletonMap(POSITION_FIELD, 0L)
)));
}

@Test
public void testAlterOffsetsMultiplePartitions() {
Map<Map<String, ?>, Map<String, ?>> offsets = new HashMap<>();
offsets.put(Collections.singletonMap(FILENAME_FIELD, FILENAME), Collections.singletonMap(POSITION_FIELD, 0L));
offsets.put(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"), null);
assertTrue(connector.alterOffsets(sourceProperties, offsets));
}

@Test
public void testAlterOffsetsIncorrectOffsetKey() {
Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
Collections.singletonMap(FILENAME_FIELD, FILENAME),
Collections.singletonMap("other_offset_key", 0L)
);
assertThrows(ConnectException.class, () -> connector.alterOffsets(sourceProperties, offsets));
}

@Test
public void testAlterOffsetsOffsetPositionValues() {
yashmayya marked this conversation as resolved.
Show resolved Hide resolved
Function<Object, Boolean> alterOffsets = offset -> connector.alterOffsets(sourceProperties, Collections.singletonMap(
Collections.singletonMap(FILENAME_FIELD, FILENAME),
Collections.singletonMap(POSITION_FIELD, offset)
));

assertThrows(ConnectException.class, () -> alterOffsets.apply("nan"));
assertThrows(ConnectException.class, () -> alterOffsets.apply(null));
assertThrows(ConnectException.class, () -> alterOffsets.apply(new Object()));
assertThrows(ConnectException.class, () -> alterOffsets.apply(3.14));
assertThrows(ConnectException.class, () -> alterOffsets.apply(-420));
assertThrows(ConnectException.class, () -> alterOffsets.apply("-420"));
assertThrows(ConnectException.class, () -> alterOffsets.apply(10));
assertThrows(ConnectException.class, () -> alterOffsets.apply("10"));
assertThrows(ConnectException.class, () -> alterOffsets.apply(-10L));
assertTrue(() -> alterOffsets.apply(10L));
}

@Test
public void testSuccessfulAlterOffsets() {
Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
Collections.singletonMap(FILENAME_FIELD, FILENAME),
Collections.singletonMap(POSITION_FIELD, 0L)
);

// Expect no exception to be thrown when a valid offsets map is passed. An empty offsets map is treated as valid
// since it could indicate that the offsets were reset previously or that no offsets have been committed yet
// (for a reset operation)
assertTrue(connector.alterOffsets(sourceProperties, offsets));
assertTrue(connector.alterOffsets(sourceProperties, new HashMap<>()));
}
}