From 2ba56d85e7418667a2166e875ad4c2054983cf2f Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Thu, 17 Mar 2016 17:05:30 -0400 Subject: [PATCH] NIFI-1488 Refactoring HBase Kerberos support - Storing UGI so we can support multiple HBaseClientServices with different configs - Creating nifi-hadoop-utils to hold utility code shared between HDFS and HBase processors - Incorporating KerberosProperties into existing hadoop processors --- nifi-commons/nifi-hadoop-utils/pom.xml | 60 +++++++ .../nifi/hadoop/KerberosProperties.java | 146 ++++++++++++++++++ .../nifi/hadoop/KerberosTicketRenewer.java | 81 ++++++++++ .../org/apache/nifi/hadoop/SecurityUtil.java | 114 ++++++++++++++ .../nifi/hadoop/TestKerberosProperties.java | 96 ++++++++++++ .../src/test/resources/krb5.conf | 12 ++ .../processor/util/StandardValidators.java | 4 - .../util/TestStandardValidators.java | 20 --- nifi-commons/pom.xml | 1 + .../nifi-hdfs-processors/pom.xml | 4 + .../hadoop/AbstractHadoopProcessor.java | 144 ++++++++--------- .../hadoop/CreateHadoopSequenceFile.java | 23 ++- .../nifi/processors/hadoop/FetchHDFS.java | 28 ++-- ...wFileStreamUnpackerSequenceFileWriter.java | 19 ++- .../nifi/processors/hadoop/GetHDFS.java | 63 ++++---- .../hadoop/GetHDFSSequenceFile.java | 24 ++- .../processors/hadoop/KeyValueReader.java | 16 +- .../nifi/processors/hadoop/ListHDFS.java | 40 +++-- .../nifi/processors/hadoop/PutHDFS.java | 43 +++--- .../hadoop/SequenceFileWriterImpl.java | 29 ++-- .../hadoop/TarUnpackerSequenceFileWriter.java | 13 +- .../nifi/processors/hadoop/ValueReader.java | 14 +- .../hadoop/ZipUnpackerSequenceFileWriter.java | 15 +- .../processors/hadoop/util/HDFSListing.java | 9 +- .../hadoop/util/InputStreamWritable.java | 4 +- .../processors/hadoop/util/LongSerDe.java | 10 +- .../hadoop/util/OutputStreamWritable.java | 11 +- .../hadoop/util/SequenceFileReader.java | 4 +- .../hadoop/util/SequenceFileWriter.java | 5 +- .../processors/hadoop/util/StringSerDe.java | 8 +- .../processors/hadoop/AbstractHadoopTest.java | 91 +++++++---- .../nifi/processors/hadoop/GetHDFSTest.java | 65 ++++++-- .../nifi/processors/hadoop/PutHDFSTest.java | 84 +++++++--- .../hadoop/SimpleHadoopProcessor.java | 13 +- .../hadoop/TestCreateHadoopSequenceFile.java | 55 +++++-- .../nifi/processors/hadoop/TestListHDFS.java | 52 +++++-- .../src/test/resources/core-site-broken.xml | 2 +- .../src/test/resources/core-site-security.xml | 30 ++++ .../src/test/resources/core-site.xml | 2 +- .../java/org/apache/nifi/hbase/GetHBase.java | 10 +- .../apache/nifi/hbase/HBaseClientService.java | 12 -- .../nifi-hbase_1_1_2-client-service/pom.xml | 5 +- .../nifi/hbase/HBase_1_1_2_ClientService.java | 114 +++++++++----- .../hbase/TestHBase_1_1_2_ClientService.java | 94 ++++++++--- .../src/test/resources/core-site-security.xml | 24 +-- .../src/test/resources/core-site.xml | 8 +- .../test/resources/hbase-site-security.xml | 30 ++++ .../src/test/resources/hbase-site.xml | 22 +++ .../src/test/resources/krb5.conf | 0 .../pom.xml | 1 + pom.xml | 5 + 51 files changed, 1274 insertions(+), 505 deletions(-) create mode 100644 nifi-commons/nifi-hadoop-utils/pom.xml create mode 100644 nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java create mode 100644 nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java create mode 100644 nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java create mode 100644 nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java create mode 100644 nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-security.xml create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site-security.xml create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site.xml create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/krb5.conf diff --git a/nifi-commons/nifi-hadoop-utils/pom.xml b/nifi-commons/nifi-hadoop-utils/pom.xml new file mode 100644 index 000000000000..d0177cea575e --- /dev/null +++ b/nifi-commons/nifi-hadoop-utils/pom.xml @@ -0,0 +1,60 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-commons + 0.6.0-SNAPSHOT + + nifi-hadoop-utils + 0.6.0-SNAPSHOT + jar + + + + org.apache.nifi + nifi-api + provided + + + org.apache.nifi + nifi-processor-utils + + + org.apache.nifi + nifi-properties + + + org.apache.hadoop + hadoop-common + + + + + + + org.apache.rat + apache-rat-plugin + + + src/test/resources/krb5.conf + + + + + + diff --git a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java new file mode 100644 index 000000000000..5e8fb7dd889f --- /dev/null +++ b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StringUtils; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +/** + * All processors and controller services that need properties for Kerberos Principal and Keytab + * should obtain them through this class by calling: + * + * KerberosProperties props = KerberosProperties.create(NiFiProperties.getInstance()) + * + * The properties can be accessed from the resulting KerberosProperties instance. + */ +public class KerberosProperties { + + private final File kerberosConfigFile; + private final Validator kerberosConfigValidator; + private final PropertyDescriptor kerberosPrincipal; + private final PropertyDescriptor kerberosKeytab; + + private KerberosProperties(final File kerberosConfigFile) { + this.kerberosConfigFile = kerberosConfigFile; + + if (this.kerberosConfigFile != null) { + System.setProperty("java.security.krb5.conf", kerberosConfigFile.getAbsolutePath()); + } + + this.kerberosConfigValidator = new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + // Check that the Kerberos configuration is set + if (kerberosConfigFile == null) { + return new ValidationResult.Builder() + .subject(subject).input(input).valid(false) + .explanation("you are missing the nifi.kerberos.krb5.file property which " + + "must be set in order to use Kerberos") + .build(); + } + + // Check that the Kerberos configuration is readable + if (!kerberosConfigFile.canRead()) { + return new ValidationResult.Builder().subject(subject).input(input).valid(false) + .explanation(String.format("unable to read Kerberos config [%s], please make sure the path is valid " + + "and nifi has adequate permissions", kerberosConfigFile.getAbsoluteFile())) + .build(); + } + + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } + }; + + this.kerberosPrincipal = new PropertyDescriptor.Builder() + .name("Kerberos Principal") + .required(false) + .description("Kerberos principal to authenticate as. Requires nifi.kerberos.krb5.file to be set in your nifi.properties") + .addValidator(kerberosConfigValidator) + .build(); + + this.kerberosKeytab = new PropertyDescriptor.Builder() + .name("Kerberos Keytab").required(false) + .description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set in your nifi.properties") + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .addValidator(kerberosConfigValidator) + .build(); + } + + public static KerberosProperties create(final NiFiProperties niFiProperties) { + if (niFiProperties == null) { + throw new IllegalArgumentException("NiFiProperties can not be null"); + } + return new KerberosProperties(niFiProperties.getKerberosConfigurationFile()); + } + + public File getKerberosConfigFile() { + return kerberosConfigFile; + } + + public Validator getKerberosConfigValidator() { + return kerberosConfigValidator; + } + + public PropertyDescriptor getKerberosPrincipal() { + return kerberosPrincipal; + } + + public PropertyDescriptor getKerberosKeytab() { + return kerberosKeytab; + } + + public static List validatePrincipalAndKeytab(final String subject, final Configuration config, final String principal, final String keytab, final ComponentLog logger) { + final List results = new ArrayList<>(); + + // if security is enabled then the keytab and principal are required + final boolean isSecurityEnabled = SecurityUtil.isSecurityEnabled(config); + + if (isSecurityEnabled && StringUtils.isBlank(principal)) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(subject) + .explanation("Kerberos Principal must be provided when using a secure configuration") + .build()); + } + + if (isSecurityEnabled && StringUtils.isBlank(keytab)) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(subject) + .explanation("Kerberos Keytab must be provided when using a secure configuration") + .build()); + } + + if (!isSecurityEnabled && (!StringUtils.isBlank(principal) || !StringUtils.isBlank(keytab))) { + logger.warn("Configuration does not have security enabled, Keytab and Principal will be ignored"); + } + + return results; + } + +} diff --git a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java new file mode 100644 index 000000000000..d451535969b7 --- /dev/null +++ b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.logging.ComponentLog; + +import java.io.IOException; + +/** + * Periodically attempts to renew the Kerberos user's ticket for the given UGI. + * + * This class will attempt to call ugi.checkTGTAndReloginFromKeytab() which + * will re-login the user if the ticket expired or is close to expiry. Between + * relogin attempts this thread will sleep for the provided amount of time. + * + */ +public class KerberosTicketRenewer implements Runnable { + + private final UserGroupInformation ugi; + private final long renewalPeriod; + private final ComponentLog logger; + + private volatile boolean stopped = false; + + /** + * @param ugi + * the user to renew the ticket for + * @param renewalPeriod + * the amount of time in milliseconds to wait between renewal attempts + * @param logger + * the logger from the component that started the renewer + */ + public KerberosTicketRenewer(final UserGroupInformation ugi, final long renewalPeriod, final ComponentLog logger) { + this.ugi = ugi; + this.renewalPeriod = renewalPeriod; + this.logger = logger; + } + + @Override + public void run() { + stopped = false; + while (!stopped) { + try { + logger.debug("Invoking renewal attempt for Kerberos ticket"); + // While we run this "frequently", the Hadoop implementation will only perform the login at 80% of ticket lifetime. + ugi.checkTGTAndReloginFromKeytab(); + } catch (IOException e) { + logger.error("Failed to renew Kerberos ticket", e); + } + + // Wait for a bit before checking again. + try { + Thread.sleep(renewalPeriod); + } catch (InterruptedException e) { + logger.error("Renewal thread interrupted", e); + Thread.currentThread().interrupt(); + return; + } + } + } + + public void stop() { + stopped = true; + } + +} diff --git a/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java new file mode 100644 index 000000000000..74197efa80ca --- /dev/null +++ b/nifi-commons/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.commons.lang3.Validate; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.logging.ComponentLog; + +import java.io.IOException; + +/** + * Provides synchronized access to UserGroupInformation to avoid multiple processors/services from + * interfering with each other. + */ +public class SecurityUtil { + + /** + * Initializes UserGroupInformation with the given Configuration and performs the login for the given principal + * and keytab. All logins should happen through this class to ensure other threads are not concurrently modifying + * UserGroupInformation. + * + * @param config the configuration instance + * @param principal the principal to authenticate as + * @param keyTab the keytab to authenticate with + * + * @return the UGI for the given principal + * + * @throws IOException if login failed + */ + public static synchronized UserGroupInformation loginKerberos(final Configuration config, final String principal, final String keyTab) + throws IOException { + Validate.notNull(config); + Validate.notNull(principal); + Validate.notNull(keyTab); + + UserGroupInformation.setConfiguration(config); + return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal.trim(), keyTab.trim()); + } + + /** + * Initializes UserGroupInformation with the given Configuration and returns UserGroupInformation.getLoginUser(). + * All logins should happen through this class to ensure other threads are not concurrently modifying + * UserGroupInformation. + * + * @param config the configuration instance + * + * @return the UGI for the given principal + * + * @throws IOException if login failed + */ + public static synchronized UserGroupInformation loginSimple(final Configuration config) throws IOException { + Validate.notNull(config); + UserGroupInformation.setConfiguration(config); + return UserGroupInformation.getLoginUser(); + } + + /** + * Initializes UserGroupInformation with the given Configuration and returns UserGroupInformation.isSecurityEnabled(). + * + * All checks for isSecurityEnabled() should happen through this method. + * + * @param config the given configuration + * + * @return true if kerberos is enabled on the given configuration, false otherwise + * + */ + public static synchronized boolean isSecurityEnabled(final Configuration config) { + Validate.notNull(config); + return "kerberos".equalsIgnoreCase(config.get("hadoop.security.authentication")); + } + + /** + * Start a thread that periodically attempts to renew the current Kerberos user's ticket. + * + * Callers of this method should store the reference to the KerberosTicketRenewer and call stop() to stop the thread. + * + * @param id + * The unique identifier to use for the thread, can be the class name that started the thread + * (i.e. PutHDFS, etc) + * @param ugi + * The current Kerberos user. + * @param renewalPeriod + * The amount of time between attempting renewals. + * @param logger + * The logger to use with in the renewer + * + * @return the KerberosTicketRenewer Runnable + */ + public static KerberosTicketRenewer startTicketRenewalThread(final String id, final UserGroupInformation ugi, final long renewalPeriod, final ComponentLog logger) { + final KerberosTicketRenewer renewer = new KerberosTicketRenewer(ugi, renewalPeriod, logger); + + final Thread t = new Thread(renewer); + t.setName("Kerberos Ticket Renewal [" + id + "]"); + t.start(); + + return renewer; + } + +} diff --git a/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java b/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java new file mode 100644 index 000000000000..131fe656e903 --- /dev/null +++ b/nifi-commons/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.util.NiFiProperties; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.util.List; + +import static org.mockito.Mockito.when; + +public class TestKerberosProperties { + + @Test + public void testWithKerberosConfigFile() { + final File file = new File("src/test/resources/krb5.conf"); + final NiFiProperties niFiProperties = Mockito.mock(NiFiProperties.class); + when(niFiProperties.getKerberosConfigurationFile()).thenReturn(file); + + final KerberosProperties kerberosProperties = KerberosProperties.create(niFiProperties); + Assert.assertNotNull(kerberosProperties); + + Assert.assertNotNull(kerberosProperties.getKerberosConfigFile()); + Assert.assertNotNull(kerberosProperties.getKerberosConfigValidator()); + Assert.assertNotNull(kerberosProperties.getKerberosPrincipal()); + Assert.assertNotNull(kerberosProperties.getKerberosKeytab()); + + final ValidationResult result = kerberosProperties.getKerberosConfigValidator().validate("test", "principal", null); + Assert.assertTrue(result.isValid()); + } + + @Test + public void testWithoutKerberosConfigFile() { + final NiFiProperties niFiProperties = Mockito.mock(NiFiProperties.class); + when(niFiProperties.getKerberosConfigurationFile()).thenReturn(null); + + final KerberosProperties kerberosProperties = KerberosProperties.create(niFiProperties); + Assert.assertNotNull(kerberosProperties); + + Assert.assertNull(kerberosProperties.getKerberosConfigFile()); + Assert.assertNotNull(kerberosProperties.getKerberosConfigValidator()); + Assert.assertNotNull(kerberosProperties.getKerberosPrincipal()); + Assert.assertNotNull(kerberosProperties.getKerberosKeytab()); + + final ValidationResult result = kerberosProperties.getKerberosConfigValidator().validate("test", "principal", null); + Assert.assertFalse(result.isValid()); + } + + @Test + public void testValidatePrincipalAndKeytab() { + final ComponentLog log = Mockito.mock(ComponentLog.class); + final Configuration config = new Configuration(); + + // no security enabled in config so doesn't matter what principal and keytab are + List results = KerberosProperties.validatePrincipalAndKeytab( + "test", config, null, null, log); + Assert.assertEquals(0, results.size()); + + results = KerberosProperties.validatePrincipalAndKeytab( + "test", config, "principal", null, log); + Assert.assertEquals(0, results.size()); + + results = KerberosProperties.validatePrincipalAndKeytab( + "test", config, "principal", "keytab", log); + Assert.assertEquals(0, results.size()); + + // change the config to have kerberos turned on + config.set("hadoop.security.authentication", "kerberos"); + config.set("hadoop.security.authorization", "true"); + + results = KerberosProperties.validatePrincipalAndKeytab( + "test", config, null, null, log); + Assert.assertEquals(2, results.size()); + } + +} diff --git a/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf b/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf new file mode 100644 index 000000000000..814d5b2fb8f6 --- /dev/null +++ b/nifi-commons/nifi-hadoop-utils/src/test/resources/krb5.conf @@ -0,0 +1,12 @@ +[libdefaults] + default_realm = EXAMPLE.COM + +[realms] + EXAMPLE.COM = { + kdc = kdc1.example.com + kdc = kdc2.example.com + admin_server = kdc1.example.com + } + +[domain_realm] + .example.com = EXAMPLE.COM \ No newline at end of file diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java index c29b1d2d5e99..8255781d3a79 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -333,10 +333,6 @@ public ValidationResult validate(final String subject, final String input, final public static final Validator FILE_EXISTS_VALIDATOR = new FileExistsValidator(true); - private static final String PRINCIPAL_CHAR_CLASS = "[A-Za-z0-9\\\\\\/\\.@]"; - - public static final Validator KERB_PRINC_VALIDATOR = createRegexMatchingValidator(Pattern.compile(PRINCIPAL_CHAR_CLASS + "+" + - "@" + PRINCIPAL_CHAR_CLASS + "+")); // // // FACTORY METHODS FOR VALIDATORS diff --git a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java index 8f2590abee09..13d2f4f7cc0e 100644 --- a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java +++ b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java @@ -84,25 +84,5 @@ public void testDataSizeBoundsValidator() { vr = val.validate("DataSizeBounds", "water", validationContext); assertFalse(vr.isValid()); - - } - - @Test - public void testKerbPrincipalValidator() { - Validator val = StandardValidators.KERB_PRINC_VALIDATOR; - ValidationResult vr; - - final ValidationContext validationContext = Mockito.mock(ValidationContext.class); - vr = val.validate("Kerberos Principal","jon@CDH.PROD", validationContext); - assertTrue(vr.isValid()); - - vr = val.validate("Kerberos Principal","jon@CDH", validationContext); - assertTrue(vr.isValid()); - - vr = val.validate("kerberos-principal","service/nifi@PROD", validationContext); - assertTrue(vr.isValid()); - - vr = val.validate("keberos-principal", "joewitt", validationContext); - assertFalse(vr.isValid()); } } diff --git a/nifi-commons/pom.xml b/nifi-commons/pom.xml index 9d1448d27784..9a4542d916ba 100644 --- a/nifi-commons/pom.xml +++ b/nifi-commons/pom.xml @@ -37,5 +37,6 @@ nifi-write-ahead-log nifi-site-to-site-client nifi-hl7-query-language + nifi-hadoop-utils diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml index 39b1adbb085e..00b3b04ba955 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml @@ -34,6 +34,10 @@ org.apache.nifi nifi-processor-utils + + org.apache.nifi + nifi-hadoop-utils + org.apache.nifi nifi-flowfile-packager diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index b53699633060..cd9683bc7cc7 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -16,20 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.URI; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import javax.net.SocketFactory; - import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -48,12 +34,29 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StringUtils; + +import javax.net.SocketFactory; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URI; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * This is a base class that is helpful when building processors interacting with HDFS. @@ -86,82 +89,28 @@ public String toString() { } } - - private static final Validator KERBEROS_CONFIG_VALIDATOR = new Validator() { - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - // Check that both the principal & keytab are set before checking the kerberos config - if (context.getProperty(KERBEROS_KEYTAB).getValue() == null || context.getProperty(KERBEROS_PRINCIPAL).getValue() == null) { - return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("both keytab and principal must be set in order to use Kerberos authentication").build(); - } - - // Check that the Kerberos configuration is set - if (NIFI_PROPERTIES.getKerberosConfigurationFile() == null) { - return new ValidationResult.Builder().subject(subject).input(input).valid(false) - .explanation("you are missing the nifi.kerberos.krb5.file property in nifi.properties. " + "This must be set in order to use Kerberos").build(); - } - - // Check that the Kerberos configuration is readable - if (!NIFI_PROPERTIES.getKerberosConfigurationFile().canRead()) { - return new ValidationResult.Builder().subject(subject).input(input).valid(false) - .explanation(String.format("unable to read Kerberos config [%s], please make sure the path is valid " + "and nifi has adequate permissions", - NIFI_PROPERTIES.getKerberosConfigurationFile().getAbsoluteFile())) - .build(); - } - return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); - } - }; - // properties public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder().name("Hadoop Configuration Resources") .description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop " + "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.") .required(false).addValidator(createMultipleFilesExistValidator()).build(); - public static NiFiProperties NIFI_PROPERTIES = null; - public static final String DIRECTORY_PROP_NAME = "Directory"; public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("Compression codec").required(true) .allowableValues(CompressionType.values()).defaultValue(CompressionType.NONE.toString()).build(); - public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder().name("Kerberos Principal").required(false) - .description("Kerberos principal to authenticate as. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID) - .addValidator(KERBEROS_CONFIG_VALIDATOR).build(); - - public static final PropertyDescriptor KERBEROS_KEYTAB = new PropertyDescriptor.Builder().name("Kerberos Keytab").required(false) - .description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build(); - public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false) .description("Period of time which should pass before attempting a kerberos relogin").defaultValue("4 hours") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - protected static final List properties; - private static final Object RESOURCES_LOCK = new Object(); private long kerberosReloginThreshold; private long lastKerberosReloginTime; - - static { - List props = new ArrayList<>(); - props.add(HADOOP_CONFIGURATION_RESOURCES); - props.add(KERBEROS_PRINCIPAL); - props.add(KERBEROS_KEYTAB); - props.add(KERBEROS_RELOGIN_PERIOD); - properties = Collections.unmodifiableList(props); - try { - NIFI_PROPERTIES = NiFiProperties.getInstance(); - } catch (Exception e) { - // This will happen during tests - NIFI_PROPERTIES = null; - } - if (NIFI_PROPERTIES != null && NIFI_PROPERTIES.getKerberosConfigurationFile() != null) { - System.setProperty("java.security.krb5.conf", NIFI_PROPERTIES.getKerberosConfigurationFile().getAbsolutePath()); - } - } + protected KerberosProperties kerberosProperties; + protected List properties; // variables shared by all threads of this processor // Hadoop Configuration, Filesystem, and UserGroupInformation (optional) @@ -170,6 +119,18 @@ public ValidationResult validate(String subject, String input, ValidationContext @Override protected void init(ProcessorInitializationContext context) { hdfsResources.set(new HdfsResources(null, null, null)); + kerberosProperties = getKerberosProperties(); + + List props = new ArrayList<>(); + props.add(HADOOP_CONFIGURATION_RESOURCES); + props.add(kerberosProperties.getKerberosPrincipal()); + props.add(kerberosProperties.getKerberosKeytab()); + props.add(KERBEROS_RELOGIN_PERIOD); + properties = Collections.unmodifiableList(props); + } + + protected KerberosProperties getKerberosProperties() { + return KerberosProperties.create(NiFiProperties.getInstance()); } @Override @@ -177,9 +138,36 @@ protected List getSupportedPropertyDescriptors() { return properties; } + @Override + protected Collection customValidate(ValidationContext validationContext) { + final String configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); + final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + + final List results = new ArrayList<>(); + + if (!StringUtils.isBlank(configResources)) { + Configuration conf = null; + try { + conf = getConfigurationFromResources(configResources); + + results.addAll(KerberosProperties.validatePrincipalAndKeytab( + this.getClass().getSimpleName(), conf, principal, keytab, getLogger())); + } catch (IOException e) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(this.getClass().getSimpleName()) + .explanation("Could not load Hadoop Configuration resources") + .build()); + } + } + + return results; + } + /* - * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context) - */ + * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context) + */ @OnScheduled public final void abstractOnScheduled(ProcessContext context) throws IOException { try { @@ -261,18 +249,16 @@ HdfsResources resetHDFSResources(String configResources, String dir, ProcessCont FileSystem fs; UserGroupInformation ugi; synchronized (RESOURCES_LOCK) { - if (config.get("hadoop.security.authentication").equalsIgnoreCase("kerberos")) { - String principal = context.getProperty(KERBEROS_PRINCIPAL).getValue(); - String keyTab = context.getProperty(KERBEROS_KEYTAB).getValue(); - UserGroupInformation.setConfiguration(config); - ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab); + if (SecurityUtil.isSecurityEnabled(config)) { + String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + ugi = SecurityUtil.loginKerberos(config, principal, keyTab); fs = getFileSystemAsUser(config, ugi); lastKerberosReloginTime = System.currentTimeMillis() / 1000; } else { config.set("ipc.client.fallback-to-simple-auth-allowed", "true"); config.set("hadoop.security.authentication", "simple"); - UserGroupInformation.setConfiguration(config); - ugi = UserGroupInformation.getLoginUser(); + ugi = SecurityUtil.loginSimple(config); fs = getFileSystemAsUser(config, ugi); } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java index 385ac7394555..4b8f87e3835c 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java @@ -16,12 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - import org.apache.hadoop.io.SequenceFile; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -38,6 +32,12 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.hadoop.util.SequenceFileWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + /** *

* This processor is used to create a Hadoop Sequence File, which essentially is a file of key/value pairs. The key will be a file name and the value will be the flow file content. The processor will @@ -93,13 +93,6 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor { .allowableValues(CompressionType.values()) .build(); - private static final List props; - - static { - List someProps = new ArrayList<>(properties); - someProps.add(COMPRESSION_TYPE); - props = Collections.unmodifiableList(someProps); - } // Default Values. public static final String DEFAULT_COMPRESSION_TYPE = "NONE"; @@ -110,7 +103,9 @@ public Set getRelationships() { @Override public List getSupportedPropertyDescriptors() { - return props; + List someProps = new ArrayList<>(properties); + someProps.add(COMPRESSION_TYPE); + return someProps; } @Override diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index 6b0491050b98..fdb2dcffdbd2 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -16,15 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +36,15 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"}) @@ -80,13 +80,9 @@ public class FetchHDFS extends AbstractHadoopProcessor { @Override protected List getSupportedPropertyDescriptors() { - final List properties = new ArrayList<>(); - properties.add(HADOOP_CONFIGURATION_RESOURCES); - properties.add(FILENAME); - properties.add(KERBEROS_PRINCIPAL); - properties.add(KERBEROS_KEYTAB); - properties.add(KERBEROS_RELOGIN_PERIOD); - return properties; + final List props = new ArrayList<>(properties); + props.add(FILENAME); + return props; } @Override diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FlowFileStreamUnpackerSequenceFileWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FlowFileStreamUnpackerSequenceFileWriter.java index d3fb97fab1bb..0194e8775865 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FlowFileStreamUnpackerSequenceFileWriter.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FlowFileStreamUnpackerSequenceFileWriter.java @@ -16,6 +16,15 @@ */ package org.apache.nifi.processors.hadoop; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.Text; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processors.hadoop.util.InputStreamWritable; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.FlowFilePackagerV3; +import org.slf4j.LoggerFactory; + import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -23,16 +32,6 @@ import java.util.HashMap; import java.util.Map; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.processors.hadoop.util.InputStreamWritable; -import org.apache.nifi.util.FlowFilePackagerV3; - -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.Text; -import org.slf4j.LoggerFactory; - public class FlowFileStreamUnpackerSequenceFileWriter extends SequenceFileWriterImpl { static { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index 4c9deeafa6c2..d18c13fd54ad 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -16,22 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.regex.Pattern; - import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -63,6 +47,22 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; + @TriggerWhenEmpty @InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"}) @@ -178,28 +178,12 @@ public class GetHDFS extends AbstractHadoopProcessor { .build(); private static final Set relationships; - protected static final List localProperties; static { final Set rels = new HashSet<>(); rels.add(REL_SUCCESS); rels.add(REL_PASSTHROUGH); relationships = Collections.unmodifiableSet(rels); - - List props = new ArrayList<>(properties); - props.add(DIRECTORY); - props.add(RECURSE_SUBDIRS); - props.add(KEEP_SOURCE_FILE); - props.add(FILE_FILTER_REGEX); - props.add(FILTER_MATCH_NAME_ONLY); - props.add(IGNORE_DOTTED_FILES); - props.add(MIN_AGE); - props.add(MAX_AGE); - props.add(POLLING_INTERVAL); - props.add(BATCH_SIZE); - props.add(BUFFER_SIZE); - props.add(COMPRESSION_CODEC); - localProperties = Collections.unmodifiableList(props); } protected ProcessorConfiguration processorConfig; @@ -219,7 +203,20 @@ public Set getRelationships() { @Override protected List getSupportedPropertyDescriptors() { - return localProperties; + List props = new ArrayList<>(properties); + props.add(DIRECTORY); + props.add(RECURSE_SUBDIRS); + props.add(KEEP_SOURCE_FILE); + props.add(FILE_FILTER_REGEX); + props.add(FILTER_MATCH_NAME_ONLY); + props.add(IGNORE_DOTTED_FILES); + props.add(MIN_AGE); + props.add(MAX_AGE); + props.add(POLLING_INTERVAL); + props.add(BATCH_SIZE); + props.add(BUFFER_SIZE); + props.add(COMPRESSION_CODEC); + return props; } @Override diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java index f032ee41c1fe..7de728a16660 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java @@ -16,12 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -38,6 +32,12 @@ import org.apache.nifi.processors.hadoop.util.SequenceFileReader; import org.apache.nifi.util.StopWatch; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + /** * This processor is used to pull files from HDFS. The files being pulled in MUST be SequenceFile formatted files. The processor creates a flow file for each key/value entry in the ingested * SequenceFile. The created flow file's content depends on the value of the optional configuration property FlowFile Content. Currently, there are two choices: VALUE ONLY and KEY VALUE PAIR. With the @@ -64,17 +64,11 @@ public class GetHDFSSequenceFile extends GetHDFS { .required(true) .build(); - static final List props; - - static { - List someProps = new ArrayList<>(localProperties); - someProps.add(FLOWFILE_CONTENT); - props = Collections.unmodifiableList(someProps); - } - @Override protected List getSupportedPropertyDescriptors() { - return props; + List someProps = new ArrayList<>(super.getSupportedPropertyDescriptors()); + someProps.add(FLOWFILE_CONTENT); + return Collections.unmodifiableList(someProps); } @Override diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java index 896e4d881ef5..2aa77e1b6c9e 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/KeyValueReader.java @@ -16,14 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.util.HashSet; -import java.util.Set; -import java.util.regex.Pattern; - import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -42,6 +34,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Pattern; + /** * This class reads a SequenceFile and generates FlowFiles, one per KeyValue pair in the SequenceFile. The FlowFile name is based on the the incoming file name with System nanotime appended; the * FlowFile content is the key/value pair serialized via Text. diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 2100f488d4a4..f5daef277224 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -16,18 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -58,13 +46,25 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.hadoop.util.HDFSListing; -import org.apache.nifi.processors.hadoop.util.StringSerDe; import org.apache.nifi.processors.hadoop.util.HDFSListing.StateKeys; +import org.apache.nifi.processors.hadoop.util.StringSerDe; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + @TriggerSerially @TriggerWhenEmpty @@ -143,15 +143,11 @@ protected File getPersistenceFile() { @Override protected List getSupportedPropertyDescriptors() { - final List properties = new ArrayList<>(); - properties.add(HADOOP_CONFIGURATION_RESOURCES); - properties.add(DISTRIBUTED_CACHE_SERVICE); - properties.add(DIRECTORY); - properties.add(RECURSE_SUBDIRS); - properties.add(KERBEROS_PRINCIPAL); - properties.add(KERBEROS_KEYTAB); - properties.add(KERBEROS_RELOGIN_PERIOD); - return properties; + final List props = new ArrayList<>(properties); + props.add(DISTRIBUTED_CACHE_SERVICE); + props.add(DIRECTORY); + props.add(RECURSE_SUBDIRS); + return props; } @Override diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 0c369284573f..7c9747873ba2 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -16,17 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -58,6 +47,17 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + /** * This processor copies FlowFiles to HDFS. */ @@ -144,14 +144,21 @@ public class PutHDFS extends AbstractHadoopProcessor { .build(); private static final Set relationships; - private static final List localProperties; static { final Set rels = new HashSet<>(); rels.add(REL_SUCCESS); rels.add(REL_FAILURE); relationships = Collections.unmodifiableSet(rels); + } + + @Override + public Set getRelationships() { + return relationships; + } + @Override + protected List getSupportedPropertyDescriptors() { List props = new ArrayList<>(properties); props.add(DIRECTORY); props.add(CONFLICT_RESOLUTION); @@ -162,17 +169,7 @@ public class PutHDFS extends AbstractHadoopProcessor { props.add(REMOTE_OWNER); props.add(REMOTE_GROUP); props.add(COMPRESSION_CODEC); - localProperties = Collections.unmodifiableList(props); - } - - @Override - public Set getRelationships() { - return relationships; - } - - @Override - protected List getSupportedPropertyDescriptors() { - return localProperties; + return props; } @OnScheduled diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java index 4bb9ca9054bb..a0d02f734640 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java @@ -16,21 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.UnsupportedEncodingException; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.io.StreamCallback; -import org.apache.nifi.processors.hadoop.util.ByteFilteringOutputStream; -import org.apache.nifi.processors.hadoop.util.InputStreamWritable; -import org.apache.nifi.processors.hadoop.util.SequenceFileWriter; -import org.apache.nifi.util.StopWatch; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -40,9 +25,23 @@ import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processors.hadoop.util.ByteFilteringOutputStream; +import org.apache.nifi.processors.hadoop.util.InputStreamWritable; +import org.apache.nifi.processors.hadoop.util.SequenceFileWriter; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; + public class SequenceFileWriterImpl implements SequenceFileWriter { protected static Logger logger = LoggerFactory.getLogger(SequenceFileWriterImpl.class); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/TarUnpackerSequenceFileWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/TarUnpackerSequenceFileWriter.java index 82e1de2d73af..fbc1875f1387 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/TarUnpackerSequenceFileWriter.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/TarUnpackerSequenceFileWriter.java @@ -16,19 +16,18 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.processors.hadoop.util.InputStreamWritable; - import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processors.hadoop.util.InputStreamWritable; +import org.apache.nifi.stream.io.BufferedInputStream; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; + public class TarUnpackerSequenceFileWriter extends SequenceFileWriterImpl { static { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java index a6f70054d271..0ef103d42132 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ValueReader.java @@ -16,13 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.util.HashSet; -import java.util.Set; -import java.util.regex.Pattern; - import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -41,6 +34,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Pattern; + /** * This class reads a SequenceFile and generates FlowFiles, one per each KeyValue Pair in the SequenceFile. The FlowFile name is the key, which is typically a file name but may not be; the FlowFile * content is the value. diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ZipUnpackerSequenceFileWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ZipUnpackerSequenceFileWriter.java index c986e9aed97a..8ce6a7e328f8 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ZipUnpackerSequenceFileWriter.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ZipUnpackerSequenceFileWriter.java @@ -16,20 +16,19 @@ */ package org.apache.nifi.processors.hadoop; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.Text; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processors.hadoop.util.InputStreamWritable; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.processors.hadoop.util.InputStreamWritable; - -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.Text; -import org.slf4j.LoggerFactory; - public class ZipUnpackerSequenceFileWriter extends SequenceFileWriterImpl { static { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java index 6786945d2b5a..2d1c01a59d8c 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java @@ -16,6 +16,10 @@ */ package org.apache.nifi.processors.hadoop.util; +import org.apache.hadoop.fs.Path; + +import javax.xml.bind.annotation.XmlTransient; +import javax.xml.bind.annotation.XmlType; import java.util.ArrayList; import java.util.Collection; import java.util.Date; @@ -24,11 +28,6 @@ import java.util.Map; import java.util.Set; -import javax.xml.bind.annotation.XmlTransient; -import javax.xml.bind.annotation.XmlType; - -import org.apache.hadoop.fs.Path; - /** * A simple POJO for maintaining state about the last HDFS Listing that was performed so that * we can avoid pulling the same file multiple times diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/InputStreamWritable.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/InputStreamWritable.java index 4cb2e8ddee89..a463dfd45572 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/InputStreamWritable.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/InputStreamWritable.java @@ -16,13 +16,13 @@ */ package org.apache.nifi.processors.hadoop.util; +import org.apache.hadoop.io.Writable; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.InputStream; -import org.apache.hadoop.io.Writable; - /** * Simple implementation of {@link Writable} that writes data from an InputStream. This class will throw an * UnsupportedOperationException if {@link #readFields(DataInput)} is called. diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java index 17cacd94d210..25e66f80687d 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java @@ -16,17 +16,17 @@ */ package org.apache.nifi.processors.hadoop.util; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; -import org.apache.nifi.distributed.cache.client.Deserializer; -import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.distributed.cache.client.exception.DeserializationException; -import org.apache.nifi.distributed.cache.client.exception.SerializationException; - public class LongSerDe implements Serializer, Deserializer { @Override diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/OutputStreamWritable.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/OutputStreamWritable.java index e9547215d736..234ca2a49089 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/OutputStreamWritable.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/OutputStreamWritable.java @@ -16,17 +16,16 @@ */ package org.apache.nifi.processors.hadoop.util; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Writable; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.stream.io.DataOutputStream; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.OutputStream; -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.stream.io.DataOutputStream; - -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.Writable; - /** * This class will write to an output stream, rather than an in-memory buffer, the fields being read. * diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileReader.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileReader.java index e197426cf67a..fabf18d02144 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileReader.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileReader.java @@ -16,12 +16,12 @@ */ package org.apache.nifi.processors.hadoop.util; -import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import java.io.IOException; + public interface SequenceFileReader { public T readSequenceFile(Path file, Configuration configuration, FileSystem fileSystem) throws IOException; diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java index 851afd842ba2..6ad6461834a1 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java @@ -16,11 +16,10 @@ */ package org.apache.nifi.processors.hadoop.util; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessSession; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; public interface SequenceFileWriter { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java index 2a52c4da0674..e13f6b8625ed 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java @@ -16,15 +16,15 @@ */ package org.apache.nifi.processors.hadoop.util; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; - import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.exception.DeserializationException; import org.apache.nifi.distributed.cache.client.exception.SerializationException; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; + public class StringSerDe implements Serializer, Deserializer { @Override diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java index c47c7a07d7f5..76fc15dd67e1 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java @@ -17,12 +17,15 @@ package org.apache.nifi.processors.hadoop; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -40,17 +43,42 @@ public class AbstractHadoopTest { private static Logger logger; + private File temporaryFile; + private KerberosProperties kerberosProperties; + private NiFiProperties mockedProperties; + @BeforeClass - public static void setUpClass() { + public static void setUpClass() throws IOException { System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hadoop", "debug"); logger = LoggerFactory.getLogger(AbstractHadoopTest.class); } + @Before + public void setup() throws IOException { + // needed for calls to UserGroupInformation.setConfiguration() to work when passing in + // config with Kerberos authentication enabled + System.setProperty("java.security.krb5.realm", "nifi.com"); + System.setProperty("java.security.krb5.kdc", "nifi.kdc"); + + temporaryFile = File.createTempFile("hadoop-test", ".properties"); + + // mock properties and return a temporary file for the kerberos configuration + mockedProperties = mock(NiFiProperties.class); + when(mockedProperties.getKerberosConfigurationFile()).thenReturn(temporaryFile); + kerberosProperties = KerberosProperties.create(mockedProperties); + } + + @After + public void cleanUp() { + temporaryFile.delete(); + } + @Test public void testErrorConditions() { - TestRunner runner = TestRunners.newTestRunner(SimpleHadoopProcessor.class); + SimpleHadoopProcessor processor = new SimpleHadoopProcessor(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(processor); Collection results; ProcessContext pc; @@ -81,8 +109,8 @@ public void testErrorConditions() { @Test public void testTimeoutDetection() throws Exception { - TestRunner runner = TestRunners.newTestRunner(SimpleHadoopProcessor.class); - SimpleHadoopProcessor processor = (SimpleHadoopProcessor) runner.getProcessor(); + SimpleHadoopProcessor processor = new SimpleHadoopProcessor(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(processor); try { processor.resetHDFSResources("src/test/resources/core-site-broken.xml", "/target", runner.getProcessContext()); Assert.fail("Should have thrown SocketTimeoutException"); @@ -92,29 +120,36 @@ public void testTimeoutDetection() throws Exception { @Test public void testKerberosOptions() throws Exception { - File temporaryFile = File.createTempFile("hadoop-test", ".properties"); - try { - // mock properties and return a temporary file for the kerberos configuration - NiFiProperties mockedProperties = mock(NiFiProperties.class); - when(mockedProperties.getKerberosConfigurationFile()).thenReturn(temporaryFile); - SimpleHadoopProcessor.NIFI_PROPERTIES = mockedProperties; - TestRunner runner = TestRunners.newTestRunner(SimpleHadoopProcessor.class); - // should be valid since no kerberos options specified - runner.assertValid(); - // no longer valid since only the principal is provided - runner.setProperty(SimpleHadoopProcessor.KERBEROS_PRINCIPAL, "principal"); - runner.assertNotValid(); - // invalid since the keytab does not exist - runner.setProperty(SimpleHadoopProcessor.KERBEROS_KEYTAB, "BAD_KEYTAB_PATH"); - runner.assertNotValid(); - // valid since keytab is now a valid file location - runner.setProperty(SimpleHadoopProcessor.KERBEROS_KEYTAB, temporaryFile.getAbsolutePath()); - runner.assertValid(); - // invalid since the kerberos configuration was changed to a non-existent file - when(mockedProperties.getKerberosConfigurationFile()).thenReturn(new File("BAD_KERBEROS_PATH")); - runner.assertNotValid(); - } finally { - temporaryFile.delete(); - } + SimpleHadoopProcessor processor = new SimpleHadoopProcessor(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(processor); + // should be valid since no kerberos options specified + runner.assertValid(); + // no longer valid since only the principal is provided + runner.setProperty(AbstractHadoopProcessor.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site-security.xml"); + runner.setProperty(kerberosProperties.getKerberosPrincipal(), "principal"); + runner.assertNotValid(); + // invalid since the keytab does not exist + runner.setProperty(kerberosProperties.getKerberosKeytab(), "BAD_KEYTAB_PATH"); + runner.assertNotValid(); + // valid since keytab is now a valid file location + runner.setProperty(kerberosProperties.getKerberosKeytab(), temporaryFile.getAbsolutePath()); + runner.assertValid(); + } + + @Test + public void testKerberosOptionsWithBadKerberosConfigFile() throws Exception { + // invalid since the kerberos configuration was changed to a non-existent file + when(mockedProperties.getKerberosConfigurationFile()).thenReturn(new File("BAD_KERBEROS_PATH")); + kerberosProperties = KerberosProperties.create(mockedProperties); + + SimpleHadoopProcessor processor = new SimpleHadoopProcessor(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(processor); + runner.assertValid(); + + runner.setProperty(AbstractHadoopProcessor.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site-security.xml"); + runner.setProperty(kerberosProperties.getKerberosPrincipal(), "principal"); + runner.setProperty(kerberosProperties.getKerberosKeytab(), temporaryFile.getAbsolutePath()); + runner.assertNotValid(); + } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java index e8714dd23eaf..64fe16fce5b2 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java @@ -16,28 +16,43 @@ */ package org.apache.nifi.processors.hadoop; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; - +import org.apache.hadoop.fs.Path; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.apache.hadoop.fs.Path; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class GetHDFSTest { + private NiFiProperties mockNiFiProperties; + private KerberosProperties kerberosProperties; + + @Before + public void setup() { + mockNiFiProperties = mock(NiFiProperties.class); + when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); + kerberosProperties = KerberosProperties.create(mockNiFiProperties); + } + @Test public void getPathDifferenceTest() { Assert.assertEquals("", GetHDFS.getPathDifference(new Path("/root"), new Path("/file"))); @@ -69,7 +84,8 @@ public void getPathDifferenceTest() { @Test public void testValidators() { - TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); + GetHDFS proc = new TestableGetHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); Collection results; ProcessContext pc; @@ -110,7 +126,8 @@ public void testValidators() { @Test public void testGetFilesWithFilter() { - TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); + GetHDFS proc = new TestableGetHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata"); runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*"); runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); @@ -124,7 +141,8 @@ public void testGetFilesWithFilter() { @Test public void testAutomaticDecompression() throws IOException { - TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); + GetHDFS proc = new TestableGetHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata"); runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz"); runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); @@ -142,7 +160,8 @@ public void testAutomaticDecompression() throws IOException { @Test public void testInferCompressionCodecDisabled() throws IOException { - TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); + GetHDFS proc = new TestableGetHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata"); runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz"); runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); @@ -160,7 +179,8 @@ public void testInferCompressionCodecDisabled() throws IOException { @Test public void testFileExtensionNotACompressionCodec() throws IOException { - TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); + GetHDFS proc = new TestableGetHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata"); runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip"); runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); @@ -175,4 +195,19 @@ public void testFileExtensionNotACompressionCodec() throws IOException { InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip"); flowFile.assertContentEquals(expected); } + + private static class TestableGetHDFS extends GetHDFS { + + private final KerberosProperties testKerberosProperties; + + public TestableGetHDFS(KerberosProperties testKerberosProperties) { + this.testKerberosProperties = testKerberosProperties; + } + + @Override + protected KerberosProperties getKerberosProperties() { + return testKerberosProperties; + } + } + } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index c524a44f0920..76970eddbd4e 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -16,41 +16,49 @@ */ package org.apache.nifi.processors.hadoop; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class PutHDFSTest { + private NiFiProperties mockNiFiProperties; + private KerberosProperties kerberosProperties; + @BeforeClass - public static void setUp() throws Exception{ + public static void setUpClass() throws Exception{ /* * Running Hadoop on Windows requires a special build which will produce required binaries and native modules [1]. Since functionality * provided by this module and validated by these test does not have any native implication we do not distribute required binaries and native modules @@ -61,9 +69,17 @@ public static void setUp() throws Exception{ */ } + @Before + public void setup() { + mockNiFiProperties = mock(NiFiProperties.class); + when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); + kerberosProperties = KerberosProperties.create(mockNiFiProperties); + } + @Test public void testValidators() { - TestRunner runner = TestRunners.newTestRunner(PutHDFS.class); + PutHDFS proc = new TestablePutHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); Collection results; ProcessContext pc; @@ -100,7 +116,8 @@ public void testValidators() { assertTrue(vr.toString().contains("is invalid because short integer must be greater than zero")); } - runner = TestRunners.newTestRunner(PutHDFS.class); + proc = new TestablePutHDFS(kerberosProperties); + runner = TestRunners.newTestRunner(proc); results = new HashSet<>(); runner.setProperty(PutHDFS.DIRECTORY, "/target"); runner.setProperty(PutHDFS.REPLICATION_FACTOR, "0"); @@ -114,7 +131,8 @@ public void testValidators() { assertTrue(vr.toString().contains("is invalid because short integer must be greater than zero")); } - runner = TestRunners.newTestRunner(PutHDFS.class); + proc = new TestablePutHDFS(kerberosProperties); + runner = TestRunners.newTestRunner(proc); results = new HashSet<>(); runner.setProperty(PutHDFS.DIRECTORY, "/target"); runner.setProperty(PutHDFS.UMASK, "-1"); @@ -128,7 +146,8 @@ public void testValidators() { assertTrue(vr.toString().contains("is invalid because octal umask [-1] cannot be negative")); } - runner = TestRunners.newTestRunner(PutHDFS.class); + proc = new TestablePutHDFS(kerberosProperties); + runner = TestRunners.newTestRunner(proc); results = new HashSet<>(); runner.setProperty(PutHDFS.DIRECTORY, "/target"); runner.setProperty(PutHDFS.UMASK, "18"); @@ -156,7 +175,8 @@ public void testValidators() { } results = new HashSet<>(); - runner = TestRunners.newTestRunner(PutHDFS.class); + proc = new TestablePutHDFS(kerberosProperties); + runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "/target"); runner.setProperty(PutHDFS.COMPRESSION_CODEC, CompressionCodec.class.getName()); runner.enqueue(new byte[0]); @@ -175,7 +195,8 @@ public void testPutFile() throws IOException { // Refer to comment in the BeforeClass method for an explanation assumeTrue(isNotWindows()); - TestRunner runner = TestRunners.newTestRunner(PutHDFS.class); + PutHDFS proc = new TestablePutHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); runner.setValidateExpressionUsage(false); @@ -208,11 +229,17 @@ public void testPutFileWithException() throws IOException { FileSystem fs = FileSystem.get(config); Path p = new Path(dirName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); + final KerberosProperties testKerberosProperties = kerberosProperties; TestRunner runner = TestRunners.newTestRunner(new PutHDFS() { @Override protected void changeOwner(ProcessContext context, FileSystem hdfs, Path name) { throw new ProcessException("Forcing Exception to get thrown in order to verify proper handling"); } + + @Override + protected KerberosProperties getKerberosProperties() { + return testKerberosProperties; + } }); runner.setProperty(PutHDFS.DIRECTORY, dirName); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); @@ -236,4 +263,19 @@ protected void changeOwner(ProcessContext context, FileSystem hdfs, Path name) { private boolean isNotWindows() { return !System.getProperty("os.name").startsWith("Windows"); } + + private static class TestablePutHDFS extends PutHDFS { + + private KerberosProperties testKerberosProperties; + + public TestablePutHDFS(KerberosProperties testKerberosProperties) { + this.testKerberosProperties = testKerberosProperties; + } + + @Override + protected KerberosProperties getKerberosProperties() { + return testKerberosProperties; + } + } + } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java index 905460ff1af1..4d382ebc6059 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/SimpleHadoopProcessor.java @@ -16,15 +16,26 @@ */ package org.apache.nifi.processors.hadoop; -import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor; +import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; public class SimpleHadoopProcessor extends AbstractHadoopProcessor { + private KerberosProperties testKerberosProperties; + + public SimpleHadoopProcessor(KerberosProperties kerberosProperties) { + this.testKerberosProperties = kerberosProperties; + } + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { } + @Override + protected KerberosProperties getKerberosProperties() { + return testKerberosProperties; + } + } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java index 5357dff705d6..e568dfb801c9 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java @@ -16,25 +16,14 @@ */ package org.apache.nifi.processors.hadoop; -import org.apache.nifi.processors.hadoop.CreateHadoopSequenceFile; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; - -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Text; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -42,6 +31,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestCreateHadoopSequenceFile { private TestRunner controller; @@ -52,6 +54,9 @@ public class TestCreateHadoopSequenceFile { new File(testdata, "randombytes-2"), new File(testdata, "randombytes-3") }; + private NiFiProperties mockNiFiProperties; + private KerberosProperties kerberosProperties; + @BeforeClass public static void setUpClass() { LOGGER = LoggerFactory.getLogger(TestCreateHadoopSequenceFile.class); @@ -61,7 +66,12 @@ public static void setUpClass() { @Before public void setUp() { - controller = TestRunners.newTestRunner(CreateHadoopSequenceFile.class); + mockNiFiProperties = mock(NiFiProperties.class); + when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); + kerberosProperties = KerberosProperties.create(mockNiFiProperties); + + CreateHadoopSequenceFile proc = new TestableCreateHadoopSequenceFile(kerberosProperties); + controller = TestRunners.newTestRunner(proc); } @After @@ -183,4 +193,17 @@ public void testMergedFlowfilePackagedData() throws IOException { // fos.close(); } + private static class TestableCreateHadoopSequenceFile extends CreateHadoopSequenceFile { + + private KerberosProperties testKerbersProperties; + + public TestableCreateHadoopSequenceFile(KerberosProperties testKerbersProperties) { + this.testKerbersProperties = testKerbersProperties; + } + + @Override + protected KerberosProperties getKerberosProperties() { + return testKerbersProperties; + } + } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java index 7a77f068aa8e..6fcea9514952 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -16,21 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -44,23 +29,48 @@ import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestListHDFS { private TestRunner runner; private ListHDFSWithMockedFileSystem proc; private MockCacheClient service; + private NiFiProperties mockNiFiProperties; + private KerberosProperties kerberosProperties; @Before public void setup() throws InitializationException { - proc = new ListHDFSWithMockedFileSystem(); + mockNiFiProperties = mock(NiFiProperties.class); + when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); + kerberosProperties = KerberosProperties.create(mockNiFiProperties); + + proc = new ListHDFSWithMockedFileSystem(kerberosProperties); runner = TestRunners.newTestRunner(proc); service = new MockCacheClient(); @@ -250,6 +260,16 @@ private FsPermission create777() { private class ListHDFSWithMockedFileSystem extends ListHDFS { private final MockFileSystem fileSystem = new MockFileSystem(); + private final KerberosProperties testKerberosProps; + + public ListHDFSWithMockedFileSystem(KerberosProperties kerberosProperties) { + this.testKerberosProps = kerberosProperties; + } + + @Override + protected KerberosProperties getKerberosProperties() { + return testKerberosProps; + } @Override protected FileSystem getFileSystem() { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-broken.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-broken.xml index e06a19335319..fd849e3faced 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-broken.xml +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-broken.xml @@ -1,4 +1,5 @@ + - diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-security.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-security.xml new file mode 100644 index 000000000000..2aca105f9b2b --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-security.xml @@ -0,0 +1,30 @@ + + + + + + fs.default.name + hdfs://hbase + + + hadoop.security.authentication + kerberos + + + hadoop.security.authorization + true + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site.xml index 5e3b55cb0b2c..7f01a9f374a9 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site.xml +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site.xml @@ -1,4 +1,5 @@ + - diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java index fa4d80a17f0b..3cd81a3f5b01 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java @@ -453,10 +453,12 @@ private void clearState(final DistributedMapCacheClient client) { localState.delete(); } - try { - client.remove(getKey(), new StringSerDe()); - } catch (IOException e) { - getLogger().warn("Processor state was not cleared from distributed cache due to {}", new Object[]{e}); + if (client != null) { + try { + client.remove(getKey(), new StringSerDe()); + } catch (IOException e) { + getLogger().warn("Processor state was not cleared from distributed cache due to {}", new Object[]{e}); + } } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index d83e9d6b4b2d..2f5b6a5e5749 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -42,18 +42,6 @@ public interface HBaseClientService extends ControllerService { .addValidator(new ConfigFilesValidator()) .build(); - PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder() - .name("kerberos-principal").displayName("Kerberos Principal") - .description("Principal of user writing to hbase").required(false) - .addValidator(StandardValidators.KERB_PRINC_VALIDATOR) - .build(); - - PropertyDescriptor KERBEROS_KEYTAB = new PropertyDescriptor.Builder() - .name("kerberos-keytab").displayName("Kerberos Keytab") - .description("Path to keytab file").required(false) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .build(); - PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder() .name("ZooKeeper Quorum") .description("Comma-separated list of ZooKeeper hosts for HBase. Required if Hadoop Configuration Files are not provided.") diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml index 5c2314f275b6..50b10059ceca 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml @@ -41,7 +41,10 @@ org.apache.nifi nifi-processor-utils - + + org.apache.nifi + nifi-hadoop-utils + org.apache.hbase hbase-client diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index e31a60ff596a..346513543a36 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -44,6 +44,9 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.hadoop.KerberosTicketRenewer; +import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; @@ -51,9 +54,11 @@ import org.apache.nifi.hbase.scan.ResultHandler; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NiFiProperties; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -61,9 +66,6 @@ import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - @Tags({ "hbase", "client"}) @CapabilityDescription("Implementation of HBaseClientService for HBase 1.1.2. This service can be configured by providing " + "a comma-separated list of configuration files, or by specifying values for the other properties. If configuration files " + @@ -73,23 +75,29 @@ @DynamicProperty(name="The name of an HBase configuration property.", value="The value of the given HBase configuration property.", description="These properties will be set on the HBase configuration after loading any provided configuration files.") public class HBase_1_1_2_ClientService extends AbstractControllerService implements HBaseClientService { - private static final Logger LOG = LoggerFactory.getLogger(HBase_1_1_2_ClientService.class); + static final String HBASE_CONF_ZK_QUORUM = "hbase.zookeeper.quorum"; static final String HBASE_CONF_ZK_PORT = "hbase.zookeeper.property.clientPort"; static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent"; static final String HBASE_CONF_CLIENT_RETRIES = "hbase.client.retries.number"; + static final long TICKET_RENEWAL_PERIOD = 60000; + private volatile Connection connection; - private List properties; + private volatile UserGroupInformation ugi; + private volatile KerberosTicketRenewer renewer; - protected boolean isSecurityEnabled = UserGroupInformation.isSecurityEnabled(); + private List properties; + private KerberosProperties kerberosProperties; @Override protected void init(ControllerServiceInitializationContext config) throws InitializationException { + this.kerberosProperties = getKerberosProperties(); + List props = new ArrayList<>(); props.add(HADOOP_CONF_FILES); - props.add(KERBEROS_PRINCIPAL); - props.add(KERBEROS_KEYTAB); + props.add(kerberosProperties.getKerberosPrincipal()); + props.add(kerberosProperties.getKerberosKeytab()); props.add(ZOOKEEPER_QUORUM); props.add(ZOOKEEPER_CLIENT_PORT); props.add(ZOOKEEPER_ZNODE_PARENT); @@ -97,6 +105,10 @@ protected void init(ControllerServiceInitializationContext config) throws Initia this.properties = Collections.unmodifiableList(props); } + protected KerberosProperties getKerberosProperties() { + return KerberosProperties.create(NiFiProperties.getInstance()); + } + @Override protected List getSupportedPropertyDescriptors() { return properties; @@ -119,8 +131,6 @@ protected Collection customValidate(ValidationContext validati boolean zkPortProvided = validationContext.getProperty(ZOOKEEPER_CLIENT_PORT).isSet(); boolean znodeParentProvided = validationContext.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet(); boolean retriesProvided = validationContext.getProperty(HBASE_CLIENT_RETRIES).isSet(); - boolean kerbprincProvided = validationContext.getProperty(KERBEROS_PRINCIPAL).isSet(); - boolean kerbkeytabProvided = validationContext.getProperty(KERBEROS_KEYTAB).isSet(); final List problems = new ArrayList<>(); @@ -133,19 +143,21 @@ protected Collection customValidate(ValidationContext validati .build()); } - if (isSecurityEnabled && (!kerbprincProvided || !kerbkeytabProvided)) { - problems.add(new ValidationResult.Builder().valid(false) - .subject(this.getClass().getSimpleName()).explanation("Kerberos" + - " principal and keytab must be provided when using a secure " + - "hbase") - .build()); + if (confFileProvided) { + final String configFiles = validationContext.getProperty(HADOOP_CONF_FILES).getValue(); + final Configuration hbaseConfig = getConfigurationFromFiles(configFiles); + final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + + problems.addAll(KerberosProperties.validatePrincipalAndKeytab( + this.getClass().getSimpleName(), hbaseConfig, principal, keytab, getLogger())); } return problems; } @OnEnabled - public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException { + public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException { this.connection = createConnection(context); // connection check @@ -154,18 +166,18 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE if (admin != null) { admin.listTableNames(); } - } - } - - protected Connection createConnection(final ConfigurationContext context) throws IOException { - final Configuration hbaseConfig = HBaseConfiguration.create(); - // if conf files are provided, start with those - if (context.getProperty(HADOOP_CONF_FILES).isSet()) { - for (final String configFile : context.getProperty(HADOOP_CONF_FILES).getValue().split(",")) { - hbaseConfig.addResource(new Path(configFile.trim())); + // if we got here then we have a successful connection, so if we have a ugi then start a renewer + if (ugi != null) { + final String id = getClass().getSimpleName(); + renewer = SecurityUtil.startTicketRenewalThread(id, ugi, TICKET_RENEWAL_PERIOD, getLogger()); } } + } + + protected Connection createConnection(final ConfigurationContext context) throws IOException, InterruptedException { + final String configFiles = context.getProperty(HADOOP_CONF_FILES).getValue(); + final Configuration hbaseConfig = getConfigurationFromFiles(configFiles); // override with any properties that are provided if (context.getProperty(ZOOKEEPER_QUORUM).isSet()) { @@ -188,23 +200,45 @@ protected Connection createConnection(final ConfigurationContext context) throws hbaseConfig.set(descriptor.getName(), entry.getValue()); } } - UserGroupInformation.setConfiguration(hbaseConfig); - isSecurityEnabled = UserGroupInformation.isSecurityEnabled(); - if (UserGroupInformation.isSecurityEnabled()) { - try{ - UserGroupInformation.loginUserFromKeytab(context.getProperty(KERBEROS_PRINCIPAL).getValue(), - context.getProperty(KERBEROS_KEYTAB).getValue()); - LOG.info("HBase Security Enabled, Logging in as User {}"); - } catch (Exception e) { - } + + if (SecurityUtil.isSecurityEnabled(hbaseConfig)) { + final String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + final String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + + getLogger().info("HBase Security Enabled, logging in as principal {} with keytab {}", new Object[] {principal, keyTab}); + ugi = SecurityUtil.loginKerberos(hbaseConfig, principal, keyTab); + getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[] {principal, keyTab}); + + return ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Connection run() throws Exception { + return ConnectionFactory.createConnection(hbaseConfig); + } + }); + } else { - LOG.info("Simple Authentication"); - } - return ConnectionFactory.createConnection(hbaseConfig); + getLogger().info("Simple Authentication"); + return ConnectionFactory.createConnection(hbaseConfig); + } + + } + + protected Configuration getConfigurationFromFiles(final String configFiles) { + final Configuration hbaseConfig = HBaseConfiguration.create(); + if (StringUtils.isNotBlank(configFiles)) { + for (final String configFile : configFiles.split(",")) { + hbaseConfig.addResource(new Path(configFile.trim())); + } + } + return hbaseConfig; } @OnDisabled public void shutdown() { + if (renewer != null) { + renewer.stop(); + } + if (connection != null) { try { connection.close(); @@ -216,7 +250,6 @@ public void shutdown() { @Override public void put(final String tableName, final Collection puts) throws IOException { - UserGroupInformation.getBestUGI(null,null).checkTGTAndReloginFromKeytab(); try (final Table table = connection.getTable(TableName.valueOf(tableName))) { // Create one Put per row.... final Map rowPuts = new HashMap<>(); @@ -241,7 +274,6 @@ public void put(final String tableName, final Collection puts) thro @Override public void put(final String tableName, final String rowId, final Collection columns) throws IOException { - UserGroupInformation.getBestUGI(null,null).checkTGTAndReloginFromKeytab(); try (final Table table = connection.getTable(TableName.valueOf(tableName))) { Put put = new Put(rowId.getBytes(StandardCharsets.UTF_8)); for (final PutColumn column : columns) { @@ -263,7 +295,7 @@ public void scan(final String tableName, final Collection columns, final ParseFilter parseFilter = new ParseFilter(); filter = parseFilter.parseFilterString(filterExpression); } - UserGroupInformation.getBestUGI(null,null).checkTGTAndReloginFromKeytab(); + try (final Table table = connection.getTable(TableName.valueOf(tableName)); final ResultScanner scanner = getResults(table, columns, filter, minTime)) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java index 58d91946b856..0854d28175bf 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java @@ -25,18 +25,22 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -57,8 +61,27 @@ public class TestHBase_1_1_2_ClientService { + private KerberosProperties kerberosPropsWithFile; + private KerberosProperties kerberosPropsWithoutFile; + + @Before + public void setup() { + // needed for calls to UserGroupInformation.setConfiguration() to work when passing in + // config with Kerberos authentication enabled + System.setProperty("java.security.krb5.realm", "nifi.com"); + System.setProperty("java.security.krb5.kdc", "nifi.kdc"); + + NiFiProperties niFiPropertiesWithKerberos = Mockito.mock(NiFiProperties.class); + when(niFiPropertiesWithKerberos.getKerberosConfigurationFile()).thenReturn(new File("src/test/resources/krb5.conf")); + kerberosPropsWithFile = KerberosProperties.create(niFiPropertiesWithKerberos); + + NiFiProperties niFiPropertiesWithoutKerberos = Mockito.mock(NiFiProperties.class); + when(niFiPropertiesWithKerberos.getKerberosConfigurationFile()).thenReturn(null); + kerberosPropsWithoutFile = KerberosProperties.create(niFiPropertiesWithoutKerberos); + } + @Test - public void testCustomValidate() throws InitializationException { + public void testCustomValidate() throws InitializationException, IOException { final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); final String tableName = "nifi"; @@ -66,7 +89,7 @@ public void testCustomValidate() throws InitializationException { when(table.getName()).thenReturn(TableName.valueOf(tableName)); // no conf file or zk properties so should be invalid - MockHBaseClientService service = new MockHBaseClientService(table); + MockHBaseClientService service = new MockHBaseClientService(table, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.enableControllerService(service); @@ -74,16 +97,16 @@ public void testCustomValidate() throws InitializationException { runner.removeControllerService(service); // conf file with no zk properties should be valid - service = new MockHBaseClientService(table); + service = new MockHBaseClientService(table, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); - runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site.xml"); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml"); runner.enableControllerService(service); runner.assertValid(service); runner.removeControllerService(service); // only quorum and no conf file should be invalid - service = new MockHBaseClientService(table); + service = new MockHBaseClientService(table, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); runner.enableControllerService(service); @@ -92,7 +115,7 @@ public void testCustomValidate() throws InitializationException { runner.removeControllerService(service); // quorum and port, no znode, no conf file, should be invalid - service = new MockHBaseClientService(table); + service = new MockHBaseClientService(table, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181"); @@ -102,7 +125,7 @@ public void testCustomValidate() throws InitializationException { runner.removeControllerService(service); // quorum, port, and znode, no conf file, should be valid - service = new MockHBaseClientService(table); + service = new MockHBaseClientService(table, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181"); @@ -113,44 +136,60 @@ public void testCustomValidate() throws InitializationException { runner.removeControllerService(service); // quorum and port with conf file should be valid - service = new MockHBaseClientService(table); + service = new MockHBaseClientService(table, kerberosPropsWithFile); runner.addControllerService("hbaseClientService", service); - runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site.xml"); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181"); runner.enableControllerService(service); runner.assertValid(service); + runner.removeControllerService(service); - // Kerberos - principal with non-set keytab + // Kerberos - principal with non-set keytab and only hbase-site-security - valid because we need core-site-security to turn on security + service = new MockHBaseClientService(table, kerberosPropsWithFile); + runner.addControllerService("hbaseClientService", service); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site-security.xml"); + runner.setProperty(service, kerberosPropsWithFile.getKerberosPrincipal(), "test@REALM"); + runner.enableControllerService(service); + runner.assertValid(service); + + // Kerberos - principal with non-set keytab and both config files runner.disableControllerService(service); - service.setIsSecurityEnabled(true); - runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site-security.xml"); - runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_PRINCIPAL, "test@REALM"); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, + "src/test/resources/hbase-site-security.xml, src/test/resources/core-site-security.xml"); runner.enableControllerService(service); runner.assertNotValid(service); // Kerberos - add valid options runner.disableControllerService(service); - runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_KEYTAB, "src/test/resources/fake.keytab"); - runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_PRINCIPAL, "test@REALM"); + runner.setProperty(service, kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/fake.keytab"); + runner.setProperty(service, kerberosPropsWithFile.getKerberosPrincipal(), "test@REALM"); runner.enableControllerService(service); runner.assertValid(service); // Kerberos - add invalid non-existent keytab file runner.disableControllerService(service); - runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_KEYTAB, "src/test/resources/missing.keytab"); + runner.setProperty(service, kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/missing.keytab"); runner.enableControllerService(service); runner.assertNotValid(service); // Kerberos - add invalid principal runner.disableControllerService(service); - runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_KEYTAB, "src/test/resources/fake.keytab"); - runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_PRINCIPAL, "invalid"); + runner.setProperty(service, kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/fake.keytab"); + runner.setProperty(service, kerberosPropsWithFile.getKerberosPrincipal(), ""); runner.enableControllerService(service); runner.assertNotValid(service); - runner.removeControllerService(service); + // Kerberos - valid props but the KerberosProperties has a null Kerberos config file so be invalid + service = new MockHBaseClientService(table, kerberosPropsWithoutFile); + runner.addControllerService("hbaseClientService", service); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, + "src/test/resources/hbase-site-security.xml, src/test/resources/core-site-security.xml"); + runner.setProperty(service, kerberosPropsWithoutFile.getKerberosKeytab(), "src/test/resources/fake.keytab"); + runner.setProperty(service, kerberosPropsWithoutFile.getKerberosPrincipal(), "test@REALM"); + runner.enableControllerService(service); + runner.assertNotValid(service); } @Test @@ -370,9 +409,9 @@ public void testScanWithInvalidFilter() throws InitializationException, IOExcept } private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException { - final MockHBaseClientService service = new MockHBaseClientService(table); + final MockHBaseClientService service = new MockHBaseClientService(table, kerberosPropsWithFile); runner.addControllerService("hbaseClient", service); - runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site.xml"); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml"); runner.enableControllerService(service); runner.setProperty(TestProcessor.HBASE_CLIENT_SERVICE, "hbaseClient"); return service; @@ -409,13 +448,20 @@ private class MockHBaseClientService extends HBase_1_1_2_ClientService { private Table table; private List results = new ArrayList<>(); + private KerberosProperties kerberosProperties; - public MockHBaseClientService(final Table table) { + public MockHBaseClientService(final Table table, final KerberosProperties kerberosProperties) { this.table = table; + this.kerberosProperties = kerberosProperties; + } + + @Override + protected KerberosProperties getKerberosProperties() { + return kerberosProperties; } - public void setIsSecurityEnabled(boolean value) { - this.isSecurityEnabled = value; + protected void setKerberosProperties(KerberosProperties properties) { + this.kerberosProperties = properties; } public void addResult(final String rowKey, final Map cells, final long timestamp) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site-security.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site-security.xml index 0875ea8d96f4..2aca105f9b2b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site-security.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site-security.xml @@ -15,16 +15,16 @@ limitations under the License. --> - - fs.default.name - hdfs://hbase - - - hbase.security.authentication - kerberos - - - hbase.security.authorization - true - + + fs.default.name + hdfs://hbase + + + hadoop.security.authentication + kerberos + + + hadoop.security.authorization + true + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site.xml index d022099f6c60..c044ee30dab1 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/core-site.xml @@ -15,8 +15,8 @@ limitations under the License. --> - - fs.default.name - hdfs://hbase - + + fs.default.name + hdfs://hbase + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site-security.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site-security.xml new file mode 100644 index 000000000000..0875ea8d96f4 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site-security.xml @@ -0,0 +1,30 @@ + + + + + + fs.default.name + hdfs://hbase + + + hbase.security.authentication + kerberos + + + hbase.security.authorization + true + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site.xml new file mode 100644 index 000000000000..d022099f6c60 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/hbase-site.xml @@ -0,0 +1,22 @@ + + + + + + fs.default.name + hdfs://hbase + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/krb5.conf b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/resources/krb5.conf new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/pom.xml index 91cea1ac0809..3045a5c9c6fa 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/pom.xml @@ -40,6 +40,7 @@ src/test/resources/fake.keytab + src/test/resources/krb5.conf diff --git a/pom.xml b/pom.xml index ec2db7409fe1..63018bb2f702 100644 --- a/pom.xml +++ b/pom.xml @@ -1126,6 +1126,11 @@ language governing permissions and limitations under the License. --> nifi-processor-utils 0.6.0-SNAPSHOT + + org.apache.nifi + nifi-hadoop-utils + 0.6.0-SNAPSHOT + org.apache.nifi nifi-mock