Skip to content

Commit

Permalink
KAFKA-12278; Ensure exposed api versions are consistent within listen…
Browse files Browse the repository at this point in the history
…er (#10666)

Previously all APIs were accessible on every listener exposed by the broker, but
with KIP-500, that is no longer true.  We now have more complex requirements for
API accessibility.

For example, the KIP-500 controller exposes some APIs which are not exposed by
brokers, such as BrokerHeartbeatRequest, and does not expose most client APIs,
such as JoinGroupRequest, etc.  Similarly, the KIP-500 broker does not implement
some APIs that the ZK-based broker does, such as LeaderAndIsrRequest and
UpdateFeaturesRequest.

All of this means that we need more sophistication in how we expose APIs and
keep them consistent with the ApiVersions API. Up until now, we have been
working around this using the controllerOnly flag inside ApiKeys, but this is
not rich enough to support all of the cases listed above.  This PR introduces a
new "listeners" field to the request schema definitions.  This field is an array
of strings which indicate the listener types in which the API should be exposed.
We currently support "zkBroker", "broker", and "controller".  ("broker"
indicates the KIP-500 broker, whereas zkBroker indicates the old broker).

This PR also creates ApiVersionManager to encapsulate the creation of the
ApiVersionsResponse based on the listener type.  Additionally, it modifies
SocketServer to check the listener type of received requests before forwarding
them to the request handler.

Finally, this PR also fixes a bug in the handling of the ApiVersionsResponse
prior to authentication. Previously a static response was sent, which means that
changes to features would not get reflected. This also meant that the logic to
ensure that only the intersection of version ranges supported by the controller
would get exposed did not work. I think this is important because some clients
rely on the initial pre-authenticated ApiVersions response rather than doing a
second round after authentication as the Java client does.

One final cleanup note: I have removed the expectation that envelope requests
are only allowed on "privileged" listeners.  This made sense initially because
we expected to use forwarding before the KIP-500 controller was available. That
is not the case anymore and we expect the Envelope API to only be exposed on the
controller listener. I have nevertheless preserved the existing workarounds to
allow verification of the forwarding behavior in integration testing.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
  • Loading branch information
hachikuji authored and cmccabe committed Feb 19, 2021
1 parent b50a78b commit 698319b
Show file tree
Hide file tree
Showing 113 changed files with 1,091 additions and 585 deletions.
2 changes: 2 additions & 0 deletions checkstyle/import-control.xml
Expand Up @@ -50,6 +50,7 @@

<subpackage name="common">
<allow class="org.apache.kafka.clients.consumer.ConsumerRecord" exact-match="true" />
<allow class="org.apache.kafka.common.message.ApiMessageType" exact-match="true" />
<disallow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.annotation" />
Expand Down Expand Up @@ -108,6 +109,7 @@
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.security" />
<allow class="org.apache.kafka.common.requests.ApiVersionsResponse" />
</subpackage>

<subpackage name="resource">
Expand Down
Expand Up @@ -16,9 +16,6 @@
*/
package org.apache.kafka.clients;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
Expand All @@ -27,7 +24,10 @@
import org.apache.kafka.common.utils.Utils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -62,7 +62,7 @@ public static NodeApiVersions create() {
*/
public static NodeApiVersions create(Collection<ApiVersion> overrides) {
List<ApiVersion> apiVersions = new LinkedList<>(overrides);
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
boolean exists = false;
for (ApiVersion apiVersion : apiVersions) {
if (apiVersion.apiKey() == apiKey.id) {
Expand Down Expand Up @@ -170,7 +170,7 @@ public String toString(boolean lineBreaks) {

// Also handle the case where some apiKey types are not specified at all in the given ApiVersions,
// which may happen when the remote is too old.
for (ApiKeys apiKey : ApiKeys.brokerApis()) {
for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
if (!apiKeysText.containsKey(apiKey.id)) {
StringBuilder bld = new StringBuilder();
bld.append(apiKey.name).append("(").
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.config.SslClientAuth;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
Expand All @@ -40,6 +41,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Supplier;

public class ChannelBuilders {
private static final Logger log = LoggerFactory.getLogger(ChannelBuilders.class);
Expand Down Expand Up @@ -77,7 +79,7 @@ public static ChannelBuilder clientChannelBuilder(
throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
}
return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism,
saslHandshakeRequestEnable, null, null, time, logContext);
saslHandshakeRequestEnable, null, null, time, logContext, null);
}

/**
Expand All @@ -89,6 +91,7 @@ public static ChannelBuilder clientChannelBuilder(
* @param tokenCache Delegation token cache
* @param time the time instance
* @param logContext the log context instance
* @param apiVersionSupplier supplier for ApiVersions responses sent prior to authentication
*
* @return the configured `ChannelBuilder`
*/
Expand All @@ -99,10 +102,11 @@ public static ChannelBuilder serverChannelBuilder(ListenerName listenerName,
CredentialCache credentialCache,
DelegationTokenCache tokenCache,
Time time,
LogContext logContext) {
LogContext logContext,
Supplier<ApiVersionsResponse> apiVersionSupplier) {
return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName,
isInterBrokerListener, null, true, credentialCache,
tokenCache, time, logContext);
tokenCache, time, logContext, apiVersionSupplier);
}

private static ChannelBuilder create(SecurityProtocol securityProtocol,
Expand All @@ -116,7 +120,8 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
CredentialCache credentialCache,
DelegationTokenCache tokenCache,
Time time,
LogContext logContext) {
LogContext logContext,
Supplier<ApiVersionsResponse> apiVersionSupplier) {
Map<String, Object> configs = channelBuilderConfigs(config, listenerName);

ChannelBuilder channelBuilder;
Expand Down Expand Up @@ -174,7 +179,8 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
tokenCache,
sslClientAuthOverride,
time,
logContext);
logContext,
apiVersionSupplier);
break;
case PLAINTEXT:
channelBuilder = new PlaintextChannelBuilder(listenerName);
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.Login;
Expand Down Expand Up @@ -85,6 +86,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
private final DelegationTokenCache tokenCache;
private final Map<String, LoginManager> loginManagers;
private final Map<String, Subject> subjects;
private final Supplier<ApiVersionsResponse> apiVersionSupplier;

private SslFactory sslFactory;
private Map<String, ?> configs;
Expand All @@ -108,7 +110,8 @@ public SaslChannelBuilder(Mode mode,
DelegationTokenCache tokenCache,
String sslClientAuthOverride,
Time time,
LogContext logContext) {
LogContext logContext,
Supplier<ApiVersionsResponse> apiVersionSupplier) {
this.mode = mode;
this.jaasContexts = jaasContexts;
this.loginManagers = new HashMap<>(jaasContexts.size());
Expand All @@ -126,6 +129,11 @@ public SaslChannelBuilder(Mode mode,
this.time = time;
this.logContext = logContext;
this.log = logContext.logger(getClass());
this.apiVersionSupplier = apiVersionSupplier;

if (mode == Mode.SERVER && apiVersionSupplier == null) {
throw new IllegalArgumentException("Server channel builder must provide an ApiVersionResponse supplier");
}
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -266,7 +274,7 @@ protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> config
ChannelMetadataRegistry metadataRegistry) {
return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects,
kerberosShortNamer, listenerName, securityProtocol, transportLayer,
connectionsMaxReauthMsByMechanism, metadataRegistry, time);
connectionsMaxReauthMsByMechanism, metadataRegistry, time, apiVersionSupplier);
}

// Visible to override for testing
Expand Down
71 changes: 47 additions & 24 deletions clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
Expand Up @@ -21,7 +21,10 @@
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.record.RecordBatch;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -90,19 +93,28 @@ public enum ApiKeys {
ALTER_CLIENT_QUOTAS(ApiMessageType.ALTER_CLIENT_QUOTAS, false, true),
DESCRIBE_USER_SCRAM_CREDENTIALS(ApiMessageType.DESCRIBE_USER_SCRAM_CREDENTIALS),
ALTER_USER_SCRAM_CREDENTIALS(ApiMessageType.ALTER_USER_SCRAM_CREDENTIALS, false, true),
VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, true),
BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, true),
END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, true),
DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false, true),
VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false),
BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false),
END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false),
DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false),
ALTER_ISR(ApiMessageType.ALTER_ISR, true),
UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, false, true),
ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, true),
FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false, true),
ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false),
FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false),
DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER),
DESCRIBE_PRODUCERS(ApiMessageType.DESCRIBE_PRODUCERS),
BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, RecordBatch.MAGIC_VALUE_V0, false, true),
BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false, true),
UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true, false);
BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, RecordBatch.MAGIC_VALUE_V0, false),
BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false),
UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true);

