Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -600,10 +600,11 @@ private void processDisconnection(List<ClientResponse> responses, String nodeId,
connectionStates.disconnected(nodeId, now);
apiVersions.remove(nodeId);
nodesNeedingApiVersionsFetch.remove(nodeId);
switch (disconnectState.state()) {
switch (disconnectState.value()) {
case AUTHENTICATION_FAILED:
connectionStates.authenticationFailed(nodeId, now, disconnectState.exception());
log.error("Connection to node {} failed authentication due to: {}", nodeId, disconnectState.exception().getMessage());
ChannelState.AuthenticationFailed authFailedState = disconnectState.asAuthenticationFailed();
connectionStates.authenticationFailed(nodeId, now, authFailedState.exception());
log.error("Connection to node {} failed authentication due to: {}", nodeId, authFailedState.exception().getMessage());
break;
case AUTHENTICATE:
// This warning applies to older brokers which dont provide feedback on authentication failures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.kafka.common.errors.AuthenticationException;

import java.util.Objects;

/**
* States for KafkaChannel:
* <ul>
Expand All @@ -37,8 +39,8 @@
* to a send failure.</li>
* <li>AUTHENTICATION_FAILED: Channels are moved to this state if the requested SASL mechanism is not
* enabled in the broker or when brokers with versions 1.0.0 and above provide an error response
* during SASL authentication. {@link #exception()} gives the reason provided by the broker for
* authentication failure.</li>
* during SASL authentication. {@link AuthenticationFailed#exception()} gives the reason provided by the broker
* for authentication failure.</li>
* <li>LOCAL_CLOSE: Channels are moved to LOCAL_CLOSE state if close() is initiated locally.</li>
* </ul>
* If the remote endpoint closes a channel, the state of the channel reflects the state the channel
Expand All @@ -53,7 +55,7 @@
* <li>Security misconfiguration with older broker: NOT_CONNECTED => AUTHENTICATE, disconnected in AUTHENTICATE state</li>
* </ul>
*/
public class ChannelState {
public abstract class ChannelState {
public enum State {
NOT_CONNECTED,
AUTHENTICATE,
Expand All @@ -62,31 +64,62 @@ public enum State {
FAILED_SEND,
AUTHENTICATION_FAILED,
LOCAL_CLOSE
};
// AUTHENTICATION_FAILED has a custom exception. For other states,
// create a reusable `ChannelState` instance per-state.
public static final ChannelState NOT_CONNECTED = new ChannelState(State.NOT_CONNECTED);
public static final ChannelState AUTHENTICATE = new ChannelState(State.AUTHENTICATE);
public static final ChannelState READY = new ChannelState(State.READY);
public static final ChannelState EXPIRED = new ChannelState(State.EXPIRED);
public static final ChannelState FAILED_SEND = new ChannelState(State.FAILED_SEND);
public static final ChannelState LOCAL_CLOSE = new ChannelState(State.LOCAL_CLOSE);

private final State state;
private final AuthenticationException exception;
public ChannelState(State state) {
this(state, null);
}
public ChannelState(State state, AuthenticationException exception) {
this.state = state;
this.exception = exception;

public static final ChannelState NOT_CONNECTED = new BasicChannelState(State.NOT_CONNECTED);
public static final ChannelState AUTHENTICATE = new BasicChannelState(State.AUTHENTICATE);
public static final ChannelState READY = new BasicChannelState(State.READY);
public static final ChannelState EXPIRED = new BasicChannelState(State.EXPIRED);
public static final ChannelState FAILED_SEND = new BasicChannelState(State.FAILED_SEND);
public static final ChannelState LOCAL_CLOSE = new BasicChannelState(State.LOCAL_CLOSE);

private final State value;

ChannelState(State value) {
this.value = value;
}

public State value() {
return value;
}

/**
* This should only be called after checking that `value == State.AUTHENTICATION_FAILED`.
* @throws ClassCastException if value != State.AUTHENTICATION_FAILED.
*/
public AuthenticationFailed asAuthenticationFailed() {
return (AuthenticationFailed) this;
}

public State state() {
return state;
@Override
public String toString() {
return "ChannelState(" + value.toString() + ")";
}

public AuthenticationException exception() {
return exception;
private static class BasicChannelState extends ChannelState {
BasicChannelState(State state) {
super(state);
}
}

public static class AuthenticationFailed extends ChannelState {

private final AuthenticationException exception;

public AuthenticationFailed(AuthenticationException exception) {
super(State.AUTHENTICATION_FAILED);
Objects.requireNonNull(exception, "exception should not be null");
this.exception = exception;
}

public AuthenticationException exception() {
return exception;
}

@Override
public String toString() {
return "AuthenticationFailed(exception=" + exception + ")";
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void prepare() throws IOException {
case SASL_AUTHENTICATION_FAILED:
case ILLEGAL_SASL_STATE:
case UNSUPPORTED_SASL_MECHANISM:
state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED, e);
state = new ChannelState.AuthenticationFailed(e);
break;
default:
// Other errors are handled as network exceptions in Selector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,6 @@ public static void waitForChannelClose(Selector selector, String node, ChannelSt
}
}
assertTrue("Channel was not closed by timeout", closed);
assertEquals(channelState, selector.disconnected().get(node).state());
assertEquals(channelState, selector.disconnected().get(node).value());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void testEndpointIdentificationNoReverseLookup() throws Exception {
InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);

NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value());
}

/**
Expand Down Expand Up @@ -230,7 +230,7 @@ public void testInvalidEndpointIdentification() throws Exception {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);

NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value());
}

/**
Expand Down Expand Up @@ -295,7 +295,7 @@ public void testListenerConfigOverride() throws Exception {
sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
createSelector(sslClientConfigs);
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value());
selector.close();
server.close();

Expand Down Expand Up @@ -323,7 +323,7 @@ public void testClientAuthenticationRequiredUntrustedProvided() throws Exception
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);

NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value());
}

/**
Expand All @@ -343,7 +343,7 @@ public void testClientAuthenticationRequiredNotProvided() throws Exception {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);

NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value());
}

/**
Expand Down Expand Up @@ -495,7 +495,7 @@ public void testInvalidKeyPassword() throws Exception {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);

NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value());
}

/**
Expand All @@ -512,7 +512,7 @@ public void testUnsupportedTLSVersion() throws Exception {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);

NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value());
}

/**
Expand All @@ -530,7 +530,7 @@ public void testUnsupportedCiphers() throws Exception {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);

NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ public void testSaslHandshakeRequestWithUnsupportedVersion() throws Exception {
selector.send(request.toSend(node1, header));
// This test uses a non-SASL PLAINTEXT client in order to do manual handshake.
// So the channel is in READY state.
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state());
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.value());
selector.close();

// Test good connection still works
Expand All @@ -487,7 +487,7 @@ public void testInvalidSaslPacket() throws Exception {
byte[] bytes = new byte[1024];
random.nextBytes(bytes);
selector.send(new NetworkSend(node1, ByteBuffer.wrap(bytes)));
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state());
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.value());
selector.close();

// Test good connection still works
Expand All @@ -498,7 +498,7 @@ public void testInvalidSaslPacket() throws Exception {
createClientConnection(SecurityProtocol.PLAINTEXT, node2);
random.nextBytes(bytes);
selector.send(new NetworkSend(node2, ByteBuffer.wrap(bytes)));
NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state());
NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.value());
selector.close();

// Test good connection still works
Expand Down Expand Up @@ -526,7 +526,7 @@ public void testInvalidApiVersionsRequestSequence() throws Exception {
ApiVersionsRequest request = createApiVersionsRequestV0();
RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS, request.version(), "someclient", 2);
selector.send(request.toSend(node1, versionsHeader));
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state());
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.value());
selector.close();

// Test good connection still works
Expand All @@ -553,7 +553,7 @@ public void testPacketSizeTooBig() throws Exception {
buffer.put(new byte[buffer.capacity() - 4]);
buffer.rewind();
selector.send(new NetworkSend(node1, buffer));
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state());
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.value());
selector.close();

// Test good connection still works
Expand All @@ -567,7 +567,7 @@ public void testPacketSizeTooBig() throws Exception {
buffer.put(new byte[buffer.capacity() - 4]);
buffer.rewind();
selector.send(new NetworkSend(node2, buffer));
NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state());
NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.value());
selector.close();

// Test good connection still works
Expand All @@ -592,7 +592,7 @@ public void testDisallowedKafkaRequestsBeforeAuthentication() throws Exception {
RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA, metadataRequest1.version(),
"someclient", 1);
selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1));
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state());
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.value());
selector.close();

// Test good connection still works
Expand All @@ -606,7 +606,7 @@ public void testDisallowedKafkaRequestsBeforeAuthentication() throws Exception {
RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA,
metadataRequest2.version(), "someclient", 2);
selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2));
NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state());
NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.value());
selector.close();

// Test good connection still works
Expand Down Expand Up @@ -912,7 +912,7 @@ private void verifySaslAuthenticateHeaderInteropWithFailure(boolean enableHeader
// Without SASL_AUTHENTICATE headers, disconnect state is ChannelState.AUTHENTICATE which is
// a hint that channel was closed during authentication, unlike ChannelState.AUTHENTICATE_FAILED
// which is an actual authentication failure reported by the broker.
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value());
}

private void createServer(SecurityProtocol securityProtocol, String saslMechanism,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import joptsimple._
import kafka.common.Config
import kafka.common.InvalidConfigException
import kafka.log.LogConfig
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, QuotaId}
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
import kafka.utils.{CommandLineUtils, ZkUtils}
import kafka.utils.Implicits._
import org.apache.kafka.common.security.JaasUtils
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/kafka/server/ClientQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package kafka.server

import java.net.{URLEncoder, URLDecoder}
import java.nio.charset.StandardCharsets
import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock

Expand Down