From f7d494cf968970448f7789d23e2b820a382a0edd Mon Sep 17 00:00:00 2001 From: rtempleton Date: Thu, 9 Jun 2016 10:16:49 -0500 Subject: [PATCH 1/3] PutHBaseJSON processor treats all values as Strings The operator will now inspect the node value to determine type and convert as such. Numeric integral - Long (assumes widest type) Numeric not integral - Double (assumes widest type) Logical - Boolean everything else (including current Complex Type logic) - String Values that represent the row key continue to be implictly treated as Strings by the processor --- .../nifi-hbase-processors/pom.xml | 10 ++ .../org/apache/nifi/hbase/PutHBaseJSON.java | 32 ++++++- .../org/apache/nifi/hbase/HBaseTestUtil.java | 19 ++-- .../apache/nifi/hbase/TestPutHBaseJSON.java | 94 +++++++++++++------ 4 files changed, 110 insertions(+), 45 deletions(-) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml index ed24a36e2649..a8e94542221c 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml @@ -30,6 +30,16 @@ 1.0.0-SNAPSHOT provided + + org.apache.hbase + hbase-client + + + org.slf4j + slf4j-log4j12 + + + org.apache.nifi nifi-distributed-cache-client-service-api diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java index 0dba7eecd42e..3e4a4c615077 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java @@ -18,6 +18,7 @@ import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -175,13 +176,13 @@ public void process(final InputStream in) throws IOException { final Iterator fieldNames = rootNode.getFieldNames(); while (fieldNames.hasNext()) { final String fieldName = fieldNames.next(); - final ObjectHolder fieldValueHolder = new ObjectHolder<>(null); + final ObjectHolder fieldValueHolder = new ObjectHolder<>(null); final JsonNode fieldNode = rootNode.get(fieldName); if (fieldNode.isNull()) { getLogger().debug("Skipping {} because value was null", new Object[]{fieldName}); } else if (fieldNode.isValueNode()) { - fieldValueHolder.set(fieldNode.asText()); + fieldValueHolder.set(extractJNodeValue(fieldNode)); } else { // for non-null, non-value nodes, determine what to do based on the handling strategy switch (complexFieldStrategy) { @@ -194,7 +195,7 @@ public void process(final InputStream in) throws IOException { case TEXT_VALUE: // use toString() here because asText() is only guaranteed to be supported on value nodes // some other types of nodes, like ArrayNode, provide toString implementations - fieldValueHolder.set(fieldNode.toString()); + fieldValueHolder.set(Bytes.toBytes(fieldNode.toString())); break; case IGNORE_VALUE: // silently skip @@ -208,9 +209,9 @@ public void process(final InputStream in) throws IOException { // otherwise add a new column where the fieldName and fieldValue are the column qualifier and value if (fieldValueHolder.get() != null) { if (extractRowId && fieldName.equals(rowFieldName)) { - rowIdHolder.set(fieldValueHolder.get()); + rowIdHolder.set(fieldNode.asText()); } else { - columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get().getBytes(StandardCharsets.UTF_8))); + columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get())); } } } @@ -226,5 +227,26 @@ public void process(final InputStream in) throws IOException { final String putRowId = (extractRowId ? rowIdHolder.get() : rowId); return new PutFlowFile(tableName, putRowId, columns, flowFile); } + + /* + *Handles the conversion of the JsonNode value into it correct underlying data type in the form of a byte array as expected by the columns.add function + */ + private byte[] extractJNodeValue(JsonNode n){ + if (n.isBoolean()){ + //boolean + return Bytes.toBytes(n.asBoolean()); + }else if(n.isNumber()){ + if(n.isIntegralNumber()){ + //interpret as Long + return Bytes.toBytes(n.asLong()); + }else{ + //interpret as Double + return Bytes.toBytes(n.asDouble()); + } + }else{ + //if all else fails, interpret as String + return Bytes.toBytes(n.asText()); + } + } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java index fc30f73948f3..f1c6689f0e5f 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java @@ -17,20 +17,20 @@ */ package org.apache.nifi.hbase; -import org.apache.nifi.hbase.put.PutColumn; -import org.apache.nifi.hbase.put.PutFlowFile; -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.provenance.ProvenanceEventType; +import static org.junit.Assert.assertTrue; -import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; import java.util.Map; -import static org.junit.Assert.assertTrue; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; public class HBaseTestUtil { - public static void verifyPut(final String row, final String columnFamily, final Map columns, final List puts) { + public static void verifyPut(final String row, final String columnFamily, final Map columns, final List puts) { boolean foundPut = false; for (final PutFlowFile put : puts) { @@ -45,13 +45,12 @@ public static void verifyPut(final String row, final String columnFamily, final // start off assuming we have all the columns boolean foundAllColumns = true; - for (Map.Entry entry : columns.entrySet()) { + for (Map.Entry entry : columns.entrySet()) { // determine if we have the current expected column boolean foundColumn = false; for (PutColumn putColumn : put.getColumns()) { - final String colVal = new String(putColumn.getBuffer(), StandardCharsets.UTF_8); if (columnFamily.equals(putColumn.getColumnFamily()) && entry.getKey().equals(putColumn.getColumnQualifier()) - && entry.getValue().equals(colVal)) { + && Arrays.equals(entry.getValue(), putColumn.getBuffer())) { foundColumn = true; break; } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java index 7b5991986fc2..d64b1e6e6fa7 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java @@ -16,6 +16,16 @@ */ package org.apache.nifi.hbase; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.util.Bytes; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; @@ -25,15 +35,6 @@ import org.apache.nifi.util.TestRunners; import org.junit.Test; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - public class TestPutHBaseJSON { public static final String DEFAULT_TABLE_NAME = "nifi"; @@ -87,9 +88,42 @@ public void testSingleJsonDocAndProvidedRowId() throws IOException, Initializati final List puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); assertEquals(1, puts.size()); - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "value1"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", Bytes.toBytes("value1")); + expectedColumns.put("field2", Bytes.toBytes("value2")); + HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); + + final List events = runner.getProvenanceEvents(); + assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + assertEquals("hbase://" + DEFAULT_TABLE_NAME + "/" + DEFAULT_ROW, event.getTransitUri()); + } + + @Test + public void testSingleJsonDocAndProvidedRowIdwithNonString() throws IOException, InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + + final String content = "{ \"field1\" : 1.23456, \"field2\" : 2345235, \"field3\" : false }"; + runner.enqueue(content.getBytes("UTF-8")); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + final List puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); + assertEquals(1, puts.size()); + + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", Bytes.toBytes(1.23456d)); + expectedColumns.put("field2", Bytes.toBytes(2345235l)); + expectedColumns.put("field3", Bytes.toBytes(false)); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -120,9 +154,9 @@ public void testSingJsonDocAndExtractedRowId() throws IOException, Initializatio assertEquals(1, puts.size()); // should be a put with row id of myRowId, and rowField shouldn't end up in the columns - final Map expectedColumns1 = new HashMap<>(); - expectedColumns1.put("field1", "value1"); - expectedColumns1.put("field2", "value2"); + final Map expectedColumns1 = new HashMap<>(); + expectedColumns1.put("field1", Bytes.toBytes("value1")); + expectedColumns1.put("field2", Bytes.toBytes("value2")); HBaseTestUtil.verifyPut("myRowId", DEFAULT_COLUMN_FAMILY, expectedColumns1, puts); final List events = runner.getProvenanceEvents(); @@ -200,9 +234,9 @@ public void testELWithProvidedRowId() throws IOException, InitializationExceptio final List puts = hBaseClient.getFlowFilePuts().get("myTable"); assertEquals(1, puts.size()); - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "value1"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", Bytes.toBytes("value1")); + expectedColumns.put("field2", Bytes.toBytes("value2")); HBaseTestUtil.verifyPut("myRowId", "myColFamily", expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -235,8 +269,8 @@ public void testELWithExtractedRowId() throws IOException, InitializationExcepti final List puts = hBaseClient.getFlowFilePuts().get("myTable"); assertEquals(1, puts.size()); - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field2", Bytes.toBytes("value2")); HBaseTestUtil.verifyPut("value1", "myColFamily", expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -264,8 +298,8 @@ public void testNullAndArrayElementsWithWarnStrategy() throws InitializationExce assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field2", Bytes.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -289,8 +323,8 @@ public void testNullAndArrayElementsWithIgnoreStrategy() throws InitializationEx assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field2", Bytes.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -337,9 +371,9 @@ public void testNullAndArrayElementsWithTextStrategy() throws InitializationExce assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "[{\"child_field1\":\"child_value1\"}]"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", Bytes.toBytes("[{\"child_field1\":\"child_value1\"}]")); + expectedColumns.put("field2", Bytes.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -363,9 +397,9 @@ public void testNestedDocWithTextStrategy() throws InitializationException { assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "{\"child_field1\":\"child_value1\"}"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", Bytes.toBytes("{\"child_field1\":\"child_value1\"}")); + expectedColumns.put("field2", Bytes.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } From 8ac95f034619974a25fd741d7943a72fcb8750b8 Mon Sep 17 00:00:00 2001 From: rtempleton Date: Thu, 9 Jun 2016 16:11:26 -0500 Subject: [PATCH 2/3] Removed depenency on HBase utility Bytes class from the PutHBaseJSON processor. Convenience methods to encode to byte array are now wrapped by the appropriate HBaseClientService instance. --- .../nifi-hbase-processors/pom.xml | 10 ---- .../apache/nifi/hbase/AbstractPutHBase.java | 20 ++++--- .../org/apache/nifi/hbase/PutHBaseJSON.java | 54 +++++++++---------- .../nifi/hbase/MockHBaseClientService.java | 26 +++++++++ .../apache/nifi/hbase/TestPutHBaseJSON.java | 33 ++++++------ .../apache/nifi/hbase/HBaseClientService.java | 4 ++ .../nifi/hbase/HBase_1_1_2_ClientService.java | 21 ++++++++ 7 files changed, 107 insertions(+), 61 deletions(-) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml index a8e94542221c..ed24a36e2649 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml @@ -30,16 +30,6 @@ 1.0.0-SNAPSHOT provided - - org.apache.hbase - hbase-client - - - org.slf4j - slf4j-log4j12 - - - org.apache.nifi nifi-distributed-cache-client-service-api diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java index 87424f967f13..f0171e815c94 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java @@ -17,7 +17,14 @@ package org.apache.nifi.hbase; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hbase.put.PutFlowFile; @@ -28,12 +35,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - /** * Base class for processors that put data to HBase. */ @@ -179,5 +180,12 @@ protected String getTransitUri(PutFlowFile putFlowFile) { * @return a PutFlowFile instance for the given FlowFile */ protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile); + + protected HBaseClientService cliSvc; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + cliSvc = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java index 3e4a4c615077..4ab223483aa1 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java @@ -17,8 +17,17 @@ package org.apache.nifi.hbase; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -41,17 +50,6 @@ import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - @EventDriven @SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @@ -195,7 +193,7 @@ public void process(final InputStream in) throws IOException { case TEXT_VALUE: // use toString() here because asText() is only guaranteed to be supported on value nodes // some other types of nodes, like ArrayNode, provide toString implementations - fieldValueHolder.set(Bytes.toBytes(fieldNode.toString())); + fieldValueHolder.set(cliSvc.toBytes(fieldNode.toString())); break; case IGNORE_VALUE: // silently skip @@ -232,21 +230,21 @@ public void process(final InputStream in) throws IOException { *Handles the conversion of the JsonNode value into it correct underlying data type in the form of a byte array as expected by the columns.add function */ private byte[] extractJNodeValue(JsonNode n){ - if (n.isBoolean()){ - //boolean - return Bytes.toBytes(n.asBoolean()); - }else if(n.isNumber()){ - if(n.isIntegralNumber()){ - //interpret as Long - return Bytes.toBytes(n.asLong()); - }else{ - //interpret as Double - return Bytes.toBytes(n.asDouble()); - } - }else{ - //if all else fails, interpret as String - return Bytes.toBytes(n.asText()); - } + if (n.isBoolean()){ + //boolean + return cliSvc.toBytes(n.asBoolean()); + }else if(n.isNumber()){ + if(n.isIntegralNumber()){ + //interpret as Long + return cliSvc.toBytes(n.asLong()); + }else{ + //interpret as Double + return cliSvc.toBytes(n.asDouble()); + } + }else{ + //if all else fails, interpret as String + return cliSvc.toBytes(n.asText()); + } } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index bca8b4f98382..35a96bb09566 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -105,4 +105,30 @@ public Map> getFlowFilePuts() { public void setThrowException(boolean throwException) { this.throwException = throwException; } + + @Override + public byte[] toBytes(final boolean b) { + return new byte[] { b ? (byte) -1 : (byte) 0 }; + } + + @Override + public byte[] toBytes(long l) { + byte [] b = new byte[8]; + for (int i = 7; i > 0; i--) { + b[i] = (byte) l; + l >>>= 8; + } + b[0] = (byte) l; + return b; + } + + @Override + public byte[] toBytes(final double d) { + return toBytes(Double.doubleToRawLongBits(d)); + } + + @Override + public byte[] toBytes(final String s) { + return s.getBytes(StandardCharsets.UTF_8); + } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java index d64b1e6e6fa7..9a3e0ad3c0b7 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; @@ -89,8 +88,8 @@ public void testSingleJsonDocAndProvidedRowId() throws IOException, Initializati assertEquals(1, puts.size()); final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", Bytes.toBytes("value1")); - expectedColumns.put("field2", Bytes.toBytes("value2")); + expectedColumns.put("field1", hBaseClient.toBytes("value1")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -121,9 +120,9 @@ public void testSingleJsonDocAndProvidedRowIdwithNonString() throws IOException, assertEquals(1, puts.size()); final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", Bytes.toBytes(1.23456d)); - expectedColumns.put("field2", Bytes.toBytes(2345235l)); - expectedColumns.put("field3", Bytes.toBytes(false)); + expectedColumns.put("field1", hBaseClient.toBytes(1.23456d)); + expectedColumns.put("field2", hBaseClient.toBytes(2345235l)); + expectedColumns.put("field3", hBaseClient.toBytes(false)); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -155,8 +154,8 @@ public void testSingJsonDocAndExtractedRowId() throws IOException, Initializatio // should be a put with row id of myRowId, and rowField shouldn't end up in the columns final Map expectedColumns1 = new HashMap<>(); - expectedColumns1.put("field1", Bytes.toBytes("value1")); - expectedColumns1.put("field2", Bytes.toBytes("value2")); + expectedColumns1.put("field1", hBaseClient.toBytes("value1")); + expectedColumns1.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut("myRowId", DEFAULT_COLUMN_FAMILY, expectedColumns1, puts); final List events = runner.getProvenanceEvents(); @@ -235,8 +234,8 @@ public void testELWithProvidedRowId() throws IOException, InitializationExceptio assertEquals(1, puts.size()); final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", Bytes.toBytes("value1")); - expectedColumns.put("field2", Bytes.toBytes("value2")); + expectedColumns.put("field1", hBaseClient.toBytes("value1")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut("myRowId", "myColFamily", expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -270,7 +269,7 @@ public void testELWithExtractedRowId() throws IOException, InitializationExcepti assertEquals(1, puts.size()); final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", Bytes.toBytes("value2")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut("value1", "myColFamily", expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -299,7 +298,7 @@ public void testNullAndArrayElementsWithWarnStrategy() throws InitializationExce // should have skipped field1 and field3 final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", Bytes.toBytes("value2")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -324,7 +323,7 @@ public void testNullAndArrayElementsWithIgnoreStrategy() throws InitializationEx // should have skipped field1 and field3 final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", Bytes.toBytes("value2")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -372,8 +371,8 @@ public void testNullAndArrayElementsWithTextStrategy() throws InitializationExce // should have skipped field1 and field3 final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", Bytes.toBytes("[{\"child_field1\":\"child_value1\"}]")); - expectedColumns.put("field2", Bytes.toBytes("value2")); + expectedColumns.put("field1", hBaseClient.toBytes("[{\"child_field1\":\"child_value1\"}]")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -398,8 +397,8 @@ public void testNestedDocWithTextStrategy() throws InitializationException { // should have skipped field1 and field3 final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", Bytes.toBytes("{\"child_field1\":\"child_value1\"}")); - expectedColumns.put("field2", Bytes.toBytes("value2")); + expectedColumns.put("field1", hBaseClient.toBytes("{\"child_field1\":\"child_value1\"}")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index 2f5b6a5e5749..3a65f5dee5c2 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -98,4 +98,8 @@ public interface HBaseClientService extends ControllerService { */ void scan(String tableName, Collection columns, String filterExpression, long minTime, ResultHandler handler) throws IOException; + byte[] toBytes(boolean b); + byte[] toBytes(long l); + byte[] toBytes(double d); + byte[] toBytes(String s); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index 1791cfee0062..e07b728950f9 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.ParseFilter; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -405,4 +406,24 @@ public Configuration getConfiguration() { } } + @Override + public byte[] toBytes(boolean b) { + return Bytes.toBytes(b); + } + + @Override + public byte[] toBytes(long l) { + return Bytes.toBytes(l); + } + + @Override + public byte[] toBytes(double d) { + return Bytes.toBytes(d); + } + + @Override + public byte[] toBytes(String s) { + return Bytes.toBytes(s); + } + } From 2f2e1e584cdb5cd98757dcadeace57bfa6e7c130 Mon Sep 17 00:00:00 2001 From: rtempleton Date: Thu, 9 Jun 2016 19:34:06 -0500 Subject: [PATCH 3/3] remove whitespace --- .../src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java | 4 ++-- .../src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java | 4 ++-- .../src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java index f0171e815c94..50813963aeb8 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java @@ -180,9 +180,9 @@ protected String getTransitUri(PutFlowFile putFlowFile) { * @return a PutFlowFile instance for the given FlowFile */ protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile); - + protected HBaseClientService cliSvc; - + @OnScheduled public void onScheduled(final ProcessContext context) { cliSvc = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java index 4ab223483aa1..3c10e66ec23d 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java @@ -225,9 +225,9 @@ public void process(final InputStream in) throws IOException { final String putRowId = (extractRowId ? rowIdHolder.get() : rowId); return new PutFlowFile(tableName, putRowId, columns, flowFile); } - + /* - *Handles the conversion of the JsonNode value into it correct underlying data type in the form of a byte array as expected by the columns.add function + *Handles the conversion of the JsonNode value into it correct underlying data type in the form of a byte array as expected by the columns.add function */ private byte[] extractJNodeValue(JsonNode n){ if (n.isBoolean()){ diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java index 9a3e0ad3c0b7..92c96cc8f03a 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java @@ -98,7 +98,7 @@ public void testSingleJsonDocAndProvidedRowId() throws IOException, Initializati final ProvenanceEventRecord event = events.get(0); assertEquals("hbase://" + DEFAULT_TABLE_NAME + "/" + DEFAULT_ROW, event.getTransitUri()); } - + @Test public void testSingleJsonDocAndProvidedRowIdwithNonString() throws IOException, InitializationException { final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");