Skip to content

Commit

Permalink
[Proxy] Fix port exhaustion and connection issues in Pulsar Proxy (#1…
Browse files Browse the repository at this point in the history
…4078)

(cherry picked from commit 640b4e6)
  • Loading branch information
lhotari committed Feb 9, 2022
1 parent 1742df8 commit 3d2e6ce
Show file tree
Hide file tree
Showing 40 changed files with 655 additions and 85 deletions.
2 changes: 2 additions & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ The Apache Software License, Version 2.0
- com.google.http-client-google-http-client-1.38.0.jar
- com.google.auto.value-auto-value-annotations-1.7.4.jar
- com.google.re2j-re2j-1.5.jar
* IPAddress
- com.github.seancfoley-ipaddress-5.3.3.jar

BSD 3-clause "New" or "Revised" License
* Google auth library
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ flexible messaging model and an intuitive client API.</description>
<cron-utils.version>9.1.6</cron-utils.version>
<spring-context.version>5.3.15</spring-context.version>
<apache-http-client.version>4.5.13</apache-http-client.version>
<seancfoley.ipaddress.version>5.3.3</seancfoley.ipaddress.version>

<!-- test dependencies -->
<cassandra.version>3.6.0</cassandra.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ void testAuthentication() throws Exception {
ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setAuthenticationEnabled(true);
proxyConfig.setServicePort(Optional.of(0));
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private void handleKeepAliveTimeout() {
}
}

