diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 1e7ccfb298a653..93f8a12c55219a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -868,6 +868,7 @@ protected void handleConnect(CommandConnect connect) { doAuthentication(clientData, false, clientProtocolVersion, clientVersion); } catch (Exception e) { service.getPulsarStats().recordConnectionCreateFail(); + state = State.Failed; logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e); String msg = "Unable to authenticate"; ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg)); @@ -893,11 +894,13 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) { authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY); } catch (AuthenticationException e) { service.getPulsarStats().recordConnectionCreateFail(); + state = State.Failed; log.warn("[{}] Authentication failed: {} ", remoteAddress, e.getMessage()); ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, e.getMessage())); close(); } catch (Exception e) { service.getPulsarStats().recordConnectionCreateFail(); + state = State.Failed; String msg = "Unable to handleAuthResponse"; log.warn("[{}] {} ", remoteAddress, msg, e); ctx.writeAndFlush(Commands.newError(-1, ServerError.UnknownError, msg)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java new file mode 100644 index 00000000000000..db94467e8bd52f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.auth; + +import javax.naming.AuthenticationException; +import javax.net.ssl.SSLSession; +import java.net.SocketAddress; +import org.apache.pulsar.broker.authentication.AuthenticationState; +import org.apache.pulsar.common.api.AuthData; + +/** + * Class that provides the same authentication semantics as the {@link MockAuthenticationProvider} except + * that this one initializes the {@link MockMultiStageAuthenticationState} class to support testing + * multistage authentication. + */ +public class MockMultiStageAuthenticationProvider extends MockAuthenticationProvider { + + @Override + public String getAuthMethodName() { + return "multi-stage"; + } + + @Override + public AuthenticationState newAuthState(AuthData authData, + SocketAddress remoteAddress, + SSLSession sslSession) throws AuthenticationException { + return new MockMultiStageAuthenticationState(this); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationState.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationState.java new file mode 100644 index 00000000000000..e5095c82c91f2e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationState.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.auth; + +import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationState; +import org.apache.pulsar.common.api.AuthData; + +import javax.naming.AuthenticationException; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Performs multistage authentication by extending the paradigm created in {@link MockAuthenticationProvider}. + */ +public class MockMultiStageAuthenticationState implements AuthenticationState { + + private final MockMultiStageAuthenticationProvider provider; + private String authRole = null; + + MockMultiStageAuthenticationState(MockMultiStageAuthenticationProvider provider) { + this.provider = provider; + } + + @Override + public String getAuthRole() throws AuthenticationException { + if (authRole == null) { + throw new AuthenticationException("Must authenticate first"); + } + return null; + } + + @Override + public AuthData authenticate(AuthData authData) throws AuthenticationException { + String data = new String(authData.getBytes(), UTF_8); + String[] parts = data.split("\\."); + if (parts.length == 2) { + if ("challenge".equals(parts[0])) { + return AuthData.of("challenged".getBytes()); + } else { + AuthenticationDataCommand command = new AuthenticationDataCommand(data); + authRole = provider.authenticate(command); + // Auth successful, no more auth required + return null; + } + } + throw new AuthenticationException("Failed to authenticate"); + } + + @Override + public AuthenticationDataSource getAuthDataSource() { + return null; + } + + @Override + public boolean isComplete() { + return authRole != null; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index f55cc387705d44..7d30b91e8d2c1f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -73,6 +73,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockAuthenticationProvider; +import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationService; @@ -94,6 +95,7 @@ import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.BaseCommand.Type; import org.apache.pulsar.common.api.proto.CommandAck.AckType; +import org.apache.pulsar.common.api.proto.CommandAuthChallenge; import org.apache.pulsar.common.api.proto.CommandAuthResponse; import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.api.proto.CommandConnected; @@ -454,11 +456,84 @@ public void testConnectCommandWithAuthenticationNegative() throws Exception { ByteBuf clientCommand = Commands.newConnect("none", "", null); channel.writeInbound(clientCommand); - assertEquals(serverCnx.getState(), State.Start); + assertEquals(serverCnx.getState(), State.Failed); assertTrue(getResponse() instanceof CommandError); channel.finish(); } + @Test(timeOut = 30000) + public void testConnectCommandWithFailingOriginalAuthData() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + svcConfig.setAuthenticateOriginalAuthData(true); + svcConfig.setProxyRoles(Collections.singleton("proxy")); + + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); + + ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1,null, + null, "client", "fail", authMethodName); + channel.writeInbound(clientCommand); + + // We currently expect two responses because the originalAuthData is verified after sending + // a successful response to the proxy. Because this is a synchronous operation, there is currently + // no risk. It would be better to fix this. See https://github.com/apache/pulsar/issues/19311. + Object response1 = getResponse(); + assertTrue(response1 instanceof CommandConnected); + Object response2 = getResponse(); + assertTrue(response2 instanceof CommandError); + assertEquals(((CommandError) response2).getMessage(), "Unable to authenticate"); + assertEquals(serverCnx.getState(), State.Failed); + assertFalse(serverCnx.isActive()); + channel.finish(); + } + + @Test(timeOut = 30000) + public void testAuthResponseWithFailingAuthData() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockMultiStageAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); + + // Trigger connect command to result in AuthChallenge + ByteBuf clientCommand = Commands.newConnect(authMethodName, "challenge.client", "1"); + channel.writeInbound(clientCommand); + + Object challenge1 = getResponse(); + assertTrue(challenge1 instanceof CommandAuthChallenge); + + // Trigger another AuthChallenge to verify that code path continues to challenge + ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("challenge.client".getBytes()), 1, "1"); + channel.writeInbound(authResponse1); + + Object challenge2 = getResponse(); + assertTrue(challenge2 instanceof CommandAuthChallenge); + + // Trigger failure + ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("fail.client".getBytes()), 1, "1"); + channel.writeInbound(authResponse2); + + Object response3 = getResponse(); + assertTrue(response3 instanceof CommandError); + assertEquals(((CommandError) response3).getMessage(), "Unable to authenticate"); + assertEquals(serverCnx.getState(), State.Failed); + assertFalse(serverCnx.isActive()); + channel.finish(); + } + @Test(timeOut = 30000) public void testProducerCommand() throws Exception { resetChannel(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java index cfcaa5cdc127ec..5b9bd5e74ac290 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java @@ -19,7 +19,7 @@ package org.apache.pulsar.broker.service.utils; import java.util.Queue; - +import org.apache.pulsar.common.api.proto.CommandAuthChallenge; import org.apache.pulsar.common.protocol.PulsarDecoder; import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandCloseConsumer; @@ -76,6 +76,11 @@ protected void handleConnected(CommandConnected connected) { queue.offer(new CommandConnected().copyFrom(connected)); } + @Override + protected void handleAuthChallenge(CommandAuthChallenge challenge) { + queue.offer(new CommandAuthChallenge().copyFrom(challenge)); + } + @Override protected void handleSubscribe(CommandSubscribe subscribe) { queue.offer(new CommandSubscribe().copyFrom(subscribe));