-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP-82] [pulsar-broker] Add a task to publish resource usage to a topic #10008
[PIP-82] [pulsar-broker] Add a task to publish resource usage to a topic #10008
Conversation
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class ResourceUsageTransportManager implements AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add Javadoc header comment to explain why and what this class is for?
Short summary on how this will be used or related to existing classes and module will help us to remember why this class exist or created, in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. I added the javadoc comment.
/pulsabot run-failure-checks |
/pulsabot run-failure-checks |
1 similar comment
/pulsabot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, we are introduced SystemTopic mechanism in the broker, which is used by topic level policy and transactions now. I think we can improve the system topic to support non-persistent topic to achieve this purpose?
category = CATEGORY_POLICIES, | ||
doc = "Topic to publish usage reports to if resourceUsagePublishToTopic is enabled." | ||
) | ||
private String resourceUsagePublishTopicName = "non-persistent://pulsar/system/resource-usage"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one should be an internal topic, I think we don't need to expose it to users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui thank you for the review. Prior to writing the code in this PR, I read through the SystemTopic/TopicPolicies implementation. My observation was that:
- it was added to publish namespace change events to a special system topic (created under the same namespace), so every namespace will have a special topic created.
- there is a SystemTopicClient wrapper that was added on top of PulsarClient(), which has methods to create reader/writer for each of the above topics. A reader/writer is created every time there is a topic policy change.
- the topic schema is hard-coded to PulsarEvent with AVRO encoding.
The resource-usage feature differs in the following way:
- one topic at the system level, one reader, one writer.
- message schema is protobuf
- periodic task to publish usages and reader callback to process usage messages.
I don't think we could have used SystemTopic for resource-usage without making any changes to SystemTopic. At a minimum I would have had to change SystemTopicClientBase implementation. In my opinion, the overlap in functionality is not large enough to make re-factoring SystemTopic and re-testing the SystemTopic feature worthwhile. So, I decided to write the simple code to create a single reader and writer (which is what the PR has).
W.r.t your other comment, the topic name is internal. The config option is exposed to the user in case the topic name conflicts with the topics that user wants to use. The user can decide that the default is OK, in which case the topic name gets decided by the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be internal of the resource exchange implementation? Or we can add a config such as resourceExchangeProperties=
, so that this can works for other implementations.
doc = "Default policy for publishing usage reports to system topic is disabled." | ||
+ "This enables publishing of usage reports" | ||
) | ||
private boolean resourceUsagePublishToTopic = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to expose namespaceResourceLimiterEnabled or tenantResourceLimiterEnabled to users? the resourceUsagePublishToTopic
is an internal mechanism, expose it to users is need the user to understand the details of his work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When PIP-82 was presented in the community meeting, it was suggested that the mechanism used to exchange the usage information across brokers should be configurable/pluggable (for instance, one could use a third-party DB or zookeeper or something else). This config option was added for that purpose. There will be other configuration that allows the user to configure resource-groups to manage the quotas. That implementation is still work in progress and a PR will be raised when it is ready.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, if the mechanism used to exchange the usage information across brokers should be pluggable, it better to provide the implementation class name such as resourceExchangeClassName=NonPersistentTopicBasedResourceExchange
? So that we can support diverse implementations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui, I renamed the configuration option per your suggestion. Please take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui, please let me know if you have any more comments. If all looks good, can you please approve, so we can get the changes merged? Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
…ageTransportClassName'.
If we don’t use |
Hi @315157973 , I have already outlined some of the reasons why we decided to not use SystemTopic (please see #10008 (comment)). If we are to use SystemTopic to exchange 'resource usage', it will require significant refactoring to SystemTopic. There is really not much overlap in functionality other than creating one producer and one consumer. In my opinion, there is nothing much to gain to refactor the SystemTopic code just to use a couple of lines of code. With respect to allowAutoTopicCreation=false,
[ side note: from looking at the code, I am not able figure out how SystemTopic handles the allowAutoTopicCreation= false; I'd appreciate if you can provide code pointers or documentation that explain. It would help to me to get familiarized. ] And to your other point of inactive topic, by default the resource usage topic is non-persistent as the brokers are only interested in the current/recent usage for calculating the quotas, not historical usage. If the user chooses to use a persistent topic (via configuration) for some reason, it should be fine to assume that the user knows what they are doing and take necessary steps (they may be OK for the topic to be deleted as inactive or they may not be). |
This PR is a great feature, and we can later migrate other usage information from zk to here. So I have the above suggestions. Using
This is just my suggestion Here is an explanation of your question above:
|
Are you suggesting that if 'resource quota enforcement' is enabled, it should override 'allowAutoTopicCreation=false'? My preference is not to do that, but if others agree, I can do that. Are you OK if I take it up as a separate PR? If you strongly feel that the 'resource-usage' topic name should not be exposed in the configuration, I can hide it. The 'resource-usage-exchange' mechanism is a broker-level mechanism. The implementation should be as simple as possible. I think we should it should not use higher level features that are use-visible, such as topic compaction. It is one of the reasons why we have chosen to not have schema validation (even though the schema is defined in protobuf). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good.
I left one suggestion
PTAL
* Listener for resource usage published by other brokers | ||
* @param broker name | ||
* @param Resource usage object | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about 'acceptResourceUsage'?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eolivelli , I changed the name of the method, per your suggestion. Please take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eolivelli I have incorporated your comments and the CI passed as well. Can you please take a look and approve. Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good, thanks
.batchingMaxPublishDelay(publishDelayMilliSecs, TimeUnit.MILLISECONDS) | ||
.sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS) | ||
.blockIfQueueFull(false) | ||
.compressionType(SNAPPY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering, if there a particular reason snappy was chosen as the compression algorithm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerrypeng there is no particular reason. If you believe we should use some other compression algorithm, please let me know. We may also change this after testing if we find that some other algorithm is better.
|
||
List<String> tenantList = admin.tenants().getTenants(); | ||
if (!tenantList.contains(tenant)) { | ||
admin.tenants().createTenant(tenant, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every broker when attempt to run this correct? If so, I would add some retry logic for this whole block in case multiple brokers are trying to do this at the same time. In that scenario only one succeeds but others can just retry to make sure the tenant is already created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerrypeng Yes, every broker will attempt to run this. I can add the retry logic, but I don't understand why we need one. One of the brokers will succeed and others will find that the tenant/namespace exist and move on. The one race condition that is not handled is if after the check for exists, the tenant gets created by some other broker, this broker might get ConflictException. I will add a try/catch to ignore this exception. However, can I please take that up in my next commit? I can create a github issue to track it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The one race condition that is not handled is if after the check for exists, the tenant gets created by some other broker, this broker might get ConflictException
Yes that exact scenario. We have added similar logic in the worker:
To handle that exact situation. It is a lot more lightweight solution that to force a restart.
However, can I please take that up in my next commit? I can create a github issue to track it.
sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerrypeng Yes, every broker will attempt to run this. I can add the retry logic, but I don't understand why we need one. One of the brokers will succeed and others will find that the tenant/namespace exist and move on. The one race condition that is not handled is if after the check for exists, the tenant gets created by some other broker, this broker might get ConflictException. I will add a try/catch to ignore this exception. However, can I please take that up in my next commit? I can create a github issue to track it.
To add try/catch is a good solution.
/pulsarbot run-failure-checks |
4 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
great work
* Listener for resource usage published by other brokers | ||
* @param broker name | ||
* @param Resource usage object | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good, thanks
IIUC @jerrypeng comment will be addresses in the next commit and he agrees with that. thank you @bharanic-dev |
@eolivelli, thank you for the review and for merging the changes to master. |
return pulsarClient.newProducer() | ||
.topic(pulsarService.getConfig().getResourceUsageTransportPublishTopicName()) | ||
.batchingMaxPublishDelay(publishDelayMilliSecs, TimeUnit.MILLISECONDS) | ||
.sendTimeout(sendTimeoutSecs, TimeUnit.SECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder, it seems we can hardcode 10
here, right? why do we need to use the verb sendTimeoutSecs
?
Motivation
PIP-82 adds support for distributed resource quota enforcement at a tenant, namespace or topic level.
Modifications
Verifying this change
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation