NIFI-3518 Create a Morphlines processor#2028
NIFI-3518 Create a Morphlines processor#2028williampn wants to merge 6 commits intoapache:masterfrom
Conversation
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml new file mode 100644 index 0000000..afb93b8 --- /dev/null +++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-morphlines-bundle</artifactId> + <version>1.4.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-morphlines-nar</artifactId> + <version>1.4.0-SNAPSHOT</version> + <packaging>nar</packaging> + <properties> + <maven.javadoc.skip>true</maven.javadoc.skip> + <source.skip>true</source.skip> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-morphlines-processors</artifactId> + <version>1.4.0-SNAPSHOT</version> + </dependency> + </dependencies> + +</project> diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml new file mode 100644 index 0000000..c9ebb31 --- /dev/null +++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml @@ -0,0 +1,61 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-morphlines-bundle</artifactId> + <version>1.4.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-morphlines-processors</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <version>${nifi.version}</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>${nifi.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.kitesdk</groupId> + <artifactId>kite-morphlines-core</artifactId> + <version>${kite.version}</version> + </dependency> + </dependencies> +</project> diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ImplementMorphlines.java b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ImplementMorphlines.java new file mode 100644 index 0000000..56bf66f --- /dev/null +++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ImplementMorphlines.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.morphlines; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.kitesdk.morphline.api.Command; +import org.kitesdk.morphline.api.MorphlineContext; +import org.kitesdk.morphline.api.Record; +import org.kitesdk.morphline.base.Fields; +import org.kitesdk.morphline.base.Compiler; +import org.kitesdk.morphline.base.Notifications; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Set; +import java.util.Iterator; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; + +@tags({"kitesdk", "morphlines", "ETL", "HDFS", "avro", "Solr", "HBase"}) +@CapabilityDescription("Implements Morphlines (http://kitesdk.org/docs/1.1.0/morphlines/) framework, which performs in-memory container of transformation " + + "commands in oder to perform tasks such as loading, parsing, transforming, or otherwise processing a single record.") +@Dynamicproperty(name = "Relationship Name", value = "A Regular Expression", supportsExpressionLanguage = true, description = "Adds the dynamic property key and value " + + "as key-value pair to Morphlines content.") + +public class ImplementMorphlines extends AbstractProcessor { + public static final PropertyDescriptor MORPHLINES_ID = new PropertyDescriptor + .Builder().name("Morphlines ID") + .description("Identifier of the morphlines context") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor MORPHLINES_FILE = new PropertyDescriptor + .Builder().name("Morphlines File") + .description("File for the morphlines context") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor MORPHLINES_OUTPUT_FIELD = new PropertyDescriptor + .Builder().name("Morphlines output field") + .description("Field name of output in Morphlines. Default is '_attachment_body'.") + .required(false) + .defaultValue("_attachment_body") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Relationship for success.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Relationship for failure of morphlines.") + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Relationship for original flowfiles.") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = ImmutableList.<PropertyDescriptor>builder() + .add(MORPHLINES_FILE) + .add(MORPHLINES_ID) + .add(MORPHLINES_OUTPUT_FIELD) + .build(); + + private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.<Relationship>builder() + .add(REL_SUCCESS) + .add(REL_FAILURE) + .build(); + + public PropertyValue morphlinesFileProperty; + public PropertyValue morphlinesIdProperty; + public PropertyValue morphlinesOutputFieldProperty; + public Map<String, PropertyValue> dynamicPropertyMap = new HashMap(); + + @OverRide + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @OverRide + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OverRide + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .required(false) + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .expressionLanguageSupported(true) + .build(); + } + + @OnScheduled + public void onScheduled(ProcessContext context) throws Exception { + morphlinesFileProperty = context.getProperty(MORPHLINES_FILE); + morphlinesIdProperty = context.getProperty(MORPHLINES_ID); + morphlinesOutputFieldProperty = context.getProperty(MORPHLINES_OUTPUT_FIELD); + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (descriptor.isDynamic()) { + dynamicPropertyMap.put(descriptor.getName(), context.getProperty(descriptor)); + } + } + } + + @OverRide + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + FlowFile originalFlowFile = session.clone(flowFile); + final AtomicLong written = new AtomicLong(0L); + final byte[] value = new byte[(int) flowFile.getSize()]; + + final File morphlinesFile = new File(morphlinesFileProperty.evaluateAttributeExpressions(flowFile).getValue()); + final String morphlinesId = morphlinesIdProperty.evaluateAttributeExpressions(flowFile).getValue(); + final String morphlinesOutputField = morphlinesOutputFieldProperty.evaluateAttributeExpressions(flowFile).getValue(); + Map<String, Object> settings = new HashMap(); + for (final String descriptorName : dynamicPropertyMap.keySet()) { + final PropertyValue dynamicPropertyValue = dynamicPropertyMap.get(descriptorName); + settings.put(descriptorName, dynamicPropertyValue.evaluateAttributeExpressions(flowFile).getValue()); + } + final MorphlineContext morphlineContext = new MorphlineContext.Builder().setSettings(settings).build(); + + final Collector collectorRecord = new Collector(); + final Command morphline = new Compiler().compile(morphlinesFile, morphlinesId, morphlineContext, collectorRecord); + + try{ + flowFile = session.write(flowFile, new StreamCallback() { + @OverRide + public void process(InputStream in, OutputStream out) throws IOException { + Record record = new Record(); + StreamUtils.fillBuffer(in, value); + record.put(Fields.ATTACHMENT_BODY, value); + Notifications.notifyStartSession(morphline); + if (morphline.process(record)) { + List<Record> results = collectorRecord.getRecords(); + for (Iterator<Record> it = results.iterator(); it.hasNext();) { + Record result = it.next(); + if (result.getFirstValue(morphlinesOutputField) != null) { + String outputValue = it.next().getFirstValue(morphlinesOutputField).toString() + "/n"; + out.write(outputValue.getBytes()); + written.incrementAndGet(); + } else { + getLogger().warn(String.format("Unable to get %s within processed record: %s", morphlinesOutputField, result.toString())); + } + } + Notifications.notifyCommitTransaction(morphline); + } + } + }); + + if (written.get() > 0L) { + // false to only update if file transfer is successful + session.adjustCounter("Processed records in morphlines", written.get(), false); + session.transfer(flowFile, REL_SUCCESS); + session.transfer(originalFlowFile, REL_ORIGINAL); + } else { + getLogger().warn(String.format("Morphlines transformations did not march any of the input records for %s Morphlines ID", morphlinesId)); + session.transfer(flowFile, REL_ORIGINAL); + session.transfer(originalFlowFile, REL_FAILURE); + } + } catch (ProcessException e) { + getLogger().error("Error while processing the flowFile through Morphlines"); + session.transfer(flowFile, REL_ORIGINAL); + session.transfer(originalFlowFile, REL_FAILURE); + Notifications.notifyRollbackTransaction(morphline); + morphlineContext.getExceptionHandler().handleException(e, null); + } + Notifications.notifyShutdown(morphline); + } + + private static final class Collector implements Command { + + private volatile List<Record> results = new ArrayList(); + + public List<Record> getRecords() { + return results; + } + + public void reset() { + results.clear(); + } + + @OverRide + public Command getParent() { + return null; + } + + @OverRide + public void notify(Record notification) { + } + + @OverRide + public boolean process(Record record) { + results.add(record); + return true; + } + + public int getRecordCount() { + return results.size(); + } + } +} diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..65b2511 --- /dev/null +++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1 @@ +org.apache.nifi.processors.morphlines.ImplementMorphlines diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml b/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml new file mode 100644 index 0000000..c4a9abb --- /dev/null +++ b/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.2.0</version> + </parent> + + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-morphlines-bundle</artifactId> + <version>1.4.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <modules> + <module>nifi-morphlines-processors</module> + <module>nifi-morphlines-nar</module> + </modules> + + <properties> + <kite.version>1.1.0</kite.version> + <nifi.version>1.2.0</nifi.version> + <skipNexusStaginDeployMojo>true</skipNexusStaginDeployMojo> + </properties> + +</project>
|
Is there anyone familiar with morphlines that can help test/review? |
|
Dear all,
First, let me introduce myself. My name is William Nouet and I am a big fan of the NiFi project. I believe this is revolutionizing the way the industry is handling data transport and transformation. I would love to be involved in the project and its open source community; so I created a JIRA (NIFI-3518) and decided to help close another one (NIFI-385).
I got some amazing feedback and guidance from the community while working on the above 2 JIRAs, updated my code to make sure it lives up to the NiFi standards. That being said, I haven’t been able to get any traction on getting the code pushed to the main branch for the past few months, hence this emailZ
Please let me know if there is anything I can do to get the ball rolling. I would be happy to re-work on my code if you think it can be improved and would more than happy to meet in person in a bar or a coffee shop if some of you guys are in the NYC metro area.
Thank you in advance.
Best,
William
Envoyé de mon iPhone
… Le 6 oct. 2017 à 16:16, Joe Witt ***@***.***> a écrit :
Is there anyone familiar with morphlines that can help test/review?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub, or mute the thread.
|
|
@WilliamNouet Sorry for the trouble. We try to have a good review model for these things and it seems like we just dont have anyone familiar with morphlines that can help. I am happy to help review it and work with you to get it in though. Can you please rebase, change nifi version references to 1.5.0-SNAPSHOT, and we can go from there. I'm almost certain there will be a lot of LICENSE and NOTICE updates required which I dont see yet. |
|
@WilliamNouet can you please see my last set of comments. A rebase and some progress on the licensing will be necessary. I can definitely help that. I have some time over the next couple of days and this is one I'd be happy to try and knock out if time allows. |
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml new file mode 100644 index 0000000..afb93b8 --- /dev/null +++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-morphlines-bundle</artifactId> + <version>1.4.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-morphlines-nar</artifactId> + <version>1.4.0-SNAPSHOT</version> + <packaging>nar</packaging> + <properties> + <maven.javadoc.skip>true</maven.javadoc.skip> + <source.skip>true</source.skip> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-morphlines-processors</artifactId> + <version>1.4.0-SNAPSHOT</version> + </dependency> + </dependencies> + +</project> diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml new file mode 100644 index 0000000..c9ebb31 --- /dev/null +++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml @@ -0,0 +1,61 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-morphlines-bundle</artifactId> + <version>1.4.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-morphlines-processors</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <version>${nifi.version}</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>${nifi.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.kitesdk</groupId> + <artifactId>kite-morphlines-core</artifactId> + <version>${kite.version}</version> + </dependency> + </dependencies> +</project> diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ImplementMorphlines.java b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ImplementMorphlines.java new file mode 100644 index 0000000..56bf66f --- /dev/null +++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ImplementMorphlines.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.morphlines; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.kitesdk.morphline.api.Command; +import org.kitesdk.morphline.api.MorphlineContext; +import org.kitesdk.morphline.api.Record; +import org.kitesdk.morphline.base.Fields; +import org.kitesdk.morphline.base.Compiler; +import org.kitesdk.morphline.base.Notifications; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Set; +import java.util.Iterator; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; + +@tags({"kitesdk", "morphlines", "ETL", "HDFS", "avro", "Solr", "HBase"}) +@CapabilityDescription("Implements Morphlines (http://kitesdk.org/docs/1.1.0/morphlines/) framework, which performs in-memory container of transformation " + + "commands in oder to perform tasks such as loading, parsing, transforming, or otherwise processing a single record.") +@Dynamicproperty(name = "Relationship Name", value = "A Regular Expression", supportsExpressionLanguage = true, description = "Adds the dynamic property key and value " + + "as key-value pair to Morphlines content.") + +public class ImplementMorphlines extends AbstractProcessor { + public static final PropertyDescriptor MORPHLINES_ID = new PropertyDescriptor + .Builder().name("Morphlines ID") + .description("Identifier of the morphlines context") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor MORPHLINES_FILE = new PropertyDescriptor + .Builder().name("Morphlines File") + .description("File for the morphlines context") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor MORPHLINES_OUTPUT_FIELD = new PropertyDescriptor + .Builder().name("Morphlines output field") + .description("Field name of output in Morphlines. Default is '_attachment_body'.") + .required(false) + .defaultValue("_attachment_body") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Relationship for success.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Relationship for failure of morphlines.") + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Relationship for original flowfiles.") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = ImmutableList.<PropertyDescriptor>builder() + .add(MORPHLINES_FILE) + .add(MORPHLINES_ID) + .add(MORPHLINES_OUTPUT_FIELD) + .build(); + + private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.<Relationship>builder() + .add(REL_SUCCESS) + .add(REL_FAILURE) + .build(); + + public PropertyValue morphlinesFileProperty; + public PropertyValue morphlinesIdProperty; + public PropertyValue morphlinesOutputFieldProperty; + public Map<String, PropertyValue> dynamicPropertyMap = new HashMap(); + + @OverRide + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @OverRide + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OverRide + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .required(false) + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .expressionLanguageSupported(true) + .build(); + } + + @OnScheduled + public void onScheduled(ProcessContext context) throws Exception { + morphlinesFileProperty = context.getProperty(MORPHLINES_FILE); + morphlinesIdProperty = context.getProperty(MORPHLINES_ID); + morphlinesOutputFieldProperty = context.getProperty(MORPHLINES_OUTPUT_FIELD); + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (descriptor.isDynamic()) { + dynamicPropertyMap.put(descriptor.getName(), context.getProperty(descriptor)); + } + } + } + + @OverRide + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + FlowFile originalFlowFile = session.clone(flowFile); + final AtomicLong written = new AtomicLong(0L); + final byte[] value = new byte[(int) flowFile.getSize()]; + + final File morphlinesFile = new File(morphlinesFileProperty.evaluateAttributeExpressions(flowFile).getValue()); + final String morphlinesId = morphlinesIdProperty.evaluateAttributeExpressions(flowFile).getValue(); + final String morphlinesOutputField = morphlinesOutputFieldProperty.evaluateAttributeExpressions(flowFile).getValue(); + Map<String, Object> settings = new HashMap(); + for (final String descriptorName : dynamicPropertyMap.keySet()) { + final PropertyValue dynamicPropertyValue = dynamicPropertyMap.get(descriptorName); + settings.put(descriptorName, dynamicPropertyValue.evaluateAttributeExpressions(flowFile).getValue()); + } + final MorphlineContext morphlineContext = new MorphlineContext.Builder().setSettings(settings).build(); + + final Collector collectorRecord = new Collector(); + final Command morphline = new Compiler().compile(morphlinesFile, morphlinesId, morphlineContext, collectorRecord); + + try{ + flowFile = session.write(flowFile, new StreamCallback() { + @OverRide + public void process(InputStream in, OutputStream out) throws IOException { + Record record = new Record(); + StreamUtils.fillBuffer(in, value); + record.put(Fields.ATTACHMENT_BODY, value); + Notifications.notifyStartSession(morphline); + if (morphline.process(record)) { + List<Record> results = collectorRecord.getRecords(); + for (Iterator<Record> it = results.iterator(); it.hasNext();) { + Record result = it.next(); + if (result.getFirstValue(morphlinesOutputField) != null) { + String outputValue = it.next().getFirstValue(morphlinesOutputField).toString() + "/n"; + out.write(outputValue.getBytes()); + written.incrementAndGet(); + } else { + getLogger().warn(String.format("Unable to get %s within processed record: %s", morphlinesOutputField, result.toString())); + } + } + Notifications.notifyCommitTransaction(morphline); + } + } + }); + + if (written.get() > 0L) { + // false to only update if file transfer is successful + session.adjustCounter("Processed records in morphlines", written.get(), false); + session.transfer(flowFile, REL_SUCCESS); + session.transfer(originalFlowFile, REL_ORIGINAL); + } else { + getLogger().warn(String.format("Morphlines transformations did not march any of the input records for %s Morphlines ID", morphlinesId)); + session.transfer(flowFile, REL_ORIGINAL); + session.transfer(originalFlowFile, REL_FAILURE); + } + } catch (ProcessException e) { + getLogger().error("Error while processing the flowFile through Morphlines"); + session.transfer(flowFile, REL_ORIGINAL); + session.transfer(originalFlowFile, REL_FAILURE); + Notifications.notifyRollbackTransaction(morphline); + morphlineContext.getExceptionHandler().handleException(e, null); + } + Notifications.notifyShutdown(morphline); + } + + private static final class Collector implements Command { + + private volatile List<Record> results = new ArrayList(); + + public List<Record> getRecords() { + return results; + } + + public void reset() { + results.clear(); + } + + @OverRide + public Command getParent() { + return null; + } + + @OverRide + public void notify(Record notification) { + } + + @OverRide + public boolean process(Record record) { + results.add(record); + return true; + } + + public int getRecordCount() { + return results.size(); + } + } +} diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..65b2511 --- /dev/null +++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1 @@ +org.apache.nifi.processors.morphlines.ImplementMorphlines diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml b/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml new file mode 100644 index 0000000..c4a9abb --- /dev/null +++ b/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.2.0</version> + </parent> + + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-morphlines-bundle</artifactId> + <version>1.4.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <modules> + <module>nifi-morphlines-processors</module> + <module>nifi-morphlines-nar</module> + </modules> + + <properties> + <kite.version>1.1.0</kite.version> + <nifi.version>1.2.0</nifi.version> + <skipNexusStaginDeployMojo>true</skipNexusStaginDeployMojo> + </properties> + +</project> NIFI-3518: Updated NiFi snapshot version & added licensing
|
@joewitt Thanks for getting back to me, and sorry for the delay; I was away without access to a laptop. I just pushed a commit with an updated snapshot version and licensing. |
|
@joewitt, did you get a chance to review the latest commit? |
|
Bill, Some feedback after reviewing in more detail.
This looks like a lot but for the L&N we have tons of examples. The rest are pretty straightforward to sort out. Let me know if you plan to tackle these. Thanks |
|
i added a file to the JIRA which is a patch you can apply for the simple pom updates for version |
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
old mode 100755
new mode 100644
index b875f1c..ba8a136
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -1,20 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to
- You under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
- language governing permissions and limitations under the License. -->
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+license agreements. See the NOTICE file distributed with this work for additional
+information regarding copyright ownership. The ASF licenses this file to
+You under the Apache License, Version 2.0 (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of
+the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+by applicable law or agreed to in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+OF ANY KIND, either express or implied. See the License for the specific
+language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-assembly</artifactId>
<packaging>pom</packaging>
@@ -44,6 +44,11 @@
<descriptor>src/main/assembly/dependencies.xml</descriptor>
</descriptors>
<tarLongFileMode>posix</tarLongFileMode>
+ <formats>
+ <format>dir</format>
+ <format>zip</format>
+ <format>tar.gz</format>
+ </formats>
</configuration>
</execution>
</executions>
@@ -188,6 +193,16 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kafka-0-11-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kafka-1-0-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-confluent-platform-nar</artifactId>
<type>nar</type>
</dependency>
@@ -218,6 +233,11 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kudu-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-flume-nar</artifactId>
<type>nar</type>
</dependency>
@@ -243,6 +263,11 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mongodb-services-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-solr-nar</artifactId>
<type>nar</type>
</dependency>
@@ -278,6 +303,11 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-aws-service-api-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-ambari-nar</artifactId>
<type>nar</type>
</dependency>
@@ -308,6 +338,11 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-couchbase-services-api-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase-nar</artifactId>
<type>nar</type>
</dependency>
@@ -333,6 +368,11 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-groovyx-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-nar</artifactId>
<type>nar</type>
</dependency>
@@ -468,7 +508,7 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-grpc-nar</artifactId>
+ <artifactId>nifi-gcp-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
@@ -501,9 +541,76 @@
<artifactId>nifi-redis-nar</artifactId>
<type>nar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-metrics-reporter-service-api-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-metrics-reporting-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-livy-controller-service-api-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-livy-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-druid-controller-service-api-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-druid-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-morphlines-nar</artifactId>
+ <type>nar</type>
+ </dependency>
</dependencies>
<profiles>
<profile>
+ <id>include-grpc</id>
+ <!-- This profile handles the inclusion of grpc artifacts. They are notoriously
+ environment specific in terms of build such as not working in os/arch=ppc64le
+ or in Centos6 due to requiring a newer version of GLIBC. -->
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-grpc-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>include-atlas</id>
+ <!-- This profile handles the inclusion of atlas artifacts. The NAR
+ is quite large and makes the resultant binary distribution significantly
+ larger (60+ MB). -->
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-atlas-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
<id>rpm</id>
<activation>
<activeByDefault>false</activeByDefault>
@@ -638,12 +745,12 @@
<mapping>
<directory>/opt/nifi/nifi-${project.version}/lib</directory>
</mapping>
- <!-- The lib excludes and lib/bootstrap
- includes are computed by looking at the desired contents of lib vs the desired
- contents of bootstrap directories. The bootstrap directory should be comprised
- of explicitly included items as found from the lib/bootstrap of a non rpm
- build and the lib folder should be specific excludes being those which we
- want in bootstrap and NOT in lib. -->
+ <!-- The lib excludes and lib/bootstrap
+ includes are computed by looking at the desired contents of lib vs the desired
+ contents of bootstrap directories. The bootstrap directory should be comprised
+ of explicitly included items as found from the lib/bootstrap of a non rpm
+ build and the lib folder should be specific excludes being those which we
+ want in bootstrap and NOT in lib. -->
<mapping>
<directory>/opt/nifi/nifi-${project.version}/lib</directory>
<dependency>
@@ -676,8 +783,8 @@
<!-- must be in lib <exclude>ch.qos.logback:logback-core</exclude> -->
<exclude>org.apache.nifi:nifi-security-utils</exclude>
<exclude>org.apache.nifi:nifi-utils</exclude>
- <!-- Items to not include
- which are also not in bootstrap -->
+ <!-- Items to not include
+ which are also not in bootstrap -->
<exclude>org.apache.nifi:nifi-resources</exclude>
<exclude>org.apache.nifi:nifi-docs</exclude>
</excludes>
@@ -787,42 +894,11 @@
</dependencies>
</profile>
<profile>
- <id>generateArchives</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <finalName>nifi-${project.version}</finalName>
- <attach>false</attach>
- </configuration>
- <executions>
- <execution>
- <id>make shared resource</id>
- <goals>
- <goal>single</goal>
- </goals>
- <phase>package</phase>
- <configuration>
- <formats>
- <format>dir</format>
- <format>zip</format>
- <format>tar.gz</format>
- </formats>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>dir-only</id>
+ <id>avoid-archive-formats</id>
<activation>
- <activeByDefault>false</activeByDefault>
+ <property>
+ <name>dir-only</name>
+ </property>
</activation>
<build>
<plugins>
@@ -840,6 +916,15 @@
</goals>
<phase>package</phase>
<configuration>
+ <archiverConfig>
+ <defaultDirectoryMode>0775</defaultDirectoryMode>
+ <directoryMode>0775</directoryMode>
+ <fileMode>0664</fileMode>
+ </archiverConfig>
+ <descriptors>
+ <descriptor>src/main/assembly/dependencies.xml</descriptor>
+ </descriptors>
+ <tarLongFileMode>posix</tarLongFileMode>
<formats>
<format>dir</format>
</formats>
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/README.md b/nifi-nar-bundles/nifi-morphlines-bundle/README.md
new file mode 100644
index 0000000..ea93881
--- /dev/null
+++ b/nifi-nar-bundles/nifi-morphlines-bundle/README.md
@@ -0,0 +1,23 @@
+# Kite Morphlines Processor
+
+## What is Morphlines?
+
+Morphlines is an open sour framework, which performs in-memory container of transformation commands in oder to perform tasks such as loading, parsing, transforming, or otherwise processing a single record.
+A morphline is a rich configuration file containing a set of commands that consumes any kind of data from any kind of data source, processes the data and loads the results into a Hadoop component.
+
+## How can it be used within NiFi?
+
+Given the wide scope of commands provided by Morphlines; the Morphlines processor can be used for multiple applications such as real time log parsing, data enrichment, data transformation and data storage (Solr). The processor also allows data enrichment from flowfiles attributes.
+
+## What is needed to use the processor?
+
+Two properties are required to run the processor; namely, the absolute local path of the Morphlines configuration file containing the set of commands. The Morphlines ID of the set of transformations the user wants to run.
+Optionally, the user can chose what should be the output of the processor given that all the output of transformations are stored in memory. Also, the user can add properties to enrich the data: the properties can be from the flowfiles attributes or can be any hard coded values.
+
+## What is the performance the user can expect from the processor?
+
+The performance greatly depends on the commands that will be ran. Ballpark-wise, simple commands such as readLine or readAvro or grok can process O(100k) records per second per CPU core. A command that does almost nothing runs up to O(5 M) records/sec/core. Complex commands such as GeoIP which returns Geolocation information for a given IP address process O(10k) records/sec/core.
+
+## Reference
+
+Kite SDK Morphlines: http://kitesdk.org/docs/1.1.0/morphlines/
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml
index 1215f11..b15846f 100644
--- a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml
@@ -19,11 +19,11 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-morphlines-bundle</artifactId>
- <version>1.5.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-morphlines-nar</artifactId>
- <version>1.5.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
@@ -34,7 +34,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-morphlines-processors</artifactId>
- <version>1.5.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</dependency>
</dependencies>
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/src/main/resources/META-INF/LICENSE
index 7256c19..04afb64 100644
--- a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/src/main/resources/META-INF/LICENSE
+++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/src/main/resources/META-INF/LICENSE
@@ -205,114 +205,27 @@ APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
-licenses.
-
-The binary distribution of this product bundles 'SUAsync Library' which is
-available under a 3-Clause BSD License.
-
- Copyright (c) 2010 StumbleUpon, Inc. All rights reserved.
-
- Redistribution and use in source and binary forms, with or without
- modification, are permitted provided that the following conditions are met:
- - Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
- - Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
- - Neither the name of the StumbleUpon nor the names of its contributors
- may be used to endorse or promote products derived from this software
- without specific prior written permission.
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- POSSIBILITY OF SUCH DAMAGE.
-
-The binary distribution of this product bundles 'Asynchronous HBase Client'
-which is available under a 3-Clause BSD License.
-
- Copyright (C) 2010-2012 The Async HBase Authors. All rights reserved.
-
- Redistribution and use in source and binary forms, with or without
- modification, are permitted provided that the following conditions are met:
- - Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
- - Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
- - Neither the name of the StumbleUpon nor the names of its contributors
- may be used to endorse or promote products derived from this software
- without specific prior written permission.
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- POSSIBILITY OF SUCH DAMAGE.
-
-The binary distribution of this product bundles 'JOpt Simple' which is
-available under the MIT license.
-
- The MIT License
-
- Copyright (c) 2004-2011 Paul R. Holser, Jr.
-
- Permission is hereby granted, free of charge, to any person obtaining
- a copy of this software and associated documentation files (the
- "Software"), to deal in the Software without restriction, including
- without limitation the rights to use, copy, modify, merge, publish,
- distribute, sublicense, and/or sell copies of the Software, and to
- permit persons to whom the Software is furnished to do so, subject to
- the following conditions:
-
- The above copyright notice and this permission notice shall be
- included in all copies or substantial portions of the Software.
-
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
- MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
- LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
- OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
- WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-The binary distribution of this product bundles 'Scala Library' under a BSD
-style license.
-
- Copyright (c) 2002-2015 EPFL
- Copyright (c) 2011-2015 Typesafe, Inc.
-
- All rights reserved.
-
- Redistribution and use in source and binary forms, with or without modification,
- are permitted provided that the following conditions are met:
-
- Redistributions of source code must retain the above copyright notice, this list of
- conditions and the following disclaimer.
-
- Redistributions in binary form must reproduce the above copyright notice, this list of
- conditions and the following disclaimer in the documentation and/or other materials
- provided with the distribution.
-
- Neither the name of the EPFL nor the names of its contributors may be used to endorse
- or promote products derived from this software without specific prior written permission.
-
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS” AND ANY EXPRESS
- OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
- AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
- IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
- OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+licenses.
+
+ The binary distribution of this product bundles 'Bouncy Castle JDK 1.5'
+ under an MIT style license.
+
+ Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org)
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ THE SOFTWARE.
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/src/main/resources/META-INF/NOTICE
index cae5ab0..dc31398 100644
--- a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/src/main/resources/META-INF/NOTICE
@@ -21,6 +21,82 @@ The following binary components are provided under the Apache Software License v
This product includes software developed by
Saxonica (http://www.saxonica.com/).
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2016 The Apache Software Foundation
+
+ This product includes software from the Spring Framework,
+ under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+ (ASLv2) Apache Commons IO
+ The following NOTICE information applies:
+ Apache Commons IO
+ Copyright 2002-2016 The Apache Software Foundation
+ (ASLv2) Apache Commons Codec
+ The following NOTICE information applies:
+ Apache Commons Codec
+ Copyright 2002-2014 The Apache Software Foundation
+
+ src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+ contains test data from http://aspell.net/test/orig/batch0.tab.
+ Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+ ===============================================================================
+
+ The content of package org.apache.commons.codec.language.bm has been translated
+ from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+ with permission from the original authors.
+ Original source copyright:
+ Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+ (ASLv2) Guava
+ The following NOTICE information applies:
+ Guava
+ Copyright 2015 The Guava Authors
+
+ (ASLv2) Apache Commons Configuration
+ The following NOTICE information applies:
+ Apache Commons Configuration
+ Copyright 2001-2008 The Apache Software Foundation
+
+ (ASLv2) Jackson JSON processor
+ The following NOTICE information applies:
+ # Jackson JSON processor
+
+ Jackson is a high-performance, Free/Open Source JSON processing library.
+ It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+ been in development since 2007.
+ It is currently developed by a community of developers, as well as supported
+ commercially by FasterXML.com.
+
+ ## Licensing
+
+ Jackson core and extension components may licensed under different licenses.
+ To find the details that apply to this artifact see the accompanying LICENSE file.
+ For more information, including possible other licensing options, contact
+ FasterXML.com (http://fasterxml.com).
+
+ ## Credits
+
+ A list of contributors may be found from CREDITS file, which is included
+ in some artifacts (usually source distributions); but is always available
+ from the source code management (SCM) system project uses.
+
+ (ASLv2) Yammer Metrics
+ The following NOTICE information applies:
+ Metrics
+ Copyright 2010-2012 Coda Hale and Yammer, Inc.
+
+ This product includes software developed by Coda Hale and Yammer, Inc.
+
+ This product includes code derived from the JSR-166 project (ThreadLocalRandom), which was released
+ with the following comments:
+
+ Written by Doug Lea with assistance from members of JCP JSR-166
+ Expert Group and released to the public domain, as explained at
+ http://creativecommons.org/publicdomain/zero/1.0/
+
This product optionally depends on 'SLF4J', a simple logging facade for Java,
which can be obtained at:
@@ -36,4 +112,3 @@ The following binary components are provided under the Apache Software License v
* http://junit.sourceforge.net/cpl-v10.html (Common Public License v. 1.0)
* HOMEPAGE:
* http://www.junit.org/
-
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml
index 07f0a15..7559cec 100644
--- a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-morphlines-bundle</artifactId>
- <version>1.5.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-morphlines-processors</artifactId>
@@ -29,7 +29,6 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
- <version>${nifi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@@ -38,7 +37,6 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
- <version>${nifi.version}</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ImplementMorphlines.java b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ExecuteMorphline.java
similarity index 94%
rename from nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ImplementMorphlines.java
rename to nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ExecuteMorphline.java
index 56bf66f..fba2dd0 100644
--- a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ImplementMorphlines.java
+++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ExecuteMorphline.java
@@ -23,14 +23,13 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
@@ -56,12 +55,13 @@ import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;
@Tags({"kitesdk", "morphlines", "ETL", "HDFS", "avro", "Solr", "HBase"})
-@CapabilityDescription("Implements Morphlines (http://kitesdk.org/docs/1.1.0/morphlines/) framework, which performs in-memory container of transformation "
+@CapabilityDescription("Executes Morphlines (http://kitesdk.org/docs/1.1.0/morphlines/) framework, which performs in-memory container of transformation "
+ "commands in oder to perform tasks such as loading, parsing, transforming, or otherwise processing a single record.")
@DynamicProperty(name = "Relationship Name", value = "A Regular Expression", supportsExpressionLanguage = true, description = "Adds the dynamic property key and value "
+ "as key-value pair to Morphlines content.")
+@Restricted("Provides operator the ability to read/write to any file that NiFi has access to.")
-public class ImplementMorphlines extends AbstractProcessor {
+public class ExecuteMorphline extends AbstractProcessor {
public static final PropertyDescriptor MORPHLINES_ID = new PropertyDescriptor
.Builder().name("Morphlines ID")
.description("Identifier of the morphlines context")
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 65b2511..0e37ddd 100644
--- a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -1 +1 @@
-org.apache.nifi.processors.morphlines.ImplementMorphlines
+org.apache.nifi.processors.morphlines.ExecuteMorphline
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/java/org/apache/nifi/processors/morphlines/TestExecuteMorphline.java b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/java/org/apache/nifi/processors/morphlines/TestExecuteMorphline.java
new file mode 100644
index 0000000..f855078
--- /dev/null
+++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/java/org/apache/nifi/processors/morphlines/TestExecuteMorphline.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.morphlines;
+
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static junit.framework.TestCase.assertEquals;
+
+public class TestExecuteMorphline {
+
+ private TestRunner testRunner;
+
+ @Before
+ public void init() {
+ testRunner = TestRunners.newTestRunner(ExecuteMorphline.class);
+ URL file = ExecuteMorphlineTest.class.getClassLoader().getResource("morphlines.conf");
+ testRunner.setProperty(ExecuteMorphline.MORPHLINES_FILE, file.getPath());
+ testRunner.setProperty(ExecuteMorphline.MORPHLINES_ID, "test");
+ }
+
+ @Test
+ public void testProcessorSuccess() throws IOException {
+ try (
+ InputStream inputStream = getClass().getResourceAsStream("/good_record.txt")
+ ) {
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "good_record.txt");
+ testRunner.enqueue(inputStream, attributes);
+ testRunner.run();
+ }
+ List<MockFlowFile> result = testRunner.getFlowFilesForRelationship(ExecuteMorphline.REL_SUCCESS);
+
+ assertEquals(1, result.size());
+ }
+
+ @Test
+ public void testProcessorFail() throws IOException {
+ try (
+ InputStream inputStream = getClass().getResourceAsStream("/bad_record.txt")
+ ) {
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "bad_record.txt");
+ testRunner.enqueue(inputStream, attributes);
+ testRunner.run();
+ }
+ List<MockFlowFile> result = testRunner.getFlowFilesForRelationship(ExecuteMorphline.REL_FAILURE);
+
+ assertEquals(1, result.size());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/bad_record.txt b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/bad_record.txt
new file mode 100644
index 0000000..5a24202
--- /dev/null
+++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/bad_record.txt
@@ -0,0 +1 @@
+2018-02-02 12:30:98 syslog sshd[607]: listening on 0.0.0.0 port 22
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/good_record.txt b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/good_record.txt
new file mode 100644
index 0000000..f0d72e7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/good_record.txt
@@ -0,0 +1 @@
+Feb 4 10:46:14 syslog sshd[607]: listening on 0.0.0.0 port 22
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/grok-dictionaries/grok-patterns b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/grok-dictionaries/grok-patterns
new file mode 100644
index 0000000..599d3be
--- /dev/null
+++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/grok-dictionaries/grok-patterns
@@ -0,0 +1,96 @@
+USERNAME [a-zA-Z0-9._-]+
+USER %{USERNAME}
+INT (?:[+-]?(?:[0-9]+))
+BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
+NUMBER (?:%{BASE10NUM})
+BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
+BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b
+
+POSINT \b(?:[1-9][0-9]*)\b
+NONNEGINT \b(?:[0-9]+)\b
+WORD \b\w+\b
+NOTSPACE \S+
+SPACE \s*
+DATA .*?
+GREEDYDATA .*
+#QUOTEDSTRING (?:(?<!\\)(?:"(?:\\.|[^\\"])*"|(?:'(?:\\.|[^\\'])*')|(?:`(?:\\.|[^\\`])*`)))
+QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
+UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}
+
+# Networking
+MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
+CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
+WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
+COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
+IP (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])
+HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
+HOST %{HOSTNAME}
+IPORHOST (?:%{HOSTNAME}|%{IP})
+#HOSTPORT (?:%{IPORHOST=~/\./}:%{POSINT}) # WH
+
+# paths
+PATH (?:%{UNIXPATH}|%{WINPATH})
+UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+
+#UNIXPATH (?<![\w\/])(?:/[^\/\s?*]*)+
+TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
+WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
+URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?
+URIHOST %{IPORHOST}(?::%{POSINT:port})?
+# uripath comes loosely from RFC1738, but mostly from what Firefox
+# doesn't turn into %XX
+URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=#%_\-]*)+
+#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
+URIPARAM \?[A-Za-z0-9$.+!*'|(){},~#%&/=:;_?\-\[\]]*
+URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
+URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?
+
+# Months: January, Feb, 3, 03, 12, December
+MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b
+MONTHNUM (?:0?[1-9]|1[0-2])
+MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
+
+# Days: Monday, Tue, Thu, etc...
+DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
+
+# Years?
+YEAR (?>\d\d){1,2}
+# Time: HH:MM:SS
+#TIME \d{2}:\d{2}(?::\d{2}(?:\.\d+)?)?
+# I'm still on the fence about using grok to perform the time match,
+# since it's probably slower.
+# TIME %{POSINT<24}:%{POSINT<60}(?::%{POSINT<60}(?:\.%{POSINT})?)?
+HOUR (?:2[0123]|[01]?[0-9])
+MINUTE (?:[0-5][0-9])
+# '60' is a leap second in most time standards and thus is valid.
+SECOND (?:(?:[0-5][0-9]|60)(?:[:.,][0-9]+)?)
+TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
+# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
+DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
+DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
+ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE})?)
+ISO8601_SECOND (?:%{SECOND}|60)
+TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
+DATE %{DATE_US}|%{DATE_EU}
+DATESTAMP %{DATE}[- ]%{TIME}
+TZ (?:[PMCE][SD]T)
+DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
+DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
+
+# Syslog Dates: Month Day HH:MM:SS
+SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
+PROG (?:[\w._/%-]+)
+SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
+SYSLOGHOST %{IPORHOST}
+SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
+HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
+
+# Shortcuts
+QS %{QUOTEDSTRING}
+
+# Log formats
+SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
+COMBINEDAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}
+
+# Log Levels
+LOGLEVEL ([T|t]race|TRACE|[D|d]ebug|DEBUG|[N|n]otice|NOTICE|[I|i]nfo|INFO|[W|w]arn?(?:ing)?|WARN?(?:ING)?|[E|e]rr?(?:or)?|ERR?(?:OR)?|[C|c]rit?(?:ical)?|CRIT?(?:ICAL)?|[F|f]atal|FATAL|[S|s]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
+
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/morphlines.conf b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/morphlines.conf
new file mode 100644
index 0000000..8516b0d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/morphlines.conf
@@ -0,0 +1,69 @@
+morphlines : [
+ {
+ id : test
+ importCommands : [
+ "org.kitesdk.morphline.stdio.ReadLineBuilder",
+ "org.kitesdk.morphline.stdlib.TryRulesBuilder",
+ "org.kitesdk.morphline.stdlib.GrokBuilder",
+ "org.kitesdk.morphline.stdlib.SplitBuilder",
+ "org.kitesdk.morphline.stdlib.ConvertTimestampBuilder",
+ "org.kitesdk.morphline.stdlib.SetValuesBuilder",
+ ]
+
+ commands : [
+ {
+ readLine {
+ charset : UTF-8
+ }
+ }
+
+ {
+ tryRules {
+ catchExceptions : true
+ throwExceptionIfAllRulesFailed : true
+ rules : [
+ {
+ commands : [
+ {
+ grok {
+ dictionaryFiles : [grok-dictionaries/grok-patterns]
+ expressions : {
+ message : """%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"""
+ }
+ }
+ }
+
+ {
+ convertTimestamp {
+ field : syslog_timestamp
+ inputFormats : ["MMM d HH:mm:ss"]
+ outputFormat : "unixTimeInMillis"
+ }
+ }
+
+ {
+ setValues {
+ key : "success"
+ value : "@{syslog_timestamp},@{syslog_hostname},@{syslog_program},@{syslog_message}"
+ }
+ }
+ ]
+ }
+
+ # Handle exception
+ {
+ commands : [
+ {
+ setValues {
+ key : "exception"
+ value : "@{message}"
+ }
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+ }
+]
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml b/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml
index 4f8149d..88b3297 100644
--- a/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml
@@ -19,12 +19,12 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
- <version>1.5.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-morphlines-bundle</artifactId>
- <version>1.5.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
@@ -34,8 +34,5 @@
<properties>
<kite.version>1.1.0</kite.version>
- <nifi.version>1.5.0-SNAPSHOT</nifi.version>
- <skipNexusStaginDeployMojo>true</skipNexusStaginDeployMojo>
</properties>
-
</project>
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
old mode 100755
new mode 100644
index 6aef817..0966a77
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
@@ -34,6 +34,7 @@
<module>nifi-update-attribute-bundle</module>
<module>nifi-kafka-bundle</module>
<module>nifi-kite-bundle</module>
+ <module>nifi-kudu-bundle</module>
<module>nifi-solr-bundle</module>
<module>nifi-confluent-platform-bundle</module>
<module>nifi-aws-bundle</module>
@@ -74,6 +75,7 @@
<module>nifi-ignite-bundle</module>
<module>nifi-rethinkdb-bundle</module>
<module>nifi-email-bundle</module>
+ <module>nifi-groovyx-bundle</module>
<module>nifi-ranger-bundle</module>
<module>nifi-websocket-bundle</module>
<module>nifi-tcp-bundle</module>
@@ -85,8 +87,12 @@
<module>nifi-cybersecurity-bundle</module>
<module>nifi-parquet-bundle</module>
<module>nifi-extension-utils</module>
- <module>nifi-grpc-bundle</module>
<module>nifi-redis-bundle</module>
+ <module>nifi-metrics-reporting-bundle</module>
+ <module>nifi-spark-bundle</module>
+ <module>nifi-atlas-bundle</module>
+ <module>nifi-druid-bundle</module>
+ <module>nifi-morphlines-bundle</module>
</modules>
<build>
@@ -132,6 +138,18 @@
<buildBranch />
</properties>
</profile>
+ <profile>
+ <id>include-grpc</id>
+ <!-- This profile handles the inclusion of grpc artifacts. They are notoriously
+ environment specific in terms of build such as not working in os/arch=ppc64le
+ or in Centos6 due to requiring a newer version of GLIBC. -->
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <modules>
+ <module>nifi-grpc-bundle</module>
+ </modules>
+ </profile>
</profiles>
<dependencyManagement>
@@ -139,105 +157,117 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-load-distribution-service-api</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map-api</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-lookup-service-api</artifactId>
+ <version>1.6.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mongodb-client-service-api</artifactId>
+ <version>1.6.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-protocol</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-server</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-volatile-provenance-repository</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<!-- The following dependencies are marked provided because they must be provided by the container. Nars can assume they are there-->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-api</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-runtime</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-utils</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.6.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
</dependencies>
|
@joewitt thanks for the feedback! That was quite a bit of changes to make on my side, but hopefully the latest commit captures everything. I will need to work on the rebase a bit later though. |
|
@WilliamNouet there is already a |
|
|
||
| FlowFile originalFlowFile = session.clone(flowFile); | ||
| final AtomicLong written = new AtomicLong(0L); | ||
| final byte[] value = new byte[(int) flowFile.getSize()]; |
There was a problem hiding this comment.
It depends on your definition of "big" files. I believe that is not in the user interest to have big files (> 1 GB) being processed through the data flow as most processors under perform with this kind of files. So I do not think it is an issue here; I have seen the processor getting tremendous results on files up to 0.5 GB.
| Notifications.notifyShutdown(morphline); | ||
| } | ||
|
|
||
| private static final class Collector implements Command { |
There was a problem hiding this comment.
The same as above, this implementation is not going to work with big input file
binhnv
left a comment
There was a problem hiding this comment.
The current implementation is not going to work well with big input file.
|
@binhnv I could move it to the nifi-kite-bundle if needed, but even if they both leverages the kite dependencies, they have different outcomes: one regards limited data conversions (avro, json, csv) and storage, whereas the other one offer way more flexibly in terms of data transformations (please refer to the Morphlines doc), and as such should be a subset of its own as it could be developed further in the future. As such, it makes sense for me to keep it separate. |
diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml index b15846f..61c05b5 100644 --- a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml +++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml @@ -19,11 +19,11 @@ <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-morphlines-bundle</artifactId> - <version>1.6.0-SNAPSHOT</version> + <version>1.7.0-SNAPSHOT</version> </parent> <artifactId>nifi-morphlines-nar</artifactId> - <version>1.6.0-SNAPSHOT</version> + <version>1.7.0-SNAPSHOT</version> <packaging>nar</packaging> <properties> <maven.javadoc.skip>true</maven.javadoc.skip> @@ -34,7 +34,7 @@ <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-morphlines-processors</artifactId> - <version>1.6.0-SNAPSHOT</version> + <version>1.7.0-SNAPSHOT</version> </dependency> </dependencies> diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml index 7559cec..65182a8 100644 --- a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml +++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-morphlines-bundle</artifactId> - <version>1.6.0-SNAPSHOT</version> + <version>1.7.0-SNAPSHOT</version> </parent> <artifactId>nifi-morphlines-processors</artifactId> diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ExecuteMorphline.java b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ExecuteMorphline.java index fba2dd0..917880d 100644 --- a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ExecuteMorphline.java +++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ExecuteMorphline.java @@ -110,6 +110,7 @@ public class ExecuteMorphline extends AbstractProcessor { private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.<Relationship>builder() .add(REL_SUCCESS) .add(REL_FAILURE) + .add(REL_ORIGINAL) .build(); public PropertyValue morphlinesFileProperty; @@ -187,7 +188,7 @@ public class ExecuteMorphline extends AbstractProcessor { for (Iterator<Record> it = results.iterator(); it.hasNext();) { Record result = it.next(); if (result.getFirstValue(morphlinesOutputField) != null) { - String outputValue = it.next().getFirstValue(morphlinesOutputField).toString() + "/n"; + String outputValue = result.getFirstValue(morphlinesOutputField).toString() + "\n"; out.write(outputValue.getBytes()); written.incrementAndGet(); } else { @@ -205,7 +206,7 @@ public class ExecuteMorphline extends AbstractProcessor { session.transfer(flowFile, REL_SUCCESS); session.transfer(originalFlowFile, REL_ORIGINAL); } else { - getLogger().warn(String.format("Morphlines transformations did not march any of the input records for %s Morphlines ID", morphlinesId)); + getLogger().warn(String.format("Morphlines transformations did not match any of the input records for %s Morphlines ID", morphlinesId)); session.transfer(flowFile, REL_ORIGINAL); session.transfer(originalFlowFile, REL_FAILURE); } diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/java/org/apache/nifi/processors/morphlines/TestExecuteMorphline.java b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/java/org/apache/nifi/processors/morphlines/TestExecuteMorphline.java index f855078..0d76efb 100644 --- a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/java/org/apache/nifi/processors/morphlines/TestExecuteMorphline.java +++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/java/org/apache/nifi/processors/morphlines/TestExecuteMorphline.java @@ -39,21 +39,26 @@ public class TestExecuteMorphline { @before public void init() { testRunner = TestRunners.newTestRunner(ExecuteMorphline.class); - URL file = ExecuteMorphlineTest.class.getClassLoader().getResource("morphlines.conf"); + testRunner.setValidateExpressionUsage(false); + URL file = TestExecuteMorphline.class.getClassLoader().getResource("morphlines.conf"); testRunner.setProperty(ExecuteMorphline.MORPHLINES_FILE, file.getPath()); testRunner.setProperty(ExecuteMorphline.MORPHLINES_ID, "test"); + // Try to get one of the fields which was parsed; syslog_timestamp in our case + testRunner.setProperty(ExecuteMorphline.MORPHLINES_OUTPUT_FIELD, "syslog_timestamp"); } @test public void testProcessorSuccess() throws IOException { - try ( - InputStream inputStream = getClass().getResourceAsStream("/good_record.txt") - ) { - Map<String, String> attributes = new HashMap<>(); - attributes.put(CoreAttributes.FILENAME.key(), "good_record.txt"); - testRunner.enqueue(inputStream, attributes); - testRunner.run(); + InputStream inputStream = null; + try { + inputStream = getClass().getResourceAsStream("/good_record.txt"); + } catch(Exception e) { + System.out.println("ERROR: Good record file does not exist"); } + Map<String, String> attributes = new HashMap<String, String>(); + attributes.put(CoreAttributes.FILENAME.key(), "good_record.txt"); + testRunner.enqueue(inputStream, attributes); + testRunner.run(); List<MockFlowFile> result = testRunner.getFlowFilesForRelationship(ExecuteMorphline.REL_SUCCESS); assertEquals(1, result.size()); @@ -61,14 +66,16 @@ public class TestExecuteMorphline { @test public void testProcessorFail() throws IOException { - try ( - InputStream inputStream = getClass().getResourceAsStream("/bad_record.txt") - ) { - Map<String, String> attributes = new HashMap<>(); - attributes.put(CoreAttributes.FILENAME.key(), "bad_record.txt"); - testRunner.enqueue(inputStream, attributes); - testRunner.run(); + InputStream inputStream = null; + try { + inputStream = getClass().getResourceAsStream("/bad_record.txt"); + } catch(Exception e) { + System.out.println("ERROR: Bad record file does not exist"); } + Map<String, String> attributes = new HashMap<String, String>(); + attributes.put(CoreAttributes.FILENAME.key(), "bad_record.txt"); + testRunner.enqueue(inputStream, attributes); + testRunner.run(); List<MockFlowFile> result = testRunner.getFlowFilesForRelationship(ExecuteMorphline.REL_FAILURE); assertEquals(1, result.size()); diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/morphlines.conf b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/morphlines.conf index 8516b0d..d6a6db3 100644 --- a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/morphlines.conf +++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/test/resources/morphlines.conf @@ -26,7 +26,21 @@ morphlines : [ commands : [ { grok { - dictionaryFiles : [grok-dictionaries/grok-patterns] + dictionaryString : """ + MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b + MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) + TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9]) + HOUR (?:2[0123]|[01]?[0-9]) + MINUTE (?:[0-5][0-9]) + SECOND (?:(?:[0-5][0-9]|60)(?:[:.,][0-9]+)?) + HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b) + IP (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9]) + DATA .*? + GREEDYDATA .* + SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME} + SYSLOGHOST (?:%{HOSTNAME}|%{IP}) + POSINT \b(?:[1-9][0-9]*)\b + """ expressions : { message : """%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}""" } diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml b/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml index 88b3297..2dca1cb 100644 --- a/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml @@ -19,12 +19,12 @@ <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nar-bundles</artifactId> - <version>1.6.0-SNAPSHOT</version> + <version>1.7.0-SNAPSHOT</version> </parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-morphlines-bundle</artifactId> - <version>1.6.0-SNAPSHOT</version> + <version>1.7.0-SNAPSHOT</version> <packaging>pom</packaging> <modules>
|
@WilliamNouet I did some searching, and it appears that Morphline is abandoned. The last update to the Kitesdk morphline package is in 2015. I'm going to close this out because we can't accept submissions that are based on abandoned core dependencies. If I missed something and Morphline is still active, feel free to reopen. Regardless, thanks for the contribution. It was a solid effort. |
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
[Y] Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
[Y] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
[Y] Has your PR been rebased against the latest commit within the target branch (typically master)?
[Y] Is your initial contribution a single, squashed commit?
For code changes:
[Y] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
[Y] Have you written or updated unit tests to verify your changes?
[N/A] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
[N/A] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
[N/A] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
[N/A] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
For documentation related changes:
[N/A] Have you ensured that format looks appropriate for the output in which it is rendered?
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.