-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-45: Implement load managers locks using coordination service #10391
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me.
I left a few comments and suggestions, mostly for some follow up work
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
Show resolved
Hide resolved
@@ -363,24 +343,27 @@ private void reapDeadBrokerPreallocations(Set<String> aliveBrokers) { | |||
@Override | |||
public Set<String> getAvailableBrokers() { | |||
try { | |||
return availableActiveBrokers.get(); | |||
return new TreeSet<>(brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT).get()); | |||
} catch (Exception e) { | |||
log.warn("Error when trying to get active brokers", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in case of ZK problem, do we see lots of stacktraces written in logs ?
does it make sense to strip out the stacktrace here ?
(we should also deal with InterruptedException), as this method can be called from anywhere
...r-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
Show resolved
Hide resolved
...ar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
Show resolved
Hide resolved
...ar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.. few minor comments.
@@ -363,24 +343,27 @@ private void reapDeadBrokerPreallocations(Set<String> aliveBrokers) { | |||
@Override | |||
public Set<String> getAvailableBrokers() { | |||
try { | |||
return availableActiveBrokers.get(); | |||
return new TreeSet<>(brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT).get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any requirement for ordering? can't we use HashSet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was mostly that we used a TreeSet
before for all the ZK returned children. In this case, we don't really care. I'll change it to HashSet
|
||
} catch (Exception e) { | ||
throw new PulsarServerException(e); | ||
lockManager.acquireLock(brokerReportPath, localData).join(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add log here as it's a blocking call and we can use the log to troubleshoot if server is taking time to come-up or not coming up..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
bundleData = readJson(zkClient.getData(bundleZPath, null, null), BundleData.class); | ||
} else if (zkClient.exists(quotaZPath, null) != null) { | ||
final ResourceQuota quota = readJson(zkClient.getData(quotaZPath, null, null), ResourceQuota.class); | ||
Optional<BundleData> optBundleData = bundlesCache.get(getBundleDataPath(bundle)).join(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use time bounded get(timeout)
instead join to break the deadlock ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than putting get(Timeout)
, I prefer having timeouts on the async operations in the metadata store implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
umm.. you mean it will be metadata store-impl's responsibility to complete the returned CompletableFuture in certain time? ZK-Impl depends on ZK-Client and returns future. So, if we put timeout responsibility to impl class then Impl has to add extra handling to return time based future. this doesn't seem feasible solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can add the timeouts in the AbstractMetadataStoreImpl
. Otherwise we only have timeouts for sync calls, but in most places we're doing (and we should do more) the async calls.
// ZooKeeper cache of the local broker data, stored in LoadManager.LOADBALANCE_BROKER_ROOT. | ||
private ZooKeeperDataCache<LocalBrokerData> brokerDataCache; | ||
// Cache of the local broker data, stored in LoadManager.LOADBALANCE_BROKER_ROOT. | ||
private LockManager<LocalBrokerData> brokersData; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use MetadataCache instead LockManager? LocalBrokerData
is a metadata stored in localzk right now and it doesn't need to acquire lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a lock though, in the sense that we're acquiring an ephemeral z-node with the broker name. If there's already a z-node there we need to handle the scenario and that's what the ResourceLock does
ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(), | ||
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); | ||
} | ||
brokerDataLock.updateValue(localData).join(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, we should use time bounded get instead join . this can cause deadlock in system if future never completes.
* the object to insert in metadata store | ||
* @return a future to track the completion of the operation | ||
*/ | ||
CompletableFuture<Void> updateOrCreate(String path, T value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have used readModifyUpdateOrCreate
at few places for this usecase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. the usage here is slightly different in that we're not caring for the existing value, but we can avoid adding a new method just for that.
…native#488) This PR upgrades pulsar dependency to 2.8.0-rc-202105092228, which has two major API changes. apache/pulsar#10391 changed `LoadManager` API so that `MetadataCache` is used instead of `ZookeeperCache` in this PR. apache/pulsar#7406 changed the throttling strategy. However, currently KoP is different from Pulsar that the produce and its callback may be in different threads. KoP calls `PersistentTopic#publishMessages` in a callback of `KafkaTopicManager#getTopic` if the returned future is not completed immediately. Otherwise, it's called just in the I/O thread. Therefore, here we still use a **channel based** publish bytes stats for throttling, while apache/pulsar#7406 uses a **thread based** publish bytes stats. The other refactors are: 1. Change the throttling related fields from `InternalServerCnx` to `KafkaRequestHandler`. 2. Use `BrokerService#getPausedConnections` to check if the channel's auto read is disabled and modify the tests as well.
@lhotari That's probably related, taking a look |
This PR upgrades pulsar dependency to 2.8.0-rc-202105092228, which has two major API changes. apache/pulsar#10391 changed `LoadManager` API so that `MetadataCache` is used instead of `ZookeeperCache` in this PR. apache/pulsar#7406 changed the throttling strategy. However, currently KoP is different from Pulsar that the produce and its callback may be in different threads. KoP calls `PersistentTopic#publishMessages` in a callback of `KafkaTopicManager#getTopic` if the returned future is not completed immediately. Otherwise, it's called just in the I/O thread. Therefore, here we still use a **channel based** publish bytes stats for throttling, while apache/pulsar#7406 uses a **thread based** publish bytes stats. The other refactors are: 1. Change the throttling related fields from `InternalServerCnx` to `KafkaRequestHandler`. 2. Use `BrokerService#getPausedConnections` to check if the channel's auto read is disabled and modify the tests as well. * Fix LoadManager interface * Refactor publish throttling * Remove ZookeeperCache usage
Motivation
Implemented load manager lock and load report using
ResourceLock
fromCoordinationService
instead of direct ZK access.