Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(
category = CATEGORY_SERVER,
doc = "Hostname or IP address the service advertises to the outside world."
+ " If not set, the value of `InetAddress.getLocalHost().getHostname()` is used."
+ " If `advertisedAddress` is not set, the value of `InetAddress.getLocalHost().getHostname()` is "
+ "used by default."
+ " If `advertisedAddress` is not set and `ipAsAdvertisedAddress` is set to true, the value of "
+ "`InetAddress.getLocalHost().getHostAddress()` is used"
)
private String advertisedAddress;

@FieldContext(
category = CATEGORY_SERVER,
doc = "If `ipAsAdvertisedAddress` is set to true and `advertisedAddress` is not set,"
+ " the value of `InetAddress.getLocalHost().getHostAddress()` is used to set the advertised address."
)
private boolean ipAsAdvertisedAddress = false;

@FieldContext(category=CATEGORY_SERVER,
doc = "Used to specify multiple advertised listeners for the broker."
+ " The value must format as <listener_name>:pulsar://<host>:<port>,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@ public class ServiceConfigurationUtils {

private static final Logger LOG = LoggerFactory.getLogger(ServiceConfigurationUtils.class);

public static String getDefaultOrConfiguredAddress(String configuredAddress) {
public static String getDefaultOrConfiguredAddress(String configuredAddress, boolean ipAsAdvertisedAddress) {
if (isBlank(configuredAddress)) {
return unsafeLocalhostResolve();
return unsafeLocalhostResolve(ipAsAdvertisedAddress);
}
return configuredAddress;
}

public static String unsafeLocalhostResolve() {
public static String unsafeLocalhostResolve(boolean ipAsAdvertisedAddress) {
try {
if (ipAsAdvertisedAddress) {
return InetAddress.getLocalHost().getHostAddress();
}
// Get the fully qualified hostname
return InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException ex) {
Expand Down Expand Up @@ -78,7 +81,7 @@ public static String getAppliedAdvertisedAddress(ServiceConfiguration configurat
}
}

return getDefaultOrConfiguredAddress(advertisedAddress);
return getDefaultOrConfiguredAddress(advertisedAddress, configuration.isIpAsAdvertisedAddress());
}

/**
Expand All @@ -91,7 +94,9 @@ public static AdvertisedListener getInternalListener(ServiceConfiguration config
AdvertisedListener internal = result.get(config.getInternalListenerName());
if (internal == null) {
// synthesize an advertised listener based on legacy configuration properties
String host = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
String host = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
config.getAdvertisedAddress(),
config.isIpAsAdvertisedAddress());
internal = AdvertisedListener.builder()
.brokerServiceUrl(createUriOrNull("pulsar", host, config.getBrokerServicePort()))
.brokerServiceUrlTls(createUriOrNull("pulsar+ssl", host, config.getBrokerServicePortTls()))
Expand All @@ -109,7 +114,9 @@ private static URI createUriOrNull(String scheme, String hostname, Optional<Inte
* Gets the web service address (hostname).
*/
public static String getWebServiceAddress(ServiceConfiguration config) {
return ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
return ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
config.getAdvertisedAddress(),
config.isIpAsAdvertisedAddress());
}

public static String brokerUrl(String host, int port) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,15 @@ public void testGetAppliedAdvertised() throws Exception {

config.setAdvertisedAddress(null);
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false),
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null));
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null, false));
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null));
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null, false));

config.setIpAsAdvertisedAddress(true);
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false),
InetAddress.getLocalHost().getHostAddress());
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
InetAddress.getLocalHost().getHostAddress());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ public PulsarStandalone build() {
zkServers = pulsarStandalone.getAdvertisedAddress();
} else if (isBlank(pulsarStandalone.getConfig().getAdvertisedAddress())) {
// Use advertised address as local hostname
pulsarStandalone.getConfig().setAdvertisedAddress(ServiceConfigurationUtils.unsafeLocalhostResolve());
pulsarStandalone.getConfig().setAdvertisedAddress(
ServiceConfigurationUtils.unsafeLocalhostResolve(
pulsarStandalone.getConfig().isIpAsAdvertisedAddress()));
} else {
// Use advertised address from config file
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,14 @@ public PulsarService(ServiceConfiguration config,
this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);

// the advertised address is defined as the host component of the broker's canonical name.
this.advertisedAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
this.advertisedAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
config.getAdvertisedAddress(),
config.isIpAsAdvertisedAddress());

