-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Re-work Function MetaDataManager to make all metadata writes only by the leader #7255
Re-work Function MetaDataManager to make all metadata writes only by the leader #7255
Conversation
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
Outdated
Show resolved
Hide resolved
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
Show resolved
Hide resolved
...nctions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
Outdated
Show resolved
Hide resolved
...ons/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
Outdated
Show resolved
Hide resolved
...ons/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
Outdated
Show resolved
Hide resolved
...ons/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
Outdated
Show resolved
Hide resolved
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
Outdated
Show resolved
Hide resolved
...nctions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
Outdated
Show resolved
Hide resolved
...nctions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
Show resolved
Hide resolved
...nctions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
Show resolved
Hide resolved
this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, | ||
pulsarClient.newReader(), this.workerConfig, this.errorNotifier); | ||
// read all existing messages | ||
this.setInitializePhase(true); | ||
while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) { | ||
this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is kind of weird that the functionMetaDataTopicTailer.processRequest() will call back to FunctionMetadataManager. Seems like an awkward interaction between the classes. Perhaps we can refactor in a subsequent PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
...nctions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
Show resolved
Hide resolved
readerThread.start(); | ||
} | ||
|
||
@Override | ||
public void run() { | ||
while(running) { | ||
while (running) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To check if we have really reached the end of the topic, I think its safer if we check reader.hasMessageAvailable() == false and reader.readNext(5, TimeUnit.SECONDS) returns null.
.build(); | ||
try { | ||
lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray()); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we return a 500 error to the end user? If we call just "errorNotifier.triggerError(e)", the worker die and the end user will likely not get a response or a timeout error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. Changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"errorNotifier.triggerError(e);" is still being called. The worker might exit before exception gets bubbled up and a response send back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srkukarni ^^^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the question here is whats the right thing to do. If we are having issues to write into the producer, should the leader just reject the request saying Internal server error and hope that things will be better next time? Or is the right approach to trigger worker death?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should return the error to the worker making the call to the leader, otherwise the worker might have to wait for a timeout. I think we should just return an error and the user can retry. There is no guarantee that restarting the worker or electing another leader will help solve the issue since all the workers have the same configuration. Restarting can also be heavy and I would prefer to minimize the amount of forced restarts as possible.
...nctions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
Show resolved
Hide resolved
...unctions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
Outdated
Show resolved
Hide resolved
...unctions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
Outdated
Show resolved
Hide resolved
...nctions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
Show resolved
Hide resolved
…the leader (apache#7255) * Function workers re-direct call update requests to the leader * Fixed test * tests pass * Working version * Fix test * Short circuit update * Fix test * Fix test * Fix tests * Added one more catch * Added one more catch * Seperated internal and external errors * Fix test * Address feedback * Do not expose updateOnLeader to functions * hide api * hide api * removed duplicate comments * Do leadership changes in function metadata manager * make the function sync * Added more comments * Throw error * Changed name * address comments * Deleted unused classes * Rework metadata manager * Working * Fix test * A better way for test * Address feedback Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>
*Motivation* apache#7255 re-worked Function MetaDataManager to make all metadata writes only by the leader. This unintentionally broke Pulsar Functions when m-TLS is used for authentication. Because it doesn't taken TLS port into consideration and always uses a non-TLS port to communicate with the leader broker. The PR fixes the broken implementation and ensure Pulsar Functions use the right service url and authentication plugin to communicate with leader. *Tests* Add an integration test to reproduce the issue and ensure functions worker with m-TLS
*Motivation* #7255 re-worked Function MetaDataManager to make all metadata writes only by the leader. This unintentionally broke Pulsar Functions when m-TLS is used for authentication. Because it doesn't taken TLS port into consideration and always uses a non-TLS port to communicate with the leader broker. The PR fixes the broken implementation and ensure Pulsar Functions use the right service url and authentication plugin to communicate with leader. *Tests* Add an integration test to reproduce the issue and ensure functions worker with m-TLS
*Motivation* apache#7255 re-worked Function MetaDataManager to make all metadata writes only by the leader. This unintentionally broke Pulsar Functions when m-TLS is used for authentication. Because it doesn't taken TLS port into consideration and always uses a non-TLS port to communicate with the leader broker. The PR fixes the broken implementation and ensure Pulsar Functions use the right service url and authentication plugin to communicate with leader. *Tests* Add an integration test to reproduce the issue and ensure functions worker with m-TLS
(If this PR fixes a github issue, please add
Fixes #<xyz>
.)Fixes #
(or if this PR is one task of a github issue, please add
Master Issue: #<xyz>
to link to the master issue.)Master Issue: #
Motivation
Currently Function Metadata topic is not compacted, which means that in a long running system, with sufficient number of function submissions/updates/state changes, the startup lag for workers to read from beginning increases linearly.
However the current mechanism of Function Metadata topic writes does not lend itself to compaction. This is because all workers write into the topic and only one of them wins(it need not be the last).
This pr makes a first stab at simplifying the current workflow. Now, upon a function submission/update/state change, the workers simply pass that request to the leader. The leader is the arbitrer of what goes in(just like it is today) and is the only one writing to the function metadata topic. The rest of the worker still continue to tail the topic to receive the appropriate updates. The leader does not have the tailer, and instead directly updates in in-memory state when it writes to the metadata topic.
Modifications
Describe the modifications you've done.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation