Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic loading throttling #306

Merged
merged 4 commits into from
Mar 24, 2017
Merged

Conversation

rdhabalia
Copy link
Contributor

Motivation

Addressing #300 : At the time of broker cold-restart, clients make broker to load all the topics concurrently which can create back pressure at zk and may increase zk latency. So, broker should have a way to throttle number of concurrent topic loading.

Modifications

  • Introduce dynamic throttling limit to control concurrent topic loading.
  • above that limit broker rejects producer/consumer create request so, producer and consumer can do retry with backoff

Result

It will help broker to control number of zk operations on cold-restart.

@rdhabalia rdhabalia added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Mar 22, 2017
@rdhabalia rdhabalia added this to the 1.17 milestone Mar 22, 2017
@rdhabalia rdhabalia self-assigned this Mar 22, 2017
Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change looks good. I just have 1 small concern.

The topic loading is initiated for all the topics in a given bundle/namespace, even though the particular topic is not being used. The main reason was to ensure stats visibility of that topic.

If we reject the loading, an unused topic will not get loaded later. That's why I was also thinking of "queuing" the topic load operations.

@@ -458,20 +467,23 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {

return null;
});
topicLoadRequestSemaphore.get().release();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The acquire() and release() operations might not happen on the same semaphore if the limit is being changed.

You should save the instance into a local variable and call release on the same object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that make sense. updated it.

@rdhabalia
Copy link
Contributor Author

If we reject the loading, an unused topic will not get loaded later. That's why I was also thinking of "queuing" the topic load operations.

That's correct. However, if topic is unused (no producer/consumer) then isn't it deleted by GC from that broker. So, after that it will not reflect into stats. And if there is any producer wants to connect then anyway that topic will be loaded after sometime.
Is that correct?

@merlimat
Copy link
Contributor

However, if topic is unused (no producer/consumer) then isn't it deleted by GC from that broker.

The topic can have subscriptions and no producers/consumers using it

@rdhabalia
Copy link
Contributor Author

The topic can have subscriptions and no producers/consumers using it

That's true, GC doesn't delete topic with non-empty subscription so, we can get the stat. Updated the change.


