From a5260e4c054afc0e09997d92e1c43eff7afd3975 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Sat, 23 Jun 2018 18:52:27 -0400 Subject: [PATCH 1/5] NIFI-5084 Added GenerateRecord processor. --- .../nifi-data-generation-nar/pom.xml | 36 +++ .../src/main/resources/META-INF/LICENSE | 270 +++++++++++++++++ .../src/main/resources/META-INF/NOTICE | 51 ++++ .../nifi-data-generation-processors/pom.xml | 95 ++++++ .../processors/generation/GenerateRecord.java | 209 +++++++++++++ .../org.apache.nifi.processor.Processor | 15 + .../additionalDetails.html | 28 ++ .../generation/TestGenerateRecord.groovy | 274 ++++++++++++++++++ .../src/test/resources/map_test.avsc | 36 +++ .../nifi-data-generation-bundle/pom.xml | 35 +++ nifi-nar-bundles/pom.xml | 1 + 11 files changed, 1050 insertions(+) create mode 100644 nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/java/org/apache/nifi/processors/generation/GenerateRecord.java create mode 100644 nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/resources/docs/org.apache.nifi.processors.generation.GenerateRecord/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/test/groovy/org/apache/nifi/processors/generation/TestGenerateRecord.groovy create mode 100644 nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/test/resources/map_test.avsc create mode 100644 nifi-nar-bundles/nifi-data-generation-bundle/pom.xml diff --git a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/pom.xml b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/pom.xml new file mode 100644 index 000000000000..48ebcc16681b --- /dev/null +++ b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + + org.apache.nifi + nifi-data-generation-bundle + 1.8.0-SNAPSHOT + + nifi-data-generation-nar + nar + NiFi Data Generation NAR + + true + true + + + + org.apache.nifi + nifi-data-generation-processors + 1.8.0-SNAPSHOT + + + diff --git a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 000000000000..80e866881171 --- /dev/null +++ b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,270 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + +The binary distribution of this product bundles 'ParaNamer' and 'Paranamer Core' + which is available under a BSD style license. + + Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of the copyright holders nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + THE POSSIBILITY OF SUCH DAMAGE. + + The binary distribution of this product bundles 'XMLENC' which is available + under a BSD license. More details found here: http://xmlenc.sourceforge.net. + + Copyright 2003-2005, Ernst de Haan + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + 3. Neither the name of the copyright holder nor the names of its contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 000000000000..021acf775a3f --- /dev/null +++ b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,51 @@ +nifi-data-generation-nar +Copyright 2014-2018 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Snappy Java + The following NOTICE information applies: + This product includes software developed by Google + Snappy: http://code.google.com/p/snappy/ (New BSD License) + + This product includes software developed by Apache + PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/ + (Apache 2.0 license) + + This library containd statically linked libstdc++. This inclusion is allowed by + "GCC RUntime Library Exception" + http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html + (ASLv2) Generex + This library was released under the ASL 2.0 license according to its official repository at: + https://github.com/mifmif/Generex + + (ASLv2) Avro Random Generator + https://github.com/confluentinc/avro-random-generator/ + Schema test samples in test/resources taken directly from examples in Avro Random Generator documentation. + Copyright Confluent, Inc. + +**************** +BSD +**************** + + Brics Automaton (under core/src/java/org/apache/lucene/util/automaton) is + BSD-licensed, created by Anders Møller. See http://www.brics.dk/automaton/ + + +***************** +Public Domain +***************** + +The following binary components are provided to the 'Public Domain'. See project link for details. + + (Public Domain) XZ for Java (org.tukaani:xz:jar:1.5 - http://tukaani.org/xz/java.html + + + diff --git a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/pom.xml b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/pom.xml new file mode 100644 index 000000000000..a4d217065705 --- /dev/null +++ b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/pom.xml @@ -0,0 +1,95 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-data-generation-bundle + 1.8.0-SNAPSHOT + + nifi-data-generation-processors + jar + + + io.confluent.avro + avro-random-generator + 0.2.1 + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-record-serialization-service-api + compile + + + org.apache.nifi + nifi-schema-registry-service-api + compile + + + org.apache.nifi + nifi-record + 1.8.0-SNAPSHOT + compile + + + org.apache.nifi + nifi-avro-record-utils + 1.8.0-SNAPSHOT + + + org.apache.nifi + nifi-mock + 1.8.0-SNAPSHOT + test + + + org.apache.nifi + nifi-record-serialization-services + 1.8.0-SNAPSHOT + test + + + org.apache.nifi + nifi-mock-record-utils + 1.8.0-SNAPSHOT + test + + + + + + org.apache.rat + apache-rat-plugin + + + src/test/resources/map_test.avsc + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + diff --git a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/java/org/apache/nifi/processors/generation/GenerateRecord.java b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/java/org/apache/nifi/processors/generation/GenerateRecord.java new file mode 100644 index 000000000000..8bac228f63cd --- /dev/null +++ b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/java/org/apache/nifi/processors/generation/GenerateRecord.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.generation; + +import io.confluent.avro.random.generator.Generator; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +public class GenerateRecord extends AbstractProcessor { + static final PropertyDescriptor WRITER = new PropertyDescriptor.Builder() + .name("generate-record-writer") + .displayName("Record Writer") + .identifiesControllerService(RecordSetWriterFactory.class) + .description("The record writer to use for serializing generated records.") + .required(true) + .build(); + + static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() + .name("generate-record-schema") + .displayName("Schema") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .description("An Avro schema to use for generating records.") + .required(false) + .addValidator(Validator.VALID) + .build(); + + static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() + .name("generate-record-limit") + .displayName("Limit") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .description("") + .defaultValue("25") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor FIXED_SIZE = new PropertyDescriptor.Builder() + .name("generate-record-fixed-size") + .displayName("Fixed Size") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .description("") + .defaultValue("true") + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If there is an error building a record set, the input will go here.") + .build(); + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Generated results go to this relationship.") + .build(); + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("If input is provided, the input flowfile gets routed to this relationship.") + .build(); + + static final List PROPS; + static final Set RELS; + + static { + List _temp = new ArrayList<>(); + _temp.add(WRITER); + _temp.add(SCHEMA); + _temp.add(LIMIT); + _temp.add(FIXED_SIZE); + + Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + _rels.add(REL_ORIGINAL); + + PROPS = Collections.unmodifiableList(_temp); + RELS = Collections.unmodifiableSet(_rels); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPS; + } + + @Override + public Set getRelationships() { + return RELS; + } + + private volatile RecordSetWriterFactory writerFactory; + + @OnScheduled + public void onScheduled(ProcessContext context) { + writerFactory = context.getProperty(WRITER).asControllerService(RecordSetWriterFactory.class); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile input = null; + if (context.hasIncomingConnection()) { + input = session.get(); + + if (input == null && context.hasNonLoopConnection()) { + return; + } + } + + final int limit; + final boolean fixed = context.getProperty(FIXED_SIZE).asBoolean(); + if (fixed) { + limit = context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger(); + } else { + int ceiling = context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger(); + limit = new Random().nextInt(ceiling); + } + + final Generator generator; + final RecordSchema schema; + + try { + if (!context.getProperty(SCHEMA).isSet() && input != null) { + schema = writerFactory.getSchema(input.getAttributes(), null); + String text = schema.getSchemaText().get(); + generator = new Generator(text, new Random()); + } else if (!context.getProperty(SCHEMA).isSet() && input == null) { + throw new ProcessException("When there is no incoming connection, a avro schema must be set " + + "in the Schema configuration property for this processor."); + } else { + Schema parsed = new Schema.Parser().parse(context.getProperty(SCHEMA).getValue()); + schema = AvroTypeUtil.createSchema(parsed); + generator = new Generator(parsed, new Random()); + } + + List records = new ArrayList<>(); + for (int x = 0; x < limit; x++) { + GenericData.Record o = (GenericData.Record) generator.generate(); + Map y = AvroTypeUtil.convertAvroRecordToMap(o, schema); + records.add(new MapRecord(schema, y)); + } + + FlowFile out = session.create(input); + out = session.write(out, outputStream -> { + try { + RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, outputStream); + writer.beginRecordSet(); + for (int x = 0; x < records.size(); x++) { + writer.write(records.get(x)); + } + writer.finishRecordSet(); + writer.close(); + } catch (SchemaNotFoundException e) { + getLogger().error(e.getMessage()); + } + }); + + session.transfer(out, REL_SUCCESS); + if (input != null) { + session.transfer(input, REL_ORIGINAL); + } + } catch (Exception ex) { + getLogger().error(ex.getMessage()); + if (input != null) { + session.transfer(input, REL_FAILURE); + } + } + } +} diff --git a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 000000000000..c1df33c9c8d5 --- /dev/null +++ b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.generation.GenerateRecord \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/resources/docs/org.apache.nifi.processors.generation.GenerateRecord/additionalDetails.html b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/resources/docs/org.apache.nifi.processors.generation.GenerateRecord/additionalDetails.html new file mode 100644 index 000000000000..9a7b94821809 --- /dev/null +++ b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/resources/docs/org.apache.nifi.processors.generation.GenerateRecord/additionalDetails.html @@ -0,0 +1,28 @@ + + + + + + GenerateRecord + + + +

