Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14784: Connect offset reset REST API #13818

Merged
merged 7 commits into from Jun 23, 2023
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Expand Up @@ -169,7 +169,7 @@
files="(RequestResponse|WorkerSinkTask)Test.java"/>

<suppress checks="JavaNCSS"
files="DistributedHerderTest.java"/>
files="(DistributedHerder|Worker)Test.java"/>

<!-- Raft -->
<suppress checks="NPathComplexity"
Expand Down
Expand Up @@ -60,7 +60,9 @@ protected SinkConnectorContext context() {
* @param connectorConfig the configuration of the connector
* @param offsets a map from topic partition to offset, containing the offsets that the user has requested to
* alter/reset. For any topic partitions whose offsets are being reset instead of altered, their
* corresponding value in the map will be {@code null}.
* corresponding value in the map will be {@code null}. This map may be empty, but never null. An
* empty offsets map could indicate that the offsets were reset previously or that no offsets have
* been committed yet.
* @return whether this method has been overridden by the connector; the default implementation returns
* {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return
* {@code true}
Expand Down
Expand Up @@ -91,7 +91,9 @@ public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String,
* @param connectorConfig the configuration of the connector
* @param offsets a map from source partition to source offset, containing the offsets that the user has requested
* to alter/reset. For any source partitions whose offsets are being reset instead of altered, their
* corresponding source offset value in the map will be {@code null}
* corresponding source offset value in the map will be {@code null}. This map may be empty, but
* never null. An empty offsets map could indicate that the offsets were reset previously or that no
* offsets have been committed yet.
* @return whether this method has been overridden by the connector; the default implementation returns
* {@code false}, and all other implementations (that do not unconditionally throw exceptions) should return
* {@code true}
Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
Expand Down Expand Up @@ -902,4 +903,22 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) {
cb.onCompletion(t, null);
}
}

@Override
public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
modifyConnectorOffsets(connName, offsets, callback);
}

@Override
public void resetConnectorOffsets(String connName, Callback<Message> callback) {
modifyConnectorOffsets(connName, null, callback);
}

/**
* Service external requests to alter or reset connector offsets.
* @param connName the name of the connector whose offsets are to be modified
* @param offsets the offsets to be written; this should be {@code null} for offsets reset requests
* @param cb callback to invoke upon completion
*/
protected abstract void modifyConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb);
}
Expand Up @@ -311,6 +311,13 @@ default void validateConnectorConfig(Map<String, String> connectorConfig, Callba
*/
void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb);

/**
* Reset a connector's offsets.
* @param connName the name of the connector whose offsets are to be reset
* @param cb callback to invoke upon completion
*/
void resetConnectorOffsets(String connName, Callback<Message> cb);

enum ConfigReloadAction {
NONE,
RESTART
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -1523,60 +1523,68 @@ public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) {
}

@Override
public void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
log.trace("Submitting alter offsets request for connector '{}'", connName);
protected void modifyConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> callback) {
boolean isReset = offsets == null;
log.trace("Submitting {} offsets request for connector '{}'", isReset ? "reset" : "alter", connName);

addRequest(() -> {
if (!alterConnectorOffsetsChecks(connName, callback)) {
if (!modifyConnectorOffsetsChecks(connName, callback)) {
return null;
}
// At this point, we should be the leader (the call to alterConnectorOffsetsChecks makes sure of that) and can safely run
// At this point, we should be the leader (the call to modifyConnectorOffsetsChecks makes sure of that) and can safely run
// a zombie fencing request
if (isSourceConnector(connName) && config.exactlyOnceSourceEnabled()) {
log.debug("Performing a round of zombie fencing before altering offsets for source connector {} with exactly-once support enabled.", connName);
log.debug("Performing a round of zombie fencing before modifying offsets for source connector {} with exactly-once support enabled.", connName);
doFenceZombieSourceTasks(connName, (error, ignored) -> {
if (error != null) {
log.error("Failed to perform zombie fencing for source connector prior to altering offsets", error);
callback.onCompletion(new ConnectException("Failed to perform zombie fencing for source connector prior to altering offsets",
error), null);
log.error("Failed to perform zombie fencing for source connector prior to modifying offsets", error);
callback.onCompletion(new ConnectException("Failed to perform zombie fencing for source connector prior to modifying offsets", error), null);
} else {
log.debug("Successfully completed zombie fencing for source connector {}; proceeding to alter offsets.", connName);
// We need to ensure that we perform the necessary checks again before proceeding to actually altering the connector offsets since
log.debug("Successfully completed zombie fencing for source connector {}; proceeding to modify offsets.", connName);
// We need to ensure that we perform the necessary checks again before proceeding to actually altering / resetting the connector offsets since
// zombie fencing is done asynchronously and the conditions could have changed since the previous check
addRequest(() -> {
if (alterConnectorOffsetsChecks(connName, callback)) {
worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback);
if (modifyConnectorOffsetsChecks(connName, callback)) {
if (isReset) {
worker.resetConnectorOffsets(connName, configState.connectorConfig(connName), callback);
} else {
worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback);
}
}
return null;
}, forwardErrorCallback(callback));
}
});
} else {
worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback);
if (isReset) {
worker.resetConnectorOffsets(connName, configState.connectorConfig(connName), callback);
} else {
worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, callback);
}
}
return null;
}, forwardErrorCallback(callback));
}