// use `internalListenerName` listener as `advertisedAddress`
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
config.getAdvertisedAddress(),
config.isIpAsAdvertisedAddress());
this.brokerVersion = PulsarVersion.getVersion();
this.config = config;
this.processTerminator = processTerminator;
Expand Down Expand Up @@ -1562,7 +1566,8 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu

// worker talks to local broker
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
brokerConfig.getAdvertisedAddress());
brokerConfig.getAdvertisedAddress(),
brokerConfig.isIpAsAdvertisedAddress());
workerConfig.setWorkerHostname(hostname);
workerConfig.setPulsarFunctionsCluster(brokerConfig.getClusterName());
// inherit broker authorization setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setInstanceLivenessCheckFreqMs(100);
workerConfig.setWorkerPort(0);
workerConfig.setPulsarFunctionsCluster(config.getClusterName());
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress(), false);
this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerId(workerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ protected WorkerConfig createWorkerConfig(ServiceConfiguration config) {
workerConfig.setInstanceLivenessCheckFreqMs(100);
workerConfig.setWorkerPort(0);
workerConfig.setPulsarFunctionsCluster(config.getClusterName());
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress(), false);
this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerId(workerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setInstanceLivenessCheckFreqMs(100);
workerConfig.setWorkerPort(0);
workerConfig.setPulsarFunctionsCluster(config.getClusterName());
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress(), false);
this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerId(workerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setInstanceLivenessCheckFreqMs(100);
workerConfig.setWorkerPort(0);
workerConfig.setPulsarFunctionsCluster(config.getClusterName());
final String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
final String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
config.getAdvertisedAddress(),
false);
workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerId(workerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setInstanceLivenessCheckFreqMs(100);
workerConfig.setWorkerPort(0);
workerConfig.setPulsarFunctionsCluster(config.getClusterName());
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress(), false);
this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerId(workerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setInstanceLivenessCheckFreqMs(100);
workerConfig.setWorkerPort(0);
workerConfig.setPulsarFunctionsCluster(config.getClusterName());
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress(), false);
workerConfig.setWorkerHostname(hostname);
workerConfig
.setWorkerId("c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf
workerConfig.setInstanceLivenessCheckFreqMs(100);
workerConfig.setWorkerPort(0);
workerConfig.setPulsarFunctionsCluster(config.getClusterName());
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress(), false);
this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
workerConfig.setWorkerHostname(hostname);
workerConfig.setWorkerId(workerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,20 @@ public class ProxyConfiguration implements PulsarConfiguration {
@FieldContext(
category = CATEGORY_SERVER,
doc = "Hostname or IP address the service advertises to the outside world."
+ " If not set, the value of `InetAddress.getLocalHost().getCanonicalHostName()` is used."
+ " If `advertisedAddress` is not set, the value of `InetAddress.getLocalHost().getHostname()` is "
+ "used by default."
+ " If `advertisedAddress` is not set and `ipAsAdvertisedAddress` is set to true, the value of "
+ "`InetAddress.getLocalHost().getHostAddress()` is used"
)
private String advertisedAddress;

@FieldContext(
category = CATEGORY_SERVER,
doc = "If `ipAsAdvertisedAddress` is set to true and `advertisedAddress` is not set,"
+ " the value of `InetAddress.getLocalHost().getHostAddress()` is used to set the advertised address."
)
private boolean ipAsAdvertisedAddress = false;

@FieldContext(category = CATEGORY_SERVER,
doc = "Enable or disable the proxy protocol.")
private boolean haProxyProtocolEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ public void start() throws Exception {
}

final String hostname =
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(proxyConfig.getAdvertisedAddress());
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
proxyConfig.getAdvertisedAddress(),
proxyConfig.isIpAsAdvertisedAddress());

if (proxyConfig.getServicePort().isPresent()) {
this.serviceUrl = String.format("pulsar://%s:%d/", hostname, getListenPort().get());
Expand Down
9 changes: 6 additions & 3 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|jvmGCMetricsLoggerClassName|Classname of Pluggable JVM GC metrics logger that can log GC specific metrics.|N/A|
|bindAddress| Hostname or IP address the service binds on, default is 0.0.0.0. |0.0.0.0|
|bindAddresses| Additional Hostname or IP addresses the service binds on: `listener_name:scheme://host:port,...`. ||
|advertisedAddress| Hostname or IP address the service advertises to the outside world. If not set, the value of `InetAddress.getLocalHost().getHostName()` is used. ||
|advertisedAddress| Hostname or IP address the service advertises to the outside world. If `advertisedAddress` is not set, the value of `InetAddress.getLocalHost().getHostName()` is used by default. If `advertisedAddress` is not set and `ipAsAdvertisedAddress` is set to true, the value of `InetAddress.getLocalHost().getHostAddress()` is used ||
|ipAsAdvertisedAddress| If `ipAsAdvertisedAddress` is set to true and `advertisedAddress` is not set, the value of `InetAddress.getLocalHost().getHostAddress()` is used to set the advertised address.|false|
|clusterName| Name of the cluster to which this broker belongs to ||
|brokerDeduplicationEnabled| Sets the default behavior for message deduplication in the broker. If enabled, the broker will reject messages that were already stored in the topic. This setting can be overridden on a per-namespace basis. |false|
|brokerDeduplicationMaxNumberOfProducers| The maximum number of producers for which information will be stored for deduplication purposes. |10000|
Expand Down Expand Up @@ -437,7 +438,8 @@ You can set the log level and configuration in the [log4j2.yaml](https://github
|webServicePort| The port used by the standalone broker for HTTP requests |8080|
|bindAddress| The hostname or IP address on which the standalone service binds |0.0.0.0|
|bindAddresses| Additional Hostname or IP addresses the service binds on: `listener_name:scheme://host:port,...`. ||
|advertisedAddress| The hostname or IP address that the standalone service advertises to the outside world. If not set, the value of `InetAddress.getLocalHost().getHostName()` is used. ||
|advertisedAddress| Hostname or IP address the service advertises to the outside world. If `advertisedAddress` is not set, the value of `InetAddress.getLocalHost().getHostName()` is used by default. If `advertisedAddress` is not set and `ipAsAdvertisedAddress` is set to true, the value of `InetAddress.getLocalHost().getHostAddress()` is used ||
|ipAsAdvertisedAddress| If `ipAsAdvertisedAddress` is set to true and `advertisedAddress` is not set, the value of `InetAddress.getLocalHost().getHostAddress()` is used to set the advertised address.|false|
| numAcceptorThreads | Number of threads to use for Netty Acceptor | 1 |
| numIOThreads | Number of threads to use for Netty IO | 2 * Runtime.getRuntime().availableProcessors() |
| numHttpServerThreads | Number of threads to use for HTTP requests processing | 2 * Runtime.getRuntime().availableProcessors()|
Expand Down Expand Up @@ -705,7 +707,8 @@ The [Pulsar proxy](concepts-architecture-overview.md#pulsar-proxy) can be config
| functionWorkerWebServiceURLTLS | The TLS Web service URL pointing to the function worker cluster. It is only configured when you setup function workers in a separate cluster. | |
|zookeeperSessionTimeoutMs| ZooKeeper session timeout (in milliseconds) |30000|
|zooKeeperCacheExpirySeconds|ZooKeeper cache expiry time in seconds|300|
|advertisedAddress|Hostname or IP address the service advertises to the outside world. If not set, the value of `InetAddress.getLocalHost().getHostname()` is used.|N/A|
|advertisedAddress| Hostname or IP address the service advertises to the outside world. If `advertisedAddress` is not set, the value of `InetAddress.getLocalHost().getHostName()` is used by default. If `advertisedAddress` is not set and `ipAsAdvertisedAddress` is set to true, the value of `InetAddress.getLocalHost().getHostAddress()` is used ||
|ipAsAdvertisedAddress| If `ipAsAdvertisedAddress` is set to true and `advertisedAddress` is not set, the value of `InetAddress.getLocalHost().getHostAddress()` is used to set the advertised address.|false|
|servicePort| The port to use for server binary Protobuf requests |6650|
|servicePortTls| The port to use to server binary Protobuf TLS requests |6651|
|statusFilePath| Path for the file used to determine the rotation status for the proxy instance when responding to service discovery health checks ||
Expand Down