Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14784: Connect offset reset REST API #13818

Merged
merged 7 commits into from Jun 23, 2023
Merged
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Expand Up @@ -169,7 +169,7 @@
files="(RequestResponse|WorkerSinkTask)Test.java"/>

<suppress checks="JavaNCSS"
files="DistributedHerderTest.java"/>
files="(DistributedHerder|Worker)Test.java"/>

<!-- Raft -->
<suppress checks="NPathComplexity"
Expand Down
Expand Up @@ -54,19 +54,26 @@ protected SinkConnectorContext context() {
* User requests to alter/reset offsets will be handled by the Connect runtime and will be reflected in the offsets
* for this connector's consumer group.
* <p>
* Note that altering / resetting offsets is expected to be an idempotent operation and this method should be able
* to handle being called more than once with the same arguments (which could occur if a user retries the request
* due to a failure in altering the consumer group offsets, for example).
* <p>
* Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
* {@link #start(Map) start} method is invoked.
*
* @param connectorConfig the configuration of the connector
* @param offsets a map from topic partition to offset, containing the offsets that the user has requested to
* alter/reset. For any topic partitions whose offsets are being reset instead of altered, their
* corresponding value in the map will be {@code null}.
* corresponding value in the map will be {@code null}. This map may be empty, but never null. An
* empty offsets map could indicate that the offsets were reset previously or that no offsets have
* been committed yet.
* @return whether this method has been overridden by the connector; the default implementation returns
* {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return
* {@code true}
* @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector
* @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be
* reset for any other reason (for example, they have failed custom validation logic specific to this connector)
* @since 3.6
*/
public boolean alterOffsets(Map<String, String> connectorConfig, Map<TopicPartition, Long> offsets) {
return false;
Expand Down
Expand Up @@ -85,19 +85,26 @@ public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String,
* returned by any {@link org.apache.kafka.connect.storage.OffsetStorageReader OffsetStorageReader instances}
* provided to this connector and its tasks.
* <p>
* Note that altering / resetting offsets is expected to be an idempotent operation and this method should be able
* to handle being called more than once with the same arguments (which could occur if a user retries the request
* due to a failure in writing the new offsets to the offsets store, for example).
* <p>
* Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
* {@link #start(Map) start} method is invoked.
*
* @param connectorConfig the configuration of the connector
* @param offsets a map from source partition to source offset, containing the offsets that the user has requested
* to alter/reset. For any source partitions whose offsets are being reset instead of altered, their
* corresponding source offset value in the map will be {@code null}
* corresponding source offset value in the map will be {@code null}. This map may be empty, but
* never null. An empty offsets map could indicate that the offsets were reset previously or that no
* offsets have been committed yet.
* @return whether this method has been overridden by the connector; the default implementation returns
* {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return
* {@code true}
* @throws UnsupportedOperationException if it is impossible to alter/reset the offsets for this connector
* @throws org.apache.kafka.connect.errors.ConnectException if the offsets for this connector cannot be
* reset for any other reason (for example, they have failed custom validation logic specific to this connector)
* @since 3.6
*/
public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) {
return false;
Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
Expand Down Expand Up @@ -902,4 +903,26 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) {
cb.onCompletion(t, null);
}
}

@Override
public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
if (offsets == null || offsets.isEmpty()) {
callback.onCompletion(new ConnectException("The offsets to be altered may not be null or empty"), null);
return;
}
modifyConnectorOffsets(connName, offsets, callback);
}

@Override
public void resetConnectorOffsets(String connName, Callback<Message> callback) {
modifyConnectorOffsets(connName, null, callback);
}

/**
* Service external requests to alter or reset connector offsets.
* @param connName the name of the connector whose offsets are to be modified
* @param offsets the offsets to be written; this should be {@code null} for offsets reset requests
* @param cb callback to invoke upon completion
*/
protected abstract void modifyConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb);
}
Expand Up @@ -311,6 +311,13 @@ default void validateConnectorConfig(Map<String, String> connectorConfig, Callba
*/
void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb);

/**
* Reset a connector's offsets.
* @param connName the name of the connector whose offsets are to be reset
* @param cb callback to invoke upon completion
*/
void resetConnectorOffsets(String connName, Callback<Message> cb);

enum ConfigReloadAction {
NONE,
RESTART
Expand Down