From 12e6fbe4eeef8db4d09bd98bb444a9ea87b6a6af Mon Sep 17 00:00:00 2001 From: cam Date: Tue, 6 Jun 2017 18:25:41 -0700 Subject: [PATCH 1/3] NiFi 3973 Create a new Kudu Processor to ingest data --- nifi-assembly/pom.xml | 5 + .../nifi-kudu-bundle/nifi-kudu-nar/pom.xml | 35 +++++ .../src/main/resources/META-INF/NOTICE | 51 +++++++ .../nifi-kudu-processors/pom.xml | 68 +++++++++ .../nifi/processors/kudu/AbstractKudu.java | 140 ++++++++++++++++++ .../apache/nifi/processors/kudu/PutKudu.java | 130 ++++++++++++++++ .../org.apache.nifi.processor.Processor | 16 ++ .../nifi/processors/kudu/TestPutKudu.java | 132 +++++++++++++++++ nifi-nar-bundles/nifi-kudu-bundle/pom.xml | 43 ++++++ nifi-nar-bundles/pom.xml | 1 + pom.xml | 6 + 11 files changed, 627 insertions(+) create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/pom.xml diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 5c7bff77ded2..7f7453560a88 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -211,6 +211,11 @@ nifi-kite-nar nar + + org.apache.nifi + nifi-kudu-nar + nar + org.apache.nifi nifi-flume-nar diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml new file mode 100644 index 000000000000..aa73976ece7a --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml @@ -0,0 +1,35 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-kudu-bundle + 1.3.0-SNAPSHOT + + + nifi-kudu-nar + nar + + + + org.apache.nifi + nifi-kudu-processors + + + diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 000000000000..34f44672cd9f --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,51 @@ +nifi-kudu-nar +Copyright 2014-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Avro + The following NOTICE information applies: + Apache Avro + Copyright 2009-2017 The Apache Software Foundation + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Commons JEXL + The following NOTICE information applies: + Apache Commons JEXL + Copyright 2001-2011 The Apache Software Foundation + + (ASLv2) Kudu SDK + The following NOTICE information applies: + This product includes software developed by Cloudera, Inc. + (http://www.cloudera.com/). + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + This product includes software developed by + Saxonica (http://www.saxonica.com/). + + (ASLv2) Parquet MR + The following NOTICE information applies: + Parquet MR + Copyright 2012 Twitter, Inc. + + This project includes code from https://github.com/lemire/JavaFastPFOR + parquet-column/src/main/java/parquet/column/values/bitpacking/LemireBitPacking.java + Apache License Version 2.0 http://www.apache.org/licenses/. + (c) Daniel Lemire, http://lemire.me/en/ + diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml new file mode 100644 index 000000000000..69b5b0c46317 --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml @@ -0,0 +1,68 @@ + + + 4.0.0 + + + org.apache.nifi + nifi-kudu-bundle + 1.3.0-SNAPSHOT + + + nifi-kudu-processors + jar + + + + org.apache.nifi + nifi-distributed-cache-client-service-api + 1.3.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + + org.apache.kudu + kudu-client + 1.3.0 + + + + org.apache.nifi + nifi-mock + test + + + org.mockito + mockito-all + 1.10.19 + test + + + com.fasterxml.jackson.core + jackson-core + 2.5.4 + test + + + diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java new file mode 100644 index 000000000000..b061fcc7fa35 --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kudu; + +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.OperationResponse; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.List; + +/** + * Created by Cam Mach - Inspur USA on 5/23/17. + */ +public abstract class AbstractKudu extends AbstractProcessor { + + protected static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder() + .name("KUDU Masters") + .description("List all kudu masters's ip with port (7051), comma separated") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .description("The name of the Kudu Table to put data into") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " + + "grouped by table, and a single Put per table will be performed.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("25") + .build(); + + protected static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu") + .build(); + protected static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if it cannot be sent to Kudu") + .build(); + + protected String kuduMasters; + protected String tableName; + protected int batchSize; + + protected KuduClient kuduClient = null; + protected KuduTable kuduTable = null; + protected KuduSession kuduSession = null; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + try { + tableName = context.getProperty(TABLE_NAME).getValue(); + kuduMasters = context.getProperty(KUDU_MASTERS).getValue(); + batchSize = context.getProperty(BATCH_SIZE).asInteger(); + + getLogger().debug("onScheduled tableName: " + tableName + " - Kudu Master: " + kuduMasters); + + kuduClient = (kuduClient == null) ? getKuduConnection(kuduMasters) : kuduClient; + kuduTable = (kuduTable == null) ? kuduClient.openTable(tableName) : kuduTable; + kuduSession = (kuduSession == null) ? kuduClient.newSession() : kuduSession; + } catch (KuduException ex) { + getLogger().error(ex.getMessage()); + } + } + + @OnStopped + public final void closeClient() throws KuduException { + if (kuduClient != null) { + getLogger().info("Closing KuduClient"); + kuduClient.close(); + kuduClient = null; + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + try { + List flowFiles = session.get(batchSize); + if (flowFiles == null || flowFiles.size() == 0) { + return; + } + + for (final FlowFile flowFile : flowFiles) { + final OperationResponse putFlowFile = createPut(session, context, flowFile); + + if (putFlowFile == null) { + session.transfer(flowFile, REL_FAILURE); + } else if (putFlowFile.hasRowError()) { + getLogger().error("Failed to produce a put for FlowFile {}; routing to failure", new Object[]{flowFile}); + session.transfer(flowFile, REL_FAILURE); + } else { + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().send(flowFile, "no transit url. Successfully added flowfile to kudu"); + } + } + } catch (Exception ex) { + getLogger().error(ex.getMessage()); + } + } + + protected KuduClient getKuduConnection(String masters) { + return new KuduClient.KuduClientBuilder(kuduMasters).build(); + } + + protected abstract OperationResponse createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile); +} diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java new file mode 100644 index 000000000000..6b4cc98d3309 --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kudu; + +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.OperationResponse; +import org.apache.kudu.client.PartialRow; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Created by Cam Mach - Inspur on 5/23/17. + */ + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"database", "NoSQL", "kudu", "inspur"}) +@CapabilityDescription("Ingest data into Kudu table") +public class PutKudu extends AbstractKudu { + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(KUDU_MASTERS); + properties.add(TABLE_NAME); + properties.add(BATCH_SIZE); + return properties; + } + + @Override + public Set getRelationships() { + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + @Override + protected OperationResponse createPut(ProcessSession session, ProcessContext context, FlowFile flowFile) { + try { + Set> attrSet = flowFile.getAttributes().entrySet(); + org.apache.kudu.client.Insert insert = kuduTable.newInsert(); + Schema colSchema = kuduTable.getSchema(); + PartialRow row = insert.getRow(); + + for (Map.Entry entry : attrSet) { + String colName = entry.getKey(); + int colIdx = this.getColumnIndex(colSchema, colName); + //getLogger().info("CreatePut: " + colName + " " + entry.getValue() + " - Found Col: " + colIdx); + if (colIdx != -1) { + Type colType = colSchema.getColumn(colName).getType(); + String val = entry.getValue(); + switch (colType.getDataType()) { + case BOOL: + row.addBoolean(colIdx, Boolean.parseBoolean(val)); + break; + case FLOAT: + row.addFloat(colIdx, Float.parseFloat(val)); + break; + case DOUBLE: + row.addDouble(colIdx, Double.parseDouble(val)); + break; + case BINARY: + row.addBinary(colIdx, val.getBytes()); + break; + case INT8: + case INT16: + row.addShort(colIdx, Short.parseShort(val)); + break; + case INT32: + row.addInt(colIdx, Integer.parseInt(val)); + break; + case INT64: + row.addLong(colIdx, Long.parseLong(val)); + break; + case STRING: + row.addString(colIdx, val); + break; + default: + throw new IllegalStateException(String.format("unknown column type %s", colType)); + } + } + } + + return kuduSession.apply(insert); + } catch (Exception ex) { + getLogger().error("CamMach My Error: " + ex.getMessage() + "::" + ex.getStackTrace()); + } + return null; + } + + private int getColumnIndex(Schema columns, String colName) { + try { + return columns.getColumnIndex(colName); + } catch (Exception ex) { + return -1; + } + } +} diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 000000000000..908723cbe490 --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.kudu.PutKudu + diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java new file mode 100644 index 000000000000..bec85c6210ab --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kudu; + +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.OperationResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class TestPutKudu { + + public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table"; + public static final String DEFAULT_MASTERS = "localhost:7051"; + public static final String DEFAULT_BATCH_SIZE = "25"; + + private TestRunner testRunner; + private MockPutKudu processor; + + @Before + public void setUp() { + processor = new MockPutKudu(); + testRunner = TestRunners.newTestRunner(processor); + + testRunner.setProperty(PutKudu.TABLE_NAME, DEFAULT_TABLE_NAME); + testRunner.setProperty(PutKudu.KUDU_MASTERS, DEFAULT_MASTERS); + testRunner.setProperty(PutKudu.BATCH_SIZE, DEFAULT_BATCH_SIZE); + } + + @After + public void close() { + testRunner = null; + } + + @Test + public void testInsertOne() throws IOException { + final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }"; + byte[] bytes = content.getBytes("UTF-8"); + testRunner.enqueue(bytes); + testRunner.run(1, true, true); + testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS); + + MockFlowFile out = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0); + out.assertContentEquals(bytes); + } + + @Test + public void testInsertMany() throws Exception { + final String content1 = "{ \"field1\" : \"value1\", \"field2\" : \"valu11\" }"; + final String content2 = "{ \"field1\" : \"value1\", \"field2\" : \"value11\" }"; + final String content3 = "{ \"field1\" : \"value3\", \"field2\" : \"value33\" }"; + + testRunner.enqueue(content1.getBytes()); + testRunner.enqueue(content2.getBytes()); + testRunner.enqueue(content3.getBytes()); + + testRunner.run(3); + + testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 3); + List flowFiles = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS); + + flowFiles.get(0).assertContentEquals(content1.getBytes()); + flowFiles.get(1).assertContentEquals(content2.getBytes()); + flowFiles.get(2).assertContentEquals(content3.getBytes()); + } + + @Test + public void testFailMany() throws Exception { + final String content1 = "TestContent1"; + final String content2 = "TestContent2"; + + //for testing failure case only + MockFlowFile mff = new MockFlowFile(1234567890); + + testRunner.enqueue(content1.getBytes()); + testRunner.enqueue(content2.getBytes()); + testRunner.enqueue(mff); + + testRunner.run(3); + + testRunner.assertTransferCount(PutKudu.REL_SUCCESS, 2); + testRunner.assertTransferCount(PutKudu.REL_FAILURE, 1); + + List flowFiles = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS); + + flowFiles.get(0).assertContentEquals(content1.getBytes()); + flowFiles.get(1).assertContentEquals(content2.getBytes()); + } + + private static class MockPutKudu extends PutKudu { + + @Override + protected OperationResponse createPut(ProcessSession session, ProcessContext context, FlowFile flowFile) { + //for testing failure case only + if (flowFile.getId() == 1234567890) + return null; + + return mock(OperationResponse.class); + } + + @Override + protected KuduClient getKuduConnection(String masters) { + return mock(KuduClient.class); + } + } +} diff --git a/nifi-nar-bundles/nifi-kudu-bundle/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml new file mode 100644 index 000000000000..6e470253255a --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml @@ -0,0 +1,43 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 1.3.0-SNAPSHOT + + + nifi-kudu-bundle + pom + + + nifi-kudu-processors + nifi-kudu-nar + + + + + + org.apache.nifi + nifi-kudu-processors + 1.3.0-SNAPSHOT + + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index a2887b8385da..b6d6ff3b33c5 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -34,6 +34,7 @@ nifi-update-attribute-bundle nifi-kafka-bundle nifi-kite-bundle + nifi-kudu-bundle nifi-solr-bundle nifi-aws-bundle nifi-social-media-bundle diff --git a/pom.xml b/pom.xml index 8794abd6e154..23822b1d4192 100644 --- a/pom.xml +++ b/pom.xml @@ -1085,6 +1085,12 @@ 1.3.0-SNAPSHOT nar + + org.apache.nifi + nifi-kudu-nar + 1.3.0-SNAPSHOT + nar + org.apache.nifi nifi-mongodb-nar From 98c67eb33f495c7062f18078ed75b2f99e48bfd3 Mon Sep 17 00:00:00 2001 From: cam Date: Mon, 19 Jun 2017 09:38:42 -0700 Subject: [PATCH 2/3] temp checkin --- .../nifi-kudu-bundle/nifi-kudu-processors/pom.xml | 4 ++++ .../org/apache/nifi/processors/kudu/AbstractKudu.java | 10 ++++++++++ .../java/org/apache/nifi/processors/kudu/PutKudu.java | 1 + 3 files changed, 15 insertions(+) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml index 69b5b0c46317..20e7f7c02a3a 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml @@ -40,6 +40,10 @@ org.apache.nifi nifi-processor-utils + + org.apache.nifi + nifi-record-serialization-service-api + org.apache.kudu diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java index b061fcc7fa35..e9f5107cb0b0 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java @@ -32,6 +32,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +//import org.apache.nifi.serialization.RecordReaderFactory; import java.util.List; @@ -54,6 +55,14 @@ public abstract class AbstractKudu extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + protected static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("The service for reading records from incoming flow files.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() .name("Batch Size") .description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " + @@ -109,6 +118,7 @@ public final void closeClient() throws KuduException { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { try { + final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); List flowFiles = session.get(batchSize); if (flowFiles == null || flowFiles.size() == 0) { return; diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index 6b4cc98d3309..84cac7901dce 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -54,6 +54,7 @@ protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(); properties.add(KUDU_MASTERS); properties.add(TABLE_NAME); + properties.add(RECORD_READER); properties.add(BATCH_SIZE); return properties; } From 458dfb445d5dba53e5a9ad76167c244025fd0598 Mon Sep 17 00:00:00 2001 From: cam Date: Mon, 26 Jun 2017 14:28:17 -0700 Subject: [PATCH 3/3] temp --- .../main/java/org/apache/nifi/processors/kudu/AbstractKudu.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java index e9f5107cb0b0..fa55aafa9ccf 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java @@ -32,7 +32,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -//import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordReaderFactory; import java.util.List;