From 23e4f685c3e6a5c83538b7e3e379f1bb1c6002b7 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Thu, 28 Apr 2016 09:53:16 -0400 Subject: [PATCH 01/11] NIFI-1822: Allow concurrent execution in ExecuteScript --- .../script/AbstractScriptProcessor.java | 24 +++++++++++++++---- .../nifi/processors/script/ExecuteScript.java | 20 +++++++++------- .../script/InvokeScriptedProcessor.java | 15 +++++++++--- 3 files changed, 42 insertions(+), 17 deletions(-) diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java index b7e73bc9503d..99a18bf73d9c 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.script; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -48,7 +49,9 @@ import java.util.Map; import java.util.ServiceLoader; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -102,7 +105,8 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc protected String scriptBody; protected String[] modules; protected List descriptors; - protected ScriptEngine scriptEngine; + + protected BlockingQueue engineQ = null; /** * Custom validation for ensuring exactly one of Script File or Script Body is populated @@ -197,7 +201,7 @@ protected boolean isFile(final String path) { * Performs common setup operations when the processor is scheduled to run. This method assumes the member * variables associated with properties have been filled. */ - public void setup() { + public void setup(int numberOfScriptEngines) { if (scriptEngineConfiguratorMap.isEmpty()) { ServiceLoader configuratorServiceLoader = @@ -206,7 +210,7 @@ public void setup() { scriptEngineConfiguratorMap.put(configurator.getScriptEngineName().toLowerCase(), configurator); } } - setupEngine(); + setupEngines(numberOfScriptEngines); } /** @@ -216,7 +220,8 @@ public void setup() { * * @see org.apache.nifi.processors.script.ScriptEngineConfigurator */ - protected void setupEngine() { + protected void setupEngines(int numberOfScriptEngines) { + engineQ = new LinkedBlockingQueue<>(numberOfScriptEngines); ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader(); try { ProcessorLog log = getLogger(); @@ -248,11 +253,13 @@ protected void setupEngine() { Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader); } - scriptEngine = createScriptEngine(); + ScriptEngine scriptEngine = createScriptEngine(); try { if (configurator != null) { configurator.init(scriptEngine, modules); } + engineQ.offer(scriptEngine); + } catch (ScriptException se) { log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName}); if (log.isDebugEnabled()) { @@ -300,4 +307,11 @@ protected ClassLoader createScriptEngineModuleClassLoader(URL[] modules) { return new URLClassLoader(modules, thisClassLoader); } + + @OnStopped + public void stop() { + if (engineQ != null) { + engineQ.clear(); + } + } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java index 3f7c202d517f..7d5ee5225ac8 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java @@ -18,8 +18,6 @@ import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.TriggerSerially; -import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -35,6 +33,7 @@ import javax.script.Bindings; import javax.script.ScriptContext; +import javax.script.ScriptEngine; import javax.script.ScriptException; import javax.script.SimpleBindings; import java.io.FileInputStream; @@ -45,7 +44,6 @@ import java.util.Map; import java.util.Set; -@TriggerSerially @Tags({"script", "execute", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj"}) @CapabilityDescription("Experimental - Executes a script given the flow file and a process session. The script is responsible for " + "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by " @@ -128,7 +126,9 @@ public void setup(final ProcessContext context) { } else { modules = new String[0]; } - super.setup(); + // Create a script engine for each possible task + int maxTasks = context.getMaxConcurrentTasks(); + super.setup(maxTasks); scriptToRun = scriptBody; try { @@ -161,6 +161,11 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto createResources(); } } + ScriptEngine scriptEngine = engineQ.poll(); + if (scriptEngine == null) { + // No engine available so nothing more to do here + return; + } ProcessorLog log = getLogger(); ProcessSession session = sessionFactory.createSession(); try { @@ -211,11 +216,8 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t}); session.rollback(true); throw t; + } finally { + engineQ.offer(scriptEngine); } } - - @OnStopped - public void stop() { - scriptEngine = null; - } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java index 2f287b610348..06c0a7f18bff 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.script.Invocable; +import javax.script.ScriptEngine; import javax.script.ScriptException; import org.apache.commons.io.IOUtils; @@ -67,6 +68,8 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { private AtomicBoolean scriptNeedsReload = new AtomicBoolean(true); + private ScriptEngine scriptEngine = null; + /** * Returns the valid relationships for this processor. SUCCESS and FAILURE are always returned, and if the script * processor has defined additional relationships, those will be added as well. @@ -174,9 +177,14 @@ public void setup(final ProcessContext context) { setup(); } - @Override public void setup() { - super.setup(); + // Create a single script engine, the Processor object is reused by each task + super.setup(1); + scriptEngine = engineQ.poll(); + if (scriptEngine == null) { + throw new ProcessException("No script engine available!"); + } + if (scriptNeedsReload.get() || processor.get() == null) { if (isFile(scriptPath)) { reloadScriptFile(scriptPath); @@ -386,7 +394,7 @@ public ControllerServiceLookup getControllerServiceLookup() { protected Collection customValidate(final ValidationContext context) { Collection commonValidationResults = super.customValidate(context); - if(!commonValidationResults.isEmpty()) { + if (!commonValidationResults.isEmpty()) { return commonValidationResults; } @@ -486,6 +494,7 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto @OnStopped public void stop() { + super.stop(); processor.set(null); } } From fd9120c28bd84a8f88a49798e1290d32c16a024d Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Tue, 10 May 2016 16:08:02 -0700 Subject: [PATCH 02/11] NIFI-1822 Added unit test skeleton for pooled script processor execution. --- .../script/ExecuteScriptGroovyTest.groovy | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy new file mode 100644 index 000000000000..479ab6ff6f38 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy @@ -0,0 +1,71 @@ +package org.apache.nifi.processors.script + +import org.apache.nifi.security.util.EncryptionMethod +import org.junit.After +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.python.bouncycastle.util.encoders.Hex +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import javax.crypto.Cipher + +@RunWith(JUnit4.class) +class ExecuteScriptGroovyTest extends GroovyTestCase { + private static final Logger logger = LoggerFactory.getLogger(ExecuteScriptGroovyTest.class) + + @BeforeClass + public static void setUpOnce() throws Exception { + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + public void setUp() throws Exception { + + } + + @After + public void tearDown() throws Exception { + + } + + @Test + public void testShouldExecuteScript() throws Exception { + // Arrange + final String PASSWORD = "shortPassword"; + final byte[] SALT = cipherProvider.generateSalt() + + final String plaintext = "This is a plaintext message."; + + // Act + for (EncryptionMethod em : strongKDFEncryptionMethods) { + logger.info("Using algorithm: ${em.getAlgorithm()}"); + + // Initialize a cipher for encryption + Cipher cipher = cipherProvider.getCipher(em, PASSWORD, SALT, DEFAULT_KEY_LENGTH, true); + byte[] iv = cipher.getIV(); + logger.info("IV: ${Hex.encodeHexString(iv)}") + + byte[] cipherBytes = cipher.doFinal(plaintext.getBytes("UTF-8")); + logger.info("Cipher text: ${Hex.encodeHexString(cipherBytes)} ${cipherBytes.length}"); + + cipher = cipherProvider.getCipher(em, PASSWORD, SALT, iv, DEFAULT_KEY_LENGTH, false); + byte[] recoveredBytes = cipher.doFinal(cipherBytes); + String recovered = new String(recoveredBytes, "UTF-8"); + logger.info("Recovered: ${recovered}") + + // Assert + assert plaintext.equals(recovered); + } + } + + //testShouldExecuteScriptWithPool + //testShouldHandleFailingScript + //testShouldHandleNoAvailableEngine + //testPooledExecutionShouldBeFaster +} From 8c5ba511283d3ddcbeb56a43240e8640e66a7357 Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Fri, 13 May 2016 19:03:34 -0700 Subject: [PATCH 03/11] NIFI-1822 Added variable max concurrent task field in MockProcessorContext because it was previously hardcoded to 1. Changed setNumThreads to setMaxConcurrentTasks to maintain naming convention. --- .../java/org/apache/nifi/util/MockProcessContext.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index b6272ee09682..deb6467f283e 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; - import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.Query.Range; import org.apache.nifi.components.ConfigurableComponent; @@ -55,7 +54,8 @@ public class MockProcessContext extends MockControllerServiceLookup implements S private boolean allowExpressionValidation = true; private volatile boolean incomingConnection = true; private volatile boolean nonLoopConnection = true; - private int numThreads = 1; + + private int maxConcurrentTasks = 1; private volatile Set connections = new HashSet<>(); private volatile Set unavailableRelationships = new HashSet<>(); @@ -175,7 +175,7 @@ public void addControllerService(final String serviceIdentifier, final Controlle @Override public int getMaxConcurrentTasks() { - return numThreads; + return maxConcurrentTasks; } public void setAnnotationData(final String annotationData) { @@ -362,7 +362,7 @@ public String getName() { return ""; } - protected void setNumThreads(int numThreads) { - this.numThreads = numThreads; + protected void setMaxConcurrentTasks(int maxConcurrentTasks) { + this.maxConcurrentTasks = maxConcurrentTasks; } } From 8ab0ce59a408f5090c68a954dc1496de275aded1 Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Fri, 13 May 2016 19:06:17 -0700 Subject: [PATCH 04/11] NIFI-1822 Added for loop to instantiate multiple script engines in queue. --- .../script/AbstractScriptProcessor.java | 58 ++++++++++--------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java index 99a18bf73d9c..a34c83c0d18f 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java @@ -16,22 +16,6 @@ */ package org.apache.nifi.processors.script; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.AbstractSessionFactoryProcessor; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.StringUtils; - -import javax.script.ScriptEngine; -import javax.script.ScriptEngineFactory; -import javax.script.ScriptEngineManager; -import javax.script.ScriptException; import java.io.File; import java.net.MalformedURLException; import java.net.URL; @@ -53,6 +37,21 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import javax.script.ScriptEngine; +import javax.script.ScriptEngineFactory; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; /** * This class contains variables and methods common to scripting processors @@ -226,6 +225,10 @@ protected void setupEngines(int numberOfScriptEngines) { try { ProcessorLog log = getLogger(); + if (StringUtils.isBlank(scriptEngineName)) { + throw new IllegalArgumentException("The script engine name cannot be null"); + } + ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase()); // Get a list of URLs from the configurator (if present), or just convert modules from Strings to URLs @@ -253,20 +256,21 @@ protected void setupEngines(int numberOfScriptEngines) { Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader); } - ScriptEngine scriptEngine = createScriptEngine(); - try { - if (configurator != null) { - configurator.init(scriptEngine, modules); - } - engineQ.offer(scriptEngine); + for (int i = 0; i < numberOfScriptEngines; i++) { + ScriptEngine scriptEngine = createScriptEngine(); + try { + if (configurator != null) { + configurator.init(scriptEngine, modules); + } + engineQ.offer(scriptEngine); - } catch (ScriptException se) { - log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName}); - if (log.isDebugEnabled()) { - log.error("Error initializing script engine configurator", se); + } catch (ScriptException se) { + log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName}); + if (log.isDebugEnabled()) { + log.error("Error initializing script engine configurator", se); + } } } - } finally { // Restore original context class loader Thread.currentThread().setContextClassLoader(originalContextClassLoader); From 4c174c8ca76f5a9e64aa2bd2469ca2966c90b888 Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Fri, 13 May 2016 19:06:53 -0700 Subject: [PATCH 05/11] NIFI-1822 Added debugging messages to script execution. --- .../nifi/processors/script/ExecuteScript.java | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java index 7d5ee5225ac8..ac450bd78754 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java @@ -16,13 +16,25 @@ */ package org.apache.nifi.processors.script; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.script.Bindings; +import javax.script.ScriptContext; +import javax.script.ScriptEngine; +import javax.script.ScriptException; +import javax.script.SimpleBindings; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; @@ -31,19 +43,6 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StringUtils; -import javax.script.Bindings; -import javax.script.ScriptContext; -import javax.script.ScriptEngine; -import javax.script.ScriptException; -import javax.script.SimpleBindings; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - @Tags({"script", "execute", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj"}) @CapabilityDescription("Experimental - Executes a script given the flow file and a process session. The script is responsible for " + "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by " @@ -162,11 +161,12 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto } } ScriptEngine scriptEngine = engineQ.poll(); + ProcessorLog log = getLogger(); + log.info("[REMOVE] After polling engine, Script Engine queue size: " + engineQ.size()); if (scriptEngine == null) { // No engine available so nothing more to do here return; } - ProcessorLog log = getLogger(); ProcessSession session = sessionFactory.createSession(); try { @@ -218,6 +218,7 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto throw t; } finally { engineQ.offer(scriptEngine); + log.info("[REMOVE] After offering engine, Script Engine queue size: " + engineQ.size()); } } } From 12bbe82381fb334ff236db6815ada8a277a782e7 Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Fri, 13 May 2016 19:07:53 -0700 Subject: [PATCH 06/11] NIFI-1822 Moved unit test to correct directory. Added test script resource which generates flowfile and updates attribute with current thread. Added tests for single run, serial run, and pooled run (not complete). --- .../script/ExecuteScriptGroovyTest.groovy | 151 ++++++++++++++++++ .../script/ExecuteScriptGroovyTest.groovy | 71 -------- .../testAddTimeAndThreadAttribute.groovy | 22 +++ 3 files changed, 173 insertions(+), 71 deletions(-) create mode 100644 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy delete mode 100644 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy create mode 100644 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy new file mode 100644 index 000000000000..4a9c3c9849fe --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy @@ -0,0 +1,151 @@ +package org.apache.nifi.processors.script + +import org.apache.nifi.util.MockFlowFile +import org.apache.nifi.util.TestRunners +import org.junit.After +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import static org.junit.Assert.assertNotNull + +@RunWith(JUnit4.class) +class ExecuteScriptGroovyTest extends BaseScriptTest { + private static final Logger logger = LoggerFactory.getLogger(ExecuteScriptGroovyTest.class) + + private static final String TEST_CSV_DATA = "gender,title,first,last\n" + + "female,miss,marlene,shaw\n" + + "male,mr,todd,graham" + + @BeforeClass + public static void setUpOnce() throws Exception { + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + public void setUp() throws Exception { + super.setupExecuteScript() + + runner.setValidateExpressionUsage(false) + runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy") + runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy") + runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy") + } + + @After + public void tearDown() throws Exception { + + } + + private void setupPooledExecuteScript(int poolSize = 2) { + final ExecuteScript executeScript = new ExecuteScript() + // Need to do something to initialize the properties, like retrieve the list of properties + assertNotNull(executeScript.getSupportedPropertyDescriptors()) + runner = TestRunners.newTestRunner(executeScript) + runner.setValidateExpressionUsage(false) + runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy") + runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy") + runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy") + + // Override context value + runner.processContext.maxConcurrentTasks = poolSize + logger.info("Overrode context max concurrent tasks to ${runner.processContext.maxConcurrentTasks}") + + // Must set context properties on runner before calling setup with pool size +// executeScript.setup(poolSize) + } + + @Test + public void testShouldExecuteScript() throws Exception { + // Arrange + final String SINGLE_POOL_THREAD_PATTERN = /pool-\d+-thread-1/ + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + runner.run() + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + MockFlowFile flowFile = result.get(0) + logger.info("Resulting flowfile attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ SINGLE_POOL_THREAD_PATTERN + } + + @Test + public void testShouldExecuteScriptSerially() throws Exception { + // Arrange + final int ITERATIONS = 10 + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + ITERATIONS.times { int i -> + logger.info("Running iteration ${i}") + runner.run() + } + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + result.eachWithIndex { MockFlowFile flowFile, int i -> + logger.info("Resulting flowfile [${i}] attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/ + } + } + + @Test + public void testShouldExecuteScriptWithPool() throws Exception { + // Arrange + final int ITERATIONS = 10 + final int POOL_SIZE = 2 + + setupPooledExecuteScript(POOL_SIZE) + logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}") + + runner.setThreadCount(POOL_SIZE) + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + ITERATIONS.times { int i -> + logger.info("Running iteration ${i}") + runner.run() + } + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + result.eachWithIndex { MockFlowFile flowFile, int i -> + logger.info("Resulting flowfile [${i}] attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + flowFile.assertAttributeEquals("thread", "pool-${i + 1}-thread-1") + } + } + + //testShouldExecuteScriptWithPool + //testShouldHandleFailingScript + //testShouldHandleNoAvailableEngine + //testPooledExecutionShouldBeFaster + //testPoolSizeVsThreadCount +} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy deleted file mode 100644 index 479ab6ff6f38..000000000000 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy +++ /dev/null @@ -1,71 +0,0 @@ -package org.apache.nifi.processors.script - -import org.apache.nifi.security.util.EncryptionMethod -import org.junit.After -import org.junit.Before -import org.junit.BeforeClass -import org.junit.Test -import org.junit.runner.RunWith -import org.junit.runners.JUnit4 -import org.python.bouncycastle.util.encoders.Hex -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -import javax.crypto.Cipher - -@RunWith(JUnit4.class) -class ExecuteScriptGroovyTest extends GroovyTestCase { - private static final Logger logger = LoggerFactory.getLogger(ExecuteScriptGroovyTest.class) - - @BeforeClass - public static void setUpOnce() throws Exception { - logger.metaClass.methodMissing = { String name, args -> - logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") - } - } - - @Before - public void setUp() throws Exception { - - } - - @After - public void tearDown() throws Exception { - - } - - @Test - public void testShouldExecuteScript() throws Exception { - // Arrange - final String PASSWORD = "shortPassword"; - final byte[] SALT = cipherProvider.generateSalt() - - final String plaintext = "This is a plaintext message."; - - // Act - for (EncryptionMethod em : strongKDFEncryptionMethods) { - logger.info("Using algorithm: ${em.getAlgorithm()}"); - - // Initialize a cipher for encryption - Cipher cipher = cipherProvider.getCipher(em, PASSWORD, SALT, DEFAULT_KEY_LENGTH, true); - byte[] iv = cipher.getIV(); - logger.info("IV: ${Hex.encodeHexString(iv)}") - - byte[] cipherBytes = cipher.doFinal(plaintext.getBytes("UTF-8")); - logger.info("Cipher text: ${Hex.encodeHexString(cipherBytes)} ${cipherBytes.length}"); - - cipher = cipherProvider.getCipher(em, PASSWORD, SALT, iv, DEFAULT_KEY_LENGTH, false); - byte[] recoveredBytes = cipher.doFinal(cipherBytes); - String recovered = new String(recoveredBytes, "UTF-8"); - logger.info("Recovered: ${recovered}") - - // Assert - assert plaintext.equals(recovered); - } - } - - //testShouldExecuteScriptWithPool - //testShouldHandleFailingScript - //testShouldHandleNoAvailableEngine - //testPooledExecutionShouldBeFaster -} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy new file mode 100644 index 000000000000..a9e599cacbbe --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +def flowFile = session.create() +flowFile = session.putAttribute(flowFile, "time-updated", new Date().toString()) +flowFile = session.putAttribute(flowFile, "thread", Thread.currentThread().getName()) +session.transfer(flowFile, REL_SUCCESS) + +// TODO: Way to add the pool "name" or script engine identifier to the flowfile? \ No newline at end of file From bdbc4bab04ae3953087fcfc5192fac19da813e83 Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Fri, 13 May 2016 19:24:07 -0700 Subject: [PATCH 07/11] NIFI-1822 Renamed reference to MockProcessorContext#setNumThreads to setMaxConcurrentTasks after refactor. --- .../org/apache/nifi/util/StandardProcessorTestRunner.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 5aa6d435143f..7ce88463d8ba 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; @@ -402,7 +401,7 @@ public void enqueue(final byte[] data) { @Override public void enqueue(final String data) { - enqueue(data.getBytes(StandardCharsets.UTF_8), Collections. emptyMap()); + enqueue(data.getBytes(StandardCharsets.UTF_8), Collections.emptyMap()); } @Override @@ -531,7 +530,7 @@ public void setThreadCount(final int threadCount) { } this.numThreads = threadCount; - this.context.setNumThreads(threadCount); + this.context.setMaxConcurrentTasks(threadCount); } @Override From 9264428bd8515aa74b73e0a84b9f46c3cd749400 Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Sat, 14 May 2016 20:12:02 -0700 Subject: [PATCH 08/11] NIFI-1822 Removed debugging log statements for script engine queue size. Added unit tests demonstrating pooled execution timing and thread usage. --- .../nifi/processors/script/ExecuteScript.java | 2 - .../script/ExecuteScriptGroovyTest.groovy | 71 ++++++++++++++----- 2 files changed, 54 insertions(+), 19 deletions(-) diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java index ac450bd78754..f9fe2e3b3e64 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java @@ -162,7 +162,6 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto } ScriptEngine scriptEngine = engineQ.poll(); ProcessorLog log = getLogger(); - log.info("[REMOVE] After polling engine, Script Engine queue size: " + engineQ.size()); if (scriptEngine == null) { // No engine available so nothing more to do here return; @@ -218,7 +217,6 @@ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFacto throw t; } finally { engineQ.offer(scriptEngine); - log.info("[REMOVE] After offering engine, Script Engine queue size: " + engineQ.size()); } } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy index 4a9c3c9849fe..fc1c4450801b 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy @@ -1,6 +1,7 @@ package org.apache.nifi.processors.script import org.apache.nifi.util.MockFlowFile +import org.apache.nifi.util.StopWatch import org.apache.nifi.util.TestRunners import org.junit.After import org.junit.Before @@ -11,6 +12,8 @@ import org.junit.runners.JUnit4 import org.slf4j.Logger import org.slf4j.LoggerFactory +import java.util.concurrent.TimeUnit + import static org.junit.Assert.assertNotNull @RunWith(JUnit4.class) @@ -56,9 +59,6 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { // Override context value runner.processContext.maxConcurrentTasks = poolSize logger.info("Overrode context max concurrent tasks to ${runner.processContext.maxConcurrentTasks}") - - // Must set context properties on runner before calling setup with pool size -// executeScript.setup(poolSize) } @Test @@ -92,10 +92,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { runner.assertValid() // Act - ITERATIONS.times { int i -> - logger.info("Running iteration ${i}") - runner.run() - } + runner.run(ITERATIONS) // Assert runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) @@ -125,10 +122,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { runner.assertValid() // Act - ITERATIONS.times { int i -> - logger.info("Running iteration ${i}") - runner.run() - } + runner.run(ITERATIONS) // Assert runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) @@ -139,13 +133,56 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { flowFile.assertAttributeExists("time-updated") flowFile.assertAttributeExists("thread") - flowFile.assertAttributeEquals("thread", "pool-${i + 1}-thread-1") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/ } } - //testShouldExecuteScriptWithPool - //testShouldHandleFailingScript - //testShouldHandleNoAvailableEngine - //testPooledExecutionShouldBeFaster - //testPoolSizeVsThreadCount + @Test + public void testPooledExecutionShouldBeFaster() throws Exception { + // Arrange + final int ITERATIONS = 1000 + final int POOL_SIZE = 4 + + // Act + // Run serially and capture the timing + final StopWatch stopWatch = new StopWatch(true) + runner.run(ITERATIONS) + stopWatch.stop() + final long serialExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS) + logger.info("Serial execution time for ${ITERATIONS} executions: ${serialExecutionTime} ms") + + // Assert (1) + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List serialResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + // Now run parallel + setupPooledExecuteScript(POOL_SIZE) + logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}") + runner.setThreadCount(POOL_SIZE) + runner.assertValid() + + stopWatch.start() + runner.run(ITERATIONS) + stopWatch.stop() + final long parallelExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS) + logger.info("Parallel execution time for ${ITERATIONS} executions using ${POOL_SIZE} threads: ${parallelExecutionTime} ms") + + // Assert (2) + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List parallelResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + parallelResults.eachWithIndex { MockFlowFile flowFile, int i -> + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/ + } + + serialResults.eachWithIndex { MockFlowFile flowFile, int i -> + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/ + } + + assert serialExecutionTime > parallelExecutionTime + } } From cb108cd4376c61b391a5d9d3d269d874bb0f65c5 Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Sat, 14 May 2016 20:21:15 -0700 Subject: [PATCH 09/11] NIFI-1822 Added ASF License to unit test. --- .../script/ExecuteScriptGroovyTest.groovy | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy index fc1c4450801b..eff6b5aec7b7 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.script import org.apache.nifi.util.MockFlowFile From 7c5acc1ee28b711bb4878371a31ddb267fae79bd Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Sat, 14 May 2016 20:37:05 -0700 Subject: [PATCH 10/11] NIFI-1822 Removed trailing whitespace to conform with checkstyle rules. --- .../src/main/java/org/apache/nifi/util/MockProcessContext.java | 1 - 1 file changed, 1 deletion(-) diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index deb6467f283e..81429e7f7ccf 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -54,7 +54,6 @@ public class MockProcessContext extends MockControllerServiceLookup implements S private boolean allowExpressionValidation = true; private volatile boolean incomingConnection = true; private volatile boolean nonLoopConnection = true; - private int maxConcurrentTasks = 1; private volatile Set connections = new HashSet<>(); From 612a7599e240d2378e94c31abf2f2525b45ce9da Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Sat, 14 May 2016 20:37:38 -0700 Subject: [PATCH 11/11] NIFI-1822 Removed unused variable in unit test. --- .../nifi/processors/script/ExecuteScriptGroovyTest.groovy | 4 ---- 1 file changed, 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy index eff6b5aec7b7..6dbc5e1008a5 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy @@ -36,10 +36,6 @@ import static org.junit.Assert.assertNotNull class ExecuteScriptGroovyTest extends BaseScriptTest { private static final Logger logger = LoggerFactory.getLogger(ExecuteScriptGroovyTest.class) - private static final String TEST_CSV_DATA = "gender,title,first,last\n" + - "female,miss,marlene,shaw\n" + - "male,mr,todd,graham" - @BeforeClass public static void setUpOnce() throws Exception { logger.metaClass.methodMissing = { String name, args ->