From 4f94645c2ad371f689001d94b71e23918cb050ae Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 29 Jun 2018 09:25:57 -0400 Subject: [PATCH] NIFI-5361: When submitting many processors to start, calculate the 'timeout timestamp' immediately before calling @OnScheduled method, after the task has been scheduled to run, instead of before the task has a chance to run. --- .../org/apache/nifi/controller/StandardProcessorNode.java | 8 ++++++-- .../controller/scheduling/StandardProcessScheduler.java | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 6f51f87ba05c..413f0974a2f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -1481,12 +1481,16 @@ private void initiateStart(final ScheduledExecutorService taskScheduler, final l final Processor processor = getProcessor(); final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); - final long completionTimestamp = System.currentTimeMillis() + onScheduleTimeoutMillis; + // Completion Timestamp is set to MAX_VALUE because we don't want to timeout until the task has a chance to run. + final AtomicLong completionTimestampRef = new AtomicLong(Long.MAX_VALUE); // Create a task to invoke the @OnScheduled annotation of the processor final Callable startupTask = () -> { LOG.debug("Invoking @OnScheduled methods of {}", processor); + // Now that the task has been scheduled, set the timeout + completionTimestampRef.set(System.currentTimeMillis() + onScheduleTimeoutMillis); + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { try { activateThread(); @@ -1571,7 +1575,7 @@ public void run() { return; } - monitorAsyncTask(taskFuture, monitoringFuture, completionTimestamp); + monitorAsyncTask(taskFuture, monitoringFuture, completionTimestampRef.get()); } }; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 8f7eb2fdb057..b23e76356e4a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -319,7 +319,7 @@ public void trigger() { @Override public Future scheduleTask(final Callable task) { lifecycleState.incrementActiveThreadCount(null); - return componentMonitoringThreadPool.submit(task); + return componentLifeCycleThreadPool.submit(task); } @Override