Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-45: Implement load managers locks using coordination service #10391

Merged
merged 15 commits into from
Apr 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Data;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;

/**
Expand All @@ -28,6 +29,7 @@
* bundle data which is written to ZooKeeper by the leader broker (TimeAverageBrokerData). - The preallocated bundles
* which are not written to ZooKeeper but are maintained by the leader broker (Map<String, BundleData>).
*/
@Data
public class BrokerData {
private LocalBrokerData localData;
private TimeAverageBrokerData timeAverageData;
Expand All @@ -44,28 +46,4 @@ public BrokerData(final LocalBrokerData localData) {
timeAverageData = new TimeAverageBrokerData();
preallocatedBundleData = new ConcurrentHashMap<>();
}

public LocalBrokerData getLocalData() {
return localData;
}

public void setLocalData(LocalBrokerData localData) {
this.localData = localData;
}

public TimeAverageBrokerData getTimeAverageData() {
return timeAverageData;
}

public void setTimeAverageData(TimeAverageBrokerData timeAverageData) {
this.timeAverageData = timeAverageData;
}

public Map<String, BundleData> getPreallocatedBundleData() {
return preallocatedBundleData;
}

public void setPreallocatedBundleData(Map<String, BundleData> preallocatedBundleData) {
this.preallocatedBundleData = preallocatedBundleData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
*/
package org.apache.pulsar.broker;

import org.apache.pulsar.policies.data.loadbalancer.JSONWritable;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;

/**
* Data class comprising the short term and long term historical data for this bundle.
*/
public class BundleData extends JSONWritable {
public class BundleData {
// Short term data for this bundle. The time frame of this data is
// determined by the number of short term samples
// and the bundle update period.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
*/
package org.apache.pulsar.broker;

import com.google.common.base.MoreObjects;
import java.util.Map;
import java.util.Set;
import org.apache.pulsar.policies.data.loadbalancer.JSONWritable;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;

/**
* Data class aggregating the short term and long term data across all bundles belonging to a broker.
*/
public class TimeAverageBrokerData extends JSONWritable {
@Data
@NoArgsConstructor
public class TimeAverageBrokerData {
private double shortTermMsgThroughputIn;
private double shortTermMsgThroughputOut;
private double shortTermMsgRateIn;
Expand All @@ -37,9 +39,6 @@ public class TimeAverageBrokerData extends JSONWritable {
private double longTermMsgRateIn;
private double longTermMsgRateOut;

public TimeAverageBrokerData() {
}

/**
* Initialize a TimeAverageBrokerData.
*
Expand Down Expand Up @@ -105,78 +104,4 @@ public void reset(final Set<String> bundles, final Map<String, BundleData> data,
}
}
}

public double getShortTermMsgThroughputIn() {
return shortTermMsgThroughputIn;
}

public void setShortTermMsgThroughputIn(double shortTermMsgThroughputIn) {
this.shortTermMsgThroughputIn = shortTermMsgThroughputIn;
}

public double getShortTermMsgThroughputOut() {
return shortTermMsgThroughputOut;
}

public void setShortTermMsgThroughputOut(double shortTermMsgThroughputOut) {
this.shortTermMsgThroughputOut = shortTermMsgThroughputOut;
}

public double getShortTermMsgRateIn() {
return shortTermMsgRateIn;
}

public void setShortTermMsgRateIn(double shortTermMsgRateIn) {
this.shortTermMsgRateIn = shortTermMsgRateIn;
}

public double getShortTermMsgRateOut() {
return shortTermMsgRateOut;
}

public void setShortTermMsgRateOut(double shortTermMsgRateOut) {
this.shortTermMsgRateOut = shortTermMsgRateOut;
}

public double getLongTermMsgThroughputIn() {
return longTermMsgThroughputIn;
}

public void setLongTermMsgThroughputIn(double longTermMsgThroughputIn) {
this.longTermMsgThroughputIn = longTermMsgThroughputIn;
}

public double getLongTermMsgThroughputOut() {
return longTermMsgThroughputOut;
}

public void setLongTermMsgThroughputOut(double longTermMsgThroughputOut) {
this.longTermMsgThroughputOut = longTermMsgThroughputOut;
}

public double getLongTermMsgRateIn() {
return longTermMsgRateIn;
}

public void setLongTermMsgRateIn(double longTermMsgRateIn) {
this.longTermMsgRateIn = longTermMsgRateIn;
}

public double getLongTermMsgRateOut() {
return longTermMsgRateOut;
}

public void setLongTermMsgRateOut(double longTermMsgRateOut) {
this.longTermMsgRateOut = longTermMsgRateOut;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("shortTermMsgThroughputIn", shortTermMsgThroughputIn)
.add("shortTermMsgThroughputOut", shortTermMsgThroughputOut)
.add("shortTermMsgRateIn", shortTermMsgRateIn).add("shortTermMsgRateOut", shortTermMsgRateOut)
.add("longTermMsgThroughputIn", longTermMsgThroughputIn)
.add("longTermMsgThroughputOut", longTermMsgThroughputOut).add("longTermMsgRateIn", longTermMsgRateIn)
.add("longTermMsgRateOut", longTermMsgRateOut).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -63,13 +61,6 @@ public interface LoadManager {
*/
LoadManagerReport generateLoadReport() throws Exception;

/**
* Returns {@link Deserializer} to deserialize load report.
*
* @return
*/
Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();

/**
* Set flag to force load report update.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;

/**
* New proposal for a load manager interface which attempts to use more intuitive method names and provide a starting
Expand Down Expand Up @@ -106,13 +104,6 @@ default void writeBrokerDataOnZooKeeper(boolean force) {
*/
void writeBundleDataOnZooKeeper();

/**
* Return :{@link Deserializer} to deserialize load-manager load report.
*
* @return
*/
Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();

/**
* Get available broker list in cluster.
*
Expand All @@ -123,7 +114,7 @@ default void writeBrokerDataOnZooKeeper(boolean force) {
/**
* Fetch local-broker data from load-manager broker cache.
*
* @param broker load-balancer zk-path
* @param broker load-balancer path
* @return
*/
LocalBrokerData getBrokerLocalData(String broker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,31 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.bookkeeper.util.ZkUtils;
import java.util.concurrent.CompletionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

@Slf4j
public class NoopLoadManager implements LoadManager {

private PulsarService pulsar;
private String lookupServiceAddress;
private ResourceUnit localResourceUnit;
private ZooKeeper zkClient;

LocalBrokerData localData;

private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> ObjectMapperFactory
.getThreadLocal()
.readValue(content, LocalBrokerData.class);
private LockManager<LocalBrokerData> lockManager;

@Override
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
this.lockManager = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
}

@Override
Expand All @@ -63,33 +55,19 @@ public void start() throws PulsarServerException {
+ pulsar.getConfiguration().getWebServicePort().get();
localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
new PulsarResourceDescription());
zkClient = pulsar.getZkClient();

localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
LocalBrokerData localData = new LocalBrokerData(pulsar.getSafeWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
pulsar.getSafeBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
localData.setProtocols(pulsar.getProtocolDataToAdvertise());
String brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
String brokerReportPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;

try {
// When running in standalone, this error can happen when killing the "standalone" process
// ungracefully since the ZK session will not be closed and it will take some time for ZK server
// to prune the expired sessions after startup.
// Since there's a single broker instance running, it's safe, in this mode, to remove the old lock

// Delete and recreate z-node
try {
if (zkClient.exists(brokerZnodePath, null) != null) {
zkClient.delete(brokerZnodePath, -1);
}
} catch (NoNodeException nne) {
// Ignore if z-node was just expired
}

ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

} catch (Exception e) {
throw new PulsarServerException(e);
log.info("Acquiring broker resource lock on {}", brokerReportPath);
lockManager.acquireLock(brokerReportPath, localData).join();
Copy link
Contributor

@rdhabalia rdhabalia Apr 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add log here as it's a blocking call and we can use the log to troubleshoot if server is taking time to come-up or not coming up..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

log.info("Acquired broker resource lock on {}", brokerReportPath);
} catch (CompletionException ce) {
eolivelli marked this conversation as resolved.
Show resolved Hide resolved
throw new PulsarServerException(MetadataStoreException.unwrap(ce));
}
}

Expand All @@ -108,11 +86,6 @@ public LoadManagerReport generateLoadReport() throws Exception {
return null;
}

@Override
public Deserializer<? extends ServiceLookupData> getLoadReportDeserializer() {
return loadReportDeserializer;
}

@Override
public void setLoadReportForceUpdateFlag() {
// do nothing
Expand Down Expand Up @@ -155,7 +128,13 @@ public Set<String> getAvailableBrokers() throws Exception {

@Override
public void stop() throws PulsarServerException {
// do nothing
if (lockManager != null) {
try {
lockManager.close();
} catch (Exception e) {
throw new PulsarServerException(e);
}
}
}

}
Loading