From a97917c596cd0d63d2f9c193e37b5375768caef9 Mon Sep 17 00:00:00 2001 From: jzonthemtn Date: Tue, 1 Aug 2017 10:25:44 -0400 Subject: [PATCH] NIFI-4248: Adding Rya processor. --- nifi-assembly/pom.xml | 5 + .../nifi-rya-bundle/nifi-rya-nar/pom.xml | 36 +++ .../nifi-rya-processors/pom.xml | 59 +++++ .../apache/nifi/processors/rya/PutRya.java | 211 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 15 ++ .../rya/RyaIngestProcessorTest.java | 68 ++++++ nifi-nar-bundles/nifi-rya-bundle/pom.xml | 39 ++++ nifi-nar-bundles/pom.xml | 1 + pom.xml | 6 + 9 files changed, 440 insertions(+) create mode 100644 nifi-nar-bundles/nifi-rya-bundle/nifi-rya-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/src/main/java/org/apache/nifi/processors/rya/PutRya.java create mode 100644 nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/src/test/java/org/apache/nifi/processors/rya/RyaIngestProcessorTest.java create mode 100644 nifi-nar-bundles/nifi-rya-bundle/pom.xml diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index b875f1c49050..6c8f7c360305 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -436,6 +436,11 @@ nifi-slack-nar nar + + org.apache.nifi + nifi-rya-nar + nar + org.apache.nifi nifi-windows-event-log-nar diff --git a/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-nar/pom.xml b/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-nar/pom.xml new file mode 100644 index 000000000000..876669fe7331 --- /dev/null +++ b/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-nar/pom.xml @@ -0,0 +1,36 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-rya-bundle + 1.4.0-SNAPSHOT + + + nifi-rya-nar + nar + + + + org.apache.nifi + nifi-rya-processor + ${project.version} + + + + diff --git a/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/pom.xml b/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/pom.xml new file mode 100644 index 000000000000..cf056e8d103d --- /dev/null +++ b/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/pom.xml @@ -0,0 +1,59 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-rya-bundle + 1.4.0-SNAPSHOT + + nifi-rya-processor + jar + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + + + com.squareup.okhttp3 + okhttp + + + commons-io + commons-io + + + org.apache.commons + commons-lang3 + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + test + + + diff --git a/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/src/main/java/org/apache/nifi/processors/rya/PutRya.java b/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/src/main/java/org/apache/nifi/processors/rya/PutRya.java new file mode 100644 index 000000000000..7ebc08a22274 --- /dev/null +++ b/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/src/main/java/org/apache/nifi/processors/rya/PutRya.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.rya; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +@Tags({ "rya, ingest, triples" }) +@CapabilityDescription("Provides triples ingesting into Rya.") +@SeeAlso({}) +@ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") }) +@WritesAttributes({ @WritesAttribute(attribute = "", description = "") }) +public class PutRya extends AbstractProcessor { + + public static final PropertyDescriptor RYA_API_ENDPOINT = new PropertyDescriptor.Builder() + .name("Rya API Endpoint") + .defaultValue("http://server:8080/web.rya/loadrdf") + .description("The Rya API endpoint for loading data.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor RYA_TRIPLES_FORMAT = new PropertyDescriptor.Builder() + .name("Triples Format") + .defaultValue("RDF/XML") + .description("The format of the triples.") + .allowableValues("RDF/XML", "N-Triples", "Turtle", "N3", "TriX", "TriG") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success").description("success").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure").description("failure").build(); + + public static final MediaType MEDIA_TYPE_TEXT_PLAIN = MediaType.parse("text/plain"); + + private List descriptors; + private Set relationships; + + private final OkHttpClient client = new OkHttpClient(); + + @Override + protected void init(final ProcessorInitializationContext context) { + + descriptors = new ArrayList(); + descriptors.add(RYA_API_ENDPOINT); + descriptors.add(RYA_TRIPLES_FORMAT); + descriptors = Collections.unmodifiableList(descriptors); + + relationships = new HashSet(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(relationships); + + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + FlowFile flowFile = session.get(); + + if (flowFile == null) { + context.yield(); + return; + } + + final String ryaApiEndpoint = context.getProperty(RYA_API_ENDPOINT).getValue(); + final String ryaTriplesFormat = context.getProperty(RYA_TRIPLES_FORMAT).getValue(); + + if(StringUtils.isBlank(ryaApiEndpoint)) { + getLogger().debug("Rya API endpoint is required but was empty."); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + return; + } + + if(StringUtils.isBlank(ryaTriplesFormat)) { + getLogger().debug("Triples format is required but was empty."); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + return; + } + + try { + + flowFile = session.write(flowFile, new StreamCallback() { + + @Override + public void process(InputStream inputStream, OutputStream outputStream) throws IOException { + + final String input = IOUtils.toString(inputStream, StandardCharsets.UTF_8); + sendToRya(ryaApiEndpoint, ryaTriplesFormat, input); + + } + + }); + + session.transfer(flowFile, REL_SUCCESS); + + } catch (Exception ex) { + + getLogger().error(String.format("Unable to send to Rya host: %s. Exception: %s", RYA_API_ENDPOINT, ex.getMessage()), ex); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + + } + + } + + private void sendToRya(String url, String format, String triples) throws IOException { + + MediaType mediaType = MediaType.parse(getContentType(format)); + + Request request = new Request.Builder() + .url(url + "?format=" + format) + .post(RequestBody.create(mediaType, triples)) + .build(); + + Response response = client.newCall(request).execute(); + + if (!response.isSuccessful()) { + throw new IOException("Unexpected code " + response); + } + + } + + private String getContentType(String format) { + + String contentType = "plain/text"; + + if(StringUtils.equalsIgnoreCase(format, "RDF/XML")) { + contentType = "rdf/xml"; + } else if(StringUtils.equalsIgnoreCase(format, "N-Triples")) { + contentType = "plain/text"; + } else if(StringUtils.equalsIgnoreCase(format, "Turtle")) { + contentType = "application/x-turtle"; + } else if(StringUtils.equalsIgnoreCase(format, "N3")) { + contentType = "text/rdf+n3"; + } else if(StringUtils.equalsIgnoreCase(format, "TriX")) { + contentType = "application/trix"; + } else if(StringUtils.equalsIgnoreCase(format, "TriG")) { + contentType = "application/trig"; + } + + return contentType; + + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 000000000000..360ca725f5c0 --- /dev/null +++ b/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.rya.PutRya \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/src/test/java/org/apache/nifi/processors/rya/RyaIngestProcessorTest.java b/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/src/test/java/org/apache/nifi/processors/rya/RyaIngestProcessorTest.java new file mode 100644 index 000000000000..bd7b160778ac --- /dev/null +++ b/nifi-nar-bundles/nifi-rya-bundle/nifi-rya-processors/src/test/java/org/apache/nifi/processors/rya/RyaIngestProcessorTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.rya; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import org.apache.nifi.processors.rya.PutRya; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +public class RyaIngestProcessorTest { + + private static final String RYA_ENDPOINT = "http://localhost:8080/web.rya/loadrdf"; + + private TestRunner runner; + + @Before + public void init() { + runner = TestRunners.newTestRunner(PutRya.class); + } + + /** + * Note that this test requires Rya to be listening at the defined endpoint. + */ + @Test + @Ignore + public void testOnTrigger() throws IOException { + + final String triple = " ."; + + InputStream content = new ByteArrayInputStream(triple.getBytes(StandardCharsets.UTF_8)); + + runner.setProperty(PutRya.RYA_API_ENDPOINT, RYA_ENDPOINT); + runner.setProperty(PutRya.RYA_TRIPLES_FORMAT, "N-Triples"); + + runner.enqueue(content); + runner.run(1); + runner.assertQueueEmpty(); + + List results = runner.getFlowFilesForRelationship(PutRya.REL_SUCCESS); + Assert.assertTrue("1 ingested", results.size() == 1); + + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-rya-bundle/pom.xml b/nifi-nar-bundles/nifi-rya-bundle/pom.xml new file mode 100644 index 000000000000..9c780d77d2b7 --- /dev/null +++ b/nifi-nar-bundles/nifi-rya-bundle/pom.xml @@ -0,0 +1,39 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 1.4.0-SNAPSHOT + + + org.apache.nifi + nifi-rya-bundle + 1.4.0-SNAPSHOT + pom + + + 2.3.0 + + + + nifi-rya-processors + nifi-rya-nar + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 6aef817e94a0..99dfd43146f0 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -87,6 +87,7 @@ nifi-extension-utils nifi-grpc-bundle nifi-redis-bundle + nifi-rya-bundle diff --git a/pom.xml b/pom.xml index 7c741255ecc0..acfc99f77549 100644 --- a/pom.xml +++ b/pom.xml @@ -1425,6 +1425,12 @@ 1.4.0-SNAPSHOT nar + + org.apache.nifi + nifi-rya-nar + 1.4.0-SNAPSHOT + nar + org.apache.nifi nifi-gcp-nar