Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16093: Fix spurious REST-related warnings on Connect startup #15149

Merged
merged 5 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,9 @@
<allow pkg="kafka.server" />
<subpackage name="rest">
<allow pkg="javax.ws.rs" />
<allow pkg="javax.inject" />
<allow pkg="org.glassfish.jersey" />
<allow pkg="org.glassfish.hk2" />
</subpackage>
</subpackage>

Expand All @@ -530,6 +533,8 @@
<subpackage name="rest">
<allow pkg="org.eclipse.jetty" />
<allow pkg="javax.ws.rs" />
<allow pkg="javax.inject" />
<allow pkg="org.glassfish.hk2" />
<allow pkg="javax.servlet" />
<allow pkg="org.glassfish.jersey" />
<allow pkg="com.fasterxml.jackson" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.glassfish.hk2.api.TypeLiteral;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -45,15 +47,28 @@ public void initializeInternalResources(Map<SourceAndTarget, Herder> herders) {
}

@Override
protected Collection<ConnectResource> regularResources() {
protected Collection<Class<?>> regularResources() {
return Arrays.asList(
new InternalMirrorResource(herders, restClient)
InternalMirrorResource.class
);
}

@Override
protected Collection<ConnectResource> adminResources() {
protected Collection<Class<?>> adminResources() {
return Collections.emptyList();
}

@Override
protected void configureRegularResources(ResourceConfig resourceConfig) {
resourceConfig.register(new Binder());
}

private class Binder extends AbstractBinder {
@Override
protected void configure() {
bind(herders).to(new TypeLiteral<Map<SourceAndTarget, Herder>>() { });
bind(restClient).to(RestClient.class);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.core.Context;
Expand All @@ -39,8 +41,13 @@ public class InternalMirrorResource extends InternalClusterResource {

private final Map<SourceAndTarget, Herder> herders;

public InternalMirrorResource(Map<SourceAndTarget, Herder> herders, RestClient restClient) {
super(restClient);
@Inject
public InternalMirrorResource(
Map<SourceAndTarget, Herder> herders,
RestClient restClient,
RestRequestTimeout requestTimeout
) {
super(restClient, requestTimeout);
this.herders = herders;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
Expand Down Expand Up @@ -718,7 +718,7 @@ public KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String,
.map(this::taskTransactionalId)
.collect(Collectors.toList());
FenceProducersOptions fencingOptions = new FenceProducersOptions()
.timeoutMs((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
.timeoutMs((int) RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
return admin.fenceProducers(transactionalIds, fencingOptions).all().whenComplete((ignored, error) -> {
if (error == null)
log.debug("Finished fencing out {} task producers for source connector {}", numTasks, connName);
Expand Down Expand Up @@ -1195,7 +1195,7 @@ void sinkConnectorOffsets(String connName, Connector connector, Map<String, Stri
Admin admin = adminFactory.apply(adminConfig);
try {
ListConsumerGroupOffsetsOptions listOffsetsOptions = new ListConsumerGroupOffsetsOptions()
.timeoutMs((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
.timeoutMs((int) RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = admin.listConsumerGroupOffsets(groupId, listOffsetsOptions);
listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().whenComplete((result, error) -> {
if (error != null) {
Expand Down Expand Up @@ -1299,7 +1299,7 @@ void modifySinkConnectorOffsets(String connName, Connector connector, Map<String
Map<Map<String, ?>, Map<String, ?>> offsets, ClassLoader connectorLoader, Callback<Message> cb) {
executor.submit(plugins.withClassLoader(connectorLoader, () -> {
try {
Timer timer = time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS));
Timer timer = time.timer(Duration.ofMillis(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS));
boolean isReset = offsets == null;
SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig);
Class<? extends Connector> sinkConnectorClass = connector.getClass();
Expand Down Expand Up @@ -1530,7 +1530,7 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map<Stri
ClassLoader connectorLoader, Callback<Message> cb) {
executor.submit(plugins.withClassLoader(connectorLoader, () -> {
try {
Timer timer = time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS));
Timer timer = time.timer(Duration.ofMillis(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS));
// This reads to the end of the offsets topic and can be a potentially time-consuming operation
offsetStore.start();
updateTimerAndCheckExpiry(timer, "Timed out while trying to read to the end of the offsets topic prior to modifying " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.kafka.connect.runtime.rest;

import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource;
import org.apache.kafka.connect.runtime.rest.resources.LoggingResource;
import org.apache.kafka.connect.runtime.rest.resources.RootResource;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig;

import java.util.Arrays;
Expand All @@ -45,25 +45,40 @@ public void initializeResources(Herder herder) {
}

@Override
protected Collection<ConnectResource> regularResources() {
protected Collection<Class<?>> regularResources() {
return Arrays.asList(
new RootResource(herder),
new ConnectorsResource(herder, config, restClient),
new InternalConnectResource(herder, restClient),
new ConnectorPluginsResource(herder)
RootResource.class,
ConnectorsResource.class,
InternalConnectResource.class,
ConnectorPluginsResource.class
);
}

@Override
protected Collection<ConnectResource> adminResources() {
protected Collection<Class<?>> adminResources() {
return Arrays.asList(
new LoggingResource(herder)
LoggingResource.class
);
}

@Override
protected void configureRegularResources(ResourceConfig resourceConfig) {
registerRestExtensions(herder, resourceConfig);
resourceConfig.register(new Binder());
}

private class Binder extends AbstractBinder {
@Override
protected void configure() {
bind(herder).to(Herder.class);
bind(restClient).to(RestClient.class);
bind(config).to(RestServerConfig.class);
}
}

Comment on lines +70 to +78
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could probably move the class definition to the bottom so that all configureXXXResources method are together

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true, but not a blocker.

@Override
protected void configureAdminResources(ResourceConfig resourceConfig) {
resourceConfig.register(new Binder());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,11 @@ public class HerderRequestHandler {

private final RestClient restClient;

private volatile long requestTimeoutMs;
private final RestRequestTimeout requestTimeout;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field requestTimeoutMs was made volatile as part of here. I don't think we need it now, but just wanted to understand why was it added over there (you can skip the explanation if it's too complex :) )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is final, it wouldn't make sense to mark it volatile.

The underlying mutable value that it returns from timeoutMs() is still marked volatile in the only non-test implementation here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't intend to mark the final field as volatile. I wanted to know why the field was marked as volatile in the old PR since you had made the change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, gotcha. The intent was to make it thread-safe since it's likely that writes and reads to/of that value will take place on different threads.


public HerderRequestHandler(RestClient restClient, long requestTimeoutMs) {
public HerderRequestHandler(RestClient restClient, RestRequestTimeout requestTimeout) {
this.restClient = restClient;
this.requestTimeoutMs = requestTimeoutMs;
}

public void requestTimeoutMs(long requestTimeoutMs) {
if (requestTimeoutMs < 1) {
throw new IllegalArgumentException("REST request timeout must be positive");
}
Comment on lines -52 to -54
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this validation was needed in the past as well, but now I don't see it being present. Do you think it's needed?

Copy link
Contributor Author

@C0urante C0urante Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it for two reasons:

  1. This code path is only ever used in testing code
  2. It's unclear that the validation would be safer than allowing negative values. At least on my machine, negative values don't cause any issues with, e.g., ConvertingFutureCallback::get.

this.requestTimeoutMs = requestTimeoutMs;
this.requestTimeout = requestTimeout;
}

/**
Expand All @@ -64,7 +57,7 @@ public void requestTimeoutMs(long requestTimeoutMs) {
*/
public <T> T completeRequest(FutureCallback<T> cb) throws Throwable {
try {
return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
return cb.get(requestTimeout.timeoutMs(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw e.getCause();
} catch (StagedTimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.rest;

public interface RestRequestTimeout {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Mark this interface as @FunctionalInterface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see some benefit to this, but it's also not a blocker.

It's also debatable, since this interface may change in the future and we don't want to imply that it can only have exactly one abstract method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Makes sense.


/**
* @return the current timeout that should be used for REST requests, in milliseconds
*/
long timeoutMs();

}