diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 4fc539d60670..377ce40fd2ca 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -544,6 +544,7 @@
+
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java
index 9627571482bc..d3726fd1dfc6 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java
@@ -16,8 +16,11 @@
*/
package org.apache.kafka.connect.sink;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.connector.Connector;
+import java.util.Map;
+
/**
* SinkConnectors implement the Connector interface to send Kafka data to another system.
*/
@@ -39,4 +42,33 @@ protected SinkConnectorContext context() {
return (SinkConnectorContext) context;
}
+ /**
+ * Invoked when users request to manually alter/reset the offsets for this connector via the Connect worker's REST
+ * API. Connectors that manage offsets externally can propagate offset changes to their external system in this
+ * method. Connectors may also validate these offsets if, for example, an offset is out of range for what can be
+ * feasibly written to the external system.
+ *
+ * Connectors that neither manage offsets externally nor require custom offset validation need not implement this
+ * method beyond simply returning {@code true}.
+ *
+ * User requests to alter/reset offsets will be handled by the Connect runtime and will be reflected in the offsets
+ * for this connector's consumer group.
+ *
+ * Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+ * {@link #start(Map) start} method is invoked.
+ *
+ * @param connectorConfig the configuration of the connector
+ * @param offsets a map from topic partition to offset, containing the offsets that the user has requested to
+ * alter/reset. For any topic partitions whose offsets are being reset instead of altered, their
+ * corresponding value in the map will be {@code null}.
+ * @return whether this method has been overridden by the connector; the default implementation returns
+ * {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return
+ * {@code true}
+ * @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector
+ * @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be
+ * reset for any other reason (for example, they have failed custom validation logic specific to this connector)
+ */
+ public boolean alterOffsets(Map connectorConfig, Map offsets) {
+ return false;
+ }
}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
index c06279df4211..eaaf56566c86 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
@@ -71,4 +71,35 @@ public ExactlyOnceSupport exactlyOnceSupport(Map connectorConfig
public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map connectorConfig) {
return ConnectorTransactionBoundaries.UNSUPPORTED;
}
+
+ /**
+ * Invoked when users request to manually alter/reset the offsets for this connector via the Connect worker's REST
+ * API. Connectors that manage offsets externally can propagate offset changes to their external system in this
+ * method. Connectors may also validate these offsets to ensure that the source partitions and source offsets are
+ * in a format that is recognizable to them.
+ *
+ * Connectors that neither manage offsets externally nor require custom offset validation need not implement this
+ * method beyond simply returning {@code true}.
+ *
+ * User requests to alter/reset offsets will be handled by the Connect runtime and will be reflected in the offsets
+ * returned by any {@link org.apache.kafka.connect.storage.OffsetStorageReader OffsetStorageReader instances}
+ * provided to this connector and its tasks.
+ *
+ * Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+ * {@link #start(Map) start} method is invoked.
+ *
+ * @param connectorConfig the configuration of the connector
+ * @param offsets a map from source partition to source offset, containing the offsets that the user has requested
+ * to alter/reset. For any source partitions whose offsets are being reset instead of altered, their
+ * corresponding source offset value in the map will be {@code null}
+ * @return whether this method has been overridden by the connector; the default implementation returns
+ * {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return
+ * {@code true}
+ * @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector
+ * @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be
+ * reset for any other reason (for example, they have failed custom validation logic specific to this connector)
+ */
+ public boolean alterOffsets(Map connectorConfig, Map