Skip to content

Commit

Permalink
Expose broker URLs in loadbalancer registration node (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Sep 15, 2016
1 parent 5771aa4 commit 47a213c
Show file tree
Hide file tree
Showing 23 changed files with 87 additions and 57 deletions.
Expand Up @@ -38,9 +38,9 @@
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.stats.AllocatorStats;
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.broker.loadbalance.data.LoadReport;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.stats.AllocatorStatsGenerator;
import com.yahoo.pulsar.broker.stats.BookieClientStatsGenerator;
Expand Down
Expand Up @@ -19,8 +19,8 @@

import com.google.common.collect.Lists;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.loadbalance.data.LoadReport;
import com.yahoo.pulsar.broker.stats.Metrics;

/**
Expand Down
Expand Up @@ -17,7 +17,7 @@

import java.util.Map;

import com.yahoo.pulsar.broker.loadbalance.data.ResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;

/*
ResourceDescription is an abstraction to represent resources like memory, cpu, network and io combined;
Expand Down
Expand Up @@ -27,7 +27,7 @@
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.broker.loadbalance.ServiceRequest;
import com.yahoo.pulsar.broker.loadbalance.ServiceUnit;
import com.yahoo.pulsar.broker.loadbalance.data.SystemResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;

public class PulsarLoadReportImpl implements LoadReport {
Expand All @@ -50,8 +50,8 @@ public static LoadReport parse(String loadReportJson) {
PulsarLoadReportImpl pulsarLoadReport = new PulsarLoadReportImpl();
ObjectMapper mapper = ObjectMapperFactory.create();
try {
com.yahoo.pulsar.broker.loadbalance.data.LoadReport report = mapper.readValue(loadReportJson,
com.yahoo.pulsar.broker.loadbalance.data.LoadReport.class);
com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport report = mapper.readValue(loadReportJson,
com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport.class);
SystemResourceUsage sru = report.getSystemResourceUsage();
String resourceUnitName = report.getName();
pulsarLoadReport.resourceDescription = new PulsarResourceDescription();
Expand Down
Expand Up @@ -19,7 +19,7 @@
import java.util.Map;

import com.yahoo.pulsar.broker.loadbalance.ResourceDescription;
import com.yahoo.pulsar.broker.loadbalance.data.ResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;

public class PulsarResourceDescription extends ResourceDescription {

Expand Down
Expand Up @@ -20,7 +20,7 @@

import com.yahoo.pulsar.broker.loadbalance.LoadRanker;
import com.yahoo.pulsar.broker.loadbalance.ResourceDescription;
import com.yahoo.pulsar.broker.loadbalance.data.ResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;

import java.util.Comparator;
import java.util.Map;
Expand Down
Expand Up @@ -58,6 +58,11 @@
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUnitRanking;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage.ResourceType;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.PulsarServerException;
Expand All @@ -68,11 +73,6 @@
import com.yahoo.pulsar.broker.loadbalance.LoadRanker;
import com.yahoo.pulsar.broker.loadbalance.PlacementStrategy;
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.broker.loadbalance.data.LoadReport;
import com.yahoo.pulsar.broker.loadbalance.data.NamespaceBundleStats;
import com.yahoo.pulsar.broker.loadbalance.data.ResourceUnitRanking;
import com.yahoo.pulsar.broker.loadbalance.data.SystemResourceUsage;
import com.yahoo.pulsar.broker.loadbalance.data.SystemResourceUsage.ResourceType;
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.api.Authentication;
Expand Down Expand Up @@ -177,7 +177,8 @@ public SimpleLoadManagerImpl(PulsarService pulsar) {
this.realtimeResourceQuotas.set(new HashMap<>());
this.realtimeAvgResourceQuota = new ResourceQuota();
placementStrategy = new WRRPlacementStrategy();
lastLoadReport = new LoadReport();
lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
brokerHostUsage = new BrokerHostUsage(pulsar);
loadReportCacheZk = new ZooKeeperDataCache<LoadReport>(pulsar.getLocalZkCache()) {
@Override
Expand Down Expand Up @@ -279,7 +280,7 @@ public void start() throws PulsarServerException {
throw new PulsarServerException(e);
}
}

@Override
public void disableBroker() throws Exception {
if (isNotEmpty(brokerZnodePath)) {
Expand Down Expand Up @@ -625,7 +626,7 @@ public void writeResourceQuotasToZooKeeper() throws Exception {
* - Available capacity for weighted random selection (weightedRandomSelection): ranks ResourceUnits units based on
* estimation of their capacity which is basically how many bundles each ResourceUnit is able can handle with its
* available resources (CPU, memory, network, etc);
*
*
* - Load percentage for least loaded server (leastLoadedServer): ranks ResourceUnits units based on estimation of
* their load percentage which is basically how many percent of resource is allocated which is
* max(resource_actually_used, resource_quota)
Expand Down Expand Up @@ -1091,7 +1092,8 @@ public LoadReport generateLoadReport() throws Exception {
}

try {
LoadReport loadReport = new LoadReport();
LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
loadReport.setName(String.format("%s:%s", pulsar.getHost(), pulsar.getConfiguration().getWebServicePort()));
SystemResourceUsage systemResourceUsage = this.getSystemResourceUsage();
loadReport.setOverLoaded(
Expand Down
Expand Up @@ -55,7 +55,6 @@
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.authentication.AuthenticationService;
import com.yahoo.pulsar.broker.authorization.AuthorizationManager;
import com.yahoo.pulsar.broker.loadbalance.data.NamespaceBundleStats;
import com.yahoo.pulsar.broker.service.BrokerServiceException.PersistenceException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
Expand All @@ -81,6 +80,7 @@
import com.yahoo.pulsar.common.policies.data.PersistentTopicStats;
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.common.policies.data.RetentionPolicies;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;

Expand Down
Expand Up @@ -25,13 +25,13 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.loadbalance.data.NamespaceBundleStats;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.broker.stats.BrokerOperabilityMetrics;
import com.yahoo.pulsar.broker.stats.ClusterReplicationMetrics;
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.broker.stats.NamespaceStats;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
import com.yahoo.pulsar.utils.StatsOutputStream;

Expand Down
Expand Up @@ -59,6 +59,7 @@
import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats;
import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.PersistentTopicStats;
import com.yahoo.pulsar.common.policies.data.PublisherStats;
import com.yahoo.pulsar.common.policies.data.Policies;
Expand All @@ -67,7 +68,6 @@
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashSet;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.loadbalance.data.NamespaceBundleStats;
import com.yahoo.pulsar.broker.service.Consumer;
import com.yahoo.pulsar.broker.service.BrokerService;
import com.yahoo.pulsar.broker.service.BrokerServiceException;
Expand Down
Expand Up @@ -60,7 +60,6 @@
import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import com.yahoo.pulsar.broker.cache.ConfigurationCacheService;
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.broker.loadbalance.data.LoadReport;
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.broker.web.PulsarWebResource;
import com.yahoo.pulsar.broker.web.RestException;
Expand All @@ -83,6 +82,7 @@
import com.yahoo.pulsar.broker.web.PulsarWebResource;
import com.yahoo.pulsar.broker.web.RestException;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;

@Test
Expand Down
Expand Up @@ -61,17 +61,12 @@
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.loadbalance.data.LoadReport;
import com.yahoo.pulsar.broker.loadbalance.data.NamespaceBundleStats;
import com.yahoo.pulsar.broker.loadbalance.data.ResourceUsage;
import com.yahoo.pulsar.broker.loadbalance.data.SystemResourceUsage;
import com.yahoo.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
import com.yahoo.pulsar.broker.loadbalance.impl.ResourceAvailabilityRanker;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.admin.internal.NamespacesImpl;
import com.yahoo.pulsar.client.admin.internal.BrokerStatsImpl;
import com.yahoo.pulsar.client.api.Authentication;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceName;
Expand All @@ -81,6 +76,10 @@
import com.yahoo.pulsar.common.policies.data.NamespaceIsolationData;
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.LocalBookkeeperEnsemble;
Expand Down
Expand Up @@ -57,12 +57,6 @@
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.loadbalance.data.BrokerUsage;
import com.yahoo.pulsar.broker.loadbalance.data.JvmUsage;
import com.yahoo.pulsar.broker.loadbalance.data.NamespaceBundleStats;
import com.yahoo.pulsar.broker.loadbalance.data.ResourceUnitRanking;
import com.yahoo.pulsar.broker.loadbalance.data.ResourceUsage;
import com.yahoo.pulsar.broker.loadbalance.data.SystemResourceUsage;
import com.yahoo.pulsar.broker.loadbalance.impl.PulsarLoadReportImpl;
import com.yahoo.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
import com.yahoo.pulsar.broker.loadbalance.impl.ResourceAvailabilityRanker;
Expand All @@ -77,6 +71,12 @@
import com.yahoo.pulsar.common.policies.data.AutoFailoverPolicyType;
import com.yahoo.pulsar.common.policies.data.NamespaceIsolationData;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import com.yahoo.pulsar.common.policies.data.loadbalancer.BrokerUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.JvmUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUnitRanking;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.LocalBookkeeperEnsemble;
Expand Down Expand Up @@ -243,8 +243,8 @@ public void testPrimary() throws Exception {
LoadRanker lr = new ResourceAvailabilityRanker();

// inject the load report and rankings
Map<ResourceUnit, com.yahoo.pulsar.broker.loadbalance.data.LoadReport> loadReports = new HashMap<>();
com.yahoo.pulsar.broker.loadbalance.data.LoadReport loadReport = new com.yahoo.pulsar.broker.loadbalance.data.LoadReport();
Map<ResourceUnit, com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport> loadReports = new HashMap<>();
com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport loadReport = new com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport();
loadReport.setSystemResourceUsage(new SystemResourceUsage());
loadReports.put(ru1, loadReport);
setObjectField(SimpleLoadManagerImpl.class, loadManager, "currentLoadReports", loadReports);
Expand Down Expand Up @@ -349,7 +349,7 @@ public void testResourceDescription() {
public void testLoadReportParsing() throws Exception {

ObjectMapper mapper = ObjectMapperFactory.create();
com.yahoo.pulsar.broker.loadbalance.data.LoadReport reportData = new com.yahoo.pulsar.broker.loadbalance.data.LoadReport();
com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport reportData = new com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport();
reportData.setName("b1");
SystemResourceUsage resource = new SystemResourceUsage();
ResourceUsage resourceUsage = new ResourceUsage();
Expand Down Expand Up @@ -398,11 +398,11 @@ public void testDoLoadShedding() throws Exception {
stats.put("property/cluster/namespace1/0x00000000_0xFFFFFFFF", nsb1);
stats.put("property/cluster/namespace2/0x00000000_0xFFFFFFFF", nsb2);

Map<ResourceUnit, com.yahoo.pulsar.broker.loadbalance.data.LoadReport> loadReports = new HashMap<>();
com.yahoo.pulsar.broker.loadbalance.data.LoadReport loadReport1 = new com.yahoo.pulsar.broker.loadbalance.data.LoadReport();
Map<ResourceUnit, com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport> loadReports = new HashMap<>();
com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport loadReport1 = new com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport();
loadReport1.setSystemResourceUsage(systemResource);
loadReport1.setBundleStats(stats);
com.yahoo.pulsar.broker.loadbalance.data.LoadReport loadReport2 = new com.yahoo.pulsar.broker.loadbalance.data.LoadReport();
com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport loadReport2 = new com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport();
loadReport2.setSystemResourceUsage(new SystemResourceUsage());
loadReport2.setBundleStats(stats);
loadReports.put(ru1, loadReport1);
Expand Down
Expand Up @@ -44,7 +44,6 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import com.yahoo.pulsar.broker.loadbalance.data.NamespaceBundleStats;
import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.broker.stats.Metrics;
Expand All @@ -65,6 +64,7 @@
import com.yahoo.pulsar.client.impl.ProducerImpl;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;

/**
*/
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance.data;
package com.yahoo.pulsar.common.policies.data.loadbalancer;

import java.util.Map;

Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance.data;
package com.yahoo.pulsar.common.policies.data.loadbalancer;

import java.util.Map;

Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance.data;
package com.yahoo.pulsar.common.policies.data.loadbalancer;

import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -23,16 +23,24 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Maps;
import com.yahoo.pulsar.broker.loadbalance.data.SystemResourceUsage.ResourceType;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage.ResourceType;


/**
* This class represents the overall load of the broker - it includes overall {@link SystemResourceUsage} and
* {@link NamespaceUsage} for all the namespaces hosted by this broker.
*/
public class LoadReport {
private String name;

private final String webServiceUrl;
private final String webServiceUrlTls;

private final String pulsarServiceUrl;
private final String pulsarServieUrlTls;

private boolean isUnderLoaded;
private boolean isOverLoaded;
private String name;
private long timestamp;
private double msgRateIn;
private double msgRateOut;
Expand All @@ -42,6 +50,15 @@ public class LoadReport {
private long numBundles;

public LoadReport() {
this(null, null, null, null);
}

public LoadReport(String webServiceUrl, String webServiceUrlTls, String pulsarServiceUrl, String pulsarServieUrlTls) {
this.webServiceUrl = webServiceUrl;
this.webServiceUrlTls = webServiceUrlTls;
this.pulsarServiceUrl = pulsarServiceUrl;
this.pulsarServieUrlTls = pulsarServieUrlTls;

isUnderLoaded = false;
isOverLoaded = false;
timestamp = 0;
Expand Down Expand Up @@ -208,4 +225,20 @@ public TreeMap<String, NamespaceBundleStats> getSortedBundleStats(ResourceType r
sortedBundleStats.putAll(bundleStats);
return sortedBundleStats;
}

public String getWebServiceUrl() {
return webServiceUrl;
}

public String getWebServiceUrlTls() {
return webServiceUrlTls;
}

public String getPulsarServiceUrl() {
return pulsarServiceUrl;
}

public String getPulsarServieUrlTls() {
return pulsarServieUrlTls;
}
}
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance.data;
package com.yahoo.pulsar.common.policies.data.loadbalancer;

/**
*/
Expand Down

0 comments on commit 47a213c

Please sign in to comment.