-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Functions metadata compaction #7377
Functions metadata compaction #7377
Conversation
@srkukarni the reader in the FunctionMetadataTopicTailer needs to
|
https://github.com/apache/pulsar/pull/7377/files#diff-fbc6eb611de17f87b29ba52e30bb7fcbR140 |
break; | ||
default: | ||
log.warn("Received request with unrecognized type: {}", serviceRequest); | ||
if (workerConfig.getCompactMetadataTopic()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or should we just check if the message has a data or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually prefer explicit setting.
Plus with this change, key is being set for whether you are compacting the topic or not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way for a user to that is already using functions to be able to turn compaction on? Is there a path of migration for these users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The simplest thing maybe to change this check from
if (workerConfig.getCompactMetadataTopic()
to
if (messsage.getdata() == null)
So old messages can get processed correctly as well as the the new messages in the new format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The format of the messages are now different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just the version is added as part of properties right? We can still process the messages differently based on whether the data == null or not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not just the version. We actually write function metadata when compaction is turned on. In case when it’s not compacted, we write the service request for backwards compatibility.
category = CATEGORY_FUNC_METADATA_MNG, | ||
doc = "Should the metadata topic be compacted?" | ||
) | ||
private Boolean compactMetadataTopic = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the current approach, this is a dangerous flag. If a user mistakenly flips the flag on. It could render their cluster corrupt. We should perhaps add more warnings for what this config with do. Instead of "compactMetadataTopic" maybe we can rename it to "useCompactedMetadataTopic"? So that what it does is more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change the name. Although the possibility of corruption is low. Because before a leader can actually start writing, it needs to read all existing messages and the messages will not deserialize either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to add some more content to the doc annotation on what the impact of flipping the flag is.
lastMessageSeen = exclusiveLeaderProducer.newMessage() | ||
.key(key) | ||
.value(toWrite) | ||
.property("version", Long.toString(functionMetaData.getVersion())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets use a
private const final String
variable for "version"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
try { | ||
lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray()); | ||
lastMessageSeen = exclusiveLeaderProducer.newMessage() | ||
.key(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will add a key for both the old format and new format right? Not sure we should add a key for the old format. Perhaps, whether checking whether the message has a key or not can be used to determine whether its the old format or new format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
break; | ||
default: | ||
log.warn("Received request with unrecognized type: {}", serviceRequest); | ||
if (workerConfig.getUseCompactedMetadataTopic()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change this check to if a key exists or not? This creates an avenue in which a existing cluster can transition to use a compacted metadata topic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think thats a good idea. I would rather have worker fail here unless specifically configured to have compaction enabled or disabled.
* Function workers re-direct call update requests to the leader * Fixed test * tests pass * Working version * Fix test * Short circuit update * Fix test * Fix test * Fix tests * Added one more catch * Added one more catch * Seperated internal and external errors * Fix test * Address feedback * Do not expose updateOnLeader to functions * hide api * hide api * removed duplicate comments * Do leadership changes in function metadata manager * make the function sync * Added more comments * Throw error * Changed name * address comments * Deleted unused classes * Rework metadata manager * Working * Fix test * A better way for test * Address feedback * Added an option to compact function metadata topic * Address feedback * Incorporate feedback Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>
(If this PR fixes a github issue, please add
Fixes #<xyz>
.)Fixes #
(or if this PR is one task of a github issue, please add
Master Issue: #<xyz>
to link to the master issue.)Master Issue: #
Motivation
Currently we do not compact FunctionMetadata topic. This pr adds the ability to do that
Modifications
Describe the modifications you've done.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation