Skip to content

Commit

Permalink
[fix][broker] Fix returns wrong webServiceUrl when both webServicePor…
Browse files Browse the repository at this point in the history
…t and webServicePortTls are set (apache#21633)

Co-authored-by: Jiwe Guo <technoboy@apache.org>
  • Loading branch information
coderzc and Technoboy- committed Dec 5, 2023
1 parent fe2d6d3 commit f8067b5
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void start() throws PulsarServerException {
localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
new PulsarResourceDescription());

LocalBrokerData localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(),
LocalBrokerData localData = new LocalBrokerData(pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
localData.setProtocols(pulsar.getProtocolDataToAdvertise());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public BrokerRegistryImpl(PulsarService pulsar) {
this.listeners = new ArrayList<>();
this.brokerId = pulsar.getLookupServiceAddress();
this.brokerLookupData = new BrokerLookupData(
pulsar.getSafeWebServiceAddress(),
pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,14 +961,14 @@ public void start() throws PulsarServerException {
// At this point, the ports will be updated with the real port number that the server was assigned
Map<String, String> protocolData = pulsar.getProtocolDataToAdvertise();

lastData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
lastData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
lastData.setProtocols(protocolData);
// configure broker-topic mode
lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());

localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
localData.setProtocols(protocolData);
localData.setBrokerVersionString(pulsar.getBrokerVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void initialize(final PulsarService pulsar) {
brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar);
}
this.policies = new SimpleResourceAllocationPolicies(pulsar);
lastLoadReport = new LoadReport(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
lastLoadReport.setProtocols(pulsar.getProtocolDataToAdvertise());
lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
Expand Down Expand Up @@ -1072,7 +1072,7 @@ public LoadReport generateLoadReport() throws Exception {
private LoadReport generateLoadReportForcefully() throws Exception {
synchronized (bundleGainsCache) {
try {
LoadReport loadReport = new LoadReport(pulsar.getSafeWebServiceAddress(),
LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(), pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls());
loadReport.setProtocols(pulsar.getProtocolDataToAdvertise());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory
this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
false, pulsar.getAdvertisedListeners());
this.selfOwnerInfoDisabled = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
true, pulsar.getAdvertisedListeners());
this.lockManager = pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class);
this.locallyAcquiredLocks = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -336,7 +336,7 @@ public Map<NamespaceBundle, ResourceLock<NamespaceEphemeralData>> getLocallyAcqu

public synchronized boolean refreshSelfOwnerInfo() {
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(),
pulsar.getBrokerServiceUrlTls(), pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
return selfOwnerInfo.getNativeUrl() != null || selfOwnerInfo.getNativeUrlTls() != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ protected CompletableFuture<Void> canUpdateCluster(String tenant, Set<String> ol
protected void validateBrokerName(String broker) {
String brokerUrl = String.format("http://%s", broker);
String brokerUrlTls = String.format("https://%s", broker);
if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
if (!brokerUrl.equals(pulsar().getWebServiceAddress())
&& !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
String[] parts = broker.split(":");
checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -56,12 +56,16 @@
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
Expand All @@ -72,6 +76,7 @@
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -93,8 +98,14 @@ public class SimpleLoadManagerImplTest {
BrokerStats brokerStatsClient2;

String primaryHost;

String primaryTlsHost;

String secondaryHost;

private String defaultNamespace;
private String defaultTenant;

ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

@BeforeMethod
Expand All @@ -108,6 +119,7 @@ void setup() throws Exception {
ServiceConfiguration config1 = new ServiceConfiguration();
config1.setClusterName("use");
config1.setWebServicePort(Optional.of(0));
config1.setWebServicePortTls(Optional.of(0));
config1.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config1.setBrokerShutdownTimeoutMs(0L);
config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
Expand All @@ -122,11 +134,13 @@ void setup() throws Exception {
admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
brokerStatsClient1 = admin1.brokerStats();
primaryHost = pulsar1.getWebServiceAddress();
primaryTlsHost = pulsar1.getWebServiceAddressTls();

// Start broker 2
ServiceConfiguration config2 = new ServiceConfiguration();
config2.setClusterName("use");
config2.setWebServicePort(Optional.of(0));
config2.setWebServicePortTls(Optional.of(0));
config2.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config2.setBrokerShutdownTimeoutMs(0L);
config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
Expand All @@ -143,6 +157,8 @@ void setup() throws Exception {
brokerStatsClient2 = admin2.brokerStats();
secondaryHost = pulsar2.getWebServiceAddress();
Thread.sleep(100);

setupClusters();
}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -256,10 +272,9 @@ public void testPrimary() throws Exception {
sortedRankingsInstance.get().put(lr.getRank(rd), rus);
setObjectField(SimpleLoadManagerImpl.class, loadManager, "sortedRankings", sortedRankingsInstance);

ResourceUnit found = loadManager
.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10")).get();
final Optional<ResourceUnit> leastLoaded = loadManager.getLeastLoaded(NamespaceName.get("pulsar/use/primary-ns.10"));
// broker is not active so found should be null
assertNotEquals(found, null, "did not find a broker when expected one to be found");
assertFalse(leastLoaded.isPresent());

}

Expand Down Expand Up @@ -399,7 +414,7 @@ public void testEvenBundleDistribution() throws Exception {
final SimpleLoadManagerImpl loadManager = (SimpleLoadManagerImpl) pulsar1.getLoadManager().get();

for (final NamespaceBundle bundle : bundles) {
if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(primaryHost)) {
if (loadManager.getLeastLoaded(bundle).get().getResourceId().equals(getAddress(primaryTlsHost))) {
++numAssignedToPrimary;
} else {
++numAssignedToSecondary;
Expand All @@ -411,6 +426,10 @@ public void testEvenBundleDistribution() throws Exception {
}
}

private static String getAddress(String url) {
return url.replaceAll("https", "http");
}

@Test
public void testNamespaceBundleStats() {
NamespaceBundleStats nsb1 = new NamespaceBundleStats();
Expand Down Expand Up @@ -479,4 +498,33 @@ public void testUsage() {
assertEquals(usage.getBandwidthIn().usage, usageLimit);
}

@Test
public void testGetWebSerUrl() throws PulsarAdminException {
String webServiceUrl = admin1.brokerStats().getLoadReport().getWebServiceUrl();
Assert.assertEquals(webServiceUrl, pulsar1.getWebServiceAddress());

String webServiceUrl2 = admin2.brokerStats().getLoadReport().getWebServiceUrl();
Assert.assertEquals(webServiceUrl2, pulsar2.getWebServiceAddress());
}

@Test
public void testRedirectOwner() throws PulsarAdminException {
final String topicName = "persistent://" + defaultNamespace + "/" + "test-topic";
admin1.topics().createNonPartitionedTopic(topicName);
TopicStats stats = admin1.topics().getStats(topicName);
Assert.assertNotNull(stats);

TopicStats stats2 = admin2.topics().getStats(topicName);
Assert.assertNotNull(stats2);
}

private void setupClusters() throws PulsarAdminException {
admin1.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar1.getWebServiceAddress()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use"));
defaultTenant = "prop-xyz";
admin1.tenants().createTenant(defaultTenant, tenantInfo);
defaultNamespace = defaultTenant + "/ns1";
admin1.namespaces().createNamespace(defaultNamespace, Set.of("use"));
}

}

0 comments on commit f8067b5

Please sign in to comment.