/**
* This method performs a few checks for alter connector offsets request and completes the callback exceptionally
* if any check fails.
* @param connName the name of the connector whose offsets are to be altered
* This method performs a few checks for external requests to modify (alter or reset) connector offsets and
* completes the callback exceptionally if any check fails.
* @param connName the name of the connector whose offsets are to be modified
* @param callback callback to invoke upon completion
* @return true if all the checks passed, false otherwise
*/
private boolean alterConnectorOffsetsChecks(String connName, Callback<Message> callback) {
private boolean modifyConnectorOffsetsChecks(String connName, Callback<Message> callback) {
if (checkRebalanceNeeded(callback)) {
return false;
}

if (!isLeader()) {
callback.onCompletion(new NotLeaderException("Only the leader can process alter offsets requests", leaderUrl()), null);
callback.onCompletion(new NotLeaderException("Only the leader can process external offsets modification requests", leaderUrl()), null);
return false;
}

if (!refreshConfigSnapshot(workerSyncTimeoutMs)) {
throw new ConnectException("Failed to read to end of config topic before altering connector offsets");
throw new ConnectException("Failed to read to end of config topic before modifying connector offsets");
}

if (!configState.contains(connName)) {
Expand All @@ -1587,10 +1595,11 @@ private boolean alterConnectorOffsetsChecks(String connName, Callback<Message> c
// If the target state for the connector is stopped, its task count is 0, and there is no rebalance pending (checked above),
// we can be sure that the tasks have at least been attempted to be stopped (or cancelled if they took too long to stop).
// Zombie tasks are handled by a round of zombie fencing for exactly once source connectors. Zombie sink tasks are handled
// naturally because requests to alter consumer group offsets will fail if there are still active members in the group.
// naturally because requests to alter consumer group offsets / delete consumer groups will fail if there are still active members
// in the group.
if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) {
callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be altered. This " +
"can be done for the specified connector by issuing a PUT request to the /connectors/" + connName + "/stop endpoint"), null);
callback.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be modified externally. " +
C0urante marked this conversation as resolved.
Show resolved Hide resolved
"This can be done for the specified connector by issuing a 'PUT' request to the '/connectors/" + connName + "/stop' endpoint"), null);
return false;
}
return true;
Expand Down
Expand Up @@ -367,6 +367,18 @@ public Response alterConnectorOffsets(final @Parameter(hidden = true) @QueryPara
return Response.ok().entity(msg).build();
}

@DELETE
@Path("/{connector}/offsets")
@Operation(summary = "Reset the offsets for the specified connector")
public Response resetConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
final @Context HttpHeaders headers, final @PathParam("connector") String connector) throws Throwable {
FutureCallback<Message> cb = new FutureCallback<>();
herder.resetConnectorOffsets(connector, cb);
Message msg = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "DELETE", headers, null,
new TypeReference<Message>() { }, new IdentityTranslator<>(), forward);
return Response.ok().entity(msg).build();
}

// Check whether the connector name from the url matches the one (if there is one) provided in the connectorConfig
// object. Throw BadRequestException on mismatch, otherwise put connectorName in config
private void checkAndPutConnectorConfigName(String connectorName, Map<String, String> connectorConfig) {
Expand Down
Expand Up @@ -374,19 +374,38 @@ public synchronized void connectorOffsets(String connName, Callback<ConnectorOff
}

@Override
public synchronized void alterConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
protected synchronized void modifyConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
if (!modifyConnectorOffsetsChecks(connName, cb)) {
return;
}

if (offsets == null) {
worker.resetConnectorOffsets(connName, configState.connectorConfig(connName), cb);
} else {
worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, cb);
}
}

/**
* This method performs a few checks for external requests to modify (alter or reset) connector offsets and
* completes the callback exceptionally if any check fails.
* @param connName the name of the connector whose offsets are to be modified
* @param cb callback to invoke upon completion
* @return true if all the checks passed, false otherwise
*/
private boolean modifyConnectorOffsetsChecks(String connName, Callback<Message> cb) {
if (!configState.contains(connName)) {
cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
return;
return false;
}

if (configState.targetState(connName) != TargetState.STOPPED || configState.taskCount(connName) != 0) {
cb.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be altered. " +
"This can be done for the specified connector by issuing a PUT request to the /connectors/" + connName + "/stop endpoint"), null);
return;
cb.onCompletion(new BadRequestException("Connectors must be in the STOPPED state before their offsets can be modified externally. " +
C0urante marked this conversation as resolved.
Show resolved Hide resolved
"This can be done for the specified connector by issuing a 'PUT' request to the '/connectors/" + connName + "/stop' endpoint"), null);
return false;
}

worker.alterConnectorOffsets(connName, configState.connectorConfig(connName), offsets, cb);
return true;
}

private void startConnector(String connName, Callback<TargetState> onStart) {
Expand Down
Expand Up @@ -73,8 +73,8 @@ public static ConnectorOffsets consumerGroupOffsetsToConnectorOffsets(Map<TopicP
* and then parse them into a mapping from {@link TopicPartition}s to their corresponding {@link Long}
* valued offsets.
*
* @param partitionOffsets the partitions to offset map that needs to be validated and parsed.
* @return the parsed mapping from {@link TopicPartition} to its corresponding {@link Long} valued offset.
* @param partitionOffsets the partitions to offset map that needs to be validated and parsed; may not be null or empty
* @return the parsed mapping from {@link TopicPartition}s to their corresponding {@link Long} valued offsets; may not be null or empty
*
* @throws BadRequestException if the provided offsets aren't in the expected format
*/
Expand Down