-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-3660: Remove use of queues for updating credentials #3295
Conversation
.forEach(taskObject -> { | ||
((ICredentialsListener) taskObject).setCredentials(creds == null ? null : creds.get_creds()); | ||
}); | ||
this.needToRefreshCreds = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think maybe we should first set this.needToRefreshCreds to false and then update the creds. Imagine we uploaded credentials twice in a row and this loaded the first creds and the thread paused. It would then clear the update flag and miss the second credentials upload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
} catch (InterruptedException e) { | ||
throw new RuntimeException(e); | ||
} | ||
executor.needToRefreshCreds = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Does this method name and signature needs to change since it is setting a flag and not using the supplied parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the SpoutExecutor and BoltExecutor during init uses the initialCredentials, the accept method does not need to worry about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -152,6 +153,7 @@ | |||
private final AtomicLong nextLoadUpdate = new AtomicLong(0); | |||
private final boolean trySerializeLocal; | |||
private final Collection<IAutoCredentials> autoCredentials; | |||
AtomicReference<Credentials> credentialsAtom; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed
storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
Outdated
Show resolved
Hide resolved
@@ -270,6 +273,15 @@ public ExecutorShutdown execute() throws Exception { | |||
|
|||
@Override | |||
public void accept(Object event) { | |||
if (this.needToRefreshCreds) { | |||
this.needToRefreshCreds = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seems to be a race condition here.
The executor thread consumes from receiveQ and every time it consumes one tuple, the accept
method is invoked and needToRefreshCreds
is checked.
This needToRefreshCreds
variable is accessed by others threads (which invoke credentialsChanged
) and the value will be updated to true
.
So first of all, needToRefreshCreds
needs to be at least violate
. But violate
doesn't seem enough because needToRefreshCreds
is set to false
in this executor thread, and set to true
in other threads.
Between checking if needToRefreshCreds
is true and setting it to be false
in the executor thread, needToRefreshCreds
could be modified to true
in other threads but this information is lost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The needToRefreshCreds could be updated to true anytime after if condition is evaluated by the worker credentials refresh thread. If there are consecutive updates, it will either execute the setCredentials twice or will execute only once with latest credentials - no harm/no deadlock.
@@ -270,6 +273,15 @@ public ExecutorShutdown execute() throws Exception { | |||
|
|||
@Override | |||
public void accept(Object event) { | |||
if (this.needToRefreshCreds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An issue about checking the value here is it might be blocked.
This accept
is only called when receiveQueue.consume
is called. But it could be blocked so credential update will be delayed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved this to separate method and being called fro withing BoltExecutor
and SpoutExecutor
call at the beginning of each iteration in the asyncloop. This would ensure that credentials are updated on executor irrespective of back pressure or no tuples to process scenarios. For backward compatibility and agreement about invoking setCreentials on user implementation on ICredentialsListener from executor thread, if Executor thread is stuck in execute or nextTuple methods delays can not be avoided. But otherwise we should be able to get credentials update on next iteration.
protected void updateExecCredsIfRequired() { | ||
if (this.needToRefreshCreds) { | ||
this.needToRefreshCreds = false; | ||
LOG.info("The credentials are being updated {}.", executorId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should change it to be DEBUG
since this is on a critical path (even though this method
might not be called very frequently)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think info should be fine.
@@ -124,6 +126,7 @@ | |||
protected int idToTaskBase; | |||
protected String hostname; | |||
private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1); | |||
private boolean needToRefreshCreds = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be volatile
to make sure the changes from other threads can be seen immediately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since multiple threads are changing this flag- switching to AtomicBoolean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If only simple get()
and set()
methods are used in AtomicBoolean
, it is basically the same as using volatile boolean
unless methods like compareAndSet
and getAndSet
are required.
https://github.com/AdoptOpenJDK/openjdk-jdk8u/blob/master/jdk/src/share/classes/java/util/concurrent/atomic/AtomicBoolean.java#L85-L87 (although the implementation uses volatile int
)
But anyway, using AtomicBoolean
is easier to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Looks good to me
Thanks
What is the purpose of the change
How was the change tested
After launching topology, wait for workertokens to be refreshed by nimbus, I noticed below changes in the worker log suggesting each executor has received the newer worker tokens: