Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add first V3 cluster APIs (+ V3 plumbing). #624

Merged
merged 3 commits into from Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -65,6 +65,13 @@ public class KafkaRestConfig extends RestConfig {
+ " hostname is used";
public static final String HOST_NAME_DEFAULT = "";

public static final String ADVERTISED_LISTENERS_CONFIG = "advertised.listeners";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not spent any time thinking about the advantages of including absolute URLs in responses, but for your consideration, want to note that requiring this additional config seems like a material downside.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuration is not required. See UrlFactoryImpl for how the absolute URLs are generated. This configuration is to allow for generating URLs using a LB.

protected static final String ADVERTISED_LISTENERS_DOC =
"List of advertised listeners. Used when generating absolute URLs in responses. Protocols"
+ " http and https are supported. Each listener must include the protocol, hostname, and"
+ " port. For example: http://myhost:8080, https://0.0.0.0:8081";
protected static final String ADVERTISED_LISTENERS_DEFAULT = "";

public static final String CONSUMER_MAX_THREADS_CONFIG = "consumer.threads";
private static final String CONSUMER_MAX_THREADS_DOC =
"The maximum number of threads to run consumer requests on."
Expand Down Expand Up @@ -342,6 +349,13 @@ protected static ConfigDef baseKafkaRestConfigDef() {
HOST_NAME_DEFAULT,
Importance.MEDIUM, HOST_NAME_DOC
)
.define(
ADVERTISED_LISTENERS_CONFIG,
Type.LIST,
ADVERTISED_LISTENERS_DEFAULT,
Importance.MEDIUM,
ADVERTISED_LISTENERS_DOC
)
.define(
CONSUMER_MAX_THREADS_CONFIG,
Type.INT,
Expand Down
Expand Up @@ -20,6 +20,9 @@

public class Versions {

// Constants for version 3
public static final String JSON_API = "application/vnd.api+json";

// Constants for version 2
public static final String KAFKA_V2_JSON = "application/vnd.kafka.v2+json";
// This is set < 1 because it is only the most-specific type if there isn't an embedded data type.
Expand Down
Expand Up @@ -30,7 +30,7 @@ public class DefaultKafkaRestContext implements KafkaRestContext {
private ProducerPool producerPool;
private KafkaConsumerManager kafkaConsumerManager;
private AdminClientWrapper adminClientWrapper;

private AdminClient admin;

public DefaultKafkaRestContext(
KafkaRestConfig config,
Expand Down Expand Up @@ -87,12 +87,19 @@ public KafkaConsumerManager getKafkaConsumerManager() {
@Override
public AdminClientWrapper getAdminClientWrapper() {
if (adminClientWrapper == null) {
adminClientWrapper = new AdminClientWrapper(config,
AdminClient.create(AdminClientWrapper.adminProperties(config)));
adminClientWrapper = new AdminClientWrapper(config, getAdmin());
}
return adminClientWrapper;
}

@Override
public AdminClient getAdmin() {
if (admin == null) {
admin = AdminClient.create(AdminClientWrapper.adminProperties(config));
}
return admin;
}

@Override
public void shutdown() {
if (kafkaConsumerManager != null) {
Expand Down
Expand Up @@ -15,12 +15,16 @@

package io.confluent.kafkarest;

import io.confluent.kafkarest.backends.BackendsModule;
import io.confluent.kafkarest.config.ConfigModule;
import io.confluent.kafkarest.controllers.ControllersModule;
import io.confluent.kafkarest.extension.ContextInvocationHandler;
import io.confluent.kafkarest.extension.InstantConverterProvider;
import io.confluent.kafkarest.extension.KafkaRestCleanupFilter;
import io.confluent.kafkarest.extension.KafkaRestContextProvider;
import io.confluent.kafkarest.extension.RestResourceExtension;
import io.confluent.kafkarest.resources.ResourcesFeature;
import io.confluent.kafkarest.response.ResponseModule;
import io.confluent.kafkarest.v2.KafkaConsumerManager;
import io.confluent.rest.Application;
import io.confluent.rest.RestConfigException;
Expand Down Expand Up @@ -93,7 +97,13 @@ protected void setupInjectedResources(
new Class[]{KafkaRestContext.class},
contextInvocationHandler
);

config.register(new BackendsModule(context));
config.register(new ConfigModule(appConfig));
config.register(new ControllersModule());
config.register(new ResourcesFeature(context));
config.register(new ResponseModule());

config.register(KafkaRestCleanupFilter.class);
config.register(InstantConverterProvider.class);

Expand Down
Expand Up @@ -16,6 +16,7 @@
package io.confluent.kafkarest;

import io.confluent.kafkarest.v2.KafkaConsumerManager;
import org.apache.kafka.clients.admin.Admin;

public interface KafkaRestContext {
public KafkaRestConfig getConfig();
Expand All @@ -35,5 +36,7 @@ public interface KafkaRestContext {

public AdminClientWrapper getAdminClientWrapper();

Admin getAdmin();

void shutdown();
}
@@ -0,0 +1,38 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.kafkarest.backends;

import io.confluent.kafkarest.KafkaRestContext;
import io.confluent.kafkarest.backends.kafka.KafkaModule;
import java.util.Objects;
import org.glassfish.hk2.utilities.binding.AbstractBinder;

/**
* A module to configure access to external dependencies.
*/
public final class BackendsModule extends AbstractBinder {

private final KafkaRestContext context;

public BackendsModule(KafkaRestContext context) {
this.context = Objects.requireNonNull(context);
}

@Override
protected void configure() {
install(new KafkaModule(context));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this registered via BackendsModule? what's the advantage of the extra level?

(note: i may well ask some stupid questions due to ignorance of the frameworks etc., this may well be one of them).

}
}
@@ -0,0 +1,43 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.kafkarest.backends.kafka;

import io.confluent.kafkarest.KafkaRestContext;
import java.util.Objects;
import org.apache.kafka.clients.admin.Admin;
import org.glassfish.hk2.utilities.binding.AbstractBinder;

/**
* A module to configure access to Kafka.
*
* <p>Right now this module does little but delegate to {@link KafkaRestContext}, since access to
* Kafka is currently being configured there. It's the author's intention to move such logic here,
* and eliminate {@code KafkaRestContext}, once dependence injection is properly used elsewhere.</p>
*/
public final class KafkaModule extends AbstractBinder {

private final KafkaRestContext context;

public KafkaModule(KafkaRestContext context) {
this.context = Objects.requireNonNull(context);
}

@Override
protected void configure() {
// Reuse the AdminClient being constructed in KafkaRestContext.
bind(context.getAdmin()).to(Admin.class);
}
}
@@ -0,0 +1,108 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.kafkarest.config;

import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.rest.RestConfig;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.List;
import java.util.Objects;
import javax.inject.Qualifier;
import org.glassfish.hk2.api.AnnotationLiteral;
import org.glassfish.hk2.api.TypeLiteral;
import org.glassfish.hk2.utilities.binding.AbstractBinder;

/**
* A module to populate the injector with the configurations passed to this application.
*
* <p>In addition to {@link KafkaRestConfig}, which contains all configurations, individual
* configurations are also exposed, on a need-to-know basis.</p>
*/
public final class ConfigModule extends AbstractBinder {

private final KafkaRestConfig config;

public ConfigModule(KafkaRestConfig config) {
this.config = Objects.requireNonNull(config);
}

@Override
protected void configure() {
bind(config).to(KafkaRestConfig.class);

// Keep this list alphabetically sorted.
bind(config.getList(KafkaRestConfig.ADVERTISED_LISTENERS_CONFIG))
.qualifiedBy(new AdvertisedListenersConfigImpl())
.to(new TypeLiteral<List<String>>() { });

bind(config.getString(KafkaRestConfig.HOST_NAME_CONFIG))
.qualifiedBy(new HostNameConfigImpl())
.to(String.class);

bind(config.getList(RestConfig.LISTENERS_CONFIG))
.qualifiedBy(new ListenersConfigImpl())
.to(new TypeLiteral<List<String>>() { });

bind(config.getInt(RestConfig.PORT_CONFIG))
.qualifiedBy(new PortConfigImpl())
.to(Integer.class);
}

@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER })
public @interface AdvertisedListenersConfig {
}

private static final class AdvertisedListenersConfigImpl
extends AnnotationLiteral<AdvertisedListenersConfig> implements AdvertisedListenersConfig {
}

@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER })
@Deprecated
public @interface HostNameConfig {
}

private static final class HostNameConfigImpl
extends AnnotationLiteral<HostNameConfig> implements HostNameConfig {
}

@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER })
public @interface ListenersConfig {
}

private static final class ListenersConfigImpl
extends AnnotationLiteral<ListenersConfig> implements ListenersConfig {
}

@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER })
@Deprecated
public @interface PortConfig {
}

private static final class PortConfigImpl
extends AnnotationLiteral<PortConfig> implements PortConfig {
}
}
@@ -0,0 +1,43 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.kafkarest.controllers;

import io.confluent.kafkarest.entities.Cluster;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/**
* A service to manage Kafka {@link Cluster Clusters}.
*/
public interface ClusterManager {

/**
* Returns the list of Kafka {@link Cluster Clusters} known.
*
* <p>Right now only one cluster is known, namely, the cluster to which this application is
* connected to. Therefore, this method will always return at most 1 cluster.</p>
*/
CompletableFuture<List<Cluster>> listClusters();

/**
* Returns the Kafka {@link Cluster} with the given {@code clusterId}, or empty if no such cluster
* is known.
*
* <p>See {@link #listClusters()} for caveats about known clusters.</p>
*/
CompletableFuture<Optional<Cluster>> getCluster(String clusterId);
}