if (!topicLoadSemaphore.tryAcquire()) {
if (failOnTooManyConcurrentTopics) {
throw new TooManyRequestsException("Too many concurrent topic load exception");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we just return a failed future at this point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main idea behind failing immediately with exception so, while retrying create topic from pendingTopicLoadingQueue if it fails again here then we can put same topic-future again into the queue because it has original future which we had returned back to ServerCnx on very first time when createPersistentTopic request came.

@@ -418,6 +431,15 @@ public PulsarClient getReplicationClient(String cluster) {
}

private CompletableFuture<Topic> createPersistentTopic(final String topic) throws RuntimeException {
try {
return createPersistentTopic(topic, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not immediate to understand what false means here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we can add comment. But the idea of having this flag:

  1. when BrokerService receives createPersistentTopic request, passing failOnTooManyConcurrentTopics=false means we will not fail the request due to throttling but will add into the queue.
  2. But when we retry from pendingTopicLoadingQueue and again if it gets throttle then we add that topic-future again into the queue at line1037

@@ -870,7 +913,10 @@ private void updateConfigurationAndRegisterListeners() {
updateDynamicServiceConfiguration();
// add listener on "maxConcurrentLookupRequest" value change
registerConfigurationListener("maxConcurrentLookupRequest",
(pendingLookupRequest) -> lookupRequestSemaphore.set(new Semaphore((int) pendingLookupRequest, true)));
(maxConcurrentLookupRequest) -> lookupRequestSemaphore.set(new Semaphore((int) maxConcurrentLookupRequest, true)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't strictly require the "fairness" in the semaphore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you means we should not create a new semaphore? I have set the new instance rather updating permit into existing semaphore so, we don't mess up with new permits (reduce/increment) number as lookupRequestSemaphore is already concurrently used by multiple threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No no, I was referring to the "true" as 2nd parameter for the semaphore constructor. That's about the fairness of the semaphore itself. I think we might not need the exact fairness, so we could relax that option.

@@ -972,4 +1018,31 @@ public void onUpdate(String path, Map<String, String> data, Stat stat) {
return dynamicConfigurationMap;
}

}
private void processPendingLoadTopic() {
lock.lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this lock? Would it be enough to poll() one item out of the queue and process it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is to prevent race condition which may cause topic reside in pendingTopicLoadingQueue forever.
eg:

  • if MaxConcurrentTopicLoadRequest=1 and if two topics (topic1, topic2) come at same time then one of them (topic2) will be added into pendingQueue.
  • as soon as topic1 completes: it tries to release semaphore and poll topic2 from queue to createTopic again, but if 3rd new topic3 acquired a permit before it and when it releases it may find pendingTopicLoadingQueue empty as we have already poll topic2 it before. after that topic2 will be added again on line-1037.

So, added lock as pendingTopicLoadingQueue.poll() and pendingTopicLoadingQueue.add(pendingTopic) is not atomic.
Do you think we should ignore this situation?

Pair<String, CompletableFuture<Topic>> pendingTopic = pendingTopicLoadingQueue.poll();
if (pendingTopic != null) {
try {
createPersistentTopic(pendingTopic.getLeft(), true).handle((topic, ex) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check if we still own the topic

}
}

private CompletableFuture<Topic> createPersistentTopic(final String topic, boolean failOnTooManyConcurrentTopics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having these 2 methods, I was thinking something like:

CompletableFuture<Topic> createPersistentTopic(final String topic) {
    // Creates future and call method below or queue the request
}


void createPersistentTopic(String topic, CompletableFuture<Topic> topicFuture) {
   // To be called when we are ready to load the topic
} 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

void createPersistentTopic(String topic, CompletableFuture<Topic> topicFuture) {
   // To be called when we are ready to load the topic
}

Does this method throttle again and puts in the queue again or it just fails the future if semaphore doesn't have any additional permits?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat I think I updated changes as per your suggestion

}
return null;
});
} catch (TooManyRequestsException te) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't even have to check again at this point.

Copy link
Contributor Author

@rdhabalia rdhabalia Mar 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if this time(as mentioned in above comment) some other new topic3 (from different bundle) comes and acquired a permit and there could be possibility that creation of this topic may fail again with TooManyRequestsException. So, we have to again put it back to the queue, as we don't want to fail it and try again.
So, shouldn't we handle retry-failure and put it back to the queue again?

latch.await();
FutureUtil.waitForAll(futures).get();

pulsarClient.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add test to verify negative scenario with TooManyReqeusts exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually as we decided broker should not give TooManyReqeustsException to client and put the topic in to the queue so, it give chance to unused topic to be loaded for stats visibility. So, eventually all the producers should get connected even it gets error internally at broker.

final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get();

if (topicLoadSemaphore.tryAcquire()) {
CompletableFuture<Topic> future = new CompletableFuture<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be done with just 1 future, just the same way you add the handle method to attach behavior after completing

@@ -870,7 +913,10 @@ private void updateConfigurationAndRegisterListeners() {
updateDynamicServiceConfiguration();
// add listener on "maxConcurrentLookupRequest" value change
registerConfigurationListener("maxConcurrentLookupRequest",
(pendingLookupRequest) -> lookupRequestSemaphore.set(new Semaphore((int) pendingLookupRequest, true)));
(maxConcurrentLookupRequest) -> lookupRequestSemaphore.set(new Semaphore((int) maxConcurrentLookupRequest, true)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No no, I was referring to the "true" as 2nd parameter for the semaphore constructor. That's about the fairness of the semaphore itself. I think we might not need the exact fairness, so we could relax that option.

*/
private void createPendingLoadTopic() {
Pair<String, CompletableFuture<Topic>> pendingTopic = pendingTopicLoadingQueue.poll();
if (pendingTopic != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (pendingtopic == null) return;

checkTopicNsOwnership(topic);
CompletableFuture<Topic> future = new CompletableFuture<>();
final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get();
final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for acquiring the semaphore here, if we're not waiting for it anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually with this approach compare to my previous commit-406b0fc fails to provide throttling in certain usecases.
eg:

  1. maxConcurrentTopicLoadRequest=10 and broker receives 21 requests so, 10 will be executed and rest of them will go to pending-queue
  2. once first 10 completes it will try to execute pending-10 without any throttling. but at the same time if additional 31 requests come then 10 requests out of them will be served as semaphore is already released. so, now we are serving total 20 concurrent request (10(new requests) + 10(10 completed topic threads which invoked createPendingLoadTopic)
  3. and this process go on which can increase concurrent-count and we are not doing real throttling.
  • However, I am acquiring semaphore here so, at least those new additional coming 31 requests will be throttled. Still this approach doesn't throttle 100% in compare to commit-406b0fc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that makes sense

final String topic = pendingTopic.getLeft();
try {
checkTopicNsOwnership(topic);
CompletableFuture<Topic> future = new CompletableFuture<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, problably we don't need an additional future

log.error("Failed to create pending topic {} {}", topic, re);
pendingTopic.getRight().completeExceptionally(re.getCause());
// process next pending topic
createPendingLoadTopic();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a recursive call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createPersistent is async call so, we are making it recursive once future is completed otherwise due to throttling-limit=10 if queue has 1000 pending requests then all 1000 can go concurrently through createPersistent(topic,future) . So, we have to call it recursively only after receiving completed future. right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in this case if you get an exception when starting the create topic asynchronous, t will immediately start the next one. If all th queued topics start failing we'd likely get into stack overflow errors. Easy fix would be to schedule the next topic load from executor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that make sense.

@rdhabalia rdhabalia force-pushed the topic_throttle branch 3 times, most recently from 78b2aca to 8027740 Compare March 24, 2017 18:44
Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@merlimat
Copy link
Contributor

Couple of seemingly legitimate unit test failures:

ServerCnxTest.testProducerOnNotOwnedTopic:397 expected [ServiceNotReady] but found [UnknownError]
  BrokerClientIntegrationTest.testMaxConcurrentTopicLoading:579 » Execution com....

@rdhabalia
Copy link
Contributor Author

yes, fixed.

@merlimat merlimat merged commit 2886250 into apache:master Mar 24, 2017
@rdhabalia rdhabalia deleted the topic_throttle branch June 21, 2017 18:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants