From accbbbf7f938c7b31e2456a32ab7b6e63d5b8cf5 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 22 Sep 2017 10:36:52 +0100 Subject: [PATCH] MINOR: Move ChannelState.exception() to its own class Given that Java doesn't support pattern matching, the benefit is not as clear. Also implemented ChannelState.toString and removed some unused imports. --- .../apache/kafka/clients/NetworkClient.java | 7 +- .../kafka/common/network/ChannelState.java | 81 +++++++++++++------ .../kafka/common/network/KafkaChannel.java | 2 +- .../common/network/NetworkTestUtils.java | 2 +- .../common/network/SslTransportLayerTest.java | 16 ++-- .../authenticator/SaslAuthenticatorTest.java | 18 ++--- .../scala/kafka/admin/ConfigCommand.scala | 2 +- .../kafka/server/ClientQuotaManager.scala | 2 - 8 files changed, 81 insertions(+), 49 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index f8da42c3c62d8..b2583473c3388 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -600,10 +600,11 @@ private void processDisconnection(List responses, String nodeId, connectionStates.disconnected(nodeId, now); apiVersions.remove(nodeId); nodesNeedingApiVersionsFetch.remove(nodeId); - switch (disconnectState.state()) { + switch (disconnectState.value()) { case AUTHENTICATION_FAILED: - connectionStates.authenticationFailed(nodeId, now, disconnectState.exception()); - log.error("Connection to node {} failed authentication due to: {}", nodeId, disconnectState.exception().getMessage()); + ChannelState.AuthenticationFailed authFailedState = disconnectState.asAuthenticationFailed(); + connectionStates.authenticationFailed(nodeId, now, authFailedState.exception()); + log.error("Connection to node {} failed authentication due to: {}", nodeId, authFailedState.exception().getMessage()); break; case AUTHENTICATE: // This warning applies to older brokers which dont provide feedback on authentication failures diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java index 08ed1a04c94f9..d9530f7e8cf66 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java @@ -18,6 +18,8 @@ import org.apache.kafka.common.errors.AuthenticationException; +import java.util.Objects; + /** * States for KafkaChannel: * * If the remote endpoint closes a channel, the state of the channel reflects the state the channel @@ -53,7 +55,7 @@ *
  • Security misconfiguration with older broker: NOT_CONNECTED => AUTHENTICATE, disconnected in AUTHENTICATE state
  • * */ -public class ChannelState { +public abstract class ChannelState { public enum State { NOT_CONNECTED, AUTHENTICATE, @@ -62,31 +64,62 @@ public enum State { FAILED_SEND, AUTHENTICATION_FAILED, LOCAL_CLOSE - }; - // AUTHENTICATION_FAILED has a custom exception. For other states, - // create a reusable `ChannelState` instance per-state. - public static final ChannelState NOT_CONNECTED = new ChannelState(State.NOT_CONNECTED); - public static final ChannelState AUTHENTICATE = new ChannelState(State.AUTHENTICATE); - public static final ChannelState READY = new ChannelState(State.READY); - public static final ChannelState EXPIRED = new ChannelState(State.EXPIRED); - public static final ChannelState FAILED_SEND = new ChannelState(State.FAILED_SEND); - public static final ChannelState LOCAL_CLOSE = new ChannelState(State.LOCAL_CLOSE); - - private final State state; - private final AuthenticationException exception; - public ChannelState(State state) { - this(state, null); } - public ChannelState(State state, AuthenticationException exception) { - this.state = state; - this.exception = exception; + + public static final ChannelState NOT_CONNECTED = new BasicChannelState(State.NOT_CONNECTED); + public static final ChannelState AUTHENTICATE = new BasicChannelState(State.AUTHENTICATE); + public static final ChannelState READY = new BasicChannelState(State.READY); + public static final ChannelState EXPIRED = new BasicChannelState(State.EXPIRED); + public static final ChannelState FAILED_SEND = new BasicChannelState(State.FAILED_SEND); + public static final ChannelState LOCAL_CLOSE = new BasicChannelState(State.LOCAL_CLOSE); + + private final State value; + + ChannelState(State value) { + this.value = value; + } + + public State value() { + return value; + } + + /** + * This should only be called after checking that `value == State.AUTHENTICATION_FAILED`. + * @throws ClassCastException if value != State.AUTHENTICATION_FAILED. + */ + public AuthenticationFailed asAuthenticationFailed() { + return (AuthenticationFailed) this; } - public State state() { - return state; + @Override + public String toString() { + return "ChannelState(" + value.toString() + ")"; } - public AuthenticationException exception() { - return exception; + private static class BasicChannelState extends ChannelState { + BasicChannelState(State state) { + super(state); + } + } + + public static class AuthenticationFailed extends ChannelState { + + private final AuthenticationException exception; + + public AuthenticationFailed(AuthenticationException exception) { + super(State.AUTHENTICATION_FAILED); + Objects.requireNonNull(exception, "exception should not be null"); + this.exception = exception; + } + + public AuthenticationException exception() { + return exception; + } + + @Override + public String toString() { + return "AuthenticationFailed(exception=" + exception + ")"; + } } } + diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 24cd9cfabea19..05ff95b497996 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -82,7 +82,7 @@ public void prepare() throws IOException { case SASL_AUTHENTICATION_FAILED: case ILLEGAL_SASL_STATE: case UNSUPPORTED_SASL_MECHANISM: - state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED, e); + state = new ChannelState.AuthenticationFailed(e); break; default: // Other errors are handled as network exceptions in Selector diff --git a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java index a4ce66cecaf69..ff09e7c9eb35d 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java @@ -89,6 +89,6 @@ public static void waitForChannelClose(Selector selector, String node, ChannelSt } } assertTrue("Channel was not closed by timeout", closed); - assertEquals(channelState, selector.disconnected().get(node).state()); + assertEquals(channelState, selector.disconnected().get(node).value()); } } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index cffcc89d9134e..ea7edfcc43c65 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -159,7 +159,7 @@ public void testEndpointIdentificationNoReverseLookup() throws Exception { InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state()); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value()); } /** @@ -230,7 +230,7 @@ public void testInvalidEndpointIdentification() throws Exception { InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state()); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value()); } /** @@ -295,7 +295,7 @@ public void testListenerConfigOverride() throws Exception { sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); createSelector(sslClientConfigs); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state()); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value()); selector.close(); server.close(); @@ -323,7 +323,7 @@ public void testClientAuthenticationRequiredUntrustedProvided() throws Exception InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state()); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value()); } /** @@ -343,7 +343,7 @@ public void testClientAuthenticationRequiredNotProvided() throws Exception { InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state()); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value()); } /** @@ -495,7 +495,7 @@ public void testInvalidKeyPassword() throws Exception { InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state()); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value()); } /** @@ -512,7 +512,7 @@ public void testUnsupportedTLSVersion() throws Exception { InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state()); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value()); } /** @@ -530,7 +530,7 @@ public void testUnsupportedCiphers() throws Exception { InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state()); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value()); } /** diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index c59f2c957da81..b6f683ec12294 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -461,7 +461,7 @@ public void testSaslHandshakeRequestWithUnsupportedVersion() throws Exception { selector.send(request.toSend(node1, header)); // This test uses a non-SASL PLAINTEXT client in order to do manual handshake. // So the channel is in READY state. - NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state()); + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.value()); selector.close(); // Test good connection still works @@ -487,7 +487,7 @@ public void testInvalidSaslPacket() throws Exception { byte[] bytes = new byte[1024]; random.nextBytes(bytes); selector.send(new NetworkSend(node1, ByteBuffer.wrap(bytes))); - NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state()); + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.value()); selector.close(); // Test good connection still works @@ -498,7 +498,7 @@ public void testInvalidSaslPacket() throws Exception { createClientConnection(SecurityProtocol.PLAINTEXT, node2); random.nextBytes(bytes); selector.send(new NetworkSend(node2, ByteBuffer.wrap(bytes))); - NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state()); + NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.value()); selector.close(); // Test good connection still works @@ -526,7 +526,7 @@ public void testInvalidApiVersionsRequestSequence() throws Exception { ApiVersionsRequest request = createApiVersionsRequestV0(); RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS, request.version(), "someclient", 2); selector.send(request.toSend(node1, versionsHeader)); - NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state()); + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.value()); selector.close(); // Test good connection still works @@ -553,7 +553,7 @@ public void testPacketSizeTooBig() throws Exception { buffer.put(new byte[buffer.capacity() - 4]); buffer.rewind(); selector.send(new NetworkSend(node1, buffer)); - NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state()); + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.value()); selector.close(); // Test good connection still works @@ -567,7 +567,7 @@ public void testPacketSizeTooBig() throws Exception { buffer.put(new byte[buffer.capacity() - 4]); buffer.rewind(); selector.send(new NetworkSend(node2, buffer)); - NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state()); + NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.value()); selector.close(); // Test good connection still works @@ -592,7 +592,7 @@ public void testDisallowedKafkaRequestsBeforeAuthentication() throws Exception { RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA, metadataRequest1.version(), "someclient", 1); selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1)); - NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.state()); + NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY.value()); selector.close(); // Test good connection still works @@ -606,7 +606,7 @@ public void testDisallowedKafkaRequestsBeforeAuthentication() throws Exception { RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA, metadataRequest2.version(), "someclient", 2); selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2)); - NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state()); + NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.value()); selector.close(); // Test good connection still works @@ -912,7 +912,7 @@ private void verifySaslAuthenticateHeaderInteropWithFailure(boolean enableHeader // Without SASL_AUTHENTICATE headers, disconnect state is ChannelState.AUTHENTICATE which is // a hint that channel was closed during authentication, unlike ChannelState.AUTHENTICATE_FAILED // which is an actual authentication failure reported by the broker. - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state()); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.value()); } private void createServer(SecurityProtocol securityProtocol, String saslMechanism, diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index bd193c7f8a416..306d64a5af577 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -22,7 +22,7 @@ import joptsimple._ import kafka.common.Config import kafka.common.InvalidConfigException import kafka.log.LogConfig -import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, QuotaId} +import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig} import kafka.utils.{CommandLineUtils, ZkUtils} import kafka.utils.Implicits._ import org.apache.kafka.common.security.JaasUtils diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index c84fbcbe989b4..afaa5ddb79ad7 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -16,8 +16,6 @@ */ package kafka.server -import java.net.{URLEncoder, URLDecoder} -import java.nio.charset.StandardCharsets import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit} import java.util.concurrent.locks.ReentrantReadWriteLock