From f58316a906af820f9f056c6ebee171015685f86b Mon Sep 17 00:00:00 2001 From: mans2singh Date: Mon, 7 May 2018 22:11:00 -0700 Subject: [PATCH 1/7] NIFI-5166 - Deep learning classification and regression processor with deeplearning4j --- nifi-assembly/pom.xml | 6 + .../nifi-deeplearning4j-nar/pom.xml | 45 +++ .../src/main/resources/META-INF/LICENSE | 208 +++++++++++ .../src/main/resources/META-INF/NOTICE | 20 + .../nifi-deeplearning4j-processors/pom.xml | 114 ++++++ .../AbstractDeepLearning4JProcessor.java | 106 ++++++ .../DeepLearning4JPredictor.java | 218 +++++++++++ .../org.apache.nifi.processor.Processor | 15 + .../additionalDetails.html | 353 ++++++++++++++++++ ...DeepLearning4JProcessorClassification.java | 336 +++++++++++++++++ ...TestDeepLearning4JProcessorRegression.java | 185 +++++++++ .../test/resources/classification_test.txt | 100 +++++ .../nifi-deeplearning4j-bundle/pom.xml | 43 +++ nifi-nar-bundles/pom.xml | 1 + 14 files changed, 1750 insertions(+) create mode 100644 nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java create mode 100644 nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java create mode 100644 nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/docs/org.apache.nifi.processors.deeplearning4j.DeepLearning4JPredictor/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JProcessorClassification.java create mode 100644 nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JProcessorRegression.java create mode 100644 nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification_test.txt create mode 100644 nifi-nar-bundles/nifi-deeplearning4j-bundle/pom.xml diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 344e30c4429e..682308d13db9 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -379,6 +379,12 @@ language governing permissions and limitations under the License. --> 1.7.0-SNAPSHOT nar + + org.apache.nifi + nifi-deeplearning4j-nar + 1.7.0-SNAPSHOT + nar + org.apache.nifi nifi-avro-nar diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-nar/pom.xml b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-nar/pom.xml new file mode 100644 index 000000000000..b7cd95a62b25 --- /dev/null +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-nar/pom.xml @@ -0,0 +1,45 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-deeplearning4j-bundle + 1.7.0-SNAPSHOT + + + nifi-deeplearning4j-nar + nar + + true + true + + + + + org.apache.nifi + nifi-standard-services-api-nar + 1.7.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-deeplearning4j-processors + + + + diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 000000000000..4a251d4cccb5 --- /dev/null +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,208 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 000000000000..1af7838e7cc9 --- /dev/null +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,20 @@ +nifi-deeplearning4j-nar +Copyright 2018 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + (ASLv2) Google GSON + The following NOTICE information applies: + Copyright 2008 Google Inc. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml new file mode 100644 index 000000000000..7e77c70af227 --- /dev/null +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml @@ -0,0 +1,114 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-deeplearning4j-bundle + 1.7.0-SNAPSHOT + + + nifi-deeplearning4j-processors + jar + + + + org.nd4j + nd4j-api + 1.0.0-alpha + + + org.nd4j + nd4j-native-platform + 1.0.0-alpha + + + org.nd4j + nd4j-cuda-8.0-platform + 1.0.0-alpha + + + org.nd4j + nd4j-cuda-9.0-platform + 1.0.0-alpha + + + org.nd4j + nd4j-cuda-9.1-platform + 1.0.0-alpha + + + org.deeplearning4j + deeplearning4j-core + 1.0.0-alpha + + + org.apache.commons + commons-lang3 + 3.7 + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + 1.7.0-SNAPSHOT + + + org.apache.nifi + nifi-mock + 1.7.0-SNAPSHOT + test + + + org.slf4j + slf4j-simple + test + + + com.google.code.gson + gson + 2.7 + + + junit + junit + test + + + com.google.guava + guava + 18.0 + test + + + + + + org.apache.rat + apache-rat-plugin + + + src/test/resources/classification_test.txt + + + + + + diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java new file mode 100644 index 000000000000..6652b0f2ee0f --- /dev/null +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import java.io.IOException; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.deeplearning4j.util.ModelSerializer; + +/** + * Base class for deeplearning4j processors + */ +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor { + + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("deeplearning4j-charset") + .displayName("Character Set") + .description("Specifies the character set of the document data.") + .required(true) + .defaultValue("UTF-8") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder() + .name("deeplearning4j-field-separator") + .displayName("Field Separator") + .description("Specifies the field separator in the records. (default is comma)") + .required(true) + .defaultValue(",") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder() + .name("deeplearning4j-record-separator") + .displayName("Record Separator") + .description("Specifies the records separator in the message body. (defaults to new line)") + .required(true) + .defaultValue(System.lineSeparator()) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor MODEL_FILE = new PropertyDescriptor.Builder() + .name("model-file") + .displayName("Model File") + .description("Location of the Deeplearning4J model zip file") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final PropertyDescriptor RECORD_DIMENSIONS = new PropertyDescriptor.Builder() + .name("deeplearning4j-record-dimension") + .displayName("Record dimensions separated by field separator") + .description("Dimension of array in each a record (eg: 2,4 - a 2x4 array)") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final String DEEPLEARNING4J_ERROR_MESSAGE = "deeplearning4j.error.message"; + + public static final String DEEPLEARNING4J_OUTPUT_SHAPE = "deeplearning4j.output.shape"; + + protected MultiLayerNetwork model = null; + + protected synchronized MultiLayerNetwork getModel(ProcessContext context) throws IOException { + if ( model == null ) { + String modelFile = context.getProperty(MODEL_FILE).evaluateAttributeExpressions().getValue(); + if ( getLogger().isDebugEnabled()) { + getLogger().debug("Loading model from {}", new Object[] {modelFile}); + } + long start = System.currentTimeMillis(); + model = ModelSerializer.restoreMultiLayerNetwork(modelFile,false); + long end = System.currentTimeMillis(); + getLogger().info("Time to load model " + (end-start) + " ms"); + } + return model; + } + + @OnStopped + public void close() { + getLogger().info("Closing"); + model = null; + } +} diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java new file mode 100644 index 000000000000..e5e1068d130c --- /dev/null +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.nd4j.linalg.api.ndarray.INDArray; +import org.nd4j.linalg.factory.Nd4j; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@EventDriven +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"}) +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. " + + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. " + + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. " + + "Each record can contain multiple fields with each field separated by the 'Field Separator' property." + ) +@WritesAttributes({ + @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"), + @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"), + }) +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor { + + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Successful DeepLearning4j results are routed to this relationship").build(); + + static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Failed DeepLearning4j results are routed to this relationship").build(); + + protected Gson gson = new Gson(); + + private static final Set relationships; + private static final List propertyDescriptors; + static { + final Set tempRelationships = new HashSet<>(); + tempRelationships.add(REL_SUCCESS); + tempRelationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(tempRelationships); + final List tempDescriptors = new ArrayList<>(); + tempDescriptors.add(MODEL_FILE); + tempDescriptors.add(RECORD_DIMENSIONS); + tempDescriptors.add(CHARSET); + tempDescriptors.add(FIELD_SEPARATOR); + tempDescriptors.add(RECORD_SEPARATOR); + propertyDescriptors = Collections.unmodifiableList(tempDescriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue()); + if ( flowFile.getSize() == 0 ) { + String message = "FlowFile query is empty"; + getLogger().error(message); + flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, message); + session.transfer(flowFile, REL_FAILURE); + return; + } + + String input = null; + try { + input = getFlowFileContents(session, charset, flowFile); + String fieldSeparator = context.getProperty(FIELD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue(); + String recordSeparator = context.getProperty(RECORD_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue(); + + int [] dimensions = getInputDimensions(context, charset, flowFile, fieldSeparator); + + if ( getLogger().isDebugEnabled() ) { + getLogger().debug("Received input {} with dimensions {}", new Object[] { input, dimensions }); + } + + MultiLayerNetwork model = getModel(context); + + long startTimeMillis = System.currentTimeMillis(); + + String [] inputRecords = input.split(recordSeparator); + + List features = Arrays.stream(inputRecords).map( + record -> { + double [] parameters = Arrays.stream(record.split(fieldSeparator)).mapToDouble( + field -> Double.parseDouble(field)).toArray(); + + INDArray featureInput = Nd4j.create(parameters, dimensions); + + if ( getLogger().isDebugEnabled() ) { + getLogger().debug("Features for record {} parameters {} dims {} featureInput {} ", + new Object[] {record, parameters, dimensions, featureInput}); + } + + return featureInput; + + }).collect(Collectors.toList()); + + INDArray allFeatures = Nd4j.vstack(features); + + INDArray results = model.output(allFeatures); + + double [][] partitionedResults = new double[inputRecords.length][]; + for (int row = 0; row < inputRecords.length; row++) { + INDArray result = results.getRow(row); + partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector(); + } + + String jsonResult = gson.toJson(partitionedResults); + int [] shape = results.shape(); + String jsonShape = gson.toJson(Arrays.copyOfRange(shape, 1, shape.length)); + + if ( getLogger().isDebugEnabled() ) { + getLogger().debug("Prediction for inputRecords {}, dims {}, results {}, result.shape {}, partitionedResults {}, jsonResult {}, shape {}, jsonShape {}", + new Object[] {inputRecords, dimensions, results, Arrays.toString(results.shape()), partitionedResults, jsonResult, shape, jsonShape}); + } + + try (ByteArrayInputStream bais = new ByteArrayInputStream(jsonResult.getBytes(charset))) { + flowFile = session.importFrom(bais, flowFile); + } + + session.putAttribute(flowFile, DEEPLEARNING4J_OUTPUT_SHAPE, jsonShape); + + final long endTimeMillis = System.currentTimeMillis(); + + session.transfer(flowFile, REL_SUCCESS); + + session.getProvenanceReporter().send(flowFile, makeProvenanceUrl(context), + (endTimeMillis - startTimeMillis)); + } catch (Exception exception) { + flowFile = populateErrorAttributes(session, flowFile, exception.getMessage()); + getLogger().error("Failed to process data due to {} for input {}", + new Object[]{exception.getLocalizedMessage(), input}, exception); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + } + + protected String getFlowFileContents(final ProcessSession session, Charset charset, FlowFile incomingFlowFile) + throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + session.exportTo(incomingFlowFile, baos); + return new String(baos.toByteArray(), charset); + } + } + + protected int [] getInputDimensions(final ProcessContext context, Charset charset, FlowFile flowFile, String separator) + throws IOException { + String values = context.getProperty(RECORD_DIMENSIONS).evaluateAttributeExpressions(flowFile).getValue(); + return Arrays.stream( + values.split(separator)) + .mapToInt(val -> Integer.parseInt(val)).toArray(); + } + + protected String makeProvenanceUrl(final ProcessContext context) { + return new StringBuilder("deeplearning4j://") + .append(context.getProperty(MODEL_FILE).evaluateAttributeExpressions().getValue()).toString(); + } + + protected FlowFile populateErrorAttributes(final ProcessSession session, FlowFile flowFile, + String message) { + Map attributes = new HashMap<>(); + attributes.put(DEEPLEARNING4J_ERROR_MESSAGE, String.valueOf(message)); + flowFile = session.putAllAttributes(flowFile, attributes); + return flowFile; + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 000000000000..b4fae5120932 --- /dev/null +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.deeplearning4j.DeepLearning4JPredictor diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/docs/org.apache.nifi.processors.deeplearning4j.DeepLearning4JPredictor/additionalDetails.html b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/docs/org.apache.nifi.processors.deeplearning4j.DeepLearning4JPredictor/additionalDetails.html new file mode 100644 index 000000000000..5a69a1a1a6a7 --- /dev/null +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/docs/org.apache.nifi.processors.deeplearning4j.DeepLearning4JPredictor/additionalDetails.html @@ -0,0 +1,353 @@ + + + + + +Deeplearning4JPredictor + + + + + +

Usage Information

+ +

+ DeepLearning4JPredictor processor produces predictions by applying the + provided deeplearning4j + model to the FlowFile contents. +

+ +

The processor can perform: +

    +
  • classification, or
  • +
  • regression
  • +
+ based on the model used. + +

The processors has the following properties: +

    +
  • a deeplearning4j classification or regression model (required)
  • +
  • record dimensions (required)
  • +
  • field separator (default comma)
  • +
  • record separator (default new line character)
  • +
  • character set for FlowFile message body (default UTF8)
  • +
+

It then applies the model to the FlowFile contents to produce + the prediction result.

+

The FlowFile content can contain a single row for a single input + record or multiple rows (separated by record separator) for multiple + inputs.

+

Both the input and the output for the classification/regression are flattened double array. The input dimensions are specified in the property 'Record Dimensions' + and the output dimensions are set the deeplearning4j.output.shape attribute.

+

Each record has fields separated by field separator and a record + dimension that indicates how to transform the single row record into a + multi-dimensional feature array.

+

The output is flattened and returned as JSON in the body of the FlowFlow. The shape of the + array is returned as an attribute deeplearning4j.output.shape +

+ +

Following are examples classification and regression inputs and + respective outputs

+
    +
      +
    • Classification - Single Record Prediction with 4 classes
    • +
        + +
      1. Input : +
          +
        • Record dimension : 1,4 (i.e., 1x4 array)
        • + +
          1,4
          +
          +
        • FlowFile content, i.e., input (comma separated): + +
          2.0,0.5,0.4,0.2
          +
          +
        • + +
        +
      2. Output: +
          + +
          +[[0.8661,0.0226,0.0455,0.0658]]
          +
          +
          +
        +
      3. +
      4. Shape: +
          + +
          +[4]
          +
          +
          +
        +
      5. +

        This classification probabilities of the first record are + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
        InputClassProbabilities
        2.0,0.5,0.4,0.2 +
        10.8661
        20.0226
        30.0455
        40.0658
        + +

      +
    • Classification - Multiple Input Record (2) Prediction with 4 + classes
    • +
        +
      1. Input : +
          +
        • Record dimension : 1,4 (i.e., 1x4 array)
        • + +
          +1,4
          +
          +
          +
        • FlowFile content, i.e., input (new line separated records and comma + separated fields): + +
          +2.0,0.5,0.4,0.2
          +3.0,.2,.1,.2
          +
          +
          +
        • +
        +
      2. +
      3. Output - first dimension corresponds to the input record and + second contains probabilities of each of the 4 classes for the + respective record + +
        +[[0.8661,0.0226,0.0455,0.0658],[0.9935,0.0011,0.0024,0.0030]]
        +
        +
        +
      4. Shape: +
          + +
          +[4]
          +
          +
          +
        +
      5. +

        This classification probabilities of the two input records are + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
        InputClassProbabilities
        2.0,0.5,0.4,0.2 +
        10.8661
        20.0226
        30.0455
        40.0658
        3.0,.2,.1,.2 +
        10.9935
        20.0011
        30.0024
        40.0030
        + +

      +
    • Regression - Single Record Regression Prediction
    • +
        +
      1. Input : +
          +
        • Record dimension : 1,1 (i.e., 1x1 array)
        • + +
          +1,1
          +
          +
          +
        • FlowFile content, i.e., input: + +
          +2.0
          +
          +
          +
        • +
        +
      2. +
      3. Output: + +
        +[[0.8661]]
        +
        +
        +
      4. Shape: +
          + +
          +[1]
          +
          +
          +
        +
      5. + + + + + + + + + + + +
        InputRegression Output
        2.0 + 0.8661
        +

        The regression output for the first and only input record (2.0) is + 0.8661

        + +
      +
    • Regression - Multiple Input Records (2) Regression Prediction +
    • +
        +
      1. Input : +
          +
        • Record dimension : 1,1 (i.e., 1x1 array)
        • + +
          +1,1
          +
          +
          +
        • FlowFile content, i.e., input (new line separated records): + +
          +2.0
          3.0 +
          +
          +
        • + +
        +
      2. Output - first dimension is the number of input records and + second contains regression of each of the respective record. + +
        +[[0.8661],[0.9935]]
        +
        +
        +
      3. Shape: +
          + +
          +[1]
          +
          +
          +
        +
      4. + + + + + + + + + + + + + + + +
        InputRegression Output
        2.0 + 0.8661
        3.0 + 0.9935
        +

        This regression output for first record (2.0)is 0.8661 and the + second record (3.0) is 0.9935

        + +
      +
    + + diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JProcessorClassification.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JProcessorClassification.java new file mode 100644 index 000000000000..2e64d057893f --- /dev/null +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JProcessorClassification.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.StringReader; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.datavec.api.records.reader.RecordReader; +import org.datavec.api.records.reader.impl.csv.CSVRecordReader; +import org.datavec.api.split.FileSplit; +import org.datavec.api.util.ClassPathResource; +import org.deeplearning4j.datasets.datavec.RecordReaderDataSetIterator; +import org.deeplearning4j.nn.conf.MultiLayerConfiguration; +import org.deeplearning4j.nn.conf.NeuralNetConfiguration; +import org.deeplearning4j.nn.conf.layers.DenseLayer; +import org.deeplearning4j.nn.conf.layers.OutputLayer; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.deeplearning4j.nn.weights.WeightInit; +import org.deeplearning4j.optimize.listeners.ScoreIterationListener; +import org.deeplearning4j.util.ModelSerializer; +import org.nd4j.linalg.activations.Activation; +import org.nd4j.linalg.dataset.DataSet; +import org.nd4j.linalg.dataset.SplitTestAndTrain; +import org.nd4j.linalg.dataset.api.iterator.DataSetIterator; +import org.nd4j.linalg.dataset.api.preprocessor.DataNormalization; +import org.nd4j.linalg.dataset.api.preprocessor.NormalizerStandardize; +import org.nd4j.linalg.learning.config.Sgd; +import org.nd4j.linalg.lossfunctions.LossFunctions; +import com.google.gson.Gson; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * This test case builds a classification model based on deeplearning4j examples. + */ +public class TestDeepLearning4JProcessorClassification { + + private static File classificationModelFile; + private static int classficationInputNumber; + private static int classificationOutputNumber; + private TestRunner runner; + protected MultiLayerNetwork model = null; + protected Gson gson = new Gson(); + + @BeforeClass + public static void setUpModel() throws FileNotFoundException, IOException, InterruptedException { + int numLinesToSkip = 0; + char delimiter = ','; + RecordReader recordReader = new CSVRecordReader(numLinesToSkip,delimiter); + recordReader.initialize(new FileSplit(new ClassPathResource("classification_test.txt").getFile())); + int labelIndex = 4; + int numClasses = 4; + int batchSize = 100; + DataSetIterator iterator = new RecordReaderDataSetIterator(recordReader,batchSize,labelIndex,numClasses); + DataSet allData = iterator.next(); + allData.shuffle(); + + SplitTestAndTrain testAndTrain = allData.splitTestAndTrain(0.70); + DataSet trainingData = testAndTrain.getTrain(); + DataSet testData = testAndTrain.getTest(); + + DataNormalization normalizer = new NormalizerStandardize(); + normalizer.fit(trainingData); + normalizer.transform(trainingData); + normalizer.transform(testData); + classficationInputNumber = 4; + classificationOutputNumber = 4; + long seed = 42; + + MultiLayerConfiguration conf = new NeuralNetConfiguration.Builder() + .seed(seed) + .weightInit(WeightInit.RELU) + .activation(Activation.RELU) + .updater(new Sgd(0.1)) + .l2(1e-3) + .list() + .layer(0, new DenseLayer.Builder().nIn(classficationInputNumber).nOut(5).build()) + .layer(1, new DenseLayer.Builder().nIn(5).nOut(5).build()) + .layer(2, new OutputLayer.Builder(LossFunctions.LossFunction.NEGATIVELOGLIKELIHOOD) + .activation(Activation.SOFTMAX) + .nIn(5).nOut(classificationOutputNumber).build()) + .backprop(true).pretrain(false) + .build(); + + MultiLayerNetwork classificationModel = new MultiLayerNetwork(conf); + classificationModel.init(); + classificationModel.setListeners(new ScoreIterationListener(100)); + for(int i=0; i< 1000; i++ ) { + classificationModel.fit(trainingData); + } + classificationModelFile = File.createTempFile("classification-model", ".zip", new File("./")); + classificationModelFile.deleteOnExit(); + // save the model + ModelSerializer.writeModel(classificationModel, classificationModelFile ,false); + } + + @AfterClass + public static void deleteModel() throws FileNotFoundException, IOException, InterruptedException { + classificationModelFile.delete(); + } + + @Before + public void setUp() throws IOException { + runner = TestRunners.newTestRunner(DeepLearning4JPredictor.class); + runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"1,4"); + runner.setProperty(DeepLearning4JPredictor.MODEL_FILE,classificationModelFile.getAbsolutePath()); + runner.assertValid(); + } + + @After + public void teardown() { + runner = null; + } + + @Test + public void testMatchWith4ParamsClass1() { + runner.enqueue("2.0,.5,.4,0.2"); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); + assertEquals("size of shape should be equal", 1, shape.length); + assertEquals("shape should be equal", (Integer)4, shape[0]); + String result = new String(flowFiles.get(0).toByteArray()); + Double[][] value = gson.fromJson(new StringReader(result),Double[][].class); + assertEquals("size should be same", classificationOutputNumber, value[0].length); + assertTrue("Should be first class",value[0][0] > value[0][1]); + assertTrue("Should be first class",value[0][0] > value[0][2]); + assertTrue("Should be first class",value[0][0] > value[0][3]); + } + + @Test + public void testMatchWith2Rows4ColumnsParamsClass11() { + runner.enqueue("2.0,.5,.4,0.2" + System.lineSeparator() + "3.0,.2,.1,.2"); + runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"1,4"); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); + assertEquals("size of shape should be equal", 1, shape.length); + assertEquals("shape should be equal", (Integer)4, shape[0]); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + String result = new String(flowFiles.get(0).toByteArray()); + Double[][] value = gson.fromJson(new StringReader(result),Double[][].class); + assertEquals("size should be same", 2, value.length); + assertTrue("Should be first class",value[0][0] > value[0][1]); + assertTrue("Should be first class",value[0][0] > value[0][2]); + assertTrue("Should be first class",value[0][0] > value[0][3]); + assertTrue("Should be first class",value[1][0] > value[1][1]); + assertTrue("Should be first class",value[1][0] > value[1][2]); + assertTrue("Should be first class",value[1][0] > value[1][3]); + } + + @Test + public void testMatchWith4Rows4ColumnsParamsClass1123() { + runner.enqueue("2.0,.5,.4,0.2" + System.lineSeparator() + "3.0,.2,.1,.2" + System.lineSeparator() + ".2,2.0,.4,0.2" + System.lineSeparator() + ".2,.2,3.4,.2"); + runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"1,4"); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); + assertEquals("size of shape should be equal", 1, shape.length); + assertEquals("shape should be equal", (Integer)4, shape[0]); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + String result = new String(flowFiles.get(0).toByteArray()); + Double[][] value = gson.fromJson(new StringReader(result),Double[][].class); + assertEquals("size should be same", 4, value.length); + assertTrue("Should be first class",value[0][0] > value[0][1]); + assertTrue("Should be first class",value[0][0] > value[0][2]); + assertTrue("Should be first class",value[0][0] > value[0][3]); + assertTrue("Should be first class",value[1][0] > value[1][1]); + assertTrue("Should be first class",value[1][0] > value[1][2]); + assertTrue("Should be first class",value[1][0] > value[1][3]); + assertTrue("Should be first class",value[2][1] > value[2][0]); + assertTrue("Should be first class",value[2][1] > value[2][2]); + assertTrue("Should be first class",value[2][1] > value[2][3]); + assertTrue("Should be first class",value[3][2] > value[3][0]); + assertTrue("Should be first class",value[3][2] > value[3][1]); + assertTrue("Should be first class",value[3][2] > value[1][3]); + } + + @Test + public void testMatchWith4ParamsClass1ModelFileEL() { + runner.setVariable("modelPath", classificationModelFile.getAbsolutePath()); + runner.setProperty(DeepLearning4JPredictor.MODEL_FILE,"${modelPath}"); + + runner.enqueue("2.0,.5,.4,0.2"); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); + assertEquals("size of shape should be equal", 1, shape.length); + assertEquals("shape should be equal", (Integer)4, shape[0]); + Double[][] value = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())),Double[][].class); + assertEquals("size should be same", classificationOutputNumber, value[0].length); + assertTrue("Should be first class",value[0][0] > value[0][1]); + assertTrue("Should be first class",value[0][0] > value[0][2]); + assertTrue("Should be first class",value[0][0] > value[0][3]); + } + + @Test + public void testMatchWith4ParamsClass1DimensionEL() { + Map map = new HashMap(); + map.put("dimension", "1,4"); + + runner.enqueue("2.0,.5,.4,0.2",map); + runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"${dimension}"); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); + assertEquals("size of shape should be equal", 1, shape.length); + assertEquals("shape should be equal", (Integer)4, shape[0]); + Double[][] value = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())),Double[][].class); + assertEquals("size should be same", classificationOutputNumber, value[0].length); + assertTrue("Should be first class",value[0][0] > value[0][1]); + assertTrue("Should be first class",value[0][0] > value[0][2]); + assertTrue("Should be first class",value[0][0] > value[0][3]); + } + + @Test + public void testMatchWith4ParamsClass1SeparatorEL() { + Map map = new HashMap(); + map.put("separator", ";"); + + runner.enqueue("2.0;0.5;0.4;0.2",map); + runner.setProperty(DeepLearning4JPredictor.FIELD_SEPARATOR,"${separator}"); + runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"1;4"); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); + assertEquals("size of shape should be equal", 1, shape.length); + assertEquals("shape should be equal", (Integer)4, shape[0]); + Double[][] value = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())),Double[][].class); + assertEquals("size should be same", classificationOutputNumber, value[0].length); + assertTrue("Should be first class",value[0][0] > value[0][1]); + assertTrue("Should be first class",value[0][0] > value[0][2]); + assertTrue("Should be first class",value[0][0] > value[0][3]); + } + + @Test + public void testMatchWith4ParamsOneNonNumber() { + runner.enqueue("5.1,3.5,1.4,abcd"); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_FAILURE, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_FAILURE); + assertNotNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + } + + @Test + public void testNoDimensionsInvalid() { + runner = TestRunners.newTestRunner(DeepLearning4JPredictor.class); + runner.setProperty(DeepLearning4JPredictor.MODEL_FILE,classificationModelFile.getAbsolutePath()); + runner.assertNotValid(); + } + + @Test + public void testNoModelInvalid() { + runner = TestRunners.newTestRunner(DeepLearning4JPredictor.class); + runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"1,4"); + runner.assertNotValid(); + } + + @Test + public void testMatchWith3ParamsBad() { + runner.enqueue("5.1,3.5,1.4"); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_FAILURE, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_FAILURE); + assertNotNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + } + + @Test + public void testMatchWithEmptyParams() { + runner.enqueue(""); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_FAILURE, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_FAILURE); + assertNotNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + } + + @Test + public void testMatchWith5ParamOk() { + runner.enqueue("5.1,3.5,1.4,0.2,.6"); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); + assertEquals("size of shape should be equal", 1, shape.length); + assertEquals("shape should be equal", (Integer)4, shape[0]); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + Double[][] value = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())),Double[][].class); + assertEquals("size should be same", classificationOutputNumber, value[0].length); + } +} diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JProcessorRegression.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JProcessorRegression.java new file mode 100644 index 000000000000..77c7dce5b073 --- /dev/null +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JProcessorRegression.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.StringReader; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.deeplearning4j.datasets.iterator.impl.ListDataSetIterator; +import org.deeplearning4j.nn.conf.MultiLayerConfiguration; +import org.deeplearning4j.nn.conf.NeuralNetConfiguration; +import org.deeplearning4j.nn.conf.layers.DenseLayer; +import org.deeplearning4j.nn.conf.layers.OutputLayer; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.deeplearning4j.nn.weights.WeightInit; +import org.deeplearning4j.optimize.listeners.ScoreIterationListener; +import org.deeplearning4j.util.ModelSerializer; +import org.nd4j.linalg.activations.Activation; +import org.nd4j.linalg.api.ndarray.INDArray; +import org.nd4j.linalg.api.ops.impl.transforms.Sin; +import org.nd4j.linalg.dataset.DataSet; +import org.nd4j.linalg.dataset.api.iterator.DataSetIterator; +import org.nd4j.linalg.factory.Nd4j; +import org.nd4j.linalg.learning.config.Nesterovs; +import org.nd4j.linalg.lossfunctions.LossFunctions; +import com.google.gson.Gson; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * This test case builds a regression model based on deeplearning4j examples. + */ +public class TestDeepLearning4JProcessorRegression { + + private static File regressionModelFile; + private static int regressionInputNumber; + private TestRunner runner; + protected MultiLayerNetwork model = null; + protected Gson gson = new Gson(); + + @BeforeClass + public static void setUpModel() throws FileNotFoundException, IOException, InterruptedException { + int nSamples = 1000; + Random random = new Random(42); + INDArray x = Nd4j.linspace(-Math.PI,Math.PI, nSamples ).reshape(nSamples, 1); + + INDArray sinX = Nd4j.getExecutioner().execAndReturn(new Sin(x.dup())); + DataSet allData = new DataSet(x,sinX); + final List list = allData.asList(); + Collections.shuffle(list,random); + final DataSetIterator iterator = new ListDataSetIterator(list,100); + regressionInputNumber = 1; + long seed = 42; + int numberOfNodes = 50; + + MultiLayerConfiguration conf = new NeuralNetConfiguration.Builder() + .seed(seed) + .weightInit(WeightInit.XAVIER) + .updater(new Nesterovs(0.01, 0.9)) + .list() + .layer(0, new DenseLayer.Builder().nIn(regressionInputNumber).nOut(numberOfNodes) + .activation(Activation.TANH) + .build()) + .layer(1, new DenseLayer.Builder().nIn(numberOfNodes).nOut(numberOfNodes) + .activation(Activation.TANH).build()) + .layer(1, new OutputLayer.Builder(LossFunctions.LossFunction.MSE) + .activation(Activation.IDENTITY) + .nIn(numberOfNodes).nOut(1).build()) + .pretrain(false).backprop(true).build(); + + MultiLayerNetwork regressionModel = new MultiLayerNetwork(conf); + regressionModel.init(); + regressionModel.setListeners(new ScoreIterationListener(100)); + for(int i = 0; i< 2000; i++ ) { + regressionModel.fit(iterator); + } + regressionModelFile = File.createTempFile("regression-model", ".zip", new File("./")); + regressionModelFile.deleteOnExit(); + + ModelSerializer.writeModel(regressionModel, regressionModelFile ,false); + } + + @AfterClass + public static void deleteModel() throws FileNotFoundException, IOException, InterruptedException { + regressionModelFile.delete(); + } + + @Before + public void setUp() throws IOException { + runner = TestRunners.newTestRunner(DeepLearning4JPredictor.class); + runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"1,1"); + runner.setProperty(DeepLearning4JPredictor.MODEL_FILE,regressionModelFile.getAbsolutePath()); + runner.assertValid(); + } + + @After + public void teardown() { + runner = null; + } + + @Test + public void testMatchWithParamsPi() { + runner.enqueue("3.14"); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); + assertEquals("size of shape should be equal", 1, shape.length); + assertEquals("shape should be equal", (Integer)1, shape[0]); + + Double[][] value = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())),Double[][].class); + assertEquals("size should be same", 1, value[0].length); + assertEquals("value should be equal", 0.0d, value[0][0], 0.01d); + } + + @Test + public void testMatchWithParamsPiBy2() { + runner.enqueue("1.57"); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); + assertEquals("size of shape should be equal", 1, shape.length); + assertEquals("shape should be equal", (Integer)1, shape[0]); + + Double[][] value = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())),Double[][].class); + assertEquals("size should be same", 1, value[0].length); + + assertEquals("value should be equal", 1.0d, value[0][0], 0.01d); + } + + @Test + public void testMatchWith2ParamsPiAndPiBy2() { + runner.enqueue("3.14," + System.lineSeparator() + "1.57"); + runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"1,1"); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); + + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); + assertEquals("size of shape should be equal", 1, shape.length); + assertEquals("shape should be equal", (Integer)1, shape[0]); + + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + String result = new String(flowFiles.get(0).toByteArray()); + Double[][] value = gson.fromJson(new StringReader(result),Double[][].class); + assertEquals("size should be same", 2, value.length); + + assertEquals("value should be equal", 0.0d, value[0][0], 0.01d); + assertEquals("value should be equal", 1.0d, value[1][0], 0.01d); + } +} diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification_test.txt b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification_test.txt new file mode 100644 index 000000000000..c3ea5674c5e4 --- /dev/null +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification_test.txt @@ -0,0 +1,100 @@ +1.1,0.5,0.5,0.2,0 +1.9,0,0,0.2,0 +1.7,0.2,0.2,0.2,0 +1.6,0.1,0.1,0.2,0 +1,0.6,0.6,0.2,0 +1.4,0.9,0.9,0.4,0 +1.6,0.4,0.4,0.3,0 +1,0.4,0.4,0.2,0 +1.4,0.9,0.9,0.2,0 +2.9,0.1,0.1,0.1,0 +2.4,0.7,0.5,0.2,0 +2.8,0.4,0,0.2,0 +3.8,0,0.2,0.1,0 +1.3,0,0.1,0.1,0 +1.8,0,0.6,0.2,0 +1.7,0.5,0.9,0.4,0 +2.4,0,0.4,0.4,0 +1.1,0.2,0.4,0.3,0 +2.7,0.1,0.9,0.3,0 +1.1,0.6,0.1,0.3,0 +1.4,0.9,0.5,0.2,0 +2.1,0.4,0,0.4,0 +2.6,0.4,0.2,0.2,0 +1.1,0.9,0.1,0.5,0 +1.8,0.1,0.6,0.2,0 +0.5,3,0.9,0.2,1 +0,3.4,0.4,0.4,1 +0.2,3.5,0.4,0.2,1 +0.1,3.4,0.9,0.2,1 +0.6,3.2,0.1,0.2,1 +0.9,3.1,0.5,0.2,1 +0.4,3.4,0,0.4,1 +0.4,4.1,0.2,0.1,1 +0.9,4.2,0.1,0.2,1 +0.1,3.1,0.6,0.1,1 +0.5,3.2,0.9,0.2,1 +0,3.5,0.4,0.2,1 +0.2,3.1,0.4,0.1,1 +0.1,3,0.9,0.2,1 +0.6,3.4,0.1,0.2,1 +0.9,3.5,0.5,0.3,1 +0.4,2.3,0,0.3,1 +0.4,3.2,0.2,0.2,1 +0.9,3.5,0.1,0.6,1 +0.1,3.8,0.6,0.4,1 +0.5,3,0.9,0.3,1 +0,3.8,0.4,0.2,1 +0.2,3.2,0.4,0.2,1 +0.1,3.7,0.9,0.2,1 +0.6,3.3,0.1,0.2,1 +0.9,0.5,3,0.9,2 +0.4,0,3.4,0.4,2 +0.4,0.2,3.5,0.4,2 +0.9,0.1,3.4,0.9,2 +0.1,0.6,3.2,0.1,2 +0.5,0.9,3.1,0.5,2 +0,0.4,3.4,0,2 +0.2,0.4,4.1,0.2,2 +0.1,0.9,4.2,0.1,2 +0.6,0.1,3.1,0.6,2 +0.9,0.5,3.2,0.9,2 +0.4,0,3.5,0.4,2 +0.4,0.2,3.1,0.4,2 +0.9,0.1,3,0.9,2 +0.1,0.6,3.4,0.1,2 +0.5,0.9,3.5,0.5,2 +0,0.4,2.3,0,2 +0.2,0.4,3.2,0.2,2 +0.1,0.9,3.5,0.1,2 +0.6,0.1,3.8,0.6,2 +0.9,0.5,3,0.9,2 +0.4,0,3.8,0.4,2 +0.4,0.2,3.2,0.4,2 +0.9,0.1,3.7,0.9,2 +0.1,0.6,3.3,0.1,2 +0.5,0.9,0.9,3.4,3 +0,0.4,0.4,3.5,3 +0.2,0.4,0.4,3.4,3 +0.1,0.9,0.9,3.2,3 +0.6,0.1,0.1,3.1,3 +0.9,0.5,0.5,3.4,3 +0.4,0,0,4.1,3 +0.4,0.2,0.2,4.2,3 +0.9,0.1,0.1,3.1,3 +0.1,0.6,0.6,3.2,3 +0.5,0.9,0.9,3.5,3 +0,0.4,0.4,3.1,3 +0.2,0.4,0.4,3,3 +0.1,0.9,0.9,3.4,3 +0.6,0.1,0.1,3.5,3 +0.9,0.5,0.5,2.3,3 +0.4,0,0,3.2,3 +0.4,0.2,0.2,3.5,3 +0.9,0.1,0.1,3.8,3 +0.1,0.6,0.6,3,3 +0.5,0.9,0.9,3.8,3 +0,0.4,0.4,3.2,3 +0.2,0.4,0.4,3.7,3 +0.1,0.9,0.9,3.3,3 +0.6,0.1,0.1,1.8,3 diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/pom.xml b/nifi-nar-bundles/nifi-deeplearning4j-bundle/pom.xml new file mode 100644 index 000000000000..060d231775ea --- /dev/null +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/pom.xml @@ -0,0 +1,43 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 1.7.0-SNAPSHOT + + + nifi-deeplearning4j-bundle + pom + + + nifi-deeplearning4j-processors + nifi-deeplearning4j-nar + + + + + + org.apache.nifi + nifi-deeplearning4j-processors + 1.7.0-SNAPSHOT + + + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 80e579c08df8..7dac7080852f 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -93,6 +93,7 @@ nifi-spark-bundle nifi-atlas-bundle nifi-druid-bundle + nifi-deeplearning4j-bundle From 7565c648c264ffd88fb67abd6007b41c3ab02cd0 Mon Sep 17 00:00:00 2001 From: mans2singh Date: Fri, 11 May 2018 08:42:11 -0700 Subject: [PATCH 2/7] NIFI-5166 - Updated code based on review comments (logging, provenance, property description/displayname,etc) --- .../AbstractDeepLearning4JProcessor.java | 16 ++++++------- .../DeepLearning4JPredictor.java | 23 ++++++++++--------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java index 6652b0f2ee0f..0596557a580a 100644 --- a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java @@ -43,7 +43,7 @@ public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder() .name("deeplearning4j-field-separator") .displayName("Field Separator") - .description("Specifies the field separator in the records. (default is comma)") + .description("Specifies the field separator in the records.") .required(true) .defaultValue(",") .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @@ -53,9 +53,9 @@ public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor public static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder() .name("deeplearning4j-record-separator") .displayName("Record Separator") - .description("Specifies the records separator in the message body. (defaults to new line)") + .description("Specifies the records separator in the message body.") .required(true) - .defaultValue(System.lineSeparator()) + .defaultValue("\n") .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -71,8 +71,8 @@ public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor public static final PropertyDescriptor RECORD_DIMENSIONS = new PropertyDescriptor.Builder() .name("deeplearning4j-record-dimension") - .displayName("Record dimensions separated by field separator") - .description("Dimension of array in each a record (eg: 2,4 - a 2x4 array)") + .displayName("Record Dimensions") + .description("Dimension of array in each a record (eg: 2,4 - a 2x4 array).") .required(true) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -87,12 +87,12 @@ public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor protected synchronized MultiLayerNetwork getModel(ProcessContext context) throws IOException { if ( model == null ) { String modelFile = context.getProperty(MODEL_FILE).evaluateAttributeExpressions().getValue(); - if ( getLogger().isDebugEnabled()) { - getLogger().debug("Loading model from {}", new Object[] {modelFile}); - } + getLogger().debug("Loading model from {}", new Object[] {modelFile}); + long start = System.currentTimeMillis(); model = ModelSerializer.restoreMultiLayerNetwork(modelFile,false); long end = System.currentTimeMillis(); + getLogger().info("Time to load model " + (end-start) + " ms"); } return model; diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java index e5e1068d130c..a9cf01085d4e 100644 --- a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java @@ -29,13 +29,14 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.stream.io.StreamUtils; import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; import org.nd4j.linalg.api.ndarray.INDArray; import org.nd4j.linalg.factory.Nd4j; import com.google.gson.Gson; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; @@ -68,7 +69,7 @@ public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor { static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") .description("Failed DeepLearning4j results are routed to this relationship").build(); - protected Gson gson = new Gson(); + protected final Gson gson = new Gson(); private static final Set relationships; private static final List propertyDescriptors; @@ -175,23 +176,23 @@ record -> { session.transfer(flowFile, REL_SUCCESS); - session.getProvenanceReporter().send(flowFile, makeProvenanceUrl(context), + session.getProvenanceReporter().modifyContent(flowFile, makeProvenanceUrl(context), (endTimeMillis - startTimeMillis)); } catch (Exception exception) { - flowFile = populateErrorAttributes(session, flowFile, exception.getMessage()); - getLogger().error("Failed to process data due to {} for input {}", - new Object[]{exception.getLocalizedMessage(), input}, exception); - session.transfer(flowFile, REL_FAILURE); - context.yield(); + flowFile = session.putAttribute(flowFile, DEEPLEARNING4J_ERROR_MESSAGE, String.valueOf(exception.getMessage())); + getLogger().error("Failed to process data due to {} for input {}", + new Object[]{exception.getLocalizedMessage(), input}, exception); + session.transfer(flowFile, REL_FAILURE); } } protected String getFlowFileContents(final ProcessSession session, Charset charset, FlowFile incomingFlowFile) throws IOException { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - session.exportTo(incomingFlowFile, baos); - return new String(baos.toByteArray(), charset); + final byte[] buffer = new byte[(int) incomingFlowFile.getSize()]; + try (final InputStream in = session.read(incomingFlowFile)) { + StreamUtils.fillBuffer(in, buffer); } + return new String(buffer, charset); } protected int [] getInputDimensions(final ProcessContext context, Charset charset, FlowFile flowFile, String separator) From 93f674bb4d89c1da0ac1f217606f3b9d6a3715de Mon Sep 17 00:00:00 2001 From: mans2singh Date: Fri, 11 May 2018 09:15:14 -0700 Subject: [PATCH 3/7] NIFI-5166 - Updated writing to flow file based on comments --- .../processors/deeplearning4j/DeepLearning4JPredictor.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java index a9cf01085d4e..d20bd9170077 100644 --- a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java @@ -34,7 +34,6 @@ import org.nd4j.linalg.api.ndarray.INDArray; import org.nd4j.linalg.factory.Nd4j; import com.google.gson.Gson; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; @@ -166,9 +165,7 @@ record -> { new Object[] {inputRecords, dimensions, results, Arrays.toString(results.shape()), partitionedResults, jsonResult, shape, jsonShape}); } - try (ByteArrayInputStream bais = new ByteArrayInputStream(jsonResult.getBytes(charset))) { - flowFile = session.importFrom(bais, flowFile); - } + flowFile = session.write(flowFile, out -> out.write(jsonResult.getBytes(charset))); session.putAttribute(flowFile, DEEPLEARNING4J_OUTPUT_SHAPE, jsonShape); From 14ef4549ef732788ea6ae8fb740e4525befafac3 Mon Sep 17 00:00:00 2001 From: mans2singh Date: Tue, 15 May 2018 04:48:48 -0700 Subject: [PATCH 4/7] NIFI-5166 - Added tags --- .../nifi/processors/deeplearning4j/DeepLearning4JPredictor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java index d20bd9170077..d0b27cd55f97 100644 --- a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java @@ -50,7 +50,7 @@ @EventDriven @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"}) +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning", "neural", "network"}) @CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. " + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. " + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. " From 023128cfebaddde26d7d90367a20a86e2367ff6b Mon Sep 17 00:00:00 2001 From: mans2singh Date: Tue, 12 Jun 2018 19:41:02 -0700 Subject: [PATCH 5/7] NIFI-5166 - Updated based on feedback comments (renamed to multilayer processor, added tags, added test with correlation attribute, and corrected documentation) --- .../AbstractDeepLearning4JProcessor.java | 26 --- ...=> DeepLearning4JMultiLayerPredictor.java} | 30 ++- .../org.apache.nifi.processor.Processor | 2 +- .../additionalDetails.html | 16 +- ...g4JMultiLayerPredictorClassification.java} | 177 +++++++++++++----- ...rning4JMultiLayerPredictorRegression.java} | 34 ++-- 6 files changed, 175 insertions(+), 110 deletions(-) rename nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/{DeepLearning4JPredictor.java => DeepLearning4JMultiLayerPredictor.java} (88%) rename nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/docs/{org.apache.nifi.processors.deeplearning4j.DeepLearning4JPredictor => org.apache.nifi.processors.deeplearning4j.DeepLearning4JMultiLayerPredictor}/additionalDetails.html (92%) rename nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/{TestDeepLearning4JProcessorClassification.java => TestDeepLearning4JMultiLayerPredictorClassification.java} (64%) rename nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/{TestDeepLearning4JProcessorRegression.java => TestDeepLearning4JMultiLayerPredictorRegression.java} (83%) diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java index 0596557a580a..19baa20393d6 100644 --- a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java @@ -15,15 +15,10 @@ * limitations under the License. */ package org.apache.nifi.processors.deeplearning4j; -import java.io.IOException; -import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; -import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; -import org.deeplearning4j.util.ModelSerializer; /** * Base class for deeplearning4j processors @@ -82,25 +77,4 @@ public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor public static final String DEEPLEARNING4J_OUTPUT_SHAPE = "deeplearning4j.output.shape"; - protected MultiLayerNetwork model = null; - - protected synchronized MultiLayerNetwork getModel(ProcessContext context) throws IOException { - if ( model == null ) { - String modelFile = context.getProperty(MODEL_FILE).evaluateAttributeExpressions().getValue(); - getLogger().debug("Loading model from {}", new Object[] {modelFile}); - - long start = System.currentTimeMillis(); - model = ModelSerializer.restoreMultiLayerNetwork(modelFile,false); - long end = System.currentTimeMillis(); - - getLogger().info("Time to load model " + (end-start) + " ms"); - } - return model; - } - - @OnStopped - public void close() { - getLogger().info("Closing"); - model = null; - } } diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JMultiLayerPredictor.java similarity index 88% rename from nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java rename to nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JMultiLayerPredictor.java index d0b27cd55f97..bd7afce9bd88 100644 --- a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JMultiLayerPredictor.java @@ -23,6 +23,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; @@ -31,6 +32,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.stream.io.StreamUtils; import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.deeplearning4j.util.ModelSerializer; import org.nd4j.linalg.api.ndarray.INDArray; import org.nd4j.linalg.factory.Nd4j; import com.google.gson.Gson; @@ -50,8 +52,8 @@ @EventDriven @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning", "neural", "network"}) -@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. " +@Tags({"deeplearning4j", "dl4j", "multilayer", "predict", "classification", "regression", "deep", "learning", "neural", "network"}) +@CapabilityDescription("The DeepLearning4JMultiLayerPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. " + "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. " + "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. " + "Each record can contain multiple fields with each field separated by the 'Field Separator' property." @@ -60,7 +62,7 @@ @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"), @WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"), }) -public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor { +public class DeepLearning4JMultiLayerPredictor extends AbstractDeepLearning4JProcessor { static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") .description("Successful DeepLearning4j results are routed to this relationship").build(); @@ -70,6 +72,14 @@ public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor { protected final Gson gson = new Gson(); + protected MultiLayerNetwork model = null; + + @OnStopped + public void close() { + getLogger().info("Closing"); + model = null; + } + private static final Set relationships; private static final List propertyDescriptors; static { @@ -96,6 +106,20 @@ public final List getSupportedPropertyDescriptors() { return propertyDescriptors; } + protected synchronized MultiLayerNetwork getModel(ProcessContext context) throws IOException { + if ( model == null ) { + String modelFile = context.getProperty(MODEL_FILE).evaluateAttributeExpressions().getValue(); + getLogger().debug("Loading model from {}", new Object[] {modelFile}); + + long start = System.currentTimeMillis(); + model = ModelSerializer.restoreMultiLayerNetwork(modelFile,false); + long end = System.currentTimeMillis(); + + getLogger().info("Time to load model " + (end-start) + " ms"); + } + return (MultiLayerNetwork)model; + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index b4fae5120932..3897ca0c86d8 100644 --- a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,4 +12,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.deeplearning4j.DeepLearning4JPredictor +org.apache.nifi.processors.deeplearning4j.DeepLearning4JMultiLayerPredictor diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/docs/org.apache.nifi.processors.deeplearning4j.DeepLearning4JPredictor/additionalDetails.html b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/docs/org.apache.nifi.processors.deeplearning4j.DeepLearning4JMultiLayerPredictor/additionalDetails.html similarity index 92% rename from nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/docs/org.apache.nifi.processors.deeplearning4j.DeepLearning4JPredictor/additionalDetails.html rename to nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/docs/org.apache.nifi.processors.deeplearning4j.DeepLearning4JMultiLayerPredictor/additionalDetails.html index 5a69a1a1a6a7..b7ac74209aef 100644 --- a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/docs/org.apache.nifi.processors.deeplearning4j.DeepLearning4JPredictor/additionalDetails.html +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/resources/docs/org.apache.nifi.processors.deeplearning4j.DeepLearning4JMultiLayerPredictor/additionalDetails.html @@ -16,7 +16,7 @@ --> -Deeplearning4JPredictor +DeepLearning4JMultiLayerPredictor @@ -26,9 +26,9 @@

    Usage Information

    - DeepLearning4JPredictor processor produces predictions by applying the + DeepLearning4JMultiLayerPredictor processor produces predictions by applying the provided deeplearning4j - model to the FlowFile contents. + MultiLayerNetwork model to the FlowFile contents.

    The processor can perform: @@ -38,15 +38,7 @@

    Usage Information

based on the model used. -

The processors has the following properties: -

    -
  • a deeplearning4j classification or regression model (required)
  • -
  • record dimensions (required)
  • -
  • field separator (default comma)
  • -
  • record separator (default new line character)
  • -
  • character set for FlowFile message body (default UTF8)
  • -
-

It then applies the model to the FlowFile contents to produce +

The processors applies the model to the FlowFile contents to produce the prediction result.

The FlowFile content can contain a single row for a single input record or multiple rows (separated by record separator) for multiple diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JProcessorClassification.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JMultiLayerPredictorClassification.java similarity index 64% rename from nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JProcessorClassification.java rename to nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JMultiLayerPredictorClassification.java index 2e64d057893f..28d9dfeacb54 100644 --- a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JProcessorClassification.java +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JMultiLayerPredictorClassification.java @@ -62,7 +62,7 @@ /** * This test case builds a classification model based on deeplearning4j examples. */ -public class TestDeepLearning4JProcessorClassification { +public class TestDeepLearning4JMultiLayerPredictorClassification { private static File classificationModelFile; private static int classficationInputNumber; @@ -70,6 +70,7 @@ public class TestDeepLearning4JProcessorClassification { private TestRunner runner; protected MultiLayerNetwork model = null; protected Gson gson = new Gson(); + protected static final String CORRELATION_ATTRIBUTE = "ids"; @BeforeClass public static void setUpModel() throws FileNotFoundException, IOException, InterruptedException { @@ -130,9 +131,9 @@ public static void deleteModel() throws FileNotFoundException, IOException, Inte @Before public void setUp() throws IOException { - runner = TestRunners.newTestRunner(DeepLearning4JPredictor.class); - runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"1,4"); - runner.setProperty(DeepLearning4JPredictor.MODEL_FILE,classificationModelFile.getAbsolutePath()); + runner = TestRunners.newTestRunner(DeepLearning4JMultiLayerPredictor.class); + runner.setProperty(DeepLearning4JMultiLayerPredictor.RECORD_DIMENSIONS,"1,4"); + runner.setProperty(DeepLearning4JMultiLayerPredictor.MODEL_FILE,classificationModelFile.getAbsolutePath()); runner.assertValid(); } @@ -145,10 +146,38 @@ public void teardown() { public void testMatchWith4ParamsClass1() { runner.enqueue("2.0,.5,.4,0.2"); runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); - assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); - String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_SUCCESS); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); + assertEquals("size of shape should be equal", 1, shape.length); + assertEquals("shape should be equal", (Integer)4, shape[0]); + String result = new String(flowFiles.get(0).toByteArray()); + + Double[][] value = gson.fromJson(new StringReader(result),Double[][].class); + assertEquals("size should be same", classificationOutputNumber, value[0].length); + assertTrue("Should be first class",value[0][0] > value[0][1]); + assertTrue("Should be first class",value[0][0] > value[0][2]); + assertTrue("Should be first class",value[0][0] > value[0][3]); + } + + @Test + public void testMatchWith4ParamsClass1WithId() { + Map properties = new HashMap(); + properties.put(CORRELATION_ATTRIBUTE, "1234"); + runner.enqueue("2.0,.5,.4,0.2", properties); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_SUCCESS, 1); + + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_SUCCESS); + String propertyValue = flowFiles.get(0).getAttribute(CORRELATION_ATTRIBUTE); + assertEquals("Property values should be same", "1234", propertyValue); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); assertEquals("size of shape should be equal", 1, shape.length); assertEquals("shape should be equal", (Integer)4, shape[0]); @@ -163,15 +192,46 @@ public void testMatchWith4ParamsClass1() { @Test public void testMatchWith2Rows4ColumnsParamsClass11() { runner.enqueue("2.0,.5,.4,0.2" + System.lineSeparator() + "3.0,.2,.1,.2"); - runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"1,4"); + runner.setProperty(DeepLearning4JMultiLayerPredictor.RECORD_DIMENSIONS,"1,4"); runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); - String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_SUCCESS, 1); + + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_SUCCESS); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); assertEquals("size of shape should be equal", 1, shape.length); assertEquals("shape should be equal", (Integer)4, shape[0]); - assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + String result = new String(flowFiles.get(0).toByteArray()); + + Double[][] value = gson.fromJson(new StringReader(result),Double[][].class); + assertEquals("size should be same", 2, value.length); + assertTrue("Should be first class",value[0][0] > value[0][1]); + assertTrue("Should be first class",value[0][0] > value[0][2]); + assertTrue("Should be first class",value[0][0] > value[0][3]); + assertTrue("Should be first class",value[1][0] > value[1][1]); + assertTrue("Should be first class",value[1][0] > value[1][2]); + assertTrue("Should be first class",value[1][0] > value[1][3]); + } + + @Test + public void testMatchWith2Rows4ColumnsParamsClass11WithIds() { + Map properties = new HashMap(); + properties.put(CORRELATION_ATTRIBUTE, "1234,3456"); + runner.enqueue("2.0,.5,.4,0.2" + System.lineSeparator() + "3.0,.2,.1,.2", properties); + runner.setProperty(DeepLearning4JMultiLayerPredictor.RECORD_DIMENSIONS,"1,4"); + runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_SUCCESS, 1); + + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_SUCCESS); + String propertyValue = flowFiles.get(0).getAttribute(CORRELATION_ATTRIBUTE); + assertEquals("Property values should be same", "1234,3456", propertyValue); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); + assertEquals("size of shape should be equal", 1, shape.length); + assertEquals("shape should be equal", (Integer)4, shape[0]); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + String result = new String(flowFiles.get(0).toByteArray()); Double[][] value = gson.fromJson(new StringReader(result),Double[][].class); assertEquals("size should be same", 2, value.length); @@ -186,16 +246,19 @@ public void testMatchWith2Rows4ColumnsParamsClass11() { @Test public void testMatchWith4Rows4ColumnsParamsClass1123() { runner.enqueue("2.0,.5,.4,0.2" + System.lineSeparator() + "3.0,.2,.1,.2" + System.lineSeparator() + ".2,2.0,.4,0.2" + System.lineSeparator() + ".2,.2,3.4,.2"); - runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"1,4"); + runner.setProperty(DeepLearning4JMultiLayerPredictor.RECORD_DIMENSIONS,"1,4"); runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); - String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_SUCCESS, 1); + + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_SUCCESS); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); assertEquals("size of shape should be equal", 1, shape.length); assertEquals("shape should be equal", (Integer)4, shape[0]); - assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); String result = new String(flowFiles.get(0).toByteArray()); + Double[][] value = gson.fromJson(new StringReader(result),Double[][].class); assertEquals("size should be same", 4, value.length); assertTrue("Should be first class",value[0][0] > value[0][1]); @@ -215,17 +278,20 @@ public void testMatchWith4Rows4ColumnsParamsClass1123() { @Test public void testMatchWith4ParamsClass1ModelFileEL() { runner.setVariable("modelPath", classificationModelFile.getAbsolutePath()); - runner.setProperty(DeepLearning4JPredictor.MODEL_FILE,"${modelPath}"); + runner.setProperty(DeepLearning4JMultiLayerPredictor.MODEL_FILE,"${modelPath}"); runner.enqueue("2.0,.5,.4,0.2"); runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); - assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); - String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_SUCCESS, 1); + + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_SUCCESS); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); assertEquals("size of shape should be equal", 1, shape.length); assertEquals("shape should be equal", (Integer)4, shape[0]); + Double[][] value = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())),Double[][].class); assertEquals("size should be same", classificationOutputNumber, value[0].length); assertTrue("Should be first class",value[0][0] > value[0][1]); @@ -239,15 +305,18 @@ public void testMatchWith4ParamsClass1DimensionEL() { map.put("dimension", "1,4"); runner.enqueue("2.0,.5,.4,0.2",map); - runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"${dimension}"); + runner.setProperty(DeepLearning4JMultiLayerPredictor.RECORD_DIMENSIONS,"${dimension}"); runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); - assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); - String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_SUCCESS, 1); + + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_SUCCESS); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); assertEquals("size of shape should be equal", 1, shape.length); assertEquals("shape should be equal", (Integer)4, shape[0]); + Double[][] value = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())),Double[][].class); assertEquals("size should be same", classificationOutputNumber, value[0].length); assertTrue("Should be first class",value[0][0] > value[0][1]); @@ -261,16 +330,19 @@ public void testMatchWith4ParamsClass1SeparatorEL() { map.put("separator", ";"); runner.enqueue("2.0;0.5;0.4;0.2",map); - runner.setProperty(DeepLearning4JPredictor.FIELD_SEPARATOR,"${separator}"); - runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"1;4"); + runner.setProperty(DeepLearning4JMultiLayerPredictor.FIELD_SEPARATOR,"${separator}"); + runner.setProperty(DeepLearning4JMultiLayerPredictor.RECORD_DIMENSIONS,"1;4"); runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); - assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); - String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_SUCCESS, 1); + + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_SUCCESS); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); assertEquals("size of shape should be equal", 1, shape.length); assertEquals("shape should be equal", (Integer)4, shape[0]); + Double[][] value = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())),Double[][].class); assertEquals("size should be same", classificationOutputNumber, value[0].length); assertTrue("Should be first class",value[0][0] > value[0][1]); @@ -282,22 +354,22 @@ public void testMatchWith4ParamsClass1SeparatorEL() { public void testMatchWith4ParamsOneNonNumber() { runner.enqueue("5.1,3.5,1.4,abcd"); runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_FAILURE, 1); - List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_FAILURE); - assertNotNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_FAILURE, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_FAILURE); + assertNotNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); } @Test public void testNoDimensionsInvalid() { - runner = TestRunners.newTestRunner(DeepLearning4JPredictor.class); - runner.setProperty(DeepLearning4JPredictor.MODEL_FILE,classificationModelFile.getAbsolutePath()); + runner = TestRunners.newTestRunner(DeepLearning4JMultiLayerPredictor.class); + runner.setProperty(DeepLearning4JMultiLayerPredictor.MODEL_FILE,classificationModelFile.getAbsolutePath()); runner.assertNotValid(); } @Test public void testNoModelInvalid() { - runner = TestRunners.newTestRunner(DeepLearning4JPredictor.class); - runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"1,4"); + runner = TestRunners.newTestRunner(DeepLearning4JMultiLayerPredictor.class); + runner.setProperty(DeepLearning4JMultiLayerPredictor.RECORD_DIMENSIONS,"1,4"); runner.assertNotValid(); } @@ -305,31 +377,34 @@ public void testNoModelInvalid() { public void testMatchWith3ParamsBad() { runner.enqueue("5.1,3.5,1.4"); runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_FAILURE, 1); - List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_FAILURE); - assertNotNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_FAILURE, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_FAILURE); + assertNotNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); } @Test public void testMatchWithEmptyParams() { runner.enqueue(""); runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_FAILURE, 1); - List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_FAILURE); - assertNotNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_FAILURE, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_FAILURE); + assertNotNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); } @Test public void testMatchWith5ParamOk() { runner.enqueue("5.1,3.5,1.4,0.2,.6"); runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); - String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_SUCCESS, 1); + + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_SUCCESS); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); assertEquals("size of shape should be equal", 1, shape.length); assertEquals("shape should be equal", (Integer)4, shape[0]); - assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + Double[][] value = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())),Double[][].class); assertEquals("size should be same", classificationOutputNumber, value[0].length); } diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JProcessorRegression.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JMultiLayerPredictorRegression.java similarity index 83% rename from nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JProcessorRegression.java rename to nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JMultiLayerPredictorRegression.java index 77c7dce5b073..1117ad6516b1 100644 --- a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JProcessorRegression.java +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JMultiLayerPredictorRegression.java @@ -56,7 +56,7 @@ /** * This test case builds a regression model based on deeplearning4j examples. */ -public class TestDeepLearning4JProcessorRegression { +public class TestDeepLearning4JMultiLayerPredictorRegression { private static File regressionModelFile; private static int regressionInputNumber; @@ -113,9 +113,9 @@ public static void deleteModel() throws FileNotFoundException, IOException, Inte @Before public void setUp() throws IOException { - runner = TestRunners.newTestRunner(DeepLearning4JPredictor.class); - runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"1,1"); - runner.setProperty(DeepLearning4JPredictor.MODEL_FILE,regressionModelFile.getAbsolutePath()); + runner = TestRunners.newTestRunner(DeepLearning4JMultiLayerPredictor.class); + runner.setProperty(DeepLearning4JMultiLayerPredictor.RECORD_DIMENSIONS,"1,1"); + runner.setProperty(DeepLearning4JMultiLayerPredictor.MODEL_FILE,regressionModelFile.getAbsolutePath()); runner.assertValid(); } @@ -128,11 +128,11 @@ public void teardown() { public void testMatchWithParamsPi() { runner.enqueue("3.14"); runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); - assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_SUCCESS); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); - String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); assertEquals("size of shape should be equal", 1, shape.length); assertEquals("shape should be equal", (Integer)1, shape[0]); @@ -146,11 +146,11 @@ public void testMatchWithParamsPi() { public void testMatchWithParamsPiBy2() { runner.enqueue("1.57"); runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); - assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_SUCCESS); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); - String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); assertEquals("size of shape should be equal", 1, shape.length); assertEquals("shape should be equal", (Integer)1, shape[0]); @@ -164,17 +164,17 @@ public void testMatchWithParamsPiBy2() { @Test public void testMatchWith2ParamsPiAndPiBy2() { runner.enqueue("3.14," + System.lineSeparator() + "1.57"); - runner.setProperty(DeepLearning4JPredictor.RECORD_DIMENSIONS,"1,1"); + runner.setProperty(DeepLearning4JMultiLayerPredictor.RECORD_DIMENSIONS,"1,1"); runner.run(1,true,true); - runner.assertAllFlowFilesTransferred(DeepLearning4JPredictor.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JPredictor.REL_SUCCESS); + runner.assertAllFlowFilesTransferred(DeepLearning4JMultiLayerPredictor.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(DeepLearning4JMultiLayerPredictor.REL_SUCCESS); - String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); + String shapeString = flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_OUTPUT_SHAPE); Integer [] shape = gson.fromJson(new StringReader(shapeString), Integer[].class); assertEquals("size of shape should be equal", 1, shape.length); assertEquals("shape should be equal", (Integer)1, shape[0]); - assertNull(flowFiles.get(0).getAttribute(DeepLearning4JPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); + assertNull(flowFiles.get(0).getAttribute(DeepLearning4JMultiLayerPredictor.DEEPLEARNING4J_ERROR_MESSAGE)); String result = new String(flowFiles.get(0).toByteArray()); Double[][] value = gson.fromJson(new StringReader(result),Double[][].class); assertEquals("size should be same", 2, value.length); From d5aa217d15ec90d846824fa16f2abb854a43fe87 Mon Sep 17 00:00:00 2001 From: mans2singh Date: Tue, 12 Jun 2018 20:05:24 -0700 Subject: [PATCH 6/7] NIFI-5166 - Removed nar from assembly --- nifi-assembly/pom.xml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 682308d13db9..344e30c4429e 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -379,12 +379,6 @@ language governing permissions and limitations under the License. --> 1.7.0-SNAPSHOT nar - - org.apache.nifi - nifi-deeplearning4j-nar - 1.7.0-SNAPSHOT - nar - org.apache.nifi nifi-avro-nar From 115d280eb4d8af6189e5681c8f2ce2a739b0f495 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Tue, 3 Jul 2018 16:36:15 +0900 Subject: [PATCH 7/7] NIFI-5166: DL4J integration - Removed CUDA dependencies to reduce NAR file size - Bumped DL4J to 1.0.0-alpha - Fixed 'Unable to create a 1d array from a non vector!' error - Use existing model files to minimize test time --- .../nifi-deeplearning4j-processors/pom.xml | 21 ++--------- .../DeepLearning4JMultiLayerPredictor.java | 6 +++- ...ng4JMultiLayerPredictorClassification.java | 33 +++++++++--------- ...arning4JMultiLayerPredictorRegression.java | 30 ++++++++-------- .../test/resources/classification-model.zip | Bin 0 -> 1368 bytes .../src/test/resources/regression-model.zip | Bin 0 -> 1621 bytes 6 files changed, 40 insertions(+), 50 deletions(-) create mode 100644 nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification-model.zip create mode 100644 nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/regression-model.zip diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml index 7e77c70af227..d9e7c9b09433 100644 --- a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml @@ -29,32 +29,17 @@ org.nd4j nd4j-api - 1.0.0-alpha + 1.0.0-beta org.nd4j nd4j-native-platform - 1.0.0-alpha + 1.0.0-beta - - org.nd4j - nd4j-cuda-8.0-platform - 1.0.0-alpha - - - org.nd4j - nd4j-cuda-9.0-platform - 1.0.0-alpha - - - org.nd4j - nd4j-cuda-9.1-platform - 1.0.0-alpha - org.deeplearning4j deeplearning4j-core - 1.0.0-alpha + 1.0.0-beta org.apache.commons diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JMultiLayerPredictor.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JMultiLayerPredictor.java index bd7afce9bd88..e779f056f3fa 100644 --- a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JMultiLayerPredictor.java +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JMultiLayerPredictor.java @@ -177,7 +177,11 @@ record -> { double [][] partitionedResults = new double[inputRecords.length][]; for (int row = 0; row < inputRecords.length; row++) { INDArray result = results.getRow(row); - partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector(); + if (result.isScalar()) { + partitionedResults[row] = new double[]{result.getDouble(0)}; + } else { + partitionedResults[row] = Nd4j.toFlattened(result).toDoubleVector(); + } } String jsonResult = gson.toJson(partitionedResults); diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JMultiLayerPredictorClassification.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JMultiLayerPredictorClassification.java index 28d9dfeacb54..8d1482bf25fe 100644 --- a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JMultiLayerPredictorClassification.java +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JMultiLayerPredictorClassification.java @@ -64,16 +64,26 @@ */ public class TestDeepLearning4JMultiLayerPredictorClassification { - private static File classificationModelFile; - private static int classficationInputNumber; - private static int classificationOutputNumber; + private static final File classificationModelFile = new File("src/test/resources/classification-model.zip"); + private static final int classficationInputNumber = 4; + private static final int classificationOutputNumber = 4; private TestRunner runner; - protected MultiLayerNetwork model = null; - protected Gson gson = new Gson(); - protected static final String CORRELATION_ATTRIBUTE = "ids"; + private Gson gson = new Gson(); + private static final String CORRELATION_ATTRIBUTE = "ids"; @BeforeClass - public static void setUpModel() throws FileNotFoundException, IOException, InterruptedException { + public static void setUpModel() throws IOException, InterruptedException { + + if (classificationModelFile.isFile()) { + // An existing model was found, use it. + // Delete the existing zip file to recreate a model. + return; + } + + if (!classificationModelFile.createNewFile()) { + throw new RuntimeException("Failed to create new model file."); + } + int numLinesToSkip = 0; char delimiter = ','; RecordReader recordReader = new CSVRecordReader(numLinesToSkip,delimiter); @@ -93,8 +103,6 @@ public static void setUpModel() throws FileNotFoundException, IOException, Inter normalizer.fit(trainingData); normalizer.transform(trainingData); normalizer.transform(testData); - classficationInputNumber = 4; - classificationOutputNumber = 4; long seed = 42; MultiLayerConfiguration conf = new NeuralNetConfiguration.Builder() @@ -118,17 +126,10 @@ public static void setUpModel() throws FileNotFoundException, IOException, Inter for(int i=0; i< 1000; i++ ) { classificationModel.fit(trainingData); } - classificationModelFile = File.createTempFile("classification-model", ".zip", new File("./")); - classificationModelFile.deleteOnExit(); // save the model ModelSerializer.writeModel(classificationModel, classificationModelFile ,false); } - @AfterClass - public static void deleteModel() throws FileNotFoundException, IOException, InterruptedException { - classificationModelFile.delete(); - } - @Before public void setUp() throws IOException { runner = TestRunners.newTestRunner(DeepLearning4JMultiLayerPredictor.class); diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JMultiLayerPredictorRegression.java b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JMultiLayerPredictorRegression.java index 1117ad6516b1..bd087d07e814 100644 --- a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JMultiLayerPredictorRegression.java +++ b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/java/org/apache/nifi/processors/deeplearning4j/TestDeepLearning4JMultiLayerPredictorRegression.java @@ -19,7 +19,6 @@ import static org.junit.Assert.assertNull; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.StringReader; import java.util.Collections; @@ -48,7 +47,6 @@ import org.nd4j.linalg.lossfunctions.LossFunctions; import com.google.gson.Gson; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -58,14 +56,23 @@ */ public class TestDeepLearning4JMultiLayerPredictorRegression { - private static File regressionModelFile; - private static int regressionInputNumber; + private static final File regressionModelFile = new File("src/test/resources/regression-model.zip"); private TestRunner runner; - protected MultiLayerNetwork model = null; - protected Gson gson = new Gson(); + private Gson gson = new Gson(); @BeforeClass - public static void setUpModel() throws FileNotFoundException, IOException, InterruptedException { + public static void setUpModel() throws IOException { + + if (regressionModelFile.isFile()) { + // An existing model was found, use it. + // Delete the existing zip file to recreate a model. + return; + } + + if (!regressionModelFile.createNewFile()) { + throw new RuntimeException("Failed to create new model file."); + } + int nSamples = 1000; Random random = new Random(42); INDArray x = Nd4j.linspace(-Math.PI,Math.PI, nSamples ).reshape(nSamples, 1); @@ -75,7 +82,7 @@ public static void setUpModel() throws FileNotFoundException, IOException, Inter final List list = allData.asList(); Collections.shuffle(list,random); final DataSetIterator iterator = new ListDataSetIterator(list,100); - regressionInputNumber = 1; + final int regressionInputNumber = 1; long seed = 42; int numberOfNodes = 50; @@ -100,17 +107,10 @@ public static void setUpModel() throws FileNotFoundException, IOException, Inter for(int i = 0; i< 2000; i++ ) { regressionModel.fit(iterator); } - regressionModelFile = File.createTempFile("regression-model", ".zip", new File("./")); - regressionModelFile.deleteOnExit(); ModelSerializer.writeModel(regressionModel, regressionModelFile ,false); } - @AfterClass - public static void deleteModel() throws FileNotFoundException, IOException, InterruptedException { - regressionModelFile.delete(); - } - @Before public void setUp() throws IOException { runner = TestRunners.newTestRunner(DeepLearning4JMultiLayerPredictor.class); diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification-model.zip b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification-model.zip new file mode 100644 index 0000000000000000000000000000000000000000..69a68a8fe1e4cac6ef0b7b96463b3d69b5118752 GIT binary patch literal 1368 zcmWIWW@Zs#;Nak3;A?*D!+->Yfb8V_ytK^p(xSwY%=|pPtm6Ef`K`?} zH`vkWlDGd?=SpPW%0UcwmP5MgEgXyIE6Zs4;ywF|2R0wNG#UPvDJ9jWVYXy z_g=rhZd}S@(Y@JHdxeqWdZ&#{jUs7V#1;R&dCA2&MS8=UJgKdkc~h4(&gR;1U;pdP zle^DV{d<`Ch2_zMC)Z~vWU`me@VLv;AneeTG;7}fBF$1>!F9IkcUc2C7p}c{rzNRX z{>>cmEzvvUXRGZ}Iw&&nO0K`B`13hCF0(~AR=f>yjhmIdP2!~L(N`4#&GDaERu<_+ z30yi-@;ZpKW#y#35lu^9HLi5vWLu@AR^;+w^V{Vy=8GTvvCW^%IYCmn)AB)i)R_r$ zUh3^$rNcTYBXZNC`)qo*8t=KK?={-3J7?ExlgwSF+3OE!$8J@*@Ltg8Pjc2)pViy= zFL)}i6_)Lg?SB44vM=G-)W+Ponx2uF8naLT{Qk0P z^PKy&(Z1rR6>iUC-I|K6V9vE6ayK~S@IZIAa)+v!=hnFT8kF10(dRFC^8_t*8gS!c_tVD%lecnG#o29d}Qwvjl8$! z9m_TwL|BB*D_g3~Sas0Q<7CU;jCi4{L+!Qawe>A5=Pce9vvTG?PB|vusZYxvZJpSC ze2Kh4=$FS;doNx*Eg#^`&QZ@xf#E$r1497LtRMi+3aM#nnaP={c_qbqNtt=c z3Ezu&ic9!75)u*~BqV(M!p&$jqhW?%$KQn}9L5eWQ`l5w|M9HJv-{Vl_`7u3+I#<3 zUAaI;(Gu`{;Jul}uB{quDE+t7N6 zH}@ClY(AfP^VVNsp*`RI{jY8|?fSXtx^nr`y6Hb|ml@})+~SUp&YfQsb2=}WcXMu@ z$H#w{Zm$0Bn;3TI_J5n3b)KIt)@n&dfiiKd2i{z+HvpNzk`9lgBQO3{!?S+?LwpDH}1~~$kbO~u_yog zuK4P3ZS%z7PFRa;Cn{sGN{mf0v?_Oq4+ZN3``K|4B+f7fO zKirf*d;8J6>A7=qZ=d~Dd;03yw^wDQzPq|bz1{uo#j$x`AMIxZMO?Luvr{!A149!K z2Y53wi7+5aG2}=Em13v>vrI$Rk8Cz5>JfmO8B`{sCWHWQRyL3dCLpW>(nnZ8JOCJ5 BZC3yQ literal 0 HcmV?d00001 diff --git a/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/regression-model.zip b/nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/regression-model.zip new file mode 100644 index 0000000000000000000000000000000000000000..19acbfd3abd51c391c113ac26c883b40e0a931a2 GIT binary patch literal 1621 zcmZ{kdpHw%7{^EE+7M1cD70J}8b&%3O4x{KGd7f6Sh<^o5^KnekR}dtU2-r(E{7y^ z+%LJO<5Idw+l~~+Er;_@bet`{p&VzjN zkM>mdrWo8efy`39rJP9b|kSNB!Tg!?vPY_NpdRd0gzp?mM z9Eo!q^sb9F&CoPam<0oPk=PE&Ew#M?=DerZ5L)O8`*xxV5kM7WGAcJ-Teo8f-b4MO`Z;`U`L(gXpJmR0Ad~tgWJ?ong+k1^8yOl z)qg1}-<3ThHzLPZKACVGj5>CB_ujb7n!BU#J|N)bk3`k+Wy4}WQWRe5U7teRTzsF4nlKk zJo-o>gS(hQY)zq2s7&BT+RNom*k*;7#Zz4!i1sF%=*!H!q!zKHOWZ0>2Px9ETs}aM z#Le-St!-|7APVQ^3%M&16}$Ndj24agw8CM(lp$u|nA>E@LSuTf1tPMJC-tfsC%Jyj zT3^iTbxx@13#?59uuW&4iTr3wj ztL)<<%!Sr?LuPeH*`-R?UWG><^Us#$9g!lUJ++q4*@6H7MgjnE{9oLq|G}L|B9ZZA zqCX`>*Ng0L&%oN;4%!!=K^oqwb8-%B9XSHAS5PVCV!4}H4+k;1)iP4~mT_d;h_m7Q zK=Ji8w9)RGb{o#QA?WhGrOnE`0^PD0X)Aj=BHN(c&a~cb)=FM#fd6(?j24MHR_Z=K zFBsE;oy|WxQd5Nz&c7kkpmLUq37ITPlRmc%x>2vhU%J72?slm3; z(u~01$mqm1n)n4CL(lVBj-=k(885uI&SMYx7<=V_z!8rm+hD0^3r{oj*~H)8CtaPu z$n#}aRCLifG+M7lCH?xJQR5$dZdc(P$pZJf=EHATOUU0YiOt6VCuiJSbd=qFy7k%) z^r3ytetp8*P15_fz+PKY(XYu`t4At=gc^hy)7nR0wXLsO|g&xbylO>)XsM literal 0 HcmV?d00001