From 3d2e6ce84b6e69667a1c2095b766d9941a258b61 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 9 Feb 2022 19:39:11 +0200 Subject: [PATCH] [Proxy] Fix port exhaustion and connection issues in Pulsar Proxy (#14078) (cherry picked from commit 640b4e6ec14d9b812da608037c58b664e6778637) --- .../server/src/assemble/LICENSE.bin.txt | 2 + pom.xml | 1 + .../ProxySaslAuthenticationTest.java | 1 + .../pulsar/common/protocol/PulsarHandler.java | 2 +- pulsar-proxy/pom.xml | 13 +- .../proxy/server/BrokerDiscoveryProvider.java | 10 + .../proxy/server/BrokerProxyValidator.java | 181 ++++++++++++++++++ .../proxy/server/DirectProxyHandler.java | 154 ++++++++------- .../proxy/server/ProxyConfiguration.java | 39 ++++ .../pulsar/proxy/server/ProxyConnection.java | 101 ++++++++-- .../pulsar/proxy/server/ProxyService.java | 17 ++ .../proxy/server/ProxyServiceStarter.java | 2 + .../server/ServiceChannelInitializer.java | 8 + .../server/TargetAddressDeniedException.java | 26 +++ .../server/AuthedAdminProxyHandlerTest.java | 1 + .../server/BrokerProxyValidatorTest.java | 102 ++++++++++ .../server/ProxyAdditionalServletTest.java | 1 + ...roxyAuthenticatedProducerConsumerTest.java | 1 + .../proxy/server/ProxyAuthenticationTest.java | 1 + .../proxy/server/ProxyConnectionTest.java | 38 ++++ .../server/ProxyConnectionThrottlingTest.java | 1 + .../ProxyEnableHAProxyProtocolTest.java | 1 + .../server/ProxyForwardAuthDataTest.java | 1 + .../server/ProxyKeyStoreTlsTestWithAuth.java | 1 + .../ProxyKeyStoreTlsTestWithoutAuth.java | 1 + .../server/ProxyLookupThrottlingTest.java | 1 + .../pulsar/proxy/server/ProxyParserTest.java | 1 + .../server/ProxyRolesEnforcementTest.java | 1 + .../proxy/server/ProxyServiceStarterTest.java | 5 +- .../server/ProxyServiceTlsStarterTest.java | 15 +- .../pulsar/proxy/server/ProxyStatsTest.java | 1 + .../apache/pulsar/proxy/server/ProxyTest.java | 1 + .../pulsar/proxy/server/ProxyTlsTest.java | 1 + .../proxy/server/ProxyTlsTestWithAuth.java | 1 + .../server/ProxyWithAuthorizationNegTest.java | 1 + .../server/ProxyWithAuthorizationTest.java | 2 + .../server/ProxyWithJwtAuthorizationTest.java | 1 + .../ProxyWithoutServiceDiscoveryTest.java | 1 + .../SuperUserAuthedAdminProxyHandlerTest.java | 1 + .../server/UnauthedAdminProxyHandlerTest.java | 1 + 40 files changed, 655 insertions(+), 85 deletions(-) create mode 100644 pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java create mode 100644 pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index cd71b3818bbba..4653b71e1b5ba 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -522,6 +522,8 @@ The Apache Software License, Version 2.0 - com.google.http-client-google-http-client-1.38.0.jar - com.google.auto.value-auto-value-annotations-1.7.4.jar - com.google.re2j-re2j-1.5.jar + * IPAddress + - com.github.seancfoley-ipaddress-5.3.3.jar BSD 3-clause "New" or "Revised" License * Google auth library diff --git a/pom.xml b/pom.xml index 1075d571aa427..27dcb4eb6d330 100644 --- a/pom.xml +++ b/pom.xml @@ -195,6 +195,7 @@ flexible messaging model and an intuitive client API. 9.1.6 5.3.15 4.5.13 + 5.3.3 3.6.0 diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java index aabdcf81df387..e7b2c4411fdbe 100644 --- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java +++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java @@ -224,6 +224,7 @@ void testAuthentication() throws Exception { ProxyConfiguration proxyConfig = new ProxyConfiguration(); proxyConfig.setAuthenticationEnabled(true); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*"); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java index cdf372dca02c8..48e7ffa7b887b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java @@ -118,7 +118,7 @@ private void handleKeepAliveTimeout() { } } - protected void cancelKeepAliveTask() { + public void cancelKeepAliveTask() { if (keepAliveTask != null) { keepAliveTask.cancel(false); keepAliveTask = null; diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml index 39aecb539f459..aa2f4638ffe9e 100644 --- a/pulsar-proxy/pom.xml +++ b/pulsar-proxy/pom.xml @@ -19,7 +19,7 @@ --> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 org.apache.pulsar @@ -174,6 +174,17 @@ com.beust jcommander + + + org.apache.logging.log4j + log4j-core + + + + com.github.seancfoley + ipaddress + ${seancfoley.ipaddress.version} + diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java index ae8e13405bf01..c549b45b79fe1 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java @@ -38,6 +38,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; +import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +77,15 @@ public BrokerDiscoveryProvider(ProxyConfiguration config, PulsarResources pulsar } } + /** + * Access the list of available brokers. + * @return the list of available brokers + * @throws PulsarServerException + */ + public List getAvailableBrokers() throws PulsarServerException { + return metadataStoreCacheLoader.getAvailableBrokers(); + } + /** * Find next broker {@link LoadManagerReport} in round-robin fashion. * diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java new file mode 100644 index 0000000000000..debe1f7fcac87 --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerProxyValidator.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.proxy.server; + +import inet.ipaddr.IPAddress; +import inet.ipaddr.IPAddressString; +import inet.ipaddr.ipv4.IPv4Address; +import inet.ipaddr.ipv6.IPv6Address; +import io.netty.resolver.AddressResolver; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.StringTokenizer; +import java.util.concurrent.CompletableFuture; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.netty.NettyFutureUtil; + +@Slf4j +public class BrokerProxyValidator { + private static final String SEPARATOR = "\\s*,\\s*"; + private static final String ALLOW_ANY = "*"; + private final int[] allowedTargetPorts; + private final boolean allowAnyTargetPort; + private final List allowedIPAddresses; + private final boolean allowAnyIPAddress; + private final AddressResolver inetSocketAddressResolver; + private final List allowedHostNames; + private final boolean allowAnyHostName; + + public BrokerProxyValidator(AddressResolver inetSocketAddressResolver, String allowedHostNames, + String allowedIPAddresses, String allowedTargetPorts) { + this.inetSocketAddressResolver = inetSocketAddressResolver; + List allowedHostNamesStrings = parseCommaSeparatedConfigValue(allowedHostNames); + if (allowedHostNamesStrings.contains(ALLOW_ANY)) { + this.allowAnyHostName = true; + this.allowedHostNames = Collections.emptyList(); + } else { + this.allowAnyHostName = false; + this.allowedHostNames = allowedHostNamesStrings.stream() + .map(BrokerProxyValidator::parseWildcardPattern).collect(Collectors.toList()); + } + List allowedIPAddressesStrings = parseCommaSeparatedConfigValue(allowedIPAddresses); + if (allowedIPAddressesStrings.contains(ALLOW_ANY)) { + allowAnyIPAddress = true; + this.allowedIPAddresses = Collections.emptyList(); + } else { + allowAnyIPAddress = false; + this.allowedIPAddresses = allowedIPAddressesStrings.stream().map(IPAddressString::new) + .filter(ipAddressString -> { + if (ipAddressString.isValid()) { + return true; + } else { + throw new IllegalArgumentException("Invalid IP address filter '" + ipAddressString + "'", + ipAddressString.getAddressStringException()); + } + }).map(IPAddressString::getAddress) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + List allowedTargetPortsStrings = parseCommaSeparatedConfigValue(allowedTargetPorts); + if (allowedTargetPortsStrings.contains(ALLOW_ANY)) { + allowAnyTargetPort = true; + this.allowedTargetPorts = new int[0]; + } else { + allowAnyTargetPort = false; + this.allowedTargetPorts = + allowedTargetPortsStrings.stream().mapToInt(Integer::parseInt).toArray(); + } + } + + private static Pattern parseWildcardPattern(String wildcardPattern) { + String regexPattern = + Collections.list(new StringTokenizer(wildcardPattern, "*", true)) + .stream() + .map(String::valueOf) + .map(token -> { + if ("*".equals(token)) { + return ".*"; + } else { + return Pattern.quote(token); + } + }).collect(Collectors.joining()); + return Pattern.compile( + "^" + regexPattern + "$", + Pattern.CASE_INSENSITIVE); + } + + private static List parseCommaSeparatedConfigValue(String configValue) { + return Arrays.stream(configValue.split(SEPARATOR)).map(String::trim).filter(s -> s.length() > 0) + .collect(Collectors.toList()); + } + + public CompletableFuture resolveAndCheckTargetAddress(String hostAndPort) { + int pos = hostAndPort.indexOf(':'); + String host = hostAndPort.substring(0, pos); + int port = Integer.parseInt(hostAndPort.substring(pos + 1)); + if (!isPortAllowed(port)) { + return FutureUtil.failedFuture( + new TargetAddressDeniedException("Given port in '" + hostAndPort + "' isn't allowed.")); + } else if (!isHostAllowed(host)) { + return FutureUtil.failedFuture( + new TargetAddressDeniedException("Given host in '" + hostAndPort + "' isn't allowed.")); + } else { + return NettyFutureUtil.toCompletableFuture( + inetSocketAddressResolver.resolve(InetSocketAddress.createUnresolved(host, port))) + .thenCompose(resolvedAddress -> { + CompletableFuture result = new CompletableFuture(); + if (isIPAddressAllowed(resolvedAddress)) { + result.complete(resolvedAddress); + } else { + result.completeExceptionally(new TargetAddressDeniedException( + "The IP address of the given host and port '" + hostAndPort + "' isn't allowed.")); + } + return result; + }); + } + } + + private boolean isPortAllowed(int port) { + if (allowAnyTargetPort) { + return true; + } + for (int allowedPort : allowedTargetPorts) { + if (allowedPort == port) { + return true; + } + } + return false; + } + + private boolean isIPAddressAllowed(InetSocketAddress resolvedAddress) { + if (allowAnyIPAddress) { + return true; + } + byte[] addressBytes = resolvedAddress.getAddress().getAddress(); + IPAddress candidateAddress = + addressBytes.length == 4 ? new IPv4Address(addressBytes) : new IPv6Address(addressBytes); + for (IPAddress allowedAddress : allowedIPAddresses) { + if (allowedAddress.contains(candidateAddress)) { + return true; + } + } + return false; + } + + private boolean isHostAllowed(String host) { + if (allowAnyHostName) { + return true; + } + boolean matched = false; + for (Pattern allowedHostName : allowedHostNames) { + if (allowedHostName.matcher(host).matches()) { + matched = true; + break; + } + } + return matched; + } +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index c896be5a62993..37fc3d5a8a298 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -39,6 +39,7 @@ import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol; import io.netty.handler.ssl.SslHandler; +import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -49,6 +50,7 @@ import java.util.Arrays; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import javax.net.ssl.SSLSession; @@ -73,25 +75,25 @@ public class DirectProxyHandler { @Getter - private Channel inboundChannel; + private final Channel inboundChannel; @Getter Channel outboundChannel; @Getter private final Rate inboundChannelRequestsRate; protected static Map inboundOutboundChannelMap = new ConcurrentHashMap<>(); - private String originalPrincipal; - private AuthData clientAuthData; - private String clientAuthMethod; - private int protocolVersion; + private final String originalPrincipal; + private final AuthData clientAuthData; + private final String clientAuthMethod; public static final String TLS_HANDLER = "tls"; private final Authentication authentication; - private final Supplier sslHandlerSupplier; private AuthenticationDataProvider authenticationDataProvider; - private ProxyService service; + private final ProxyService service; + private final Runnable onHandshakeCompleteAction; public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl, - int protocolVersion, Supplier sslHandlerSupplier) { + InetSocketAddress targetBrokerAddress, int protocolVersion, + Supplier sslHandlerSupplier) { this.service = service; this.authentication = proxyConnection.getClientAuthentication(); this.inboundChannel = proxyConnection.ctx().channel(); @@ -99,8 +101,7 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, this.originalPrincipal = proxyConnection.clientAuthRole; this.clientAuthData = proxyConnection.clientAuthData; this.clientAuthMethod = proxyConnection.clientAuthMethod; - this.protocolVersion = protocolVersion; - this.sslHandlerSupplier = sslHandlerSupplier; + this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask; ProxyConfiguration config = service.getConfiguration(); // Start the connection attempt. @@ -109,13 +110,22 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, // switches when passing data between the 2 // connections b.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); + int brokerProxyConnectTimeoutMs = service.getConfiguration().getBrokerProxyConnectTimeoutMs(); + if (brokerProxyConnectTimeoutMs > 0) { + b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, brokerProxyConnectTimeoutMs); + } b.group(inboundChannel.eventLoop()).channel(inboundChannel.getClass()).option(ChannelOption.AUTO_READ, false); b.handler(new ChannelInitializer() { @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(SocketChannel ch) { if (sslHandlerSupplier != null) { ch.pipeline().addLast(TLS_HANDLER, sslHandlerSupplier.get()); } + int brokerProxyReadTimeoutMs = service.getConfiguration().getBrokerProxyReadTimeoutMs(); + if (brokerProxyReadTimeoutMs > 0) { + ch.pipeline().addLast("readTimeoutHandler", + new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS)); + } ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config, protocolVersion)); @@ -133,7 +143,7 @@ protected void initChannel(SocketChannel ch) throws Exception { return; } - ChannelFuture f = b.connect(targetBroker.getHost(), targetBroker.getPort()); + ChannelFuture f = b.connect(targetBrokerAddress); outboundChannel = f.channel(); f.addListener(future -> { if (!future.isSuccess()) { @@ -211,8 +221,8 @@ public class ProxyBackendHandler extends PulsarDecoder implements FutureListener private BackendState state = BackendState.Init; private String remoteHostName; protected ChannelHandlerContext ctx; - private ProxyConfiguration config; - private int protocolVersion; + private final ProxyConfiguration config; + private final int protocolVersion; public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion) { this.config = config; @@ -225,7 +235,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { // Send the Connect command to broker authenticationDataProvider = authentication.getAuthData(remoteHostName); AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); - ByteBuf command = null; + ByteBuf command; command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy", null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod); outboundChannel.writeAndFlush(command); @@ -293,12 +303,11 @@ protected void handleAuthChallenge(CommandAuthChallenge authChallenge) { outboundChannel.read(); } catch (Exception e) { log.error("Error mutual verify", e); - return; } } @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future future) { // This is invoked when the write operation on the paired connection // is completed if (future.isSuccess()) { @@ -317,6 +326,7 @@ protected void messageReceived() { @Override protected void handleConnected(CommandConnected connected) { + checkArgument(state == BackendState.Init, "Unexpected state %s. BackendState.Init was expected.", state); if (log.isDebugEnabled()) { log.debug("[{}] [{}] Received Connected from broker", inboundChannel, outboundChannel); } @@ -332,58 +342,68 @@ protected void handleConnected(CommandConnected connected) { state = BackendState.HandshakeCompleted; - ChannelFuture channelFuture; - if (connected.hasMaxMessageSize()) { - channelFuture = inboundChannel.writeAndFlush( - Commands.newConnected(connected.getProtocolVersion(), connected.getMaxMessageSize())); - } else { - channelFuture = inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion())); - } + onHandshakeCompleteAction.run(); + startDirectProxying(connected); + + int maxMessageSize = + connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE; + inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize)) + .addListener(future -> { + if (future.isSuccess()) { + // Start reading from both connections + inboundChannel.read(); + outboundChannel.read(); + } else { + log.warn("[{}] [{}] Failed to write to inbound connection. Closing both connections.", + inboundChannel, + outboundChannel, future.cause()); + inboundChannel.close(); + } + }); + } - channelFuture.addListener(future -> { - if (service.getProxyLogLevel() == 0) { - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Removing decoder from pipeline", inboundChannel, outboundChannel); - } - // direct tcp proxy - inboundChannel.pipeline().remove("frameDecoder"); - outboundChannel.pipeline().remove("frameDecoder"); + private void startDirectProxying(CommandConnected connected) { + if (service.getProxyLogLevel() == 0) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Removing decoder from pipeline", inboundChannel, outboundChannel); + } + // direct tcp proxy + inboundChannel.pipeline().remove("frameDecoder"); + outboundChannel.pipeline().remove("frameDecoder"); + } else { + // Enable parsing feature, proxyLogLevel(1 or 2) + // Add parser handler + if (connected.hasMaxMessageSize()) { + inboundChannel.pipeline() + .replace("frameDecoder", "newFrameDecoder", + new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize() + + Commands.MESSAGE_SIZE_FRAME_PADDING, + 0, 4, 0, 4)); + outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder", + new LengthFieldBasedFrameDecoder( + connected.getMaxMessageSize() + + Commands.MESSAGE_SIZE_FRAME_PADDING, + 0, 4, 0, 4)); + + inboundChannel.pipeline().addBefore("handler", "inboundParser", + new ParserProxyHandler(service, inboundChannel, + ParserProxyHandler.FRONTEND_CONN, + connected.getMaxMessageSize())); + outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", + new ParserProxyHandler(service, outboundChannel, + ParserProxyHandler.BACKEND_CONN, + connected.getMaxMessageSize())); } else { - // Enable parsing feature, proxyLogLevel(1 or 2) - // Add parser handler - if (connected.hasMaxMessageSize()) { - inboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder", - new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize() - + Commands.MESSAGE_SIZE_FRAME_PADDING, - 0, 4, 0, 4)); - outboundChannel.pipeline().replace("frameDecoder", "newFrameDecoder", - new LengthFieldBasedFrameDecoder( - connected.getMaxMessageSize() - + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); - - inboundChannel.pipeline().addBefore("handler", "inboundParser", - new ParserProxyHandler(service, inboundChannel, - ParserProxyHandler.FRONTEND_CONN, - connected.getMaxMessageSize())); - outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", - new ParserProxyHandler(service, outboundChannel, - ParserProxyHandler.BACKEND_CONN, - connected.getMaxMessageSize())); - } else { - inboundChannel.pipeline().addBefore("handler", "inboundParser", - new ParserProxyHandler(service, inboundChannel, - ParserProxyHandler.FRONTEND_CONN, - Commands.DEFAULT_MAX_MESSAGE_SIZE)); - outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", - new ParserProxyHandler(service, outboundChannel, - ParserProxyHandler.BACKEND_CONN, - Commands.DEFAULT_MAX_MESSAGE_SIZE)); - } + inboundChannel.pipeline().addBefore("handler", "inboundParser", + new ParserProxyHandler(service, inboundChannel, + ParserProxyHandler.FRONTEND_CONN, + Commands.DEFAULT_MAX_MESSAGE_SIZE)); + outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser", + new ParserProxyHandler(service, outboundChannel, + ParserProxyHandler.BACKEND_CONN, + Commands.DEFAULT_MAX_MESSAGE_SIZE)); } - // Start reading from both connections - inboundChannel.read(); - outboundChannel.read(); - }); + } } @Override @@ -404,7 +424,7 @@ public void setRemoteHostName(String remoteHostName) { private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) { ChannelHandler sslHandler = ctx.channel().pipeline().get("tls"); - SSLSession sslSession = null; + SSLSession sslSession; if (sslHandler != null) { sslSession = ((SslHandler) sslHandler).engine().getSession(); return (new TlsHostnameVerifier()).verify(hostname, sslSession); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index a50ac7055d9b7..fd56e18e9d4a0 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -49,6 +49,8 @@ public class ProxyConfiguration implements PulsarConfiguration { @Category private static final String CATEGORY_BROKER_DISCOVERY = "Broker Discovery"; @Category + private static final String CATEGORY_BROKER_PROXY = "Broker Proxy"; + @Category private static final String CATEGORY_AUTHENTICATION = "Proxy Authentication"; @Category private static final String CATEGORY_AUTHORIZATION = "Proxy Authorization"; @@ -136,6 +138,43 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private String functionWorkerWebServiceURLTLS; + @FieldContext(category = CATEGORY_BROKER_PROXY, + doc = "When enabled, checks that the target broker is active before connecting. " + + "zookeeperServers and configurationStoreServers must be configured in proxy configuration " + + "for retrieving the active brokers.") + private boolean checkActiveBrokers = false; + + @FieldContext( + category = CATEGORY_BROKER_PROXY, + doc = "Broker proxy connect timeout.\n" + + "The timeout value for Broker proxy connect timeout is in millisecond. Set to 0 to disable." + ) + private int brokerProxyConnectTimeoutMs = 10000; + + @FieldContext( + category = CATEGORY_BROKER_PROXY, + doc = "Broker proxy read timeout.\n" + + "The timeout value for Broker proxy read timeout is in millisecond. Set to 0 to disable." + ) + private int brokerProxyReadTimeoutMs = 75000; + + @FieldContext( + category = CATEGORY_BROKER_PROXY, + doc = "Allowed broker target host names. " + + "Supports multiple comma separated entries and a wildcard.") + private String brokerProxyAllowedHostNames = "*"; + + @FieldContext( + category = CATEGORY_BROKER_PROXY, + doc = "Allowed broker target ip addresses or ip networks / netmasks. " + + "Supports multiple comma separated entries.") + private String brokerProxyAllowedIPAddresses = "*"; + + @FieldContext( + category = CATEGORY_BROKER_PROXY, + doc = "Allowed broker target ports") + private String brokerProxyAllowedTargetPorts = "6650,6651"; + @FieldContext( category = CATEGORY_SERVER, doc = "Hostname or IP address the service binds on" diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index ff392ca0f09e3..df99acacbef5d 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -20,7 +20,10 @@ import static com.google.common.base.Preconditions.checkArgument; +import io.netty.handler.codec.haproxy.HAProxyMessage; import java.net.SocketAddress; +import java.util.Collections; +import java.util.List; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -28,7 +31,7 @@ import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; -import io.netty.handler.codec.haproxy.HAProxyMessage; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationState; @@ -50,6 +53,7 @@ import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,11 +70,12 @@ * */ public class ProxyConnection extends PulsarHandler implements FutureListener { + private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class); // ConnectionPool is used by the proxy to issue lookup requests private ConnectionPool connectionPool; private final AtomicLong requestIdGenerator = new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2)); - private ProxyService service; + private final ProxyService service; AuthenticationDataSource authenticationData; private State state; private final Supplier sslHandlerSupplier; @@ -78,6 +83,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener sslHandle this.service = proxyService; this.state = State.Init; this.sslHandlerSupplier = sslHandlerSupplier; + this.brokerProxyValidator = service.getBrokerProxyValidator(); } @Override @@ -128,6 +137,7 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); ProxyService.activeConnections.inc(); if (ProxyService.activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) { + state = State.Closing; ctx.close(); ProxyService.rejectedConnections.inc(); } @@ -173,6 +183,7 @@ public synchronized void channelInactive(ChannelHandlerContext ctx) throws Excep @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + state = State.Closing; super.exceptionCaught(ctx, cause); LOG.warn("[{}] Got exception {} : {} {}", remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(), ClientCnx.isKnownException(cause) ? null : cause); @@ -211,7 +222,7 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce } @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future future) { // This is invoked when the write operation on the paired connection is // completed if (future.isSuccess()) { @@ -247,14 +258,51 @@ private synchronized void completeConnect(AuthData clientData) throws PulsarClie } LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}", - remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl); + remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl); if (hasProxyToBrokerUrl) { - // Client already knows which broker to connect. Let's open a - // connection there and just pass bytes in both directions - state = State.ProxyConnectionToBroker; - directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, - protocolVersionToAdvertise, sslHandlerSupplier); - cancelKeepAliveTask(); + // Optimize proxy connection to fail-fast if the target broker isn't active + // Pulsar client will retry connecting after a back off timeout + if (service.getConfiguration().isCheckActiveBrokers() + && !isBrokerActive(proxyToBrokerUrl)) { + state = State.Closing; + LOG.warn("[{}] Target broker '{}' isn't available. authenticated with {} role {}.", + remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole); + ctx() + .writeAndFlush( + Commands.newError(-1, ServerError.ServiceNotReady, "Target broker isn't available.")) + .addListener(future -> ctx().close()); + return; + } + + brokerProxyValidator.resolveAndCheckTargetAddress(proxyToBrokerUrl) + .thenAccept(address -> ctx().executor().submit(() -> { + // Client already knows which broker to connect. Let's open a + // connection there and just pass bytes in both directions + state = State.ProxyConnectionToBroker; + directProxyHandler = new DirectProxyHandler(service, this, proxyToBrokerUrl, address, + protocolVersionToAdvertise, sslHandlerSupplier); + })) + .exceptionally(throwable -> { + if (throwable instanceof TargetAddressDeniedException + || throwable.getCause() instanceof TargetAddressDeniedException) { + TargetAddressDeniedException targetAddressDeniedException = + (TargetAddressDeniedException) (throwable instanceof TargetAddressDeniedException + ? throwable : throwable.getCause()); + + LOG.warn("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}.", + remoteAddress, proxyToBrokerUrl, targetAddressDeniedException.getMessage(), + authMethod, clientAuthRole); + } else { + LOG.error("[{}] Error validating target broker '{}'. authenticated with {} role {}.", + remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole, throwable); + } + ctx() + .writeAndFlush( + Commands.newError(-1, ServerError.ServiceNotReady, + "Target broker cannot be validated.")) + .addListener(future -> ctx().close()); + return null; + }); } else { // Client is doing a lookup, we can consider the handshake complete // and we'll take care of just topics and @@ -306,6 +354,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), if (getRemoteEndpointProtocolVersion() < ProtocolVersion.v10.getValue()) { LOG.warn("[{}] Client doesn't support connecting through proxy", remoteAddress); + state = State.Closing; ctx.close(); return; } @@ -485,6 +534,36 @@ public HAProxyMessage getHAProxyMessage() { return haProxyMessage; } - private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class); + private boolean isBrokerActive(String targetBrokerHostPort) { + for (ServiceLookupData serviceLookupData : getAvailableBrokers()) { + if (matchesHostAndPort("pulsar://", serviceLookupData.getPulsarServiceUrl(), targetBrokerHostPort) + || matchesHostAndPort("pulsar+ssl://", serviceLookupData.getPulsarServiceUrlTls(), + targetBrokerHostPort)) { + return true; + } + } + return false; + } + + private List getAvailableBrokers() { + if (service.getDiscoveryProvider() == null) { + LOG.warn("Unable to retrieve active brokers. service.getDiscoveryProvider() is null." + + "zookeeperServers and configurationStoreServers must be configured in proxy configuration " + + "when checkActiveBrokers is enabled."); + return Collections.emptyList(); + } + try { + return service.getDiscoveryProvider().getAvailableBrokers(); + } catch (PulsarServerException e) { + LOG.error("Unable to get available brokers", e); + return Collections.emptyList(); + } + } + static boolean matchesHostAndPort(String expectedPrefix, String pulsarServiceUrl, String brokerHostPort) { + return pulsarServiceUrl != null + && pulsarServiceUrl.length() == expectedPrefix.length() + brokerHostPort.length() + && pulsarServiceUrl.startsWith(expectedPrefix) + && pulsarServiceUrl.startsWith(brokerHostPort, expectedPrefix.length()); + } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 75f922d1f5e67..302ceba044d2a 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -26,6 +26,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.resolver.dns.DnsNameResolver; +import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.util.concurrent.DefaultThreadFactory; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; @@ -73,6 +75,10 @@ public class ProxyService implements Closeable { private final ProxyConfiguration proxyConfig; private final Authentication proxyClientAuthentication; + @Getter + private final DnsNameResolver dnsNameResolver; + @Getter + private final BrokerProxyValidator brokerProxyValidator; private String serviceUrl; private String serviceUrlTls; private ConfigurationMetadataCacheService configurationCacheService; @@ -147,6 +153,15 @@ public ProxyService(ProxyConfiguration proxyConfig, false, workersThreadFactory); this.authenticationService = authenticationService; + DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder(workerGroup.next()) + .channelType(EventLoopUtil.getDatagramChannelClass(workerGroup)); + dnsNameResolver = dnsNameResolverBuilder.build(); + + brokerProxyValidator = new BrokerProxyValidator(dnsNameResolver.asAddressResolver(), + proxyConfig.getBrokerProxyAllowedHostNames(), + proxyConfig.getBrokerProxyAllowedIPAddresses(), + proxyConfig.getBrokerProxyAllowedTargetPorts()); + statsExecutor = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("proxy-stats-executor")); statsExecutor.schedule(()->{ @@ -234,6 +249,8 @@ public BrokerDiscoveryProvider getDiscoveryProvider() { } public void close() throws IOException { + dnsNameResolver.close(); + if (discoveryProvider != null) { discoveryProvider.close(); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index 86ce236d39159..6bbcc88b97059 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -27,6 +27,7 @@ import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger; import com.google.common.annotations.VisibleForTesting; +import lombok.Getter; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationService; @@ -88,6 +89,7 @@ public class ProxyServiceStarter { private ProxyConfiguration config; + @Getter private ProxyService proxyService; private WebServer server; diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java index 658dd8762a7f2..a033a87912d87 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java @@ -21,6 +21,8 @@ import static org.apache.commons.lang3.StringUtils.isEmpty; import io.netty.handler.ssl.SslHandler; +import io.netty.handler.timeout.ReadTimeoutHandler; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.AuthenticationFactory; @@ -46,6 +48,7 @@ public class ServiceChannelInitializer extends ChannelInitializer private final ProxyService proxyService; private final boolean enableTls; private final boolean tlsEnabledWithKeyStore; + private final int brokerProxyReadTimeoutMs; private SslContextAutoRefreshBuilder serverSslCtxRefresher; private SslContextAutoRefreshBuilder clientSslCtxRefresher; @@ -58,6 +61,7 @@ public ServiceChannelInitializer(ProxyService proxyService, ProxyConfiguration s this.proxyService = proxyService; this.enableTls = enableTls; this.tlsEnabledWithKeyStore = serviceConfig.isTlsEnabledWithKeyStore(); + this.brokerProxyReadTimeoutMs = serviceConfig.getBrokerProxyReadTimeoutMs(); if (enableTls) { if (tlsEnabledWithKeyStore) { @@ -127,6 +131,10 @@ protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(TLS_HANDLER, new SslHandler(serverSSLContextAutoRefreshBuilder.get().createSSLEngine())); } + if (brokerProxyReadTimeoutMs > 0) { + ch.pipeline().addLast("readTimeoutHandler", + new ReadTimeoutHandler(brokerProxyReadTimeoutMs, TimeUnit.MILLISECONDS)); + } if (proxyService.getConfiguration().isHaProxyProtocolEnabled()) { ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder()); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java new file mode 100644 index 0000000000000..e62525fbca175 --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/TargetAddressDeniedException.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.proxy.server; + +class TargetAddressDeniedException extends RuntimeException { + public TargetAddressDeniedException(String message) { + super(message); + } +} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java index 545912dcdc030..88de4b37469fb 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java @@ -83,6 +83,7 @@ protected void setup() throws Exception { proxyConfig.setAuthenticationEnabled(true); proxyConfig.setAuthorizationEnabled(true); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setServicePortTls(Optional.of(0)); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setWebServicePortTls(Optional.of(0)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java new file mode 100644 index 0000000000000..8e457554cf5ad --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/BrokerProxyValidatorTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.proxy.server; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import io.netty.resolver.AddressResolver; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.SucceededFuture; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.ExecutionException; +import org.apache.curator.shaded.com.google.common.net.InetAddresses; +import org.testng.annotations.Test; + +public class BrokerProxyValidatorTest { + + @Test + public void shouldAllowValidInput() throws Exception { + BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator( + createMockedAddressResolver("1.2.3.4"), + "myhost" + , "1.2.0.0/16" + , "6650"); + InetSocketAddress inetSocketAddress = brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get(); + assertNotNull(inetSocketAddress); + assertEquals(inetSocketAddress.getAddress().getHostAddress(), "1.2.3.4"); + assertEquals(inetSocketAddress.getPort(), 6650); + } + + @Test(expectedExceptions = ExecutionException.class, + expectedExceptionsMessageRegExp = ".*Given host in 'myhost:6650' isn't allowed.") + public void shouldPreventInvalidHostName() throws Exception { + BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator( + createMockedAddressResolver("1.2.3.4"), + "allowedhost" + , "1.2.0.0/16" + , "6650"); + brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get(); + } + + @Test(expectedExceptions = ExecutionException.class, + expectedExceptionsMessageRegExp = ".* The IP address of the given host and port 'myhost:6650' isn't allowed.") + public void shouldPreventInvalidIPAddress() throws Exception { + BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator( + createMockedAddressResolver("1.2.3.4"), + "myhost" + , "1.3.0.0/16" + , "6650"); + brokerProxyValidator.resolveAndCheckTargetAddress("myhost:6650").get(); + } + + @Test + public void shouldSupportHostNamePattern() throws Exception { + BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator( + createMockedAddressResolver("1.2.3.4"), + "*.mydomain" + , "1.2.0.0/16" + , "6650"); + brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get(); + } + + @Test + public void shouldAllowAllWithWildcard() throws Exception { + BrokerProxyValidator brokerProxyValidator = new BrokerProxyValidator( + createMockedAddressResolver("1.2.3.4"), + "*" + , "*" + , "6650"); + brokerProxyValidator.resolveAndCheckTargetAddress("myhost.mydomain:6650").get(); + } + + private AddressResolver createMockedAddressResolver(String ipAddressResult) { + AddressResolver inetSocketAddressResolver = mock(AddressResolver.class); + when(inetSocketAddressResolver.resolve(any())).then(invocationOnMock -> { + InetSocketAddress address = (InetSocketAddress) invocationOnMock.getArgument(0); + return new SucceededFuture(mock(EventExecutor.class), + new InetSocketAddress(InetAddresses.forString(ipAddressResult), address.getPort())); + }); + return inetSocketAddressResolver; + } +} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java index a909a9ff3b818..94009c84f0e64 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java @@ -72,6 +72,7 @@ protected void setup() throws Exception { internalSetup(); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setZookeeperServers(DUMMY_VALUE); proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java index e63d3aeb4cb96..b37dedfe6a93a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java @@ -106,6 +106,7 @@ protected void setup() throws Exception { proxyConfig.setAuthenticationEnabled(true); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setServicePortTls(Optional.of(0)); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setWebServicePortTls(Optional.of(0)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index 57aa781989522..25eee72e0d700 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -216,6 +216,7 @@ void testAuthentication() throws Exception { ProxyConfiguration proxyConfig = new ProxyConfiguration(); proxyConfig.setAuthenticationEnabled(true); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java new file mode 100644 index 0000000000000..5f533e37d3594 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import org.testng.annotations.Test; + +public class ProxyConnectionTest { + + @Test + public void testMatchesHostAndPort() { + assertTrue(ProxyConnection + .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:6650", "1.2.3.4:6650")); + assertTrue(ProxyConnection + .matchesHostAndPort("pulsar+ssl://", "pulsar+ssl://1.2.3.4:6650", "1.2.3.4:6650")); + assertFalse(ProxyConnection + .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345", "5.6.7.8:1234")); + assertFalse(ProxyConnection + .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345", "1.2.3.4:1234")); + } +} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java index 062db184e0682..128d33fbf19db 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java @@ -53,6 +53,7 @@ protected void setup() throws Exception { internalSetup(); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setZookeeperServers(DUMMY_VALUE); proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE); proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java index 44403fbb39b16..496b3ca5c4d4a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java @@ -56,6 +56,7 @@ protected void setup() throws Exception { internalSetup(); proxyConfig.setServicePort(Optional.ofNullable(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setZookeeperServers(DUMMY_VALUE); proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE); proxyConfig.setHaProxyProtocolEnabled(true); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java index cf61dac0e6b68..aa8475565155b 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java @@ -104,6 +104,7 @@ public void testForwardAuthData() throws Exception { proxyConfig.setAuthenticationEnabled(true); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java index af76bfaeb2bb4..f1cb69f782da2 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java @@ -78,6 +78,7 @@ protected void setup() throws Exception { internalSetup(); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setServicePortTls(Optional.of(0)); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setWebServicePortTls(Optional.of(0)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java index 9b0e9b427e56c..03d0b2b2a8fcc 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java @@ -73,6 +73,7 @@ protected void setup() throws Exception { internalSetup(); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setServicePortTls(Optional.of(0)); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setWebServicePortTls(Optional.of(0)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java index fa3c485335581..51450264c8d15 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java @@ -52,6 +52,7 @@ protected void setup() throws Exception { internalSetup(); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setZookeeperServers(DUMMY_VALUE); proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE); proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java index 905ca2066c738..654686dedf2f4 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java @@ -71,6 +71,7 @@ protected void setup() throws Exception { internalSetup(); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setZookeeperServers(DUMMY_VALUE); proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE); //enable full parsing feature diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java index 9ae3fbc09f3ff..39446af99a577 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java @@ -209,6 +209,7 @@ public void testIncorrectRoles() throws Exception { proxyConfig.setAuthenticationEnabled(true); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java index bdba8d35c5515..62b65d32e8c2b 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java @@ -51,6 +51,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest { static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"}; private ProxyServiceStarter serviceStarter; + private String serviceUrl; @Override @BeforeClass @@ -62,7 +63,9 @@ protected void setup() throws Exception { serviceStarter.getConfig().setWebServicePort(Optional.of(0)); serviceStarter.getConfig().setServicePort(Optional.of(0)); serviceStarter.getConfig().setWebSocketServiceEnabled(true); + serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*"); serviceStarter.start(); + serviceUrl = serviceStarter.getProxyService().getServiceUrl(); } @Override @@ -92,7 +95,7 @@ public void testEnableWebSocketServer() throws Exception { @Test public void testProducer() throws Exception { @Cleanup - PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + this.pulsar.getBrokerService().getListenPort().get()) + PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl) .build(); @Cleanup diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java index 7e6c0f5f25f6c..742cfbb6581ee 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java @@ -52,6 +52,8 @@ public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest { private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem"; private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem"; private ProxyServiceStarter serviceStarter; + private String serviceUrl; + private int webPort; @Override @BeforeClass @@ -62,12 +64,17 @@ protected void setup() throws Exception { serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls()); serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress()); serviceStarter.getConfig().setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); - serviceStarter.getConfig().setServicePortTls(Optional.of(11043)); + serviceStarter.getConfig().setServicePort(Optional.empty()); + serviceStarter.getConfig().setServicePortTls(Optional.of(0)); + serviceStarter.getConfig().setWebServicePort(Optional.of(0)); serviceStarter.getConfig().setTlsEnabledWithBroker(true); serviceStarter.getConfig().setWebSocketServiceEnabled(true); serviceStarter.getConfig().setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH); serviceStarter.getConfig().setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH); + serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*"); serviceStarter.start(); + serviceUrl = serviceStarter.getProxyService().getServiceUrlTls(); + webPort = serviceStarter.getServer().getListenPortHTTP().get(); } protected void doInitConf() throws Exception { @@ -86,7 +93,7 @@ protected void cleanup() throws Exception { @Test public void testProducer() throws Exception { @Cleanup - PulsarClient client = PulsarClient.builder().serviceUrl("pulsar+ssl://localhost:11043") + PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl) .allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH) .build(); @@ -106,7 +113,7 @@ public void testProduceAndConsumeMessageWithWebsocket() throws Exception { WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient); producerWebSocketClient.start(); MyWebSocket producerSocket = new MyWebSocket(); - String produceUri = "ws://localhost:8080/ws/producer/persistent/sample/test/local/websocket-topic"; + String produceUri = "ws://localhost:" + webPort + "/ws/producer/persistent/sample/test/local/websocket-topic"; Future producerSession = producerWebSocketClient.connect(producerSocket, URI.create(produceUri)); ProducerMessage produceRequest = new ProducerMessage(); @@ -117,7 +124,7 @@ public void testProduceAndConsumeMessageWithWebsocket() throws Exception { WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient); consumerWebSocketClient.start(); MyWebSocket consumerSocket = new MyWebSocket(); - String consumeUri = "ws://localhost:8080/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub"; + String consumeUri = "ws://localhost:" + webPort + "/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub"; Future consumerSession = consumerWebSocketClient.connect(consumerSocket, URI.create(consumeUri)); consumerSession.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes())); producerSession.get().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(produceRequest)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java index 2b1c22c22d012..1859c243436f4 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java @@ -67,6 +67,7 @@ protected void setup() throws Exception { internalSetup(); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setZookeeperServers(DUMMY_VALUE); proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index 92f6a63d0f185..a90243fe019c7 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -90,6 +90,7 @@ protected void setup() throws Exception { internalSetup(); proxyConfig.setServicePort(Optional.ofNullable(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setZookeeperServers(DUMMY_VALUE); proxyConfig.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java index 59beb94712c7d..5081d0e3bb596 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java @@ -56,6 +56,7 @@ protected void setup() throws Exception { internalSetup(); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setServicePortTls(Optional.of(0)); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setWebServicePortTls(Optional.of(0)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java index 0d3d3a041f75c..ece35cf7b22f6 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java @@ -58,6 +58,7 @@ protected void setup() throws Exception { writer.close(); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setServicePortTls(Optional.of(0)); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setWebServicePortTls(Optional.of(0)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java index 5d05867d4fffd..b9d9b04ae3d74 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java @@ -113,6 +113,7 @@ protected void setup() throws Exception { proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls()); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setServicePortTls(Optional.of(0)); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setWebServicePortTls(Optional.of(0)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java index 14c72881b2994..d813777f7eb2d 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java @@ -184,6 +184,7 @@ protected void setup() throws Exception { proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls()); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setServicePortTls(Optional.of(0)); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setWebServicePortTls(Optional.of(0)); @@ -402,6 +403,7 @@ public void tlsCiphersAndProtocols(Set tlsCiphers, Set tlsProtoc proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls()); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setServicePortTls(Optional.of(0)); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setWebServicePortTls(Optional.of(0)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java index 693e4ca5db9d6..6178454dd1900 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java @@ -98,6 +98,7 @@ protected void setup() throws Exception { proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setWebServicePort(Optional.of(0)); // enable auth&auth and use JWT at proxy diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java index f20401c33aebf..59c50deafec1a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java @@ -104,6 +104,7 @@ protected void setup() throws Exception { proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls()); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setServicePortTls(Optional.of(0)); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setWebServicePortTls(Optional.of(0)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java index 7dc927a8a5cc2..342df28c31079 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java @@ -80,6 +80,7 @@ protected void setup() throws Exception { proxyConfig.setAuthenticationEnabled(true); proxyConfig.setAuthorizationEnabled(true); proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setServicePortTls(Optional.of(0)); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setWebServicePortTls(Optional.of(0)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java index 628f7cc05224a..ee18b60a11e5f 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java @@ -68,6 +68,7 @@ protected void setup() throws Exception { // start proxy service proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setWebServicePort(Optional.of(0)); proxyConfig.setBrokerWebServiceURL(brokerUrl.toString()); proxyConfig.setStatusFilePath(STATUS_FILE_PATH);