-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix leader/scheduler assignment processing lag problem #7237
Fix leader/scheduler assignment processing lag problem #7237
Conversation
e8f550c
to
8a54e0f
Compare
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
} | ||
|
||
hasExited = new CompletableFuture<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is re-initializing the variable, so if we can "start" again, the completeable future is not already completed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think its better to recreate the object? That way this re-create logic becomes simpler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a new FunctionAssignmentTailer doesn't really simplify the logic much. "hasExited" is needed regardless of whether we recreate the object from scratch or not. We are also keeping the track of the "lastMessageId" in FunctionAssignmentTailer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then maybe we can create this at start instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just dislike creating new objects in something like close. Seems like not the usual pattern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I re-initialize it in the start method
try { | ||
// trigger read to the end of the topic and exit | ||
// Since the leader can just update its in memory assignments cache directly | ||
functionAssignmentTailer.triggerReadToTheEndAndExit().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be abstracted out from leaderservice to respective class(in this case functionruntimemanager)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also we need to create the producer here right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be abstracted out from leaderservice to respective class(in this case functionruntimemanager)
yup done
Also we need to create the producer here right?
Why do we need to create a producer? To start producing to the assignment topic? We initialize the producer in the constructor. I guess we don't need to do that and only when the worker becomes the leader will it create the producer and close the producer when it looses leadership
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. That is the same pattern in #7255 as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} catch (Exception e) { | ||
log.warn("Failed to invoke scheduler", e); | ||
throw e; | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to simplify this massively.
I think part of the pr that I'm working on wrt metadata simplification will impact this as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are you thinking? What is the complexity here?
lastMessageId = msg.getMessageId(); | ||
} | ||
} catch (Throwable th) { | ||
if (isRunning) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we check for exitOnEndofTopic as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to since even if "exitOnEndOfTopic" is set ,"isRunning" will still be set to true and any error will be bubbled up as expected
log.warn("Encountered error when assignment tailer is not running", th); | ||
} | ||
} | ||
this.tailerThread = getTailerThread(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe defer this till start?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
...unctions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
Show resolved
Hide resolved
try { | ||
// trigger read to the end of the topic and exit | ||
// Since the leader can just update its in memory assignments cache directly | ||
functionRuntimeManager.stopReadingAssignments(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you instead call functionRunTimeManager.acquireLeadership() and functionRunTimeManager.giveupLeadership()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't make to call "schedulerManager.initialize();" there or add the SchedulerManager as a dependency in FunctionRuntimeManager just for this
// when a worker has lost leadership it needs to start reading from the assignment topic again | ||
try { | ||
// acquire scheduler lock to make sure a scheduling is not in process | ||
schedulerManager.getSchedulerLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the better way is to make scheduler aware of the leadership changes(just like runtime manger) and call acquireLeadership and giveupLeadership
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a way to do that. You will need synchronization somewhere and someone will have to wait
private volatile boolean isRunning = false; | ||
private volatile boolean exitOnEndOfTopic = false; | ||
private CompletableFuture<Void> hasExited; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exitFuture might be a better name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
|
||
private final Thread tailerThread; | ||
@Getter | ||
private MessageId lastMessageId = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn' t we init this to MessageId.earliest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No
public void start() { | ||
isRunning = true; | ||
tailerThread.start(); | ||
public synchronized void start() throws PulsarClientException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its cleaner to consolidate this and above method to start(MessageId) { ... }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think that some logic will be simpler if we create Tailer object every time we go thru leadership transistion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think that some logic will be simpler if we create Tailer object every time we go thru leadership transition
That is not correct. The functionAssignmentTailer is also responsible for keeping track of a message id. If a worker becomes a leader and then loses leadership prior to creating any assignments, we shouldn't just start reading the assignment topic from the beginning. We should resume from the message id stored in the functionAssignmentTailer
if (msg == null) { | ||
if (exitOnEndOfTopic && !reader.hasMessageAvailable()) { | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it simpler if we do
while(isRunning) {
if (exitOnEndOfTopic && !available) break;
try { read message... }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's safer to wait for a timeout period to make sure no messages just arrived late
|
||
} catch (Exception e) { | ||
log.error("Failed to initialize meta data store", e); | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
public void start() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initialize and start? what cannot be done during constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we cannot start prior to the SchedulerManager is setup since function metadata manager can invoke the scheduler. We can initialize prior to to setting up the SchedulerManager but we cannot start
this.getWorkerService().getClient().newReader(), | ||
this.workerConfig, | ||
this.errorNotifier); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that we are no longer using FunctionAssignmenttailer here. However maybe a static method that consolidates. this reader creation and the one in assignment tailer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
@@ -33,8 +33,11 @@ | |||
import org.apache.pulsar.client.admin.PulsarAdmin; | |||
import org.apache.pulsar.client.admin.PulsarAdminBuilder; | |||
import org.apache.pulsar.client.api.ClientBuilder; | |||
import org.apache.pulsar.client.api.MessageId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
* Fix leader/scheduler assignment processing lag problem * add license header * adding more comments * improving impl * fixing bugs * improving impl * fixing tests * adding comments * add more testing * addressing comments * cleaning up * refactoring implementation * addressing comments Co-authored-by: Jerry Peng <jerryp@splunk.com>
* Fix leader/scheduler assignment processing lag problem * add license header * adding more comments * improving impl * fixing bugs * improving impl * fixing tests * adding comments * add more testing * addressing comments * cleaning up * refactoring implementation * addressing comments Co-authored-by: Jerry Peng <jerryp@splunk.com>
Motivation
When the leader worker isn't processing assignment messages fast enough. The background routine that checks for unassigned functions instances will trigger scheduler to schedule and write more assignments to the assignment topic. There is essentially a feedback loop that can cause many assignment updates to be published in the assignment topic that are unnecessary.
Modifications
Allow leader to modify/update locally in-memory assignments map