From 84876b40a3b4efc1d613db74c89734ad31180617 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Thu, 2 Aug 2018 21:39:48 -0400 Subject: [PATCH 1/2] NIFI-5484: Fixed PutHive3Streaming to use the Hive Metastore URI property (to include multiple URIs) --- .../processor/util/StandardValidators.java | 14 ++++++++ .../processors/hive/PutHive3Streaming.java | 29 ++++++++++----- .../hive/TestPutHive3Streaming.java | 36 +++++++++++++++++++ 3 files changed, 71 insertions(+), 8 deletions(-) diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java index 0ea7c1769504..c31e72ad035b 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -36,6 +36,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -390,6 +391,19 @@ public ValidationResult validate(final String subject, final String input, final } }; + public static final Validator URI_LIST_VALIDATOR = (subject, input, context) -> { + + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { + return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); + } + Optional invalidUri = Arrays.stream(input.split(",")) + .filter(uri -> uri != null && !uri.trim().isEmpty()) + .map(String::trim) + .map((uri) -> StandardValidators.URI_VALIDATOR.validate(subject,uri,context)).filter((uri) -> !uri.isValid()).findFirst(); + + return invalidUri.orElseGet(() -> new ValidationResult.Builder().subject(subject).input(input).explanation("Valid URI(s)").valid(true).build()); + }; + public static final Validator REGULAR_EXPRESSION_VALIDATOR = createRegexValidator(0, Integer.MAX_VALUE, false); public static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = new Validator() { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java index 664915c380d4..6e87771f6a75 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.hive.streaming.ConnectionError; @@ -81,7 +82,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES; @@ -115,12 +115,12 @@ public class PutHive3Streaming extends AbstractProcessor { static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder() .name("hive3-stream-metastore-uri") .displayName("Hive Metastore URI") - .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the " - + "Hive metastore is 9043.") - .required(true) + .description("The URI location(s) for the Hive metastore. This is a comma-separated list of Hive metastore URIs; note that this is not the location of the Hive Server. " + + "The default port for the Hive metastore is 9043. If this field is not set, then the 'hive.metastore.uris' property from any provided configuration resources " + + "will be used, and if none are provided, then the default value from a default hive-site.xml will be used (usually thrift://localhost:9083).") + .required(false) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.URI_VALIDATOR) - .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with / + .addValidator(StandardValidators.URI_LIST_VALIDATOR) .build(); static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() @@ -354,13 +354,26 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); final ComponentLog log = getLogger(); - final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue(); + String metastoreURIs = null; + if (context.getProperty(METASTORE_URI).isSet()) { + metastoreURIs = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue(); + if (StringUtils.isEmpty(metastoreURIs)) { + // Shouldn't be empty at this point, log an error, penalize the flow file, and return + log.error("The '" + METASTORE_URI.getDisplayName() + "' property evaluated to null or empty, penalizing flow file, routing to failure"); + session.transfer(session.penalize(flowFile), REL_FAILURE); + } + } final String partitionValuesString = context.getProperty(PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue(); final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean(); final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean(); - HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName) + // Override the Hive Metastore URIs in the config if set by the user + if (metastoreURIs != null) { + hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs); + } + + HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName) .withHiveConf(hiveConfig) .withAutoCreatePartitions(autoCreatePartitions) .withCallTimeout(callTimeout) diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java index 6a657839d0ef..cfc6017521d5 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde.serdeConstants; @@ -288,6 +289,35 @@ public void onTrigger() throws Exception { assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); } + @Test + public void onTriggerMultipleURIs() throws Exception { + configure(processor, 1); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://host1:9083,thrift://host2:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0); + assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); + } + + @Test + public void onTriggerURIFromConfigFile() throws Exception { + configure(processor, 1); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0); + assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); + } + @Test public void onTriggerComplex() throws Exception { configureComplex(processor, 10, -1, null); @@ -662,6 +692,12 @@ private class MockPutHive3Streaming extends PutHive3Streaming { @Override StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException { + // Test here to ensure the 'hive.metastore.uris' property matches the options.getMetastoreUri() value (if it is set) + String userDefinedMetastoreURI = options.getMetaStoreURI(); + if (null != userDefinedMetastoreURI) { + assertEquals(userDefinedMetastoreURI, options.getHiveConf().get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName())); + } + if (generateConnectFailure) { throw new StubConnectionError("Unit Test - Connection Error"); } From 880c9ad6184e323ac32ebdd6f9169a72107ef788 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Fri, 3 Aug 2018 10:22:38 -0400 Subject: [PATCH 2/2] NIFI-5484: Incorporated review comments, added unit test for new validator --- .../processor/util/StandardValidators.java | 5 +++++ .../validator/TestStandardValidators.java | 20 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java index c31e72ad035b..51e41db0c318 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -396,6 +396,11 @@ public ValidationResult validate(final String subject, final String input, final if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); } + + if (input == null || input.isEmpty()) { + return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid URI, value is missing or empty").valid(false).build(); + } + Optional invalidUri = Arrays.stream(input.split(",")) .filter(uri -> uri != null && !uri.trim().isEmpty()) .map(String::trim) diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java index ffebb9d86345..f02946b74c3d 100644 --- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java @@ -288,4 +288,24 @@ public void testiso8061InstantValidator() { vr = val.validate("foo", "2016-01-01T01:01:01.000Z", vc); assertTrue(vr.isValid()); } + + @Test + public void testURIListValidator() { + Validator val = StandardValidators.URI_LIST_VALIDATOR; + ValidationContext vc = mock(ValidationContext.class); + ValidationResult vr = val.validate("foo", null, vc); + assertFalse(vr.isValid()); + + vr = val.validate("foo", "", vc); + assertFalse(vr.isValid()); + + vr = val.validate("foo", "/no_scheme", vc); + assertTrue(vr.isValid()); + + vr = val.validate("foo", "http://localhost 8080, https://host2:8080 ", vc); + assertFalse(vr.isValid()); + + vr = val.validate("foo", "http://localhost , https://host2:8080 ", vc); + assertTrue(vr.isValid()); + } }