Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14368: Connect offset write REST API #13465

Merged
merged 12 commits into from May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Expand Up @@ -544,6 +544,7 @@
<allow class="javax.ws.rs.core.Response" />
<allow pkg="com.fasterxml.jackson.core.type" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.eclipse.jetty.client"/>
</subpackage>
</subpackage>

Expand Down
Expand Up @@ -16,8 +16,11 @@
*/
package org.apache.kafka.connect.sink;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.connector.Connector;

import java.util.Map;

/**
* SinkConnectors implement the Connector interface to send Kafka data to another system.
*/
Expand All @@ -39,4 +42,33 @@ protected SinkConnectorContext context() {
return (SinkConnectorContext) context;
}

/**
* Invoked when users request to manually alter/reset the offsets for this connector via the Connect worker's REST
* API. Connectors that manage offsets externally can propagate offset changes to their external system in this
* method. Connectors may also validate these offsets if, for example, an offset is out of range for what can be
* feasibly written to the external system.
* <p>
* Connectors that neither manage offsets externally nor require custom offset validation need not implement this
* method beyond simply returning {@code true}.
* <p>
* User requests to alter/reset offsets will be handled by the Connect runtime and will be reflected in the offsets
* for this connector's consumer group.
* <p>
* Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
* {@link #start(Map) start} method is invoked.
*
* @param connectorConfig the configuration of the connector
* @param offsets a map from topic partition to offset, containing the offsets that the user has requested to
* alter/reset. For any topic partitions whose offsets are being reset instead of altered, their
* corresponding value in the map will be {@code null}.
* @return whether this method has been overridden by the connector; the default implementation returns
* {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return
* {@code true}
* @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector
* @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be
* reset for any other reason (for example, they have failed custom validation logic specific to this connector)
*/
public boolean alterOffsets(Map<String, String> connectorConfig, Map<TopicPartition, Long> offsets) {
return false;
}
}
Expand Up @@ -71,4 +71,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig
public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String, String> connectorConfig) {
return ConnectorTransactionBoundaries.UNSUPPORTED;
}

/**
* Invoked when users request to manually alter/reset the offsets for this connector via the Connect worker's REST
* API. Connectors that manage offsets externally can propagate offset changes to their external system in this
* method. Connectors may also validate these offsets to ensure that the source partitions and source offsets are
* in a format that is recognizable to them.
* <p>
* Connectors that neither manage offsets externally nor require custom offset validation need not implement this
* method beyond simply returning {@code true}.
* <p>
* User requests to alter/reset offsets will be handled by the Connect runtime and will be reflected in the offsets
* returned by any {@link org.apache.kafka.connect.storage.OffsetStorageReader OffsetStorageReader instances}
* provided to this connector and its tasks.
* <p>
* Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
* {@link #start(Map) start} method is invoked.
*
* @param connectorConfig the configuration of the connector
* @param offsets a map from source partition to source offset, containing the offsets that the user has requested
* to alter/reset. For any source partitions whose offsets are being reset instead of altered, their
* corresponding source offset value in the map will be {@code null}
* @return whether this method has been overridden by the connector; the default implementation returns
* {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return
* {@code true}
* @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector
* @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be
* reset for any other reason (for example, they have failed custom validation logic specific to this connector)
*/
public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) {
return false;
}
}
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
Expand Down Expand Up @@ -302,6 +303,14 @@ default void validateConnectorConfig(Map<String, String> connectorConfig, Callba
*/
void connectorOffsets(String connName, Callback<ConnectorOffsets> cb);

/**
* Alter a connector's offsets.
* @param connName the name of the connector whose offsets are to be altered
* @param offsets a mapping from partitions to offsets that need to be written
* @param cb callback to invoke upon completion
*/
void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb);

enum ConfigReloadAction {
NONE,
RESTART
Expand Down