From 3ada739dc69aa60d344f8ba90066a019da25685c Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Thu, 11 May 2017 12:56:06 -0400 Subject: [PATCH] NIFI-3867: Add Expression Language support to HiveConnectionPool properties --- .../nifi/dbcp/hive/HiveConnectionPool.java | 28 ++++++----- .../org/apache/nifi/util/hive/HiveUtils.java | 3 ++ .../dbcp/hive/HiveConnectionPoolTest.java | 49 ++++++++++++++++++- 3 files changed, 68 insertions(+), 12 deletions(-) diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java index 64f302774971..c3724c3c0fff 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java @@ -75,6 +75,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv .defaultValue(null) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(true) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() @@ -83,7 +84,10 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop " + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication " + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.") - .required(false).addValidator(HiveUtils.createMultipleFilesExistValidator()).build(); + .required(false) + .addValidator(HiveUtils.createMultipleFilesExistValidator()) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder() .name("hive-db-user") @@ -91,6 +95,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv .description("Database user name") .defaultValue(null) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder() @@ -101,6 +106,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv .required(false) .sensitive(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder() @@ -111,7 +117,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv .defaultValue("500 millis") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .sensitive(false) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder() @@ -122,7 +128,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv .defaultValue("8") .required(true) .addValidator(StandardValidators.INTEGER_VALIDATOR) - .sensitive(false) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder() @@ -183,7 +189,7 @@ protected Collection customValidate(ValidationContext validati final List problems = new ArrayList<>(); if (confFileProvided) { - final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue(); + final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); final String keyTab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); problems.addAll(hiveConfigurator.validate(configFiles, principal, keyTab, validationResourceHolder, getLogger())); @@ -211,7 +217,7 @@ public void onConfigured(final ConfigurationContext context) throws Initializati ComponentLog log = getLogger(); - final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue(); + final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles); final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue(); @@ -219,7 +225,7 @@ public void onConfigured(final ConfigurationContext context) throws Initializati for (final Map.Entry entry : context.getProperties().entrySet()) { final PropertyDescriptor descriptor = entry.getKey(); if (descriptor.isDynamic()) { - hiveConfig.set(descriptor.getName(), entry.getValue()); + hiveConfig.set(descriptor.getName(), context.getProperty(descriptor).evaluateAttributeExpressions().getValue()); } } @@ -237,15 +243,15 @@ public void onConfigured(final ConfigurationContext context) throws Initializati getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab}); } - final String user = context.getProperty(DB_USER).getValue(); - final String passw = context.getProperty(DB_PASSWORD).getValue(); - final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS); - final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger(); + final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); + final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); + final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); + final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger(); dataSource = new BasicDataSource(); dataSource.setDriverClassName(drv); - final String dburl = context.getProperty(DATABASE_URL).getValue(); + final String dburl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue(); dataSource.setMaxWait(maxWaitMillis); dataSource.setMaxActive(maxTotal); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java index 3e375f912148..2dc67f702f5d 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java @@ -57,6 +57,9 @@ public static void logAllHiveEndPoints(Map allWriters) */ public static Validator createMultipleFilesExistValidator() { return (subject, input, context) -> { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { + return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); + } final String[] files = input.split("\\s*,\\s*"); for (String filename : files) { try { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java index 0b5cd8f64a53..79bcb7afd7fe 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java @@ -19,9 +19,13 @@ import org.apache.commons.dbcp.BasicDataSource; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.registry.VariableDescriptor; +import org.apache.nifi.util.MockConfigurationContext; +import org.apache.nifi.util.MockVariableRegistry; import org.junit.Before; import org.junit.Test; @@ -30,6 +34,8 @@ import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.isA; @@ -51,7 +57,7 @@ public void setup() throws Exception { when(userGroupInformation.doAs(isA(PrivilegedExceptionAction.class))).thenAnswer(invocation -> { try { return ((PrivilegedExceptionAction) invocation.getArguments()[0]).run(); - } catch (IOException |Error|RuntimeException|InterruptedException e) { + } catch (IOException | Error | RuntimeException | InterruptedException e) { throw e; } catch (Throwable e) { throw new UndeclaredThrowableException(e); @@ -87,4 +93,45 @@ public void testGetConnectionSqlException() throws SQLException { throw e; } } + + @Test + public void testExpressionLanguageSupport() throws Exception { + final String URL = "jdbc:hive2://localhost:10000/default"; + final String USER = "user"; + final String PASS = "pass"; + final int MAX_CONN = 7; + final String MAX_WAIT = "10 sec"; // 10000 milliseconds + final String CONF = "/path/to/hive-site.xml"; + hiveConnectionPool = new HiveConnectionPool(); + + Map props = new HashMap() {{ + put(HiveConnectionPool.DATABASE_URL, "${url}"); + put(HiveConnectionPool.DB_USER, "${username}"); + put(HiveConnectionPool.DB_PASSWORD, "${password}"); + put(HiveConnectionPool.MAX_TOTAL_CONNECTIONS, "${maxconn}"); + put(HiveConnectionPool.MAX_WAIT_TIME, "${maxwait}"); + put(HiveConnectionPool.HIVE_CONFIGURATION_RESOURCES, "${hiveconf}"); + }}; + + MockVariableRegistry registry = new MockVariableRegistry(); + registry.setVariable(new VariableDescriptor("url"), URL); + registry.setVariable(new VariableDescriptor("username"), USER); + registry.setVariable(new VariableDescriptor("password"), PASS); + registry.setVariable(new VariableDescriptor("maxconn"), Integer.toString(MAX_CONN)); + registry.setVariable(new VariableDescriptor("maxwait"), MAX_WAIT); + registry.setVariable(new VariableDescriptor("hiveconf"), CONF); + + + MockConfigurationContext context = new MockConfigurationContext(props, null, registry); + hiveConnectionPool.onConfigured(context); + + Field dataSourceField = HiveConnectionPool.class.getDeclaredField("dataSource"); + dataSourceField.setAccessible(true); + basicDataSource = (BasicDataSource) dataSourceField.get(hiveConnectionPool); + assertEquals(URL, basicDataSource.getUrl()); + assertEquals(USER, basicDataSource.getUsername()); + assertEquals(PASS, basicDataSource.getPassword()); + assertEquals(MAX_CONN, basicDataSource.getMaxActive()); + assertEquals(10000L, basicDataSource.getMaxWait()); + } }