private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);

static {
for (ApiMessageType.ListenerType listenerType : ApiMessageType.ListenerType.values()) {
APIS_BY_LISTENER.put(listenerType, filterApisForListener(listenerType));
}
}

// The generator ensures every `ApiMessageType` has a unique id
private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values())
Expand All @@ -120,9 +132,6 @@ public enum ApiKeys {
/** indicates the minimum required inter broker magic required to support the API */
public final byte minRequiredInterBrokerMagic;

/** indicates whether this is an API which is only exposed by the KIP-500 controller **/
public final boolean isControllerOnlyApi;

/** indicates whether the API is enabled for forwarding **/
public final boolean forwardable;

Expand All @@ -142,24 +151,17 @@ public enum ApiKeys {
this(messageType, clusterAction, RecordBatch.MAGIC_VALUE_V0, forwardable);
}

ApiKeys(ApiMessageType messageType, boolean clusterAction, byte minRequiredInterBrokerMagic, boolean forwardable) {
this(messageType, clusterAction, minRequiredInterBrokerMagic, forwardable, false);
}

ApiKeys(
ApiMessageType messageType,
boolean clusterAction,
byte minRequiredInterBrokerMagic,
boolean forwardable,
boolean isControllerOnlyApi
boolean forwardable
) {
this.messageType = messageType;
this.id = messageType.apiKey();
this.name = messageType.name;
this.clusterAction = clusterAction;
this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic;
this.isControllerOnlyApi = isControllerOnlyApi;

this.requiresDelayedAllocation = forwardable || shouldRetainsBufferReference(messageType.requestSchemas());
this.forwardable = forwardable;
}
Expand Down Expand Up @@ -195,6 +197,14 @@ public short oldestVersion() {
return messageType.lowestSupportedVersion();
}

public List<Short> allVersions() {
List<Short> versions = new ArrayList<>(latestVersion() - oldestVersion() + 1);
for (short version = oldestVersion(); version < latestVersion(); version++) {
versions.add(version);
}
return versions;
}

public boolean isVersionSupported(short apiVersion) {
return apiVersion >= oldestVersion() && apiVersion <= latestVersion();
}
Expand All @@ -207,14 +217,18 @@ public short responseHeaderVersion(short apiVersion) {
return messageType.responseHeaderVersion(apiVersion);
}

public boolean inScope(ApiMessageType.ListenerType listener) {
return messageType.listeners().contains(listener);
}

private static String toHtml() {
final StringBuilder b = new StringBuilder();
b.append("<table class=\"data-table\"><tbody>\n");
b.append("<tr>");
b.append("<th>Name</th>\n");
b.append("<th>Key</th>\n");
b.append("</tr>");
for (ApiKeys key : ApiKeys.brokerApis()) {
for (ApiKeys key : zkBrokerApis()) {
b.append("<tr>\n");
b.append("<td>");
b.append("<a href=\"#The_Messages_" + key.name + "\">" + key.name + "</a>");
Expand Down Expand Up @@ -246,10 +260,19 @@ public void visit(Type field) {
return hasBuffer.get();
}

public static List<ApiKeys> brokerApis() {
return Arrays.stream(values())
.filter(api -> !api.isControllerOnlyApi)
public static EnumSet<ApiKeys> zkBrokerApis() {
return apisForListener(ApiMessageType.ListenerType.ZK_BROKER);
}

public static EnumSet<ApiKeys> apisForListener(ApiMessageType.ListenerType listener) {
return APIS_BY_LISTENER.get(listener);
}

private static EnumSet<ApiKeys> filterApisForListener(ApiMessageType.ListenerType listener) {
List<ApiKeys> controllerApis = Arrays.stream(ApiKeys.values())
.filter(apiKey -> apiKey.messageType.listeners().contains(listener))
.collect(Collectors.toList());
return EnumSet.copyOf(controllerApis);
}

}
Expand Up @@ -133,7 +133,7 @@ public static String toHtml() {
b.append("</pre>\n");
schemaToFieldTableHtml(ResponseHeaderData.SCHEMAS[i], b);
}
for (ApiKeys key : ApiKeys.brokerApis()) {
for (ApiKeys key : ApiKeys.zkBrokerApis()) {
// Key
b.append("<h5>");
b.append("<a name=\"The_Messages_" + key.name + "\">");
Expand Down

0 comments on commit 698319b

Please sign in to comment.