From 7a9dc565f43284a8535de052a812a252c6950613 Mon Sep 17 00:00:00 2001 From: Ed Date: Wed, 31 Jan 2018 16:20:35 -0500 Subject: [PATCH 1/3] NIFI-4833 Add ScanHBase processor --- .../java/org/apache/nifi/hbase/ScanHBase.java | 564 ++++++++++++++++++ .../nifi/hbase/MockHBaseClientService.java | 24 + .../org/apache/nifi/hbase/TestScanHBase.java | 381 ++++++++++++ .../apache/nifi/hbase/HBaseClientService.java | 18 + .../nifi/hbase/HBase_1_1_2_ClientService.java | 74 ++- .../nifi/hbase/MockHBaseClientService.java | 8 + 6 files changed, 1068 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java create mode 100644 nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java new file mode 100644 index 000000000000..cb6c5756d8b7 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hbase.io.JsonFullRowSerializer; +import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer; +import org.apache.nifi.hbase.io.RowSerializer; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hbase", "scan", "fetch", "get"}) +@CapabilityDescription("Scans and fetches rows from an HBase table. This processor may be used to fetch rows from hbase table by specifying a range of rowkey values (start and/or end )," + + "by time range, by filter expression, or any combination of them. \n" + + "Order of records can be controlled by a property Reversed" + + "Number of rows retrieved by the processor can be limited.") +@WritesAttributes({ + @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the row was fetched from"), + @WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), + @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), + @WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), + @WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions.
Could be null (not present) if transfered to FAILURE") +}) +public class ScanHBase extends AbstractProcessor { + //enhanced regex for columns to allow "-" in column qualifier names + static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); + static final byte[] nl = System.lineSeparator().getBytes(); + + static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder() + .displayName("HBase Client Service") + .name("scanhbase-client-service") + .description("Specifies the Controller Service to use for accessing HBase.") + .required(true) + .identifiesControllerService(HBaseClientService.class) + .build(); + + static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .displayName("Table Name") + .name("scanhbase-table-name") + .description("The name of the HBase Table to fetch from.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor START_ROW = new PropertyDescriptor.Builder() + .displayName("Start rowkey") + .name("scanhbase-start-rowkey") + .description("The rowkey to start scan from.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor END_ROW = new PropertyDescriptor.Builder() + .displayName("End rowkey") + .name("scanhbase-end-rowkey") + .description("The row key to end scan by.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor TIME_RANGE_MIN = new PropertyDescriptor.Builder() + .displayName("Time range min") + .name("scanhbase-time-range-min") + .description("Time range min value. Both min and max values for time range should be either blank or provided.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.LONG_VALIDATOR) + .build(); + + static final PropertyDescriptor TIME_RANGE_MAX = new PropertyDescriptor.Builder() + .displayName("Time range max") + .name("scanhbase-time-range-max") + .description("Time range max value. Both min and max values for time range should be either blank or provided.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.LONG_VALIDATOR) + .build(); + + static final PropertyDescriptor LIMIT_ROWS = new PropertyDescriptor.Builder() + .displayName("Limit rows") + .name("scanhbase-limit") + .description("Limit number of rows retrieved by scan.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor BULK_SIZE = new PropertyDescriptor.Builder() + .displayName("Max rows per flow file") + .name("scanhbase-bulk-size") + .description("Limits number of rows in single flow file content. Set to 0 to avoid multiple flow files.") + .required(false) + .expressionLanguageSupported(true) + .defaultValue("0") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + + static final PropertyDescriptor REVERSED_SCAN = new PropertyDescriptor.Builder() + .displayName("Reversed order") + .name("scanhbase-reversed-order") + .description("Set whether this scan is a reversed one. This is false by default which means forward(normal) scan.") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .required(false) + .defaultValue("false") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor FILTER_EXPRESSION = new PropertyDescriptor.Builder() + .displayName("Filter expression") + .name("scanhbase-filter-expression") + .description("An HBase filter expression that will be applied to the scan. This property can not be used when also using the Columns property.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor COLUMNS = new PropertyDescriptor.Builder() + .displayName("Columns") + .name("scanhbase-columns") + .description("An optional comma-separated list of \":\" pairs to fetch. To return all columns " + + "for a given family, leave off the qualifier such as \",\".") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN)) + .build(); + + static final AllowableValue JSON_FORMAT_FULL_ROW = new AllowableValue("full-row", "full-row", + "Creates a JSON document with the format: {\"row\":, \"cells\":[{\"fam\":, \"qual\":, \"val\":, \"ts\":}]}."); + static final AllowableValue JSON_FORMAT_QUALIFIER_AND_VALUE = new AllowableValue("col-qual-and-val", "col-qual-and-val", + "Creates a JSON document with the format: {\"\":\"\", \"\":\"\"."); + + static final PropertyDescriptor JSON_FORMAT = new PropertyDescriptor.Builder() + .displayName("JSON Format") + .name("scanhbase-json-format") + .description("Specifies how to represent the HBase row as a JSON document.") + .required(true) + .allowableValues(JSON_FORMAT_FULL_ROW, JSON_FORMAT_QUALIFIER_AND_VALUE) + .defaultValue(JSON_FORMAT_FULL_ROW.getValue()) + .build(); + + static final PropertyDescriptor DECODE_CHARSET = new PropertyDescriptor.Builder() + .displayName("Decode Character Set") + .name("scanhbase-decode-charset") + .description("The character set used to decode data from HBase.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + static final PropertyDescriptor ENCODE_CHARSET = new PropertyDescriptor.Builder() + .displayName("Encode Character Set") + .name("scanhbase-encode-charset") + .description("The character set used to encode the JSON representation of the row.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original input file will be routed to this destination, even if no rows are retrieved based on provided conditions.") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All successful fetches are routed to this relationship.") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("All failed fetches are routed to this relationship.") + .build(); + + static final String HBASE_TABLE_ATTR = "hbase.table"; + static final String HBASE_ROWS_COUNT_ATTR = "hbase.rows.count"; + + static final List properties; + static { + List props = new ArrayList<>(); + props.add(HBASE_CLIENT_SERVICE); + props.add(TABLE_NAME); + props.add(START_ROW); + props.add(END_ROW); + props.add(TIME_RANGE_MIN); + props.add(TIME_RANGE_MAX); + props.add(LIMIT_ROWS); + props.add(REVERSED_SCAN); + props.add(BULK_SIZE); + props.add(FILTER_EXPRESSION); + props.add(COLUMNS); + props.add(JSON_FORMAT); + props.add(ENCODE_CHARSET); + props.add(DECODE_CHARSET); + properties = Collections.unmodifiableList(props); + } + + static final Set relationships; + static { + Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_ORIGINAL); + rels.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(rels); + } + + private volatile Charset decodeCharset; + private volatile Charset encodeCharset; + private RowSerializer serializer = null; + + + @OnScheduled + public void onScheduled(ProcessContext context) { + this.decodeCharset = Charset.forName(context.getProperty(DECODE_CHARSET).getValue()); + this.encodeCharset = Charset.forName(context.getProperty(ENCODE_CHARSET).getValue()); + + final String jsonFormat = context.getProperty(JSON_FORMAT).getValue(); + if (jsonFormat.equals(JSON_FORMAT_FULL_ROW.getValue())) { + this.serializer = new JsonFullRowSerializer(decodeCharset, encodeCharset); + } else { + this.serializer = new JsonQualifierAndValueRowSerializer(decodeCharset, encodeCharset); + } + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + + final List problems = new ArrayList<>(); + + final String columns = validationContext.getProperty(COLUMNS).getValue(); + final String filter = validationContext.getProperty(FILTER_EXPRESSION).getValue(); + + if (!StringUtils.isBlank(columns) && !StringUtils.isBlank(filter)) { + problems.add(new ValidationResult.Builder() + .subject(FILTER_EXPRESSION.getDisplayName()) + .input(filter).valid(false) + .explanation("a filter expression can not be used in conjunction with the Columns property") + .build()); + } + + + return problems; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + try{ + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isBlank(tableName)) { + getLogger().error("Table Name is blank or null for {}, transferring to failure", new Object[] {flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final String startRow = context.getProperty(START_ROW).evaluateAttributeExpressions(flowFile).getValue(); + final String endRow = context.getProperty(END_ROW).evaluateAttributeExpressions(flowFile).getValue(); + + final String filterExpression = context.getProperty(FILTER_EXPRESSION).evaluateAttributeExpressions(flowFile).getValue(); + + //evaluate and validate time range min and max values. They both should be either empty or provided. + Long timerangeMin = null; + Long timerangeMax = null; + + try{ + timerangeMin = context.getProperty(TIME_RANGE_MIN).evaluateAttributeExpressions(flowFile).asLong(); + }catch(Exception e){ + getLogger().error("Time range min value is not a number ({}) for {}, transferring to failure", + new Object[] {context.getProperty(TIME_RANGE_MIN).evaluateAttributeExpressions(flowFile).getValue(), flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + try{ + timerangeMax = context.getProperty(TIME_RANGE_MAX).evaluateAttributeExpressions(flowFile).asLong(); + }catch(Exception e){ + getLogger().error("Time range max value is not a number ({}) for {}, transferring to failure", + new Object[] {context.getProperty(TIME_RANGE_MAX).evaluateAttributeExpressions(flowFile).getValue(), flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + if (timerangeMin == null && timerangeMax != null) { + getLogger().error("Time range min value cannot be blank when max value provided for {}, transferring to failure", new Object[] {flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + }else if (timerangeMin != null && timerangeMax == null) { + getLogger().error("Time range max value cannot be blank when min value provided for {}, transferring to failure", new Object[] {flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final Integer limitRows = context.getProperty(LIMIT_ROWS).evaluateAttributeExpressions(flowFile).asInteger(); + + final Boolean isReversed = context.getProperty(REVERSED_SCAN).asBoolean(); + + final Integer bulkSize = context.getProperty(BULK_SIZE).evaluateAttributeExpressions(flowFile).asInteger(); + + final List columns = getColumns(context.getProperty(COLUMNS).evaluateAttributeExpressions(flowFile).getValue()); + final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + + final AtomicReference rowsPulledHolder = new AtomicReference<>(0L); + final AtomicReference ffCountHolder = new AtomicReference<>(0L); + ScanHBaseResultHandler handler = new ScanHBaseResultHandler(context, session, flowFile, rowsPulledHolder, ffCountHolder, hBaseClientService, tableName, bulkSize); + + try { + hBaseClientService.scan(tableName, + filterExpression, + startRow, endRow, + timerangeMin, timerangeMax, + limitRows, + isReversed, + columns, + handler); + } catch (IOException e) { + getLogger().error("Unable to fetch rows from HBase table {} due to {}", new Object[] {tableName, e}); + flowFile = session.putAttribute(flowFile, "scanhbase.results.found", Boolean.toString(handler.isHandledAny())); + session.transfer(flowFile, REL_FAILURE); + return; + } + + LinkedList> hangingRows = handler.getHangingRows(); + if (!handler.isHandledAny() || // no rows found in hbase + (handler.isHandledAny() && (hangingRows == null || hangingRows.isEmpty())) // all the rows are flushed to FF inside handlers + ){ + flowFile = session.putAttribute(flowFile, "scanhbase.results.found", Boolean.toString(handler.isHandledAny())); + session.transfer(flowFile, REL_ORIGINAL); + session.commit(); + return; + } + + if (hangingRows != null && !hangingRows.isEmpty()) { + FlowFile lastFF = session.create(flowFile); + final Map attributes = new HashMap<>(); + attributes.put(HBASE_TABLE_ATTR, tableName); + attributes.put(HBASE_ROWS_COUNT_ATTR, Long.toString(rowsPulledHolder.get())); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + attributes.put(HBASE_ROWS_COUNT_ATTR, Long.toString(hangingRows.size())); + lastFF = session.putAllAttributes(lastFF, attributes); + + final AtomicReference ioe = new AtomicReference<>(null); + session.write(lastFF, (out) -> { + for (Iterator> iter = hangingRows.iterator(); iter.hasNext();){ + Tuple r = iter.next(); + serializer.serialize(r.getKey(), r.getValue(), out); + if (iter.hasNext()){ + out.write(nl); + } + } + }); + + + Relationship rel = REL_SUCCESS; + IOException error = ioe.get(); + if (error != null){ + lastFF = session.putAttribute(lastFF, "scanhbase.error", error.toString()); + rel = REL_FAILURE; + } + session.transfer(lastFF, rel); + flowFile = session.putAttribute(flowFile, "scanhbase.results.found", Boolean.toString(handler.isHandledAny())); + session.transfer(flowFile, REL_ORIGINAL); + } + session.commit(); + + }catch (final Exception e) { + getLogger().error("Failed to receive data from HBase due to {}", e); + session.rollback(); + // if we failed, we want to yield so that we don't hammer hbase. + context.yield(); + } + } + + /** + * @param columnsValue a String in the form colFam:colQual,colFam:colQual + * @return a list of Columns based on parsing the given String + */ + private List getColumns(final String columnsValue) { + final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(",")); + + List columnsList = new ArrayList<>(columns.length); + + for (final String column : columns) { + if (column.contains(":")) { + final String[] parts = column.split(":"); + final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8); + final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8); + columnsList.add(new Column(cf, cq)); + } else { + final byte[] cf = column.getBytes(StandardCharsets.UTF_8); + columnsList.add(new Column(cf, null)); + } + } + + return columnsList; + } + + /** + * @return number of rows to be committed to session. + */ + protected int getBatchSize(){ + return 500; + } + + /** + * Result Handler for Scan operation + */ + private class ScanHBaseResultHandler implements ResultHandler { + + final private ProcessSession session; + final private FlowFile origFF; + final private AtomicReference rowsPulledHolder; + final private AtomicReference ffCountHolder; + final private HBaseClientService hBaseClientService; + final private String tableName; + final private Integer bulkSize; + + private boolean handledAny = false; + private LinkedList> rows = null; + + ScanHBaseResultHandler(final ProcessContext context, final ProcessSession session, + final FlowFile origFF, final AtomicReference rowsPulledHolder, final AtomicReference ffCountHolder, final HBaseClientService hBaseClientService, + final String tableName, final Integer bulkSize){ + this.session = session; + this.rowsPulledHolder = rowsPulledHolder; + this.ffCountHolder = ffCountHolder; + this.hBaseClientService = hBaseClientService; + this.tableName = tableName; + this.bulkSize = bulkSize; + this.origFF = origFF; + } + + @Override + public void handle(final byte[] rowKey, final ResultCell[] resultCells) { + handledAny = true; + final String rowKeyString = new String(rowKey, StandardCharsets.UTF_8); + + long rowsPulled = rowsPulledHolder.get(); + long ffUncommittedCount = ffCountHolder.get(); + + if (rows == null) rows = new LinkedList<>(); + rows.add(new Tuple(rowKey, resultCells)); + + rowsPulled++; + + // bulkSize controls number of records per flow file. + if (bulkSize>0 && rowsPulled >= bulkSize) { + FlowFile flowFile = session.create(origFF); + final Map attributes = new HashMap<>(); + attributes.put(HBASE_TABLE_ATTR, tableName); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + attributes.put(HBASE_ROWS_COUNT_ATTR, Long.toString(rowsPulled)); + flowFile = session.putAllAttributes(flowFile, attributes); + + final AtomicReference ioe = new AtomicReference<>(null); + session.write(flowFile, (out) -> { + for (Iterator> iter = rows.iterator(); iter.hasNext();){ + Tuple r = iter.next(); + serializer.serialize(r.getKey(), r.getValue(), out); + if (iter.hasNext()){ + out.write(nl); + } + } + }); + + Relationship rel = REL_SUCCESS; + IOException error = ioe.get(); + if (error != null){ + flowFile = session.putAttribute(flowFile, "scanhbase.error", error.toString()); + rel = REL_FAILURE; + } + + session.getProvenanceReporter().receive(flowFile, hBaseClientService.toTransitUri(tableName, rowKeyString)); + session.transfer(flowFile, rel); + rowsPulledHolder.set(0L); + rows.clear(); + + // we could potentially have a huge number of rows. If we get to batchSize, go ahead and commit the + // session so that we can avoid buffering tons of FlowFiles without ever sending any out. + if (getBatchSize()>0 && ffUncommittedCount*bulkSize > getBatchSize()) { + session.commit(); + ffCountHolder.set(0L); + }else{ + ffCountHolder.set(ffUncommittedCount++); + } + }else{ + rowsPulledHolder.set(rowsPulled); + } + } + + public LinkedList> getHangingRows(){ + return rows; + } + + public boolean isHandledAny(){ + return handledAny; + } + } +} diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index f62102a8e1c3..a549452f93db 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -16,6 +16,11 @@ */ package org.apache.nifi.hbase; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.hbase.put.PutColumn; @@ -33,6 +38,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; public class MockHBaseClientService extends AbstractControllerService implements HBaseClientService { @@ -119,6 +125,24 @@ public void scan(String tableName, Collection columns, String filterExpr numScans++; } + @Override + public void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin, + Long timerangeMax, Integer limitRows, Boolean isReversed, Collection columns, ResultHandler handler) + throws IOException { + if (throwException) { + throw new IOException("exception"); + } + + // pass all the staged data to the handler + for (final Map.Entry entry : results.entrySet()) { + handler.handle(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue()); + } + + // delegate to the handler + + numScans++; + } + public void addResult(final String rowKey, final Map cells, final long timestamp) { final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8); diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java new file mode 100644 index 000000000000..694b34a57199 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestScanHBase { + + private ScanHBase proc; + private MockHBaseClientService hBaseClientService; + private TestRunner runner; + + @Before + public void setup() throws InitializationException { + proc = new ScanHBase(); + runner = TestRunners.newTestRunner(proc); + + hBaseClientService = new MockHBaseClientService(); + runner.addControllerService("hbaseClient", hBaseClientService); + runner.enableControllerService(hBaseClientService); + runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient"); + } + + @Test + public void testColumnsValidation() { + runner.setProperty(ScanHBase.TABLE_NAME, "table1"); + runner.setProperty(ScanHBase.START_ROW, "row1"); + runner.setProperty(ScanHBase.END_ROW, "row1"); + runner.assertValid(); + + runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1"); + runner.assertValid(); + + runner.setProperty(ScanHBase.COLUMNS, "cf1"); + runner.assertValid(); + + runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3"); + runner.assertValid(); + + runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3"); + runner.assertValid(); + + runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3"); + runner.assertNotValid(); + + runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3"); + runner.assertNotValid(); + + runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,"); + runner.assertNotValid(); + } + + @Test + public void testNoIncomingFlowFile() { + runner.setProperty(ScanHBase.TABLE_NAME, "table1"); + runner.setProperty(ScanHBase.START_ROW, "row1"); + runner.setProperty(ScanHBase.END_ROW, "row1"); + + runner.run(); + runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); + runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); + runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + + Assert.assertEquals(0, hBaseClientService.getNumScans()); + } + + @Test + public void testInvalidTableName() { + runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); + runner.setProperty(ScanHBase.START_ROW, "row1"); + runner.setProperty(ScanHBase.END_ROW, "row1"); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(ScanHBase.REL_FAILURE, 1); + runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); + runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + + Assert.assertEquals(0, hBaseClientService.getNumScans()); + } + + @Test + public void testResultsNotFound() { + runner.setProperty(ScanHBase.TABLE_NAME, "table1"); + runner.setProperty(ScanHBase.START_ROW, "row1"); + runner.setProperty(ScanHBase.END_ROW, "row1"); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); + runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); + runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ScanHBase.REL_ORIGINAL).get(0); + flowFile.assertAttributeEquals("scanhbase.results.found", Boolean.FALSE.toString()); + + + Assert.assertEquals(1, hBaseClientService.getNumScans()); + } + + @Test + public void testScanToContentWithStringValues() { + final Map cells = new HashMap<>(); + cells.put("cq1", "val1"); + cells.put("cq2", "val2"); + + final long ts1 = 123456789; + hBaseClientService.addResult("row1", cells, ts1); + hBaseClientService.addResult("row2", cells, ts1); + + runner.setProperty(ScanHBase.TABLE_NAME, "table1"); + runner.setProperty(ScanHBase.START_ROW, "row1"); + runner.setProperty(ScanHBase.END_ROW, "row2"); + runner.setProperty(ScanHBase.TIME_RANGE_MIN, "0"); + runner.setProperty(ScanHBase.TIME_RANGE_MAX, "1111111110"); + runner.setProperty(ScanHBase.LIMIT_ROWS, "10"); + runner.setProperty(ScanHBase.REVERSED_SCAN, "false"); + runner.setProperty(ScanHBase.BULK_SIZE, "10"); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); + runner.assertTransferCount(ScanHBase.REL_SUCCESS, 1); + runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).get(0); + flowFile.assertContentEquals("{\"row\":\"row1\", \"cells\": [" + + "{\"fam\":\"nifi\",\"qual\":\"cq1\",\"val\":\"val1\",\"ts\":" + ts1 + "}, " + + "{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}\n" + + "{\"row\":\"row2\", \"cells\": [" + + "{\"fam\":\"nifi\",\"qual\":\"cq1\",\"val\":\"val1\",\"ts\":" + ts1 + "}, " + + "{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}"); + flowFile.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, "2"); + + flowFile = runner.getFlowFilesForRelationship(ScanHBase.REL_ORIGINAL).get(0); + flowFile.assertAttributeEquals("scanhbase.results.found", Boolean.TRUE.toString()); + + Assert.assertEquals(1, hBaseClientService.getNumScans()); + } + + @Test + public void testScanBulkSize(){ + final Map cells = new HashMap<>(); + cells.put("cq1", "val1"); + cells.put("cq2", "val2"); + + for (int i = 0; i < 15; i++){ + hBaseClientService.addResult("row"+i, cells, System.currentTimeMillis()); + } + + + runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); + runner.setProperty(ScanHBase.START_ROW, "${hbase.row}1"); + runner.setProperty(ScanHBase.END_ROW, "${hbase.row}2"); + runner.setProperty(ScanHBase.COLUMNS, "${hbase.cols}"); + runner.setProperty(ScanHBase.TIME_RANGE_MIN, "${tr_min}"); + runner.setProperty(ScanHBase.TIME_RANGE_MAX, "${tr_max}"); + runner.setProperty(ScanHBase.LIMIT_ROWS, "${limit}"); + runner.setProperty(ScanHBase.BULK_SIZE, "${bulk.size}"); + + final Map attributes = new HashMap<>(); + attributes.put("hbase.table", "table1"); + attributes.put("hbase.row", "row"); + attributes.put("hbase.cols", "nifi:cq2"); + attributes.put("tr_min", "10000000"); + attributes.put("tr_max", "10000001"); + attributes.put("limit", "1000"); + attributes.put("bulk.size", "10"); + + runner.enqueue("trigger flow file", attributes); + runner.run(); + + runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); + runner.assertTransferCount(ScanHBase.REL_SUCCESS, 2); + runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, "10"); + + flowFile = runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).get(1); + flowFile.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, "5"); + } + + @Test + public void testScanBatchSizeTimesOfBulkSize(){ + final Map cells = new HashMap<>(); + cells.put("cq1", "val1"); + cells.put("cq2", "val2"); + + for (int i = 0; i < 1000; i++){ + hBaseClientService.addResult("row"+i, cells, System.currentTimeMillis()); + } + + + runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); + runner.setProperty(ScanHBase.START_ROW, "${hbase.row}1"); + runner.setProperty(ScanHBase.END_ROW, "${hbase.row}2"); + runner.setProperty(ScanHBase.COLUMNS, "${hbase.cols}"); + runner.setProperty(ScanHBase.TIME_RANGE_MIN, "${tr_min}"); + runner.setProperty(ScanHBase.TIME_RANGE_MAX, "${tr_max}"); + runner.setProperty(ScanHBase.LIMIT_ROWS, "${limit}"); + runner.setProperty(ScanHBase.BULK_SIZE, "${bulk.size}"); + + final Map attributes = new HashMap<>(); + attributes.put("hbase.table", "table1"); + attributes.put("hbase.row", "row"); + attributes.put("hbase.cols", "nifi:cq2"); + attributes.put("tr_min", "10000000"); + attributes.put("tr_max", "10000001"); + attributes.put("limit", "1000"); + attributes.put("bulk.size", "100"); + + runner.enqueue("trigger flow file", attributes); + runner.run(); + + runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); + runner.assertTransferCount(ScanHBase.REL_SUCCESS, 10); + runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1); + + + runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).forEach(ff ->{ + ff.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, "100"); + Assert.assertNotEquals(0, ff.getId()); // since total amount of rows is a multiplication of bulkSize, original FF (with id=0) shouldn't be present on output. + }); + } + + @Test + public void testScanBatchSizeTimesCutBulkSize(){ + final Map cells = new HashMap<>(); + cells.put("cq1", "val1"); + cells.put("cq2", "val2"); + + for (int i = 0; i < 1102; i++){ + hBaseClientService.addResult("row"+i, cells, System.currentTimeMillis()); + } + + + runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); + runner.setProperty(ScanHBase.START_ROW, "${hbase.row}1"); + runner.setProperty(ScanHBase.END_ROW, "${hbase.row}2"); + runner.setProperty(ScanHBase.COLUMNS, "${hbase.cols}"); + runner.setProperty(ScanHBase.TIME_RANGE_MIN, "${tr_min}"); + runner.setProperty(ScanHBase.TIME_RANGE_MAX, "${tr_max}"); + runner.setProperty(ScanHBase.LIMIT_ROWS, "${limit}"); + runner.setProperty(ScanHBase.BULK_SIZE, "${bulk.size}"); + + final Map attributes = new HashMap<>(); + attributes.put("hbase.table", "table1"); + attributes.put("hbase.row", "row"); + attributes.put("hbase.cols", "nifi:cq2"); + attributes.put("tr_min", "10000000"); + attributes.put("tr_max", "10000001"); + attributes.put("limit", "1000"); + attributes.put("bulk.size", "110"); + + runner.enqueue("trigger flow file", attributes); + runner.run(); + + runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); + runner.assertTransferCount(ScanHBase.REL_SUCCESS, 11); + runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1); + + + List ffs = runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS); + int i = 0; + for (MockFlowFile ff : ffs) + ff.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, new String (i++ < 10 ? "110" : "2")); //last ff should have only 2 + } + + @Test + public void testScanToContentWithQualifierAndValueJSON() { + final Map cells = new HashMap<>(); + cells.put("cq1", "val1"); + cells.put("cq2", "val2"); + + hBaseClientService.addResult("row1", cells, System.currentTimeMillis()); + + runner.setProperty(ScanHBase.TABLE_NAME, "table1"); + runner.setProperty(ScanHBase.START_ROW, "row1"); + runner.setProperty(ScanHBase.END_ROW, "row1"); + runner.setProperty(ScanHBase.JSON_FORMAT, ScanHBase.JSON_FORMAT_QUALIFIER_AND_VALUE); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); + runner.assertTransferCount(ScanHBase.REL_SUCCESS, 1); + runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).get(0); + flowFile.assertContentEquals("{\"cq1\":\"val1\", \"cq2\":\"val2\"}"); + + Assert.assertEquals(1, hBaseClientService.getNumScans()); + } + + @Test + public void testScanWithExpressionLanguage() { + final Map cells = new HashMap<>(); +// cells.put("cq1", "val1"); + cells.put("cq2", "val2"); + + final long ts1 = 123456789; + hBaseClientService.addResult("row1", cells, ts1); + + runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); + runner.setProperty(ScanHBase.START_ROW, "${hbase.row}1"); + runner.setProperty(ScanHBase.END_ROW, "${hbase.row}2"); + runner.setProperty(ScanHBase.COLUMNS, "${hbase.cols}"); + runner.setProperty(ScanHBase.TIME_RANGE_MIN, "${tr_min}"); + runner.setProperty(ScanHBase.TIME_RANGE_MAX, "${tr_max}"); + runner.setProperty(ScanHBase.LIMIT_ROWS, "${limit}"); + runner.setProperty(ScanHBase.BULK_SIZE, "${bulk.size}"); + + final Map attributes = new HashMap<>(); + attributes.put("hbase.table", "table1"); + attributes.put("hbase.row", "row"); + attributes.put("hbase.cols", "nifi:cq2"); + attributes.put("tr_min", "10000000"); + attributes.put("tr_max", "10000001"); + attributes.put("limit", "1000"); + attributes.put("bulk.size", "10"); + + runner.enqueue("trigger flow file", attributes); + runner.run(); + + runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); + runner.assertTransferCount(ScanHBase.REL_SUCCESS, 1); + runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1); + + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).get(0); + flowFile.assertContentEquals("{\"row\":\"row1\", \"cells\": [{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}"); + + Assert.assertEquals(1, hBaseClientService.getNumScans()); + } + + @Test + public void testScanWhenScanThrowsException() { + hBaseClientService.setThrowException(true); + + runner.setProperty(ScanHBase.TABLE_NAME, "table1"); + runner.setProperty(ScanHBase.START_ROW, "row1"); + runner.setProperty(ScanHBase.END_ROW, "row1"); + + runner.enqueue("trigger flow file"); + runner.run(); + + runner.assertTransferCount(ScanHBase.REL_FAILURE, 1); + runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); + runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0); + + Assert.assertEquals(0, hBaseClientService.getNumScans()); + } + +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index 0c2a131fd013..f27949afe88e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -140,6 +140,24 @@ public interface HBaseClientService extends ControllerService { */ void scan(String tableName, byte[] startRow, byte[] endRow, Collection columns, ResultHandler handler) throws IOException; + /** + * Scans the given table for the given range of row keys or time rage and passes the result to a handler.
+ * + * @param tableName the name of an HBase table to scan + * @param startRow the row identifier to start scanning at + * @param endRow the row identifier to end scanning at + * @param filterExpression optional filter expression, if not specified no filtering is performed + * @param timerangeMin the minimum timestamp of cells to return, passed to the HBase scanner timeRange + * @param timerangeMax the maximum timestamp of cells to return, passed to the HBase scanner timeRange + * @param limitRows the maximum number of rows to be returned by scanner + * @param isReversed whether this scan is a reversed one. + * @param columns optional columns to return, if not specified all columns are returned + * @param bulkSize number of records per flow file. If total number of records exceed this value, there will be multiple flowfiles created. + * @param handler a handler to process rows of the result + */ + void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin, Long timerangeMax, Integer limitRows, + Boolean isReversed, Collection columns, ResultHandler handler) throws IOException; + /** * Converts the given boolean to it's byte representation. * diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index 12309d669395..fb090b4e5b6c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -416,8 +416,80 @@ public void scan(final String tableName, final byte[] startRow, final byte[] end } } } + + @Override + public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, + final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, + final Collection columns, final ResultHandler handler) throws IOException { + + try (final Table table = connection.getTable(TableName.valueOf(tableName)); + final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, + timerangeMax, limitRows, isReversed, columns)) { + + int cnt = 0; + final int lim = limitRows != null ? limitRows : 0; + for (final Result result : scanner) { + + if (lim > 0 && cnt++ > lim) break; + + final byte[] rowKey = result.getRow(); + final Cell[] cells = result.rawCells(); + + if (cells == null) { + continue; + } + + // convert HBase cells to NiFi cells + final ResultCell[] resultCells = new ResultCell[cells.length]; + for (int i = 0; i < cells.length; i++) { + final Cell cell = cells[i]; + final ResultCell resultCell = getResultCell(cell); + resultCells[i] = resultCell; + } + + // delegate to the handler + handler.handle(rowKey, resultCells); + } + } + + } + + // + protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, + final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { + final Scan scan = new Scan(); + if (!StringUtils.isBlank(startRow)) scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); + if (!StringUtils.isBlank(endRow)) scan.setStopRow ( endRow.getBytes(StandardCharsets.UTF_8)); + - // protected and extracted into separate method for testing + Filter filter = null; + if (columns != null) { + for (Column col : columns) { + if (col.getQualifier() == null) { + scan.addFamily(col.getFamily()); + } else { + scan.addColumn(col.getFamily(), col.getQualifier()); + } + } + }else if (!StringUtils.isBlank(filterExpression)) { + ParseFilter parseFilter = new ParseFilter(); + filter = parseFilter.parseFilterString(filterExpression); + } + if (filter != null) scan.setFilter(filter); + + if (timerangeMin != null && timerangeMax != null) scan.setTimeRange(timerangeMin, timerangeMax); + + // ->>> reserved for HBase v 2 or later + //if (limitRows != null && limitRows > 0){ + // scan.setLimit(limitRows) + //} + + if (isReversed != null) scan.setReversed(isReversed); + + return table.getScanner(scan); + } + + // protected and extracted into separate method for testing protected ResultScanner getResults(final Table table, final byte[] startRow, final byte[] endRow, final Collection columns) throws IOException { final Scan scan = new Scan(); scan.setStartRow(startRow); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index 8a04e5164f58..e4c43b9afd81 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -155,6 +155,14 @@ protected ResultScanner getResults(Table table, Collection columns, Filt Mockito.when(scanner.iterator()).thenReturn(results.iterator()); return scanner; } + + @Override + protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, + final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { + final ResultScanner scanner = Mockito.mock(ResultScanner.class); + Mockito.when(scanner.iterator()).thenReturn(results.iterator()); + return scanner; + } @Override protected Connection createConnection(ConfigurationContext context) throws IOException { From 4f651e6471b1a00bfa4ce5b282bd371a5c3783a0 Mon Sep 17 00:00:00 2001 From: Ed Date: Thu, 1 Feb 2018 14:27:52 -0500 Subject: [PATCH 2/3] NIFI-4833 Add ScanHBase processor minor fixes --- .../additionalDetails.html | 2 + .../java/org/apache/nifi/hbase/ScanHBase.java | 360 +++++++++--------- .../org.apache.nifi.processor.Processor | 1 + .../nifi/hbase/MockHBaseClientService.java | 39 +- .../org/apache/nifi/hbase/TestScanHBase.java | 44 +-- .../nifi/hbase/MockHBaseClientService.java | 2 +- 6 files changed, 218 insertions(+), 230 deletions(-) diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html index c8485102a3f6..38ad684f432d 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html @@ -431,6 +431,7 @@

Supported DataSets and Processors

PutHBaseCell
PutHBaseJSON
PutHBaseRecord
+ ScanHBase
FETCH
@@ -438,6 +439,7 @@

Supported DataSets and Processors

SEND
SEND
SEND
+ RECEIVE
hbase://hmaster.example.com:16000/tableA/rowX hbase_table diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java index cb6c5756d8b7..be0e04fd236e 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java @@ -70,10 +70,11 @@ @WritesAttribute(attribute = "hbase.resultset", description = "A JSON document/s representing the row/s. This property is only written when a Destination of flowfile-attributes is selected."), @WritesAttribute(attribute = "mime.type", description = "Set to application/json when using a Destination of flowfile-content, not set or modified otherwise"), @WritesAttribute(attribute = "hbase.rows.count", description = "Number of rows in the content of given flow file"), - @WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions.
Could be null (not present) if transfered to FAILURE") + @WritesAttribute(attribute = "scanhbase.results.found", description = "Indicates whether at least one row has been found in given hbase table with provided conditions.
" + + "Could be null (not present) if transfered to FAILURE") }) public class ScanHBase extends AbstractProcessor { - //enhanced regex for columns to allow "-" in column qualifier names + //enhanced regex for columns to allow "-" in column qualifier names static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*"); static final byte[] nl = System.lineSeparator().getBytes(); @@ -102,7 +103,7 @@ public class ScanHBase extends AbstractProcessor { .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + static final PropertyDescriptor END_ROW = new PropertyDescriptor.Builder() .displayName("End rowkey") .name("scanhbase-end-rowkey") @@ -111,7 +112,7 @@ public class ScanHBase extends AbstractProcessor { .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + static final PropertyDescriptor TIME_RANGE_MIN = new PropertyDescriptor.Builder() .displayName("Time range min") .name("scanhbase-time-range-min") @@ -120,7 +121,7 @@ public class ScanHBase extends AbstractProcessor { .expressionLanguageSupported(true) .addValidator(StandardValidators.LONG_VALIDATOR) .build(); - + static final PropertyDescriptor TIME_RANGE_MAX = new PropertyDescriptor.Builder() .displayName("Time range max") .name("scanhbase-time-range-max") @@ -129,7 +130,7 @@ public class ScanHBase extends AbstractProcessor { .expressionLanguageSupported(true) .addValidator(StandardValidators.LONG_VALIDATOR) .build(); - + static final PropertyDescriptor LIMIT_ROWS = new PropertyDescriptor.Builder() .displayName("Limit rows") .name("scanhbase-limit") @@ -138,7 +139,7 @@ public class ScanHBase extends AbstractProcessor { .expressionLanguageSupported(true) .addValidator(StandardValidators.INTEGER_VALIDATOR) .build(); - + static final PropertyDescriptor BULK_SIZE = new PropertyDescriptor.Builder() .displayName("Max rows per flow file") .name("scanhbase-bulk-size") @@ -148,8 +149,7 @@ public class ScanHBase extends AbstractProcessor { .defaultValue("0") .addValidator(StandardValidators.INTEGER_VALIDATOR) .build(); - - + static final PropertyDescriptor REVERSED_SCAN = new PropertyDescriptor.Builder() .displayName("Reversed order") .name("scanhbase-reversed-order") @@ -158,13 +158,14 @@ public class ScanHBase extends AbstractProcessor { .allowableValues("true", "false") .required(false) .defaultValue("false") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); - + static final PropertyDescriptor FILTER_EXPRESSION = new PropertyDescriptor.Builder() .displayName("Filter expression") .name("scanhbase-filter-expression") - .description("An HBase filter expression that will be applied to the scan. This property can not be used when also using the Columns property.") + .description("An HBase filter expression that will be applied to the scan. This property can not be used when also using the Columns property.
" + + "Example: \"ValueFilter( =, 'binaryprefix:commit' )\"") .required(false) .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -261,21 +262,20 @@ public class ScanHBase extends AbstractProcessor { private volatile Charset decodeCharset; private volatile Charset encodeCharset; private RowSerializer serializer = null; - @OnScheduled public void onScheduled(ProcessContext context) { this.decodeCharset = Charset.forName(context.getProperty(DECODE_CHARSET).getValue()); this.encodeCharset = Charset.forName(context.getProperty(ENCODE_CHARSET).getValue()); - - final String jsonFormat = context.getProperty(JSON_FORMAT).getValue(); + + final String jsonFormat = context.getProperty(JSON_FORMAT).getValue(); if (jsonFormat.equals(JSON_FORMAT_FULL_ROW.getValue())) { this.serializer = new JsonFullRowSerializer(decodeCharset, encodeCharset); } else { this.serializer = new JsonQualifierAndValueRowSerializer(decodeCharset, encodeCharset); } } - + @Override protected List getSupportedPropertyDescriptors() { return properties; @@ -298,10 +298,9 @@ protected Collection customValidate(ValidationContext validati problems.add(new ValidationResult.Builder() .subject(FILTER_EXPRESSION.getDisplayName()) .input(filter).valid(false) - .explanation("a filter expression can not be used in conjunction with the Columns property") + .explanation("A filter expression can not be used in conjunction with the Columns property") .build()); } - return problems; } @@ -312,123 +311,122 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro if (flowFile == null) { return; } - - try{ - final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); - if (StringUtils.isBlank(tableName)) { - getLogger().error("Table Name is blank or null for {}, transferring to failure", new Object[] {flowFile}); - session.transfer(session.penalize(flowFile), REL_FAILURE); - return; - } - - final String startRow = context.getProperty(START_ROW).evaluateAttributeExpressions(flowFile).getValue(); - final String endRow = context.getProperty(END_ROW).evaluateAttributeExpressions(flowFile).getValue(); - - final String filterExpression = context.getProperty(FILTER_EXPRESSION).evaluateAttributeExpressions(flowFile).getValue(); - - //evaluate and validate time range min and max values. They both should be either empty or provided. - Long timerangeMin = null; - Long timerangeMax = null; - - try{ - timerangeMin = context.getProperty(TIME_RANGE_MIN).evaluateAttributeExpressions(flowFile).asLong(); - }catch(Exception e){ - getLogger().error("Time range min value is not a number ({}) for {}, transferring to failure", - new Object[] {context.getProperty(TIME_RANGE_MIN).evaluateAttributeExpressions(flowFile).getValue(), flowFile}); - session.transfer(session.penalize(flowFile), REL_FAILURE); - return; - } - try{ - timerangeMax = context.getProperty(TIME_RANGE_MAX).evaluateAttributeExpressions(flowFile).asLong(); - }catch(Exception e){ - getLogger().error("Time range max value is not a number ({}) for {}, transferring to failure", - new Object[] {context.getProperty(TIME_RANGE_MAX).evaluateAttributeExpressions(flowFile).getValue(), flowFile}); - session.transfer(session.penalize(flowFile), REL_FAILURE); - return; - } - if (timerangeMin == null && timerangeMax != null) { - getLogger().error("Time range min value cannot be blank when max value provided for {}, transferring to failure", new Object[] {flowFile}); - session.transfer(session.penalize(flowFile), REL_FAILURE); - return; - }else if (timerangeMin != null && timerangeMax == null) { - getLogger().error("Time range max value cannot be blank when min value provided for {}, transferring to failure", new Object[] {flowFile}); - session.transfer(session.penalize(flowFile), REL_FAILURE); - return; - } - - final Integer limitRows = context.getProperty(LIMIT_ROWS).evaluateAttributeExpressions(flowFile).asInteger(); - - final Boolean isReversed = context.getProperty(REVERSED_SCAN).asBoolean(); - - final Integer bulkSize = context.getProperty(BULK_SIZE).evaluateAttributeExpressions(flowFile).asInteger(); - - final List columns = getColumns(context.getProperty(COLUMNS).evaluateAttributeExpressions(flowFile).getValue()); - final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); - - final AtomicReference rowsPulledHolder = new AtomicReference<>(0L); - final AtomicReference ffCountHolder = new AtomicReference<>(0L); - ScanHBaseResultHandler handler = new ScanHBaseResultHandler(context, session, flowFile, rowsPulledHolder, ffCountHolder, hBaseClientService, tableName, bulkSize); - - try { - hBaseClientService.scan(tableName, - filterExpression, - startRow, endRow, - timerangeMin, timerangeMax, - limitRows, - isReversed, - columns, - handler); - } catch (IOException e) { - getLogger().error("Unable to fetch rows from HBase table {} due to {}", new Object[] {tableName, e}); - flowFile = session.putAttribute(flowFile, "scanhbase.results.found", Boolean.toString(handler.isHandledAny())); - session.transfer(flowFile, REL_FAILURE); - return; - } - - LinkedList> hangingRows = handler.getHangingRows(); - if (!handler.isHandledAny() || // no rows found in hbase - (handler.isHandledAny() && (hangingRows == null || hangingRows.isEmpty())) // all the rows are flushed to FF inside handlers - ){ - flowFile = session.putAttribute(flowFile, "scanhbase.results.found", Boolean.toString(handler.isHandledAny())); - session.transfer(flowFile, REL_ORIGINAL); - session.commit(); - return; - } - - if (hangingRows != null && !hangingRows.isEmpty()) { - FlowFile lastFF = session.create(flowFile); - final Map attributes = new HashMap<>(); - attributes.put(HBASE_TABLE_ATTR, tableName); - attributes.put(HBASE_ROWS_COUNT_ATTR, Long.toString(rowsPulledHolder.get())); - attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + + try{ + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isBlank(tableName)) { + getLogger().error("Table Name is blank or null for {}, transferring to failure", new Object[] {flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final String startRow = context.getProperty(START_ROW).evaluateAttributeExpressions(flowFile).getValue(); + final String endRow = context.getProperty(END_ROW).evaluateAttributeExpressions(flowFile).getValue(); + + final String filterExpression = context.getProperty(FILTER_EXPRESSION).evaluateAttributeExpressions(flowFile).getValue(); + + //evaluate and validate time range min and max values. They both should be either empty or provided. + Long timerangeMin = null; + Long timerangeMax = null; + + try{ + timerangeMin = context.getProperty(TIME_RANGE_MIN).evaluateAttributeExpressions(flowFile).asLong(); + }catch(Exception e){ + getLogger().error("Time range min value is not a number ({}) for {}, transferring to failure", + new Object[] {context.getProperty(TIME_RANGE_MIN).evaluateAttributeExpressions(flowFile).getValue(), flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + try{ + timerangeMax = context.getProperty(TIME_RANGE_MAX).evaluateAttributeExpressions(flowFile).asLong(); + }catch(Exception e){ + getLogger().error("Time range max value is not a number ({}) for {}, transferring to failure", + new Object[] {context.getProperty(TIME_RANGE_MAX).evaluateAttributeExpressions(flowFile).getValue(), flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + if (timerangeMin == null && timerangeMax != null) { + getLogger().error("Time range min value cannot be blank when max value provided for {}, transferring to failure", new Object[] {flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + }else if (timerangeMin != null && timerangeMax == null) { + getLogger().error("Time range max value cannot be blank when min value provided for {}, transferring to failure", new Object[] {flowFile}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final Integer limitRows = context.getProperty(LIMIT_ROWS).evaluateAttributeExpressions(flowFile).asInteger(); + + final Boolean isReversed = context.getProperty(REVERSED_SCAN).asBoolean(); + + final Integer bulkSize = context.getProperty(BULK_SIZE).evaluateAttributeExpressions(flowFile).asInteger(); + + final List columns = getColumns(context.getProperty(COLUMNS).evaluateAttributeExpressions(flowFile).getValue()); + final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + + final AtomicReference rowsPulledHolder = new AtomicReference<>(0L); + final AtomicReference ffCountHolder = new AtomicReference<>(0L); + ScanHBaseResultHandler handler = new ScanHBaseResultHandler(context, session, flowFile, rowsPulledHolder, ffCountHolder, hBaseClientService, tableName, bulkSize); + + try { + hBaseClientService.scan(tableName, + filterExpression, + startRow, endRow, + timerangeMin, timerangeMax, + limitRows, + isReversed, + columns, + handler); + } catch (IOException e) { + getLogger().error("Unable to fetch rows from HBase table {} due to {}", new Object[] {tableName, e}); + flowFile = session.putAttribute(flowFile, "scanhbase.results.found", Boolean.toString(handler.isHandledAny())); + session.transfer(flowFile, REL_FAILURE); + return; + } + + LinkedList> hangingRows = handler.getHangingRows(); + if (!handler.isHandledAny() || // no rows found in hbase + (handler.isHandledAny() && (hangingRows == null || hangingRows.isEmpty())) // all the rows are flushed to FF inside handlers + ){ + flowFile = session.putAttribute(flowFile, "scanhbase.results.found", Boolean.toString(handler.isHandledAny())); + session.transfer(flowFile, REL_ORIGINAL); + session.commit(); + return; + } + + if (hangingRows != null && !hangingRows.isEmpty()) { + FlowFile lastFF = session.create(flowFile); + final Map attributes = new HashMap<>(); + attributes.put(HBASE_TABLE_ATTR, tableName); + attributes.put(HBASE_ROWS_COUNT_ATTR, Long.toString(rowsPulledHolder.get())); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); attributes.put(HBASE_ROWS_COUNT_ATTR, Long.toString(hangingRows.size())); lastFF = session.putAllAttributes(lastFF, attributes); - + final AtomicReference ioe = new AtomicReference<>(null); - session.write(lastFF, (out) -> { - for (Iterator> iter = hangingRows.iterator(); iter.hasNext();){ - Tuple r = iter.next(); - serializer.serialize(r.getKey(), r.getValue(), out); - if (iter.hasNext()){ - out.write(nl); - } - } + session.write(lastFF, (out) -> { + for (Iterator> iter = hangingRows.iterator(); iter.hasNext();){ + Tuple r = iter.next(); + serializer.serialize(r.getKey(), r.getValue(), out); + if (iter.hasNext()){ + out.write(nl); + } + } }); - - + Relationship rel = REL_SUCCESS; IOException error = ioe.get(); if (error != null){ - lastFF = session.putAttribute(lastFF, "scanhbase.error", error.toString()); - rel = REL_FAILURE; + lastFF = session.putAttribute(lastFF, "scanhbase.error", error.toString()); + rel = REL_FAILURE; } - session.transfer(lastFF, rel); - flowFile = session.putAttribute(flowFile, "scanhbase.results.found", Boolean.toString(handler.isHandledAny())); - session.transfer(flowFile, REL_ORIGINAL); - } - session.commit(); - - }catch (final Exception e) { + session.transfer(lastFF, rel); + flowFile = session.putAttribute(flowFile, "scanhbase.results.found", Boolean.toString(handler.isHandledAny())); + session.transfer(flowFile, REL_ORIGINAL); + } + session.commit(); + + }catch (final Exception e) { getLogger().error("Failed to receive data from HBase due to {}", e); session.rollback(); // if we failed, we want to yield so that we don't hammer hbase. @@ -459,106 +457,106 @@ private List getColumns(final String columnsValue) { return columnsList; } - + /** * @return number of rows to be committed to session. */ protected int getBatchSize(){ - return 500; + return 500; } /** - * Result Handler for Scan operation + * Result Handler for Scan operation */ private class ScanHBaseResultHandler implements ResultHandler { - - final private ProcessSession session; - final private FlowFile origFF; - final private AtomicReference rowsPulledHolder; - final private AtomicReference ffCountHolder; - final private HBaseClientService hBaseClientService; - final private String tableName; - final private Integer bulkSize; - - private boolean handledAny = false; - private LinkedList> rows = null; - - ScanHBaseResultHandler(final ProcessContext context, final ProcessSession session, - final FlowFile origFF, final AtomicReference rowsPulledHolder, final AtomicReference ffCountHolder, final HBaseClientService hBaseClientService, - final String tableName, final Integer bulkSize){ - this.session = session; - this.rowsPulledHolder = rowsPulledHolder; - this.ffCountHolder = ffCountHolder; - this.hBaseClientService = hBaseClientService; - this.tableName = tableName; - this.bulkSize = bulkSize; - this.origFF = origFF; - } - + + final private ProcessSession session; + final private FlowFile origFF; + final private AtomicReference rowsPulledHolder; + final private AtomicReference ffCountHolder; + final private HBaseClientService hBaseClientService; + final private String tableName; + final private Integer bulkSize; + + private boolean handledAny = false; + private LinkedList> rows = null; + + ScanHBaseResultHandler(final ProcessContext context, final ProcessSession session, + final FlowFile origFF, final AtomicReference rowsPulledHolder, final AtomicReference ffCountHolder, final HBaseClientService hBaseClientService, + final String tableName, final Integer bulkSize){ + this.session = session; + this.rowsPulledHolder = rowsPulledHolder; + this.ffCountHolder = ffCountHolder; + this.hBaseClientService = hBaseClientService; + this.tableName = tableName; + this.bulkSize = bulkSize; + this.origFF = origFF; + } + @Override public void handle(final byte[] rowKey, final ResultCell[] resultCells) { - handledAny = true; + handledAny = true; final String rowKeyString = new String(rowKey, StandardCharsets.UTF_8); long rowsPulled = rowsPulledHolder.get(); long ffUncommittedCount = ffCountHolder.get(); - + if (rows == null) rows = new LinkedList<>(); rows.add(new Tuple(rowKey, resultCells)); rowsPulled++; - - // bulkSize controls number of records per flow file. + + // bulkSize controls number of records per flow file. if (bulkSize>0 && rowsPulled >= bulkSize) { - FlowFile flowFile = session.create(origFF); + FlowFile flowFile = session.create(origFF); final Map attributes = new HashMap<>(); attributes.put(HBASE_TABLE_ATTR, tableName); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); attributes.put(HBASE_ROWS_COUNT_ATTR, Long.toString(rowsPulled)); flowFile = session.putAllAttributes(flowFile, attributes); - + final AtomicReference ioe = new AtomicReference<>(null); - session.write(flowFile, (out) -> { - for (Iterator> iter = rows.iterator(); iter.hasNext();){ - Tuple r = iter.next(); - serializer.serialize(r.getKey(), r.getValue(), out); - if (iter.hasNext()){ - out.write(nl); - } - } + session.write(flowFile, (out) -> { + for (Iterator> iter = rows.iterator(); iter.hasNext();){ + Tuple r = iter.next(); + serializer.serialize(r.getKey(), r.getValue(), out); + if (iter.hasNext()){ + out.write(nl); + } + } }); - + Relationship rel = REL_SUCCESS; IOException error = ioe.get(); if (error != null){ - flowFile = session.putAttribute(flowFile, "scanhbase.error", error.toString()); - rel = REL_FAILURE; + flowFile = session.putAttribute(flowFile, "scanhbase.error", error.toString()); + rel = REL_FAILURE; } - + session.getProvenanceReporter().receive(flowFile, hBaseClientService.toTransitUri(tableName, rowKeyString)); session.transfer(flowFile, rel); rowsPulledHolder.set(0L); rows.clear(); - + // we could potentially have a huge number of rows. If we get to batchSize, go ahead and commit the // session so that we can avoid buffering tons of FlowFiles without ever sending any out. if (getBatchSize()>0 && ffUncommittedCount*bulkSize > getBatchSize()) { - session.commit(); - ffCountHolder.set(0L); + session.commit(); + ffCountHolder.set(0L); }else{ - ffCountHolder.set(ffUncommittedCount++); + ffCountHolder.set(ffUncommittedCount++); } }else{ - rowsPulledHolder.set(rowsPulled); + rowsPulledHolder.set(rowsPulled); } } - + public LinkedList> getHangingRows(){ - return rows; + return rows; } - + public boolean isHandledAny(){ - return handledAny; + return handledAny; } } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 21c827cc7ada..ce87b73a421d 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -18,3 +18,4 @@ org.apache.nifi.hbase.PutHBaseCell org.apache.nifi.hbase.PutHBaseJSON org.apache.nifi.hbase.PutHBaseRecord org.apache.nifi.hbase.FetchHBaseRow +org.apache.nifi.hbase.ScanHBase diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index a549452f93db..beae6276f084 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -16,20 +16,6 @@ */ package org.apache.nifi.hbase; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.hbase.put.PutColumn; -import org.apache.nifi.hbase.put.PutFlowFile; -import org.apache.nifi.hbase.scan.Column; -import org.apache.nifi.hbase.scan.ResultCell; -import org.apache.nifi.hbase.scan.ResultHandler; - - import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -38,7 +24,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.hbase.scan.Column; +import org.apache.nifi.hbase.scan.ResultCell; +import org.apache.nifi.hbase.scan.ResultHandler; public class MockHBaseClientService extends AbstractControllerService implements HBaseClientService { @@ -125,10 +118,10 @@ public void scan(String tableName, Collection columns, String filterExpr numScans++; } - @Override - public void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin, - Long timerangeMax, Integer limitRows, Boolean isReversed, Collection columns, ResultHandler handler) - throws IOException { + @Override + public void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin, + Long timerangeMax, Integer limitRows, Boolean isReversed, Collection columns, ResultHandler handler) + throws IOException { if (throwException) { throw new IOException("exception"); } @@ -138,10 +131,10 @@ public void scan(String tableName, String startRow, String endRow, String filter handler.handle(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue()); } - // delegate to the handler - - numScans++; - } + // delegate to the handler + + numScans++; + } public void addResult(final String rowKey, final Map cells, final long timestamp) { final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8); diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java index 694b34a57199..7c06c4db3b90 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java @@ -116,14 +116,13 @@ public void testResultsNotFound() { runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0); runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1); - + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ScanHBase.REL_ORIGINAL).get(0); flowFile.assertAttributeEquals("scanhbase.results.found", Boolean.FALSE.toString()); - Assert.assertEquals(1, hBaseClientService.getNumScans()); } - + @Test public void testScanToContentWithStringValues() { final Map cells = new HashMap<>(); @@ -154,17 +153,17 @@ public void testScanToContentWithStringValues() { flowFile.assertContentEquals("{\"row\":\"row1\", \"cells\": [" + "{\"fam\":\"nifi\",\"qual\":\"cq1\",\"val\":\"val1\",\"ts\":" + ts1 + "}, " + "{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}\n" - + "{\"row\":\"row2\", \"cells\": [" + + + "{\"row\":\"row2\", \"cells\": [" + "{\"fam\":\"nifi\",\"qual\":\"cq1\",\"val\":\"val1\",\"ts\":" + ts1 + "}, " + "{\"fam\":\"nifi\",\"qual\":\"cq2\",\"val\":\"val2\",\"ts\":" + ts1 + "}]}"); flowFile.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, "2"); - + flowFile = runner.getFlowFilesForRelationship(ScanHBase.REL_ORIGINAL).get(0); flowFile.assertAttributeEquals("scanhbase.results.found", Boolean.TRUE.toString()); Assert.assertEquals(1, hBaseClientService.getNumScans()); } - + @Test public void testScanBulkSize(){ final Map cells = new HashMap<>(); @@ -172,10 +171,9 @@ public void testScanBulkSize(){ cells.put("cq2", "val2"); for (int i = 0; i < 15; i++){ - hBaseClientService.addResult("row"+i, cells, System.currentTimeMillis()); + hBaseClientService.addResult("row"+i, cells, System.currentTimeMillis()); } - - + runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); runner.setProperty(ScanHBase.START_ROW, "${hbase.row}1"); runner.setProperty(ScanHBase.END_ROW, "${hbase.row}2"); @@ -200,10 +198,10 @@ public void testScanBulkSize(){ runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); runner.assertTransferCount(ScanHBase.REL_SUCCESS, 2); runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1); - + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).get(0); flowFile.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, "10"); - + flowFile = runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).get(1); flowFile.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, "5"); } @@ -215,10 +213,9 @@ public void testScanBatchSizeTimesOfBulkSize(){ cells.put("cq2", "val2"); for (int i = 0; i < 1000; i++){ - hBaseClientService.addResult("row"+i, cells, System.currentTimeMillis()); + hBaseClientService.addResult("row"+i, cells, System.currentTimeMillis()); } - - + runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); runner.setProperty(ScanHBase.START_ROW, "${hbase.row}1"); runner.setProperty(ScanHBase.END_ROW, "${hbase.row}2"); @@ -243,11 +240,10 @@ public void testScanBatchSizeTimesOfBulkSize(){ runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); runner.assertTransferCount(ScanHBase.REL_SUCCESS, 10); runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1); - - + runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).forEach(ff ->{ - ff.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, "100"); - Assert.assertNotEquals(0, ff.getId()); // since total amount of rows is a multiplication of bulkSize, original FF (with id=0) shouldn't be present on output. + ff.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, "100"); + Assert.assertNotEquals(0, ff.getId()); // since total amount of rows is a multiplication of bulkSize, original FF (with id=0) shouldn't be present on output. }); } @@ -258,10 +254,9 @@ public void testScanBatchSizeTimesCutBulkSize(){ cells.put("cq2", "val2"); for (int i = 0; i < 1102; i++){ - hBaseClientService.addResult("row"+i, cells, System.currentTimeMillis()); + hBaseClientService.addResult("row"+i, cells, System.currentTimeMillis()); } - - + runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}"); runner.setProperty(ScanHBase.START_ROW, "${hbase.row}1"); runner.setProperty(ScanHBase.END_ROW, "${hbase.row}2"); @@ -286,12 +281,11 @@ public void testScanBatchSizeTimesCutBulkSize(){ runner.assertTransferCount(ScanHBase.REL_FAILURE, 0); runner.assertTransferCount(ScanHBase.REL_SUCCESS, 11); runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1); - - + List ffs = runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS); int i = 0; - for (MockFlowFile ff : ffs) - ff.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, new String (i++ < 10 ? "110" : "2")); //last ff should have only 2 + for (MockFlowFile ff : ffs) + ff.assertAttributeEquals(ScanHBase.HBASE_ROWS_COUNT_ATTR, new String(i++ < 10 ? "110" : "2")); //last ff should have only 2 } @Test diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index e4c43b9afd81..222cedc5674d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -158,7 +158,7 @@ protected ResultScanner getResults(Table table, Collection columns, Filt @Override protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, - final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { + final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { final ResultScanner scanner = Mockito.mock(ResultScanner.class); Mockito.when(scanner.iterator()).thenReturn(results.iterator()); return scanner; From 34dba5a35d3ac1492c7b49e2d62b67027d896318 Mon Sep 17 00:00:00 2001 From: Ed Date: Thu, 1 Feb 2018 15:28:47 -0500 Subject: [PATCH 3/3] NIFI-4833 Formatting fixes --- .../apache/nifi/hbase/HBaseClientService.java | 9 +- .../nifi/hbase/HBase_1_1_2_ClientService.java | 98 +++++++++---------- .../nifi/hbase/MockHBaseClientService.java | 2 +- 3 files changed, 54 insertions(+), 55 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index f27949afe88e..f96e8f4c9c5a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -142,7 +142,7 @@ public interface HBaseClientService extends ControllerService { /** * Scans the given table for the given range of row keys or time rage and passes the result to a handler.
- * + * * @param tableName the name of an HBase table to scan * @param startRow the row identifier to start scanning at * @param endRow the row identifier to end scanning at @@ -150,13 +150,12 @@ public interface HBaseClientService extends ControllerService { * @param timerangeMin the minimum timestamp of cells to return, passed to the HBase scanner timeRange * @param timerangeMax the maximum timestamp of cells to return, passed to the HBase scanner timeRange * @param limitRows the maximum number of rows to be returned by scanner - * @param isReversed whether this scan is a reversed one. + * @param isReversed whether this scan is a reversed one. * @param columns optional columns to return, if not specified all columns are returned - * @param bulkSize number of records per flow file. If total number of records exceed this value, there will be multiple flowfiles created. * @param handler a handler to process rows of the result */ - void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin, Long timerangeMax, Integer limitRows, - Boolean isReversed, Collection columns, ResultHandler handler) throws IOException; + void scan(String tableName, String startRow, String endRow, String filterExpression, Long timerangeMin, Long timerangeMax, Integer limitRows, + Boolean isReversed, Collection columns, ResultHandler handler) throws IOException; /** * Converts the given boolean to it's byte representation. diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index fb090b4e5b6c..f69a94b802ed 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -416,51 +416,51 @@ public void scan(final String tableName, final byte[] startRow, final byte[] end } } } - - @Override - public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, - final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, - final Collection columns, final ResultHandler handler) throws IOException { - - try (final Table table = connection.getTable(TableName.valueOf(tableName)); - final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, - timerangeMax, limitRows, isReversed, columns)) { - - int cnt = 0; - final int lim = limitRows != null ? limitRows : 0; - for (final Result result : scanner) { - - if (lim > 0 && cnt++ > lim) break; - - final byte[] rowKey = result.getRow(); - final Cell[] cells = result.rawCells(); - - if (cells == null) { - continue; - } - - // convert HBase cells to NiFi cells - final ResultCell[] resultCells = new ResultCell[cells.length]; - for (int i = 0; i < cells.length; i++) { - final Cell cell = cells[i]; - final ResultCell resultCell = getResultCell(cell); - resultCells[i] = resultCell; - } - - // delegate to the handler - handler.handle(rowKey, resultCells); - } - } - - } - - // + + @Override + public void scan(final String tableName, final String startRow, final String endRow, String filterExpression, + final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, + final Collection columns, final ResultHandler handler) throws IOException { + + try (final Table table = connection.getTable(TableName.valueOf(tableName)); + final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin, + timerangeMax, limitRows, isReversed, columns)) { + + int cnt = 0; + final int lim = limitRows != null ? limitRows : 0; + for (final Result result : scanner) { + + if (lim > 0 && cnt++ > lim) break; + + final byte[] rowKey = result.getRow(); + final Cell[] cells = result.rawCells(); + + if (cells == null) { + continue; + } + + // convert HBase cells to NiFi cells + final ResultCell[] resultCells = new ResultCell[cells.length]; + for (int i = 0; i < cells.length; i++) { + final Cell cell = cells[i]; + final ResultCell resultCell = getResultCell(cell); + resultCells[i] = resultCell; + } + + // delegate to the handler + handler.handle(rowKey, resultCells); + } + } + + } + + // protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, - final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { + final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException { final Scan scan = new Scan(); if (!StringUtils.isBlank(startRow)) scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8)); - if (!StringUtils.isBlank(endRow)) scan.setStopRow ( endRow.getBytes(StandardCharsets.UTF_8)); - + if (!StringUtils.isBlank(endRow)) scan.setStopRow( endRow.getBytes(StandardCharsets.UTF_8)); + Filter filter = null; if (columns != null) { @@ -476,20 +476,20 @@ protected ResultScanner getResults(final Table table, final String startRow, fin filter = parseFilter.parseFilterString(filterExpression); } if (filter != null) scan.setFilter(filter); - + if (timerangeMin != null && timerangeMax != null) scan.setTimeRange(timerangeMin, timerangeMax); - + // ->>> reserved for HBase v 2 or later //if (limitRows != null && limitRows > 0){ - // scan.setLimit(limitRows) + // scan.setLimit(limitRows) //} - + if (isReversed != null) scan.setReversed(isReversed); - + return table.getScanner(scan); - } + } - // protected and extracted into separate method for testing + // protected and extracted into separate method for testing protected ResultScanner getResults(final Table table, final byte[] startRow, final byte[] endRow, final Collection columns) throws IOException { final Scan scan = new Scan(); scan.setStartRow(startRow); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index 222cedc5674d..e4b9280f6445 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -155,7 +155,7 @@ protected ResultScanner getResults(Table table, Collection columns, Filt Mockito.when(scanner.iterator()).thenReturn(results.iterator()); return scanner; } - + @Override protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed, final Collection columns) throws IOException {