protected void cancelKeepAliveTask() {
public void cancelKeepAliveTask() {
if (keepAliveTask != null) {
keepAliveTask.cancel(false);
keepAliveTask = null;
Expand Down
13 changes: 12 additions & 1 deletion pulsar-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.pulsar</groupId>
Expand Down Expand Up @@ -174,6 +174,17 @@
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>

<dependency>
<groupId>com.github.seancfoley</groupId>
<artifactId>ipaddress</artifactId>
<version>${seancfoley.ipaddress.version}</version>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,6 +77,15 @@ public BrokerDiscoveryProvider(ProxyConfiguration config, PulsarResources pulsar
}
}

/**
* Access the list of available brokers.
* @return the list of available brokers
* @throws PulsarServerException
*/
public List<? extends ServiceLookupData> getAvailableBrokers() throws PulsarServerException {
return metadataStoreCacheLoader.getAvailableBrokers();
}

/**
* Find next broker {@link LoadManagerReport} in round-robin fashion.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.proxy.server;

import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
import inet.ipaddr.ipv4.IPv4Address;
import inet.ipaddr.ipv6.IPv6Address;
import io.netty.resolver.AddressResolver;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.StringTokenizer;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.NettyFutureUtil;

@Slf4j
public class BrokerProxyValidator {
private static final String SEPARATOR = "\\s*,\\s*";
private static final String ALLOW_ANY = "*";
private final int[] allowedTargetPorts;
private final boolean allowAnyTargetPort;
private final List<IPAddress> allowedIPAddresses;
private final boolean allowAnyIPAddress;
private final AddressResolver<InetSocketAddress> inetSocketAddressResolver;
private final List<Pattern> allowedHostNames;
private final boolean allowAnyHostName;

public BrokerProxyValidator(AddressResolver<InetSocketAddress> inetSocketAddressResolver, String allowedHostNames,
String allowedIPAddresses, String allowedTargetPorts) {
this.inetSocketAddressResolver = inetSocketAddressResolver;
List<String> allowedHostNamesStrings = parseCommaSeparatedConfigValue(allowedHostNames);
if (allowedHostNamesStrings.contains(ALLOW_ANY)) {
this.allowAnyHostName = true;
this.allowedHostNames = Collections.emptyList();
} else {
this.allowAnyHostName = false;
this.allowedHostNames = allowedHostNamesStrings.stream()
.map(BrokerProxyValidator::parseWildcardPattern).collect(Collectors.toList());
}
List<String> allowedIPAddressesStrings = parseCommaSeparatedConfigValue(allowedIPAddresses);
if (allowedIPAddressesStrings.contains(ALLOW_ANY)) {
allowAnyIPAddress = true;
this.allowedIPAddresses = Collections.emptyList();
} else {
allowAnyIPAddress = false;
this.allowedIPAddresses = allowedIPAddressesStrings.stream().map(IPAddressString::new)
.filter(ipAddressString -> {
if (ipAddressString.isValid()) {
return true;
} else {
throw new IllegalArgumentException("Invalid IP address filter '" + ipAddressString + "'",
ipAddressString.getAddressStringException());
}
}).map(IPAddressString::getAddress)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
List<String> allowedTargetPortsStrings = parseCommaSeparatedConfigValue(allowedTargetPorts);
if (allowedTargetPortsStrings.contains(ALLOW_ANY)) {
allowAnyTargetPort = true;
this.allowedTargetPorts = new int[0];
} else {
allowAnyTargetPort = false;
this.allowedTargetPorts =
allowedTargetPortsStrings.stream().mapToInt(Integer::parseInt).toArray();
}
}

private static Pattern parseWildcardPattern(String wildcardPattern) {
String regexPattern =
Collections.list(new StringTokenizer(wildcardPattern, "*", true))
.stream()
.map(String::valueOf)
.map(token -> {
if ("*".equals(token)) {
return ".*";
} else {
return Pattern.quote(token);
}
}).collect(Collectors.joining());
return Pattern.compile(
"^" + regexPattern + "$",
Pattern.CASE_INSENSITIVE);
}

private static List<String> parseCommaSeparatedConfigValue(String configValue) {
return Arrays.stream(configValue.split(SEPARATOR)).map(String::trim).filter(s -> s.length() > 0)
.collect(Collectors.toList());
}

public CompletableFuture<InetSocketAddress> resolveAndCheckTargetAddress(String hostAndPort) {
int pos = hostAndPort.indexOf(':');
String host = hostAndPort.substring(0, pos);
int port = Integer.parseInt(hostAndPort.substring(pos + 1));
if (!isPortAllowed(port)) {
return FutureUtil.failedFuture(
new TargetAddressDeniedException("Given port in '" + hostAndPort + "' isn't allowed."));
} else if (!isHostAllowed(host)) {
return FutureUtil.failedFuture(
new TargetAddressDeniedException("Given host in '" + hostAndPort + "' isn't allowed."));
} else {
return NettyFutureUtil.toCompletableFuture(
inetSocketAddressResolver.resolve(InetSocketAddress.createUnresolved(host, port)))
.thenCompose(resolvedAddress -> {
CompletableFuture<InetSocketAddress> result = new CompletableFuture();
if (isIPAddressAllowed(resolvedAddress)) {
result.complete(resolvedAddress);
} else {
result.completeExceptionally(new TargetAddressDeniedException(
"The IP address of the given host and port '" + hostAndPort + "' isn't allowed."));
}
return result;
});
}
}

private boolean isPortAllowed(int port) {
if (allowAnyTargetPort) {
return true;
}
for (int allowedPort : allowedTargetPorts) {
if (allowedPort == port) {
return true;
}
}
return false;
}

private boolean isIPAddressAllowed(InetSocketAddress resolvedAddress) {
if (allowAnyIPAddress) {
return true;
}
byte[] addressBytes = resolvedAddress.getAddress().getAddress();
IPAddress candidateAddress =
addressBytes.length == 4 ? new IPv4Address(addressBytes) : new IPv6Address(addressBytes);
for (IPAddress allowedAddress : allowedIPAddresses) {
if (allowedAddress.contains(candidateAddress)) {
return true;
}
}
return false;
}

private boolean isHostAllowed(String host) {
if (allowAnyHostName) {
return true;
}
boolean matched = false;
for (Pattern allowedHostName : allowedHostNames) {
if (allowedHostName.matcher(host).matches()) {
matched = true;
break;
}
}
return matched;
}
}
Loading

0 comments on commit 3d2e6ce

Please sign in to comment.