Skip to content

Commit

Permalink
KAFKA-16093: Fix spurious REST-related warnings on Connect startup (a…
Browse files Browse the repository at this point in the history
…pache#15149)

Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, Greg Harris <greg.harris@aiven.io>
  • Loading branch information
C0urante authored and clolov committed Apr 5, 2024
1 parent f149e43 commit 0a18837
Show file tree
Hide file tree
Showing 22 changed files with 239 additions and 185 deletions.
5 changes: 5 additions & 0 deletions checkstyle/import-control.xml
Expand Up @@ -516,6 +516,9 @@
<allow pkg="kafka.server" />
<subpackage name="rest">
<allow pkg="javax.ws.rs" />
<allow pkg="javax.inject" />
<allow pkg="org.glassfish.jersey" />
<allow pkg="org.glassfish.hk2" />
</subpackage>
</subpackage>

Expand All @@ -530,6 +533,8 @@
<subpackage name="rest">
<allow pkg="org.eclipse.jetty" />
<allow pkg="javax.ws.rs" />
<allow pkg="javax.inject" />
<allow pkg="org.glassfish.hk2" />
<allow pkg="javax.servlet" />
<allow pkg="org.glassfish.jersey" />
<allow pkg="com.fasterxml.jackson" />
Expand Down
Expand Up @@ -22,7 +22,9 @@
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.glassfish.hk2.api.TypeLiteral;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -45,15 +47,28 @@ public void initializeInternalResources(Map<SourceAndTarget, Herder> herders) {
}

@Override
protected Collection<ConnectResource> regularResources() {
protected Collection<Class<?>> regularResources() {
return Arrays.asList(
new InternalMirrorResource(herders, restClient)
InternalMirrorResource.class
);
}

@Override
protected Collection<ConnectResource> adminResources() {
protected Collection<Class<?>> adminResources() {
return Collections.emptyList();
}

@Override
protected void configureRegularResources(ResourceConfig resourceConfig) {
resourceConfig.register(new Binder());
}

private class Binder extends AbstractBinder {
@Override
protected void configure() {
bind(herders).to(new TypeLiteral<Map<SourceAndTarget, Herder>>() { });
bind(restClient).to(RestClient.class);
}
}

}
Expand Up @@ -19,10 +19,12 @@
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.core.Context;
Expand All @@ -39,8 +41,13 @@ public class InternalMirrorResource extends InternalClusterResource {

private final Map<SourceAndTarget, Herder> herders;

public InternalMirrorResource(Map<SourceAndTarget, Herder> herders, RestClient restClient) {
super(restClient);
@Inject
public InternalMirrorResource(
Map<SourceAndTarget, Herder> herders,
RestClient restClient,
RestRequestTimeout requestTimeout
) {
super(restClient, requestTimeout);
this.herders = herders;
}

Expand Down
Expand Up @@ -66,10 +66,10 @@
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
Expand Down Expand Up @@ -718,7 +718,7 @@ public KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String,
.map(this::taskTransactionalId)
.collect(Collectors.toList());
FenceProducersOptions fencingOptions = new FenceProducersOptions()
.timeoutMs((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
.timeoutMs((int) RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
return admin.fenceProducers(transactionalIds, fencingOptions).all().whenComplete((ignored, error) -> {
if (error == null)
log.debug("Finished fencing out {} task producers for source connector {}", numTasks, connName);
Expand Down Expand Up @@ -1195,7 +1195,7 @@ void sinkConnectorOffsets(String connName, Connector connector, Map<String, Stri
Admin admin = adminFactory.apply(adminConfig);
try {
ListConsumerGroupOffsetsOptions listOffsetsOptions = new ListConsumerGroupOffsetsOptions()
.timeoutMs((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
.timeoutMs((int) RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = admin.listConsumerGroupOffsets(groupId, listOffsetsOptions);
listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().whenComplete((result, error) -> {
if (error != null) {
Expand Down Expand Up @@ -1299,7 +1299,7 @@ void modifySinkConnectorOffsets(String connName, Connector connector, Map<String
Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
executor.submit(plugins.withClassLoader(connectorLoader, () -> {
try {
Timer timer = time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS));
Timer timer = time.timer(Duration.ofMillis(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS));
boolean isReset = offsets == null;
SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
Class<? extends Connector> sinkConnectorClass = connector.getClass();
Expand Down Expand Up @@ -1530,7 +1530,7 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map<Stri
ClassLoader connectorLoader, Callback<Message> cb) {
executor.submit(plugins.withClassLoader(connectorLoader, () -> {
try {
Timer timer = time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS));
Timer timer = time.timer(Duration.ofMillis(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS));
// This reads to the end of the offsets topic and can be a potentially time-consuming operation
offsetStore.start();
updateTimerAndCheckExpiry(timer, "Timed out while trying to read to the end of the offsets topic prior to modifying " +
Expand Down
Expand Up @@ -17,12 +17,12 @@
package org.apache.kafka.connect.runtime.rest;

import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource;
import org.apache.kafka.connect.runtime.rest.resources.LoggingResource;
import org.apache.kafka.connect.runtime.rest.resources.RootResource;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig;

import java.util.Arrays;
Expand All @@ -45,25 +45,40 @@ public void initializeResources(Herder herder) {
}

@Override
protected Collection<ConnectResource> regularResources() {
protected Collection<Class<?>> regularResources() {
return Arrays.asList(
new RootResource(herder),
new ConnectorsResource(herder, config, restClient),
new InternalConnectResource(herder, restClient),
new ConnectorPluginsResource(herder)
RootResource.class,
ConnectorsResource.class,
InternalConnectResource.class,
ConnectorPluginsResource.class
);
}

@Override
protected Collection<ConnectResource> adminResources() {
protected Collection<Class<?>> adminResources() {
return Arrays.asList(
new LoggingResource(herder)
LoggingResource.class
);
}

@Override
protected void configureRegularResources(ResourceConfig resourceConfig) {
registerRestExtensions(herder, resourceConfig);
resourceConfig.register(new Binder());
}

private class Binder extends AbstractBinder {
@Override
protected void configure() {
bind(herder).to(Herder.class);
bind(restClient).to(RestClient.class);
bind(config).to(RestServerConfig.class);
}
}

@Override
protected void configureAdminResources(ResourceConfig resourceConfig) {
resourceConfig.register(new Binder());
}

}
Expand Up @@ -41,18 +41,11 @@ public class HerderRequestHandler {

private final RestClient restClient;

private volatile long requestTimeoutMs;
private final RestRequestTimeout requestTimeout;

public HerderRequestHandler(RestClient restClient, long requestTimeoutMs) {
public HerderRequestHandler(RestClient restClient, RestRequestTimeout requestTimeout) {
this.restClient = restClient;
this.requestTimeoutMs = requestTimeoutMs;
}

public void requestTimeoutMs(long requestTimeoutMs) {
if (requestTimeoutMs < 1) {
throw new IllegalArgumentException("REST request timeout must be positive");
}
this.requestTimeoutMs = requestTimeoutMs;
this.requestTimeout = requestTimeout;
}

/**
Expand All @@ -64,7 +57,7 @@ public void requestTimeoutMs(long requestTimeoutMs) {
*/
public <T> T completeRequest(FutureCallback<T> cb) throws Throwable {
try {
return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
return cb.get(requestTimeout.timeoutMs(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw e.getCause();
} catch (StagedTimeoutException e) {
Expand Down
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.rest;

public interface RestRequestTimeout {

/**
* @return the current timeout that should be used for REST requests, in milliseconds
*/
long timeoutMs();

}

0 comments on commit 0a18837

Please sign in to comment.