NIFI-2545: Ensure that when @OnUnscheduled and @OnStopped methods are…#836
NIFI-2545: Ensure that when @OnUnscheduled and @OnStopped methods are…#836markap14 wants to merge 2 commits intoapache:masterfrom
Conversation
| if (activeThreadMonitorCallback.call()) { | ||
| if (scheduleState.isScheduled()) { | ||
| schedulingAgent.unschedule(StandardProcessorNode.this, scheduleState); | ||
| } |
There was a problem hiding this comment.
Trying to understand. The @OnStopped is not invoked until unscheduled flag is set and given that OnUnschedule is a synchronous call how can if (scheduleState.isScheduled()) { ever be true if the internal 'unscheduled' flag is set.
There was a problem hiding this comment.
So, after further digging I see that invoking OnUnscheduled doesn't mean that the actual PN is unscheduled since we still need to invoke getSchedulingAgent(procNode).unschedule(procNode, state);.
If so we probably have to encapsulate this better where call to getSchedulingAgent(procNode).unschedule(procNode, state); should initiate call to @OnUnschedule.
Also, I am not sure I like leaking SchedulingAgent into the ProcessNode, that is why the callback felt more appropriate. With that in mind and in spirit of current changes wouldn't this (see below for full implementation of run()) be the same?
scheduler.execute(new Runnable() {
boolean unscheduled = false;
@Override
public void run() {
if (!this.unscheduled){
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
try {
this.unscheduled = activeThreadMonitorCallback.call();
if (this.unscheduled) {
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
}
scheduledState.set(ScheduledState.STOPPED);
} else {
scheduler.schedule(this, 100, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
LOG.warn("Failed while shutting down processor " + processor, e);
}
}
}
});There was a problem hiding this comment.
I don't read it as a leak of ScheduleState. This class and the class invoking stop are part of a closed model. Having that here means logic for unscheduling, waiting for completion, and invoking onStopped are all nicely aligned in a single location.
@markap14 I am wondering why we don't unschedule the processorNode before we invoke onUnscheduled in the processor? Technically speaking couldn't we continue to give it threads between telling the processor it has been unscheduled and then actually unscheduling it? That and the finding that we're not wrapping the call to the actual processor's unscheduled method(s) in NarCloseable are my two outstanding questions here.
There was a problem hiding this comment.
I am ok with whatever decision but just to clarify; Logic for scheduling/unscheduling was tightly encapsulated in StandardProcessScheduler where it had access to other things that may be required to perform scheduling/unscheduling operations. A callback could always be modified without introducing any changes to ProcessNode. I am simply afraid that doing the above may lead to dragging more things into ProcessNode from StandardProcessScheduler which will eventually lead to package dependency cycles and all the nasty stuff it brings with it.
There was a problem hiding this comment.
Fair point regarding the previously tighter encapsulation. Given the closed nature of this model i think it comes down to a stylistic preference. I'm ok with either for this case.
… called that the active thread count takes that thread into account
…avaDocs, ensured that Nar ClassLoader is used when calling @OnUnscheduled methods, and ensure that we in fact unschedule the processor _before_ calling @OnUnscheduled methods
|
@olegz @joewitt I have pushed a new update to this PR. I did it as a separate commit so that you can see the changes made. I also rebased against master. Fixed issue where we were calling @OnUnscheduled methods before invoking SchedulingAgent.unschedule - that was a good catch. Also addressed the JavaDocs and ensured that we use the appropriate Nar ClassLoader. re: callback vs. passing in ScheduleState - I agree that both do have their pro's & con's. However, when I was reviewing the code I thought that it was quite difficult to follow, with the callback. The callback was only about 2-3 lines of code, and could easily just be moved into the StandardProcessorNode, and this made the code much more straight-forward to read and understand. Additionally, I do not consider it a leaky abstraction, given that this is the 'start' method of ProcessorNode, and this class's job is to handle the "framework-y aspect of processors." Should we need to change it to use a callback in the future, we certainly can do so. If we do, though, I think we should create an interface that has a specific method whose name makes sense and is well documented rather than using a Callable to perform the operation & documenting how that Callable should operate in the JavaDocs for the method called. I think this is what really made it confusing to understand. |
|
Makes total sense to me. Reviewing/testing locally |
|
code/docs changes look good and address the items noted. full clean build/contrib check good. Testing locally now |
|
+1. Please squash and merge to master @markap14 . |
… called that the active thread count takes that thread into account