This processor is powered by the avro-random-generator library from Confluent. This library defines and uses additional + metadata for Avro to set up rules for generating random data. Documentation for those additions can be found + here. The additions are compatible with standard Avro + and the NiFi Record API.

+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/test/groovy/org/apache/nifi/processors/generation/TestGenerateRecord.groovy b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/test/groovy/org/apache/nifi/processors/generation/TestGenerateRecord.groovy new file mode 100644 index 000000000000..0acee4da319d --- /dev/null +++ b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/test/groovy/org/apache/nifi/processors/generation/TestGenerateRecord.groovy @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.generation + +import org.apache.avro.Schema +import org.apache.nifi.avro.AvroTypeUtil +import org.apache.nifi.json.JsonRecordSetWriter +import org.apache.nifi.json.JsonTreeReader +import org.apache.nifi.schema.access.SchemaAccessUtils +import org.apache.nifi.serialization.RecordReaderFactory +import org.apache.nifi.serialization.RecordSetWriterFactory +import org.apache.nifi.serialization.record.MockSchemaRegistry +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners +import org.junit.Assert +import org.junit.Before +import org.junit.Test + +import static groovy.json.JsonOutput.prettyPrint +import static groovy.json.JsonOutput.toJson + +class TestGenerateRecord { + TestRunner runner + MockSchemaRegistry registry + RecordSetWriterFactory writer + RecordReaderFactory reader + + @Before + void setup() { + runner = TestRunners.newTestRunner(GenerateRecord.class) + writer = new JsonRecordSetWriter() + reader = new JsonTreeReader() + registry = new MockSchemaRegistry() + runner.addControllerService("writer", writer) + runner.addControllerService("registry", registry) + runner.addControllerService("reader", reader) + [reader, writer].each { + runner.setProperty(it, SchemaAccessUtils.SCHEMA_REGISTRY, "registry") + runner.setProperty(it, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY) + } + runner.setProperty(GenerateRecord.WRITER, "writer") + runner.enableControllerService(registry) + runner.enableControllerService(writer) + runner.enableControllerService(reader) + } + + @Test + void testValidity() { + runner.assertValid() + } + + @Test + void testRun() { + def simpleSchema = prettyPrint(toJson([ + type: "record", + name: "SimpleTestRecord", + fields: [ + [ name: "msg", type: "string" ] + ] + ])) + + def attrs = [ "schema.name": "really_simple"] + + def validator = { byte[] input, int expected, boolean fixed -> + def is = new ByteArrayInputStream(input) + def jsonReader = reader.createRecordReader(attrs, is, runner.getLogger()) + int count = 0 + def rec = jsonReader.nextRecord() + while (rec) { + count++ + rec = jsonReader.nextRecord() + } + if (fixed) { + Assert.assertEquals(expected, count) + } else { + Assert.assertTrue(count <= expected && count > 0) + } + } + + registry.addSchema("really_simple", AvroTypeUtil.createSchema(new Schema.Parser().parse(simpleSchema))) + runner.enqueue("", attrs) + runner.run() + + runner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1) + runner.assertTransferCount(GenerateRecord.REL_ORIGINAL, 1) + runner.assertTransferCount(GenerateRecord.REL_FAILURE, 0) + + validator(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS)[0]), 25, true) + + runner.clearTransferState() + + attrs['generate.limit'] = "100" + runner.setProperty(GenerateRecord.LIMIT, '${generate.limit}') + runner.enqueue("", attrs) + runner.run() + + runner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1) + runner.assertTransferCount(GenerateRecord.REL_ORIGINAL, 1) + runner.assertTransferCount(GenerateRecord.REL_FAILURE, 0) + + validator(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS)[0]), 100, true) + + runner.clearTransferState() + runner.setProperty(GenerateRecord.FIXED_SIZE, "false") + runner.setProperty(GenerateRecord.LIMIT, '${generate.limit}') + runner.enqueue("", attrs) + runner.run() + + runner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1) + runner.assertTransferCount(GenerateRecord.REL_ORIGINAL, 1) + runner.assertTransferCount(GenerateRecord.REL_FAILURE, 0) + validator(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS)[0]), 100, false) + + runner.clearTransferState() + runner.setIncomingConnection(false) + runner.setProperty(GenerateRecord.SCHEMA, simpleSchema) + runner.setVariable("generate.limit", "100") + runner.setProperty(GenerateRecord.FIXED_SIZE, "true") + runner.run() + runner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1) + runner.assertTransferCount(GenerateRecord.REL_ORIGINAL, 0) + runner.assertTransferCount(GenerateRecord.REL_FAILURE, 0) + validator(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS)[0]), 100, false) + } + + @Test + void testNumericParameters() { + def simpleSchema = prettyPrint(toJson([ + type: "record", + name: "SimpleTestRecord", + fields: [ + [ name: "msg", type: [ + type: "long", + "arg.properties": [ + length: [ + min: 5, + max: 10 + ] + ] + ]] + ] + ])) + + def attrs = [ "schema.name": "really_simple" ] + + def validator = { byte[] input, int expected -> + def is = new ByteArrayInputStream(input) + def jsonReader = reader.createRecordReader(attrs, is, runner.getLogger()) + int count = 0 + def rec = jsonReader.nextRecord() + while (rec) { + count++ + def msg = rec.getAsLong("msg") + Assert.assertNotNull(msg) + Assert.assertTrue(msg instanceof Long) + rec = jsonReader.nextRecord() + } + Assert.assertEquals(expected, count) + } + + registry.addSchema("really_simple", AvroTypeUtil.createSchema(new Schema.Parser().parse(simpleSchema))) + runner.enqueue("", attrs) + runner.run() + runner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1) + runner.assertTransferCount(GenerateRecord.REL_ORIGINAL, 1) + runner.assertTransferCount(GenerateRecord.REL_FAILURE, 0) + + validator(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS)[0]), 25) + } + + @Test + void testStringParameters() { + def options = ["the", "this", "that"] + def simpleSchema = prettyPrint(toJson([ + type: "record", + name: "SimpleTestRecord", + fields: [ + [ name: "msg", type: [ + type: "string", + "arg.properties": [ + length: [ + min: 5, + max: 10 + ], + regex: "[a-zA-Z]{5,10}" + ] + ]], + [ name: "msg2", type: [ + type: "string", + "arg.properties": [ + options: options + ] + ]] + ] + ])) + + def attrs = [ "schema.name": "really_simple" ] + + def validator = { byte[] input, int expected -> + def is = new ByteArrayInputStream(input) + def jsonReader = reader.createRecordReader(attrs, is, runner.getLogger()) + int count = 0 + def rec = jsonReader.nextRecord() + while (rec) { + count++ + def str = rec.getAsString("msg") + def str2 = rec.getAsString("msg2") + Assert.assertNotNull(str) + Assert.assertNotNull(str2) + Assert.assertTrue(str.length() <= 10 && str.length() >= 5) + Assert.assertTrue(options.contains(str2)) + rec = jsonReader.nextRecord() + } + Assert.assertEquals(expected, count) + } + + registry.addSchema("really_simple", AvroTypeUtil.createSchema(new Schema.Parser().parse(simpleSchema))) + runner.enqueue("", attrs) + runner.run() + runner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1) + runner.assertTransferCount(GenerateRecord.REL_ORIGINAL, 1) + runner.assertTransferCount(GenerateRecord.REL_FAILURE, 0) + + byte[] input = runner.getContentAsByteArray(runner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS)[0]) + validator(input, 25) + } + + @Test + void testMapField() { + def schema = this.class.getResourceAsStream("/map_test.avsc") + def attrs = [ "schema.name": "map_test" ] + + def validator = { byte[] input, int expected -> + def is = new ByteArrayInputStream(input) + def jsonReader = reader.createRecordReader(attrs, is, runner.getLogger()) + int count = 0 + def rec = jsonReader.nextRecord() + while (rec) { + count++ + def map = rec.getValue("map_field") + Assert.assertNotNull(map) + Assert.assertTrue(map instanceof Map) + map.each { Assert.assertTrue(it.value instanceof Integer) } + rec = jsonReader.nextRecord() + } + Assert.assertEquals(expected, count) + } + + registry.addSchema("map_test", AvroTypeUtil.createSchema(new Schema.Parser().parse(schema))) + runner.enqueue("", attrs) + runner.run() + runner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1) + runner.assertTransferCount(GenerateRecord.REL_ORIGINAL, 1) + runner.assertTransferCount(GenerateRecord.REL_FAILURE, 0) + + byte[] input = runner.getContentAsByteArray(runner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS)[0]) + validator(input, 25) + } +} diff --git a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/test/resources/map_test.avsc b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/test/resources/map_test.avsc new file mode 100644 index 000000000000..df4c6b38e449 --- /dev/null +++ b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/test/resources/map_test.avsc @@ -0,0 +1,36 @@ +{ + "type": "record", + "name": "MapTestRecord", + "fields": [ + { + "name": "map_field", + "type": { + "type": "map", + "values": "int", + "arg.properties": + { + "options": [ + { + "zero": 0 + }, + { + "one": 1, + "two": 2 + }, + { + "three": 3, + "four": 4, + "five": 5 + }, + { + "six": 6, + "seven": 7, + "eight": 8, + "nine": 9 + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-data-generation-bundle/pom.xml b/nifi-nar-bundles/nifi-data-generation-bundle/pom.xml new file mode 100644 index 000000000000..4c30bb61e288 --- /dev/null +++ b/nifi-nar-bundles/nifi-data-generation-bundle/pom.xml @@ -0,0 +1,35 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-nar-bundles + 1.8.0-SNAPSHOT + + nifi-data-generation-bundle + pom + NiFi Data Generation Bundle + + nifi-data-generation-processors + nifi-data-generation-nar + + + + + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 309be2afe467..f7cff70e4b25 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -93,6 +93,7 @@ nifi-spark-bundle nifi-atlas-bundle nifi-druid-bundle + nifi-data-generation-bundle From 52d88eb5f822cd974a52a46c0c34163acb5ba145 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Tue, 26 Jun 2018 07:37:44 -0400 Subject: [PATCH 2/5] NIFI-5084 Updated Assembly. --- nifi-assembly/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 128de680e6fd..8f268b267f4e 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -709,6 +709,12 @@ language governing permissions and limitations under the License. --> 1.8.0-SNAPSHOT nar + + org.apache.nifi + nifi-data-generation-nar + 1.8.0-SNAPSHOT + nar + From 2e78e836501b4446bfe28f1379393e1b1135a315 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Sun, 1 Jul 2018 07:38:51 -0400 Subject: [PATCH 3/5] NIFI-5084 Updated LICENSE and NOTICE with more entries. --- .../src/main/resources/META-INF/LICENSE | 25 +++++++++++++++- .../src/main/resources/META-INF/NOTICE | 30 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/LICENSE index 80e866881171..f0178d6d7546 100644 --- a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/LICENSE +++ b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/LICENSE @@ -267,4 +267,27 @@ The binary distribution of this product bundles 'ParaNamer' and 'Paranamer Core' SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + The binary distribution of this product bundles 'Bouncy Castle JDK 1.5' + under an MIT style license. + + Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org) + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. diff --git a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/NOTICE index 021acf775a3f..725661c6776d 100644 --- a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-nar/src/main/resources/META-INF/NOTICE @@ -26,6 +26,36 @@ The following binary components are provided under the Apache Software License v This library was released under the ASL 2.0 license according to its official repository at: https://github.com/mifmif/Generex + (ASLv2) Apache Avro + The following NOTICE information applies: + Apache Avro + Copyright 2009-2017 The Apache Software Foundation + + (ASLv2) Apache Commons Codec + The following NOTICE information applies: + Apache Commons Codec + Copyright 2002-2014 The Apache Software Foundation + + src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java + contains test data from http://aspell.net/test/orig/batch0.tab. + Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org) + + =============================================================================== + + The content of package org.apache.commons.codec.language.bm has been translated + from the original php source code available at http://stevemorse.org/phoneticinfo.htm + with permission from the original authors. + Original source copyright: + Copyright (c) 2008 Alexander Beider & Stephen P. Morse. + + (ASLv2) Apache Commons Lang + Copyright Apache Software Foundation + + (ASLv2) Apache Commons CSV + The following NOTICE information applies: + Apache Commons CSV + Copyright 2005-2016 The Apache Software Foundation + (ASLv2) Avro Random Generator https://github.com/confluentinc/avro-random-generator/ Schema test samples in test/resources taken directly from examples in Avro Random Generator documentation. From 3b4a6fecbdb88507c6da2b9d92d68ca4478d822e Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Thu, 5 Jul 2018 08:01:13 -0400 Subject: [PATCH 4/5] NIFI-5084 Added changes from code review. --- .../processors/generation/GenerateRecord.java | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/java/org/apache/nifi/processors/generation/GenerateRecord.java b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/java/org/apache/nifi/processors/generation/GenerateRecord.java index 8bac228f63cd..635fb7c1f050 100644 --- a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/java/org/apache/nifi/processors/generation/GenerateRecord.java +++ b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/java/org/apache/nifi/processors/generation/GenerateRecord.java @@ -36,6 +36,7 @@ import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; @@ -47,6 +48,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) public class GenerateRecord extends AbstractProcessor { @@ -71,7 +73,8 @@ public class GenerateRecord extends AbstractProcessor { .name("generate-record-limit") .displayName("Limit") .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .description("") + .description("The maximum number of records to generate per run. It is regulated by the Fixed Size configuration " + + "property.") .defaultValue("25") .required(false) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) @@ -81,7 +84,8 @@ public class GenerateRecord extends AbstractProcessor { .name("generate-record-fixed-size") .displayName("Fixed Size") .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .description("") + .description("If true, the limit configuration will be used to generate a consistently sized record set. If false " + + "the limit value will be the ceiling of a random number range from 1 to that value.") .defaultValue("true") .allowableValues("true", "false") .addValidator(StandardValidators.BOOLEAN_VALIDATOR) @@ -153,12 +157,14 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro limit = context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger(); } else { int ceiling = context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger(); - limit = new Random().nextInt(ceiling); + limit = new Random().nextInt(ceiling) + 1; } final Generator generator; final RecordSchema schema; + FlowFile out = session.create(input); + try { if (!context.getProperty(SCHEMA).isSet() && input != null) { schema = writerFactory.getSchema(input.getAttributes(), null); @@ -180,7 +186,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro records.add(new MapRecord(schema, y)); } - FlowFile out = session.create(input); + final AtomicInteger integer = new AtomicInteger(); out = session.write(out, outputStream -> { try { RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, outputStream); @@ -188,13 +194,18 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro for (int x = 0; x < records.size(); x++) { writer.write(records.get(x)); } - writer.finishRecordSet(); + WriteResult result = writer.finishRecordSet(); writer.close(); + + integer.set(result.getRecordCount()); } catch (SchemaNotFoundException e) { getLogger().error(e.getMessage()); + throw new ProcessException(e); } }); + out = session.putAttribute(out, "record.count", String.valueOf(integer.get())); + session.transfer(out, REL_SUCCESS); if (input != null) { session.transfer(input, REL_ORIGINAL); @@ -204,6 +215,8 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro if (input != null) { session.transfer(input, REL_FAILURE); } + + session.remove(out); } } } From 3c3258676004ff746711b6bcb6edc13b486ba1c9 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Thu, 5 Jul 2018 08:06:27 -0400 Subject: [PATCH 5/5] NIFI-5084 Added a few missing changes requested. --- .../processors/generation/GenerateRecord.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/java/org/apache/nifi/processors/generation/GenerateRecord.java b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/java/org/apache/nifi/processors/generation/GenerateRecord.java index 635fb7c1f050..4f0c77b46d3d 100644 --- a/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/java/org/apache/nifi/processors/generation/GenerateRecord.java +++ b/nifi-nar-bundles/nifi-data-generation-bundle/nifi-data-generation-processors/src/main/java/org/apache/nifi/processors/generation/GenerateRecord.java @@ -21,12 +21,17 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -49,8 +54,15 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@Tags({ "generate", "record", "test", "data", "random" }) +@CapabilityDescription("Provides the ability to generate a record set of a configurable size using a supplied Avro schema.") +@WritesAttributes({ + @WritesAttribute(attribute = "mime.type", description = "The record set mime type."), + @WritesAttribute(attribute = "record.count", description = "The number of records in the generated record set.") +}) public class GenerateRecord extends AbstractProcessor { static final PropertyDescriptor WRITER = new PropertyDescriptor.Builder() .name("generate-record-writer") @@ -187,6 +199,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro } final AtomicInteger integer = new AtomicInteger(); + final AtomicReference mime = new AtomicReference<>(); out = session.write(out, outputStream -> { try { RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, outputStream); @@ -195,6 +208,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro writer.write(records.get(x)); } WriteResult result = writer.finishRecordSet(); + mime.set(writer.getMimeType()); writer.close(); integer.set(result.getRecordCount()); @@ -205,6 +219,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro }); out = session.putAttribute(out, "record.count", String.valueOf(integer.get())); + out = session.putAttribute(out, CoreAttributes.MIME_TYPE.key(), mime.get()); session.transfer(out, REL_SUCCESS); if (input != null) {