From 0ca89d019d1f04fbcb08a8ac6ff2916cf094f114 Mon Sep 17 00:00:00 2001 From: rtempleton Date: Thu, 9 Jun 2016 10:16:49 -0500 Subject: [PATCH 1/2] NIFI-1895 PutHBaseJSON processor treats all values as Strings The operator will now inspect the node value to determine type and convert as such. Numeric integral - Long (assumes widest type) Numeric not integral - Double (assumes widest type) Logical - Boolean everything else (including current Complex Type logic) - String Values that represent the row key continue to be implictly treated as Strings by the processor Removed depenency on HBase utility Bytes class from the PutHBaseJSON processor. Convenience methods to encode to byte array are now wrapped by the appropriate HBaseClientService instance. --- .../apache/nifi/hbase/AbstractPutHBase.java | 20 ++-- .../org/apache/nifi/hbase/PutHBaseJSON.java | 52 +++++++---- .../org/apache/nifi/hbase/HBaseTestUtil.java | 19 ++-- .../nifi/hbase/MockHBaseClientService.java | 26 ++++++ .../apache/nifi/hbase/TestPutHBaseJSON.java | 93 +++++++++++++------ .../apache/nifi/hbase/HBaseClientService.java | 4 + .../nifi/hbase/HBase_1_1_2_ClientService.java | 21 +++++ 7 files changed, 173 insertions(+), 62 deletions(-) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java index 87424f967f13..50813963aeb8 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java @@ -17,7 +17,14 @@ package org.apache.nifi.hbase; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hbase.put.PutFlowFile; @@ -28,12 +35,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - /** * Base class for processors that put data to HBase. */ @@ -180,4 +181,11 @@ protected String getTransitUri(PutFlowFile putFlowFile) { */ protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile); + protected HBaseClientService cliSvc; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + cliSvc = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + } + } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java index 0dba7eecd42e..3c10e66ec23d 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java @@ -17,6 +17,16 @@ package org.apache.nifi.hbase; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -40,17 +50,6 @@ import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - @EventDriven @SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @@ -175,13 +174,13 @@ public void process(final InputStream in) throws IOException { final Iterator fieldNames = rootNode.getFieldNames(); while (fieldNames.hasNext()) { final String fieldName = fieldNames.next(); - final ObjectHolder fieldValueHolder = new ObjectHolder<>(null); + final ObjectHolder fieldValueHolder = new ObjectHolder<>(null); final JsonNode fieldNode = rootNode.get(fieldName); if (fieldNode.isNull()) { getLogger().debug("Skipping {} because value was null", new Object[]{fieldName}); } else if (fieldNode.isValueNode()) { - fieldValueHolder.set(fieldNode.asText()); + fieldValueHolder.set(extractJNodeValue(fieldNode)); } else { // for non-null, non-value nodes, determine what to do based on the handling strategy switch (complexFieldStrategy) { @@ -194,7 +193,7 @@ public void process(final InputStream in) throws IOException { case TEXT_VALUE: // use toString() here because asText() is only guaranteed to be supported on value nodes // some other types of nodes, like ArrayNode, provide toString implementations - fieldValueHolder.set(fieldNode.toString()); + fieldValueHolder.set(cliSvc.toBytes(fieldNode.toString())); break; case IGNORE_VALUE: // silently skip @@ -208,9 +207,9 @@ public void process(final InputStream in) throws IOException { // otherwise add a new column where the fieldName and fieldValue are the column qualifier and value if (fieldValueHolder.get() != null) { if (extractRowId && fieldName.equals(rowFieldName)) { - rowIdHolder.set(fieldValueHolder.get()); + rowIdHolder.set(fieldNode.asText()); } else { - columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get().getBytes(StandardCharsets.UTF_8))); + columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get())); } } } @@ -227,4 +226,25 @@ public void process(final InputStream in) throws IOException { return new PutFlowFile(tableName, putRowId, columns, flowFile); } + /* + *Handles the conversion of the JsonNode value into it correct underlying data type in the form of a byte array as expected by the columns.add function + */ + private byte[] extractJNodeValue(JsonNode n){ + if (n.isBoolean()){ + //boolean + return cliSvc.toBytes(n.asBoolean()); + }else if(n.isNumber()){ + if(n.isIntegralNumber()){ + //interpret as Long + return cliSvc.toBytes(n.asLong()); + }else{ + //interpret as Double + return cliSvc.toBytes(n.asDouble()); + } + }else{ + //if all else fails, interpret as String + return cliSvc.toBytes(n.asText()); + } + } + } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java index fc30f73948f3..f1c6689f0e5f 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java @@ -17,20 +17,20 @@ */ package org.apache.nifi.hbase; -import org.apache.nifi.hbase.put.PutColumn; -import org.apache.nifi.hbase.put.PutFlowFile; -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.provenance.ProvenanceEventType; +import static org.junit.Assert.assertTrue; -import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; import java.util.Map; -import static org.junit.Assert.assertTrue; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; public class HBaseTestUtil { - public static void verifyPut(final String row, final String columnFamily, final Map columns, final List puts) { + public static void verifyPut(final String row, final String columnFamily, final Map columns, final List puts) { boolean foundPut = false; for (final PutFlowFile put : puts) { @@ -45,13 +45,12 @@ public static void verifyPut(final String row, final String columnFamily, final // start off assuming we have all the columns boolean foundAllColumns = true; - for (Map.Entry entry : columns.entrySet()) { + for (Map.Entry entry : columns.entrySet()) { // determine if we have the current expected column boolean foundColumn = false; for (PutColumn putColumn : put.getColumns()) { - final String colVal = new String(putColumn.getBuffer(), StandardCharsets.UTF_8); if (columnFamily.equals(putColumn.getColumnFamily()) && entry.getKey().equals(putColumn.getColumnQualifier()) - && entry.getValue().equals(colVal)) { + && Arrays.equals(entry.getValue(), putColumn.getBuffer())) { foundColumn = true; break; } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index bca8b4f98382..35a96bb09566 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -105,4 +105,30 @@ public Map> getFlowFilePuts() { public void setThrowException(boolean throwException) { this.throwException = throwException; } + + @Override + public byte[] toBytes(final boolean b) { + return new byte[] { b ? (byte) -1 : (byte) 0 }; + } + + @Override + public byte[] toBytes(long l) { + byte [] b = new byte[8]; + for (int i = 7; i > 0; i--) { + b[i] = (byte) l; + l >>>= 8; + } + b[0] = (byte) l; + return b; + } + + @Override + public byte[] toBytes(final double d) { + return toBytes(Double.doubleToRawLongBits(d)); + } + + @Override + public byte[] toBytes(final String s) { + return s.getBytes(StandardCharsets.UTF_8); + } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java index 7b5991986fc2..92c96cc8f03a 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java @@ -16,6 +16,15 @@ */ package org.apache.nifi.hbase; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; @@ -25,15 +34,6 @@ import org.apache.nifi.util.TestRunners; import org.junit.Test; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - public class TestPutHBaseJSON { public static final String DEFAULT_TABLE_NAME = "nifi"; @@ -87,9 +87,42 @@ public void testSingleJsonDocAndProvidedRowId() throws IOException, Initializati final List puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); assertEquals(1, puts.size()); - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "value1"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes("value1")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); + HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); + + final List events = runner.getProvenanceEvents(); + assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + assertEquals("hbase://" + DEFAULT_TABLE_NAME + "/" + DEFAULT_ROW, event.getTransitUri()); + } + + @Test + public void testSingleJsonDocAndProvidedRowIdwithNonString() throws IOException, InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + + final String content = "{ \"field1\" : 1.23456, \"field2\" : 2345235, \"field3\" : false }"; + runner.enqueue(content.getBytes("UTF-8")); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + final List puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); + assertEquals(1, puts.size()); + + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes(1.23456d)); + expectedColumns.put("field2", hBaseClient.toBytes(2345235l)); + expectedColumns.put("field3", hBaseClient.toBytes(false)); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -120,9 +153,9 @@ public void testSingJsonDocAndExtractedRowId() throws IOException, Initializatio assertEquals(1, puts.size()); // should be a put with row id of myRowId, and rowField shouldn't end up in the columns - final Map expectedColumns1 = new HashMap<>(); - expectedColumns1.put("field1", "value1"); - expectedColumns1.put("field2", "value2"); + final Map expectedColumns1 = new HashMap<>(); + expectedColumns1.put("field1", hBaseClient.toBytes("value1")); + expectedColumns1.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut("myRowId", DEFAULT_COLUMN_FAMILY, expectedColumns1, puts); final List events = runner.getProvenanceEvents(); @@ -200,9 +233,9 @@ public void testELWithProvidedRowId() throws IOException, InitializationExceptio final List puts = hBaseClient.getFlowFilePuts().get("myTable"); assertEquals(1, puts.size()); - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "value1"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes("value1")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut("myRowId", "myColFamily", expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -235,8 +268,8 @@ public void testELWithExtractedRowId() throws IOException, InitializationExcepti final List puts = hBaseClient.getFlowFilePuts().get("myTable"); assertEquals(1, puts.size()); - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut("value1", "myColFamily", expectedColumns, puts); final List events = runner.getProvenanceEvents(); @@ -264,8 +297,8 @@ public void testNullAndArrayElementsWithWarnStrategy() throws InitializationExce assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -289,8 +322,8 @@ public void testNullAndArrayElementsWithIgnoreStrategy() throws InitializationEx assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -337,9 +370,9 @@ public void testNullAndArrayElementsWithTextStrategy() throws InitializationExce assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "[{\"child_field1\":\"child_value1\"}]"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes("[{\"child_field1\":\"child_value1\"}]")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } @@ -363,9 +396,9 @@ public void testNestedDocWithTextStrategy() throws InitializationException { assertEquals(1, puts.size()); // should have skipped field1 and field3 - final Map expectedColumns = new HashMap<>(); - expectedColumns.put("field1", "{\"child_field1\":\"child_value1\"}"); - expectedColumns.put("field2", "value2"); + final Map expectedColumns = new HashMap<>(); + expectedColumns.put("field1", hBaseClient.toBytes("{\"child_field1\":\"child_value1\"}")); + expectedColumns.put("field2", hBaseClient.toBytes("value2")); HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index 2f5b6a5e5749..3a65f5dee5c2 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -98,4 +98,8 @@ public interface HBaseClientService extends ControllerService { */ void scan(String tableName, Collection columns, String filterExpression, long minTime, ResultHandler handler) throws IOException; + byte[] toBytes(boolean b); + byte[] toBytes(long l); + byte[] toBytes(double d); + byte[] toBytes(String s); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index 1791cfee0062..e07b728950f9 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.ParseFilter; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -405,4 +406,24 @@ public Configuration getConfiguration() { } } + @Override + public byte[] toBytes(boolean b) { + return Bytes.toBytes(b); + } + + @Override + public byte[] toBytes(long l) { + return Bytes.toBytes(l); + } + + @Override + public byte[] toBytes(double d) { + return Bytes.toBytes(d); + } + + @Override + public byte[] toBytes(String s) { + return Bytes.toBytes(s); + } + } From 04285af71b3aba741b3cbf480f7ea466c84791c3 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Fri, 17 Jun 2016 17:10:40 -0400 Subject: [PATCH 2/2] NIFI-1895 Adding a property to PutHBaseJSON to allow specifying how to store the values --- .../apache/nifi/hbase/AbstractPutHBase.java | 17 ++++---- .../org/apache/nifi/hbase/PutHBaseJSON.java | 41 +++++++++++++++---- .../apache/nifi/hbase/TestPutHBaseJSON.java | 2 + .../apache/nifi/hbase/HBaseClientService.java | 28 +++++++++++++ 4 files changed, 72 insertions(+), 16 deletions(-) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java index 50813963aeb8..05f4b7ebbc48 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java @@ -92,6 +92,13 @@ public abstract class AbstractPutHBase extends AbstractProcessor { .description("A FlowFile is routed to this relationship if it cannot be sent to HBase") .build(); + protected HBaseClientService clientService; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + clientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); @@ -135,11 +142,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final long start = System.nanoTime(); final List successes = new ArrayList<>(); - final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); for (Map.Entry> entry : tablePuts.entrySet()) { try { - hBaseClientService.put(entry.getKey(), entry.getValue()); + clientService.put(entry.getKey(), entry.getValue()); successes.addAll(entry.getValue()); } catch (Exception e) { getLogger().error(e.getMessage(), e); @@ -181,11 +187,4 @@ protected String getTransitUri(PutFlowFile putFlowFile) { */ protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile); - protected HBaseClientService cliSvc; - - @OnScheduled - public void onScheduled(final ProcessContext context) { - cliSvc = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); - } - } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java index 3c10e66ec23d..9a57d6e4460f 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java @@ -89,6 +89,25 @@ public class PutHBaseJSON extends AbstractPutHBase { .defaultValue(COMPLEX_FIELD_TEXT.getValue()) .build(); + protected static final String STRING_ENCODING_VALUE = "String"; + protected static final String BYTES_ENCODING_VALUE = "Bytes"; + + protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE, + "Stores the value of each field as a UTF-8 String."); + protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE, + "Stores the value of each field as the byte representation of the type derived from the JSON."); + + protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder() + .name("Field Encoding Strategy") + .description(("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the " + + "JSON to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from " + + "the JSON, and convert the value to the byte representation of that type, meaning an integer will be stored as the " + + "byte representation of that integer.")) + .required(true) + .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES) + .defaultValue(FIELD_ENCODING_STRING.getValue()) + .build(); + @Override public final List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(); @@ -99,6 +118,7 @@ public final List getSupportedPropertyDescriptors() { properties.add(COLUMN_FAMILY); properties.add(BATCH_SIZE); properties.add(COMPLEX_FIELD_STRATEGY); + properties.add(FIELD_ENCODING_STRATEGY); return properties; } @@ -142,6 +162,7 @@ protected PutFlowFile createPut(final ProcessSession session, final ProcessConte final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); final boolean extractRowId = !StringUtils.isBlank(rowFieldName); final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue(); + final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue(); // Parse the JSON document final ObjectMapper mapper = new ObjectMapper(); @@ -180,7 +201,13 @@ public void process(final InputStream in) throws IOException { if (fieldNode.isNull()) { getLogger().debug("Skipping {} because value was null", new Object[]{fieldName}); } else if (fieldNode.isValueNode()) { - fieldValueHolder.set(extractJNodeValue(fieldNode)); + // for a value node we need to determine if we are storing the bytes of a string, or the bytes of actual types + if (STRING_ENCODING_VALUE.equals(fieldEncodingStrategy)) { + final byte[] valueBytes = clientService.toBytes(fieldNode.asText()); + fieldValueHolder.set(valueBytes); + } else { + fieldValueHolder.set(extractJNodeValue(fieldNode)); + } } else { // for non-null, non-value nodes, determine what to do based on the handling strategy switch (complexFieldStrategy) { @@ -193,7 +220,7 @@ public void process(final InputStream in) throws IOException { case TEXT_VALUE: // use toString() here because asText() is only guaranteed to be supported on value nodes // some other types of nodes, like ArrayNode, provide toString implementations - fieldValueHolder.set(cliSvc.toBytes(fieldNode.toString())); + fieldValueHolder.set(clientService.toBytes(fieldNode.toString())); break; case IGNORE_VALUE: // silently skip @@ -229,21 +256,21 @@ public void process(final InputStream in) throws IOException { /* *Handles the conversion of the JsonNode value into it correct underlying data type in the form of a byte array as expected by the columns.add function */ - private byte[] extractJNodeValue(JsonNode n){ + private byte[] extractJNodeValue(final JsonNode n){ if (n.isBoolean()){ //boolean - return cliSvc.toBytes(n.asBoolean()); + return clientService.toBytes(n.asBoolean()); }else if(n.isNumber()){ if(n.isIntegralNumber()){ //interpret as Long - return cliSvc.toBytes(n.asLong()); + return clientService.toBytes(n.asLong()); }else{ //interpret as Double - return cliSvc.toBytes(n.asDouble()); + return clientService.toBytes(n.asDouble()); } }else{ //if all else fails, interpret as String - return cliSvc.toBytes(n.asText()); + return clientService.toBytes(n.asText()); } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java index 92c96cc8f03a..28d9105d4cf4 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java @@ -102,6 +102,8 @@ public void testSingleJsonDocAndProvidedRowId() throws IOException, Initializati @Test public void testSingleJsonDocAndProvidedRowIdwithNonString() throws IOException, InitializationException { final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + runner.setProperty(PutHBaseJSON.FIELD_ENCODING_STRATEGY, PutHBaseJSON.BYTES_ENCODING_VALUE); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index 3a65f5dee5c2..47f6e2ecb5da 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -98,8 +98,36 @@ public interface HBaseClientService extends ControllerService { */ void scan(String tableName, Collection columns, String filterExpression, long minTime, ResultHandler handler) throws IOException; + /** + * Converts the given boolean to it's byte representation. + * + * @param b a boolean + * @return the boolean represented as bytes + */ byte[] toBytes(boolean b); + + /** + * Converts the given long to it's byte representation. + * + * @param l a long + * @return the long represented as bytes + */ byte[] toBytes(long l); + + /** + * Converts the given double to it's byte representation. + * + * @param d a double + * @return the double represented as bytes + */ byte[] toBytes(double d); + + /** + * Converts the given string to it's byte representation. + * + * @param s a string + * @return the string represented as bytes + */ byte[] toBytes(String s); + }