diff --git a/plugins/repository-hdfs/build.gradle b/plugins/repository-hdfs/build.gradle index 3824a65c1c9a7..fccfb0eb42c25 100644 --- a/plugins/repository-hdfs/build.gradle +++ b/plugins/repository-hdfs/build.gradle @@ -18,6 +18,7 @@ */ import org.apache.tools.ant.taskdefs.condition.Os +import org.elasticsearch.gradle.test.ClusterConfiguration import org.elasticsearch.gradle.test.RestIntegTestTask import java.nio.file.Files @@ -51,6 +52,7 @@ dependencies { compile 'com.google.protobuf:protobuf-java:2.5.0' compile 'commons-logging:commons-logging:1.1.3' compile 'commons-cli:commons-cli:1.2' + compile 'commons-codec:commons-codec:1.10' compile 'commons-collections:commons-collections:3.2.2' compile 'commons-configuration:commons-configuration:1.6' compile 'commons-io:commons-io:2.4' @@ -66,14 +68,6 @@ dependencyLicenses { mapping from: /hadoop-.*/, to: 'hadoop' } -task hdfsFixture(type: org.elasticsearch.gradle.test.AntFixture) { - dependsOn project.configurations.hdfsFixture - executable = new File(project.javaHome, 'bin/java') - env 'CLASSPATH', "${ -> project.configurations.hdfsFixture.asPath }" - args 'hdfs.MiniHDFS', - baseDir -} - // MIT Kerberos Vagrant Testing Fixture String box = "krb5kdc" Map vagrantEnvVars = [ @@ -116,28 +110,106 @@ for (String principal : principals) { krb5AddPrincipals.dependsOn(create) } -task secureHdfsFixture(type: org.elasticsearch.gradle.test.AntFixture) { - dependsOn project.configurations.hdfsFixture, krb5kdcFixture, krb5AddPrincipals - executable = new File(project.javaHome, 'bin/java') - env 'CLASSPATH', "${ -> project.configurations.hdfsFixture.asPath }" +// Create HDFS File System Testing Fixtures for HA/Secure combinations +for (String fixtureName : ['hdfsFixture', 'haHdfsFixture', 'secureHdfsFixture', 'secureHaHdfsFixture']) { + project.tasks.create(fixtureName, org.elasticsearch.gradle.test.AntFixture) { + dependsOn project.configurations.hdfsFixture + executable = new File(project.javaHome, 'bin/java') + env 'CLASSPATH', "${ -> project.configurations.hdfsFixture.asPath }" + + final List miniHDFSArgs = [] + + // If it's a secure fixture, then depend on Kerberos Fixture and principals + add the krb5conf to the JVM options + if (fixtureName.equals('secureHdfsFixture') || fixtureName.equals('secureHaHdfsFixture')) { + dependsOn krb5kdcFixture, krb5AddPrincipals + Path krb5Config = project(':test:fixtures:krb5kdc-fixture').buildDir.toPath().resolve("conf").resolve("krb5.conf") + miniHDFSArgs.add("-Djava.security.krb5.conf=${krb5Config}"); + if (project.rootProject.ext.javaVersion == JavaVersion.VERSION_1_9) { + miniHDFSArgs.add('--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED') + } + } + + // If it's an HA fixture, set a nameservice to use in the JVM options + if (fixtureName.equals('haHdfsFixture') || fixtureName.equals('secureHaHdfsFixture')) { + miniHDFSArgs.add("-Dha-nameservice=ha-hdfs") + } - Path keytabPath = project(':test:fixtures:krb5kdc-fixture').buildDir.toPath().resolve("keytabs").resolve("hdfs_hdfs.build.elastic.co.keytab") - Path krb5Config = project(':test:fixtures:krb5kdc-fixture').buildDir.toPath().resolve("conf").resolve("krb5.conf") + // Common options + miniHDFSArgs.add('hdfs.MiniHDFS') + miniHDFSArgs.add(baseDir) - final List miniHDFSArgs = ["-Djava.security.krb5.conf=${krb5Config}"] + // If it's a secure fixture, then set the principal name and keytab locations to use for auth. + if (fixtureName.equals('secureHdfsFixture') || fixtureName.equals('secureHaHdfsFixture')) { + Path keytabPath = project(':test:fixtures:krb5kdc-fixture').buildDir.toPath().resolve("keytabs").resolve("hdfs_hdfs.build.elastic.co.keytab") + miniHDFSArgs.add("hdfs/hdfs.build.elastic.co@${realm}") + miniHDFSArgs.add("${keytabPath}") + } - if (project.rootProject.ext.javaVersion == JavaVersion.VERSION_1_9) { - miniHDFSArgs.add('--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED') + args miniHDFSArgs.toArray() } +} + +// The following closure must execute before the afterEvaluate block in the constructor of the following integrationTest tasks: +project.afterEvaluate { + for (String integTestTaskName : ['integTestHa', 'integTestSecure', 'integTestSecureHa']) { + ClusterConfiguration cluster = project.extensions.getByName("${integTestTaskName}Cluster") as ClusterConfiguration + cluster.dependsOn(project.bundlePlugin) - miniHDFSArgs.add('hdfs.MiniHDFS') - miniHDFSArgs.add(baseDir) - miniHDFSArgs.add("hdfs/hdfs.build.elastic.co@${realm}") - miniHDFSArgs.add("${keytabPath}") + Task restIntegTestTask = project.tasks.getByName(integTestTaskName) + restIntegTestTask.clusterConfig.plugin(project.path) + + // Default jvm arguments for all test clusters + String jvmArgs = "-Xms" + System.getProperty('tests.heap.size', '512m') + + " " + "-Xmx" + System.getProperty('tests.heap.size', '512m') + + " " + System.getProperty('tests.jvm.argline', '') + + // If it's a secure cluster, add the keytab as an extra config, and set the krb5 conf in the JVM options. + if (integTestTaskName.equals('integTestSecure') || integTestTaskName.equals('integTestSecureHa')) { + Path elasticsearchKT = project(':test:fixtures:krb5kdc-fixture').buildDir.toPath().resolve("keytabs").resolve("elasticsearch.keytab").toAbsolutePath() + Path krb5conf = project(':test:fixtures:krb5kdc-fixture').buildDir.toPath().resolve("conf").resolve("krb5.conf").toAbsolutePath() + + restIntegTestTask.clusterConfig.extraConfigFile("repository-hdfs/krb5.keytab", "${elasticsearchKT}") + jvmArgs = jvmArgs + " " + "-Djava.security.krb5.conf=${krb5conf}" + if (project.rootProject.ext.javaVersion == JavaVersion.VERSION_1_9) { + jvmArgs = jvmArgs + " " + '--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED' + } + + // If it's the HA + Secure tests then also set the Kerberos settings for the integration test JVM since we'll + // need to auth to HDFS to trigger namenode failovers. + if (integTestTaskName.equals('integTestSecureHa')) { + Task restIntegTestTaskRunner = project.tasks.getByName("${integTestTaskName}Runner") + restIntegTestTaskRunner.systemProperty "test.krb5.principal.es", "elasticsearch@${realm}" + restIntegTestTaskRunner.systemProperty "test.krb5.principal.hdfs", "hdfs/hdfs.build.elastic.co@${realm}" + restIntegTestTaskRunner.jvmArg "-Djava.security.krb5.conf=${krb5conf}" + if (project.rootProject.ext.javaVersion == JavaVersion.VERSION_1_9) { + restIntegTestTaskRunner.jvmArg '--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED' + } + + Path hdfsKT = project(':test:fixtures:krb5kdc-fixture').buildDir.toPath().resolve("keytabs").resolve("hdfs_hdfs.build.elastic.co.keytab").toAbsolutePath() + restIntegTestTaskRunner.systemProperty "test.krb5.keytab.hdfs", "${hdfsKT}" + } + } + + restIntegTestTask.clusterConfig.jvmArgs = jvmArgs + } +} + +// Create a Integration Test suite just for HA based tests +RestIntegTestTask integTestHa = project.tasks.create('integTestHa', RestIntegTestTask.class) { + description = "Runs rest tests against an elasticsearch cluster with HDFS configured with HA Namenode." +} + +// Create a Integration Test suite just for security based tests +RestIntegTestTask integTestSecure = project.tasks.create('integTestSecure', RestIntegTestTask.class) { + description = "Runs rest tests against an elasticsearch cluster with HDFS secured by MIT Kerberos." +} - args miniHDFSArgs.toArray() +// Create a Integration Test suite just for HA related security based tests +RestIntegTestTask integTestSecureHa = project.tasks.create('integTestSecureHa', RestIntegTestTask.class) { + description = "Runs rest tests against an elasticsearch cluster with HDFS configured with HA Namenode and secured by MIT Kerberos." } +// Determine HDFS Fixture compatibility for the current build environment. boolean fixtureSupported = false if (Os.isFamily(Os.FAMILY_WINDOWS)) { // hdfs fixture will not start without hadoop native libraries on windows @@ -145,9 +217,9 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) { if (nativePath != null) { Path path = Paths.get(nativePath); if (Files.isDirectory(path) && - Files.exists(path.resolve("bin").resolve("winutils.exe")) && - Files.exists(path.resolve("bin").resolve("hadoop.dll")) && - Files.exists(path.resolve("bin").resolve("hdfs.dll"))) { + Files.exists(path.resolve("bin").resolve("winutils.exe")) && + Files.exists(path.resolve("bin").resolve("hadoop.dll")) && + Files.exists(path.resolve("bin").resolve("hdfs.dll"))) { fixtureSupported = true } else { throw new IllegalStateException("HADOOP_HOME: ${path} is invalid, does not contain hadoop native libraries in \$HADOOP_HOME/bin"); @@ -157,55 +229,63 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) { fixtureSupported = true } +// Always ignore HA integration tests in the normal integration test runner, they are included below as +// part of their own HA-specific integration test tasks. +integTestRunner.exclude('**/Ha*TestSuiteIT.class') + if (fixtureSupported) { + // Check depends on the HA test. Already depends on the standard test. + project.check.dependsOn(integTestHa) + + // Both standard and HA tests depend on their respective HDFS fixtures integTestCluster.dependsOn hdfsFixture + integTestHaCluster.dependsOn haHdfsFixture + + // The normal test runner only runs the standard hdfs rest tests integTestRunner.systemProperty 'tests.rest.suite', 'hdfs_repository' + + // Only include the HA integration tests for the HA test task + integTestHaRunner.patternSet.setIncludes(['**/Ha*TestSuiteIT.class']) } else { logger.warn("hdfsFixture unsupported, please set HADOOP_HOME and put HADOOP_HOME\\bin in PATH") - // just tests that the plugin loads + // The normal integration test runner will just test that the plugin loads integTestRunner.systemProperty 'tests.rest.suite', 'hdfs_repository/10_basic' + // HA fixture is unsupported. Don't run them. + integTestHa.setEnabled(false) } +// Secure HDFS testing relies on the Vagrant based Kerberos fixture. boolean secureFixtureSupported = false if (fixtureSupported) { secureFixtureSupported = project.rootProject.vagrantSupported } -// Create a Integration Test suite just for security based tests -// This must execute before the afterEvaluate block from integTestSecure -project.afterEvaluate { - Path elasticsearchKT = project(':test:fixtures:krb5kdc-fixture').buildDir.toPath().resolve("keytabs").resolve("elasticsearch.keytab").toAbsolutePath() - Path krb5conf = project(':test:fixtures:krb5kdc-fixture').buildDir.toPath().resolve("conf").resolve("krb5.conf").toAbsolutePath() - - project.integTestSecureCluster.dependsOn(project.bundlePlugin) - project.integTestSecure.clusterConfig.plugin(project.path) - project.integTestSecure.clusterConfig.extraConfigFile("repository-hdfs/krb5.keytab", "${elasticsearchKT}") - final String baseJvmArgs = "-Xms" + System.getProperty('tests.heap.size', '512m') + - " " + "-Xmx" + System.getProperty('tests.heap.size', '512m') + - " " + "-Djava.security.krb5.conf=${krb5conf}" + - " " + System.getProperty('tests.jvm.argline', '') - final String jvmArgs - if (project.rootProject.ext.javaVersion == JavaVersion.VERSION_1_9) { - jvmArgs = baseJvmArgs + " " + '--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED' - } else { - jvmArgs = baseJvmArgs - } - - project.integTestSecure.clusterConfig.jvmArgs = jvmArgs - } - -RestIntegTestTask integTestSecure = project.tasks.create('integTestSecure', RestIntegTestTask.class) { - description = "Runs rest tests against an elasticsearch cluster with HDFS secured by MIT Kerberos." -} - if (secureFixtureSupported) { project.check.dependsOn(integTestSecure) + project.check.dependsOn(integTestSecureHa) // Fixture dependencies integTestSecureCluster.dependsOn secureHdfsFixture, krb5kdcFixture + integTestSecureHaCluster.dependsOn secureHaHdfsFixture, krb5kdcFixture + + // Set the keytab files in the classpath so that we can access them from test code without the security manager + // freaking out. + Path hdfsKeytabPath = project(':test:fixtures:krb5kdc-fixture').buildDir.toPath().resolve("keytabs") + project.dependencies { + testRuntime fileTree(dir: hdfsKeytabPath.toString(), include: ['*.keytab']) + } + + // Run just the secure hdfs rest test suite. integTestSecureRunner.systemProperty 'tests.rest.suite', 'secure_hdfs_repository' + // Ignore HA integration Tests. They are included below as part of integTestSecureHa test runner. + integTestSecureRunner.exclude('**/Ha*TestSuiteIT.class') + + // Only include the HA integration tests for the HA test task + integTestSecureHaRunner.patternSet.setIncludes(['**/Ha*TestSuiteIT.class']) } else { + // Security tests unsupported. Don't run these tests. integTestSecure.enabled = false + integTestSecureHa.enabled = false } thirdPartyAudit.excludes = [ @@ -309,11 +389,7 @@ thirdPartyAudit.excludes = [ 'org.apache.commons.beanutils.DynaProperty', 'org.apache.commons.beanutils.PropertyUtils', 'org.apache.commons.compress.archivers.tar.TarArchiveEntry', - 'org.apache.commons.compress.archivers.tar.TarArchiveInputStream', - 'org.apache.commons.codec.DecoderException', - 'org.apache.commons.codec.binary.Base64', - 'org.apache.commons.codec.binary.Hex', - 'org.apache.commons.codec.digest.DigestUtils', + 'org.apache.commons.compress.archivers.tar.TarArchiveInputStream', 'org.apache.commons.daemon.Daemon', 'org.apache.commons.daemon.DaemonContext', 'org.apache.commons.digester.AbstractObjectCreationFactory', diff --git a/plugins/repository-hdfs/licenses/commons-codec-1.10.jar.sha1 b/plugins/repository-hdfs/licenses/commons-codec-1.10.jar.sha1 new file mode 100644 index 0000000000000..3fe8682a1b0f9 --- /dev/null +++ b/plugins/repository-hdfs/licenses/commons-codec-1.10.jar.sha1 @@ -0,0 +1 @@ +4b95f4897fa13f2cd904aee711aeafc0c5295cd8 \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/commons-codec-LICENSE.txt b/plugins/repository-hdfs/licenses/commons-codec-LICENSE.txt new file mode 100644 index 0000000000000..75b52484ea471 --- /dev/null +++ b/plugins/repository-hdfs/licenses/commons-codec-LICENSE.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/plugins/repository-hdfs/licenses/commons-codec-NOTICE.txt b/plugins/repository-hdfs/licenses/commons-codec-NOTICE.txt new file mode 100644 index 0000000000000..efc098ca3ee67 --- /dev/null +++ b/plugins/repository-hdfs/licenses/commons-codec-NOTICE.txt @@ -0,0 +1,17 @@ +Apache Commons Codec +Copyright 2002-2014 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java +contains test data from http://aspell.net/test/orig/batch0.tab. +Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org) + +=============================================================================== + +The content of package org.apache.commons.codec.language.bm has been translated +from the original php source code available at http://stevemorse.org/phoneticinfo.htm +with permission from the original authors. +Original source copyright: +Copyright (c) 2008 Alexander Beider & Stephen P. Morse. diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index 290de5b873ef8..26a7ed56d8182 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -173,9 +173,9 @@ public Map listBlobs() throws IOException { } /** - * Exists to wrap underlying InputStream methods that might need to make connections or - * perform actions within doPrivileged blocks. The HDFS Client performs a lot underneath - * the FSInputStream, including making connections and executing reflection based RPC calls. + * Exists to wrap underlying InputStream methods that might make socket connections in + * doPrivileged blocks. This is due to the way that hdfs client libraries might open + * socket connections when you are reading from an InputStream. */ private static class HDFSPrivilegedInputSteam extends FilterInputStream { @@ -187,43 +187,30 @@ private static class HDFSPrivilegedInputSteam extends FilterInputStream { } public int read() throws IOException { - return doPrivilegedOrThrow(in::read); + return securityContext.doPrivilegedOrThrow(in::read); } public int read(byte b[]) throws IOException { - return doPrivilegedOrThrow(() -> in.read(b)); + return securityContext.doPrivilegedOrThrow(() -> in.read(b)); } public int read(byte b[], int off, int len) throws IOException { - return doPrivilegedOrThrow(() -> in.read(b, off, len)); + return securityContext.doPrivilegedOrThrow(() -> in.read(b, off, len)); } public long skip(long n) throws IOException { - return doPrivilegedOrThrow(() -> in.skip(n)); + return securityContext.doPrivilegedOrThrow(() -> in.skip(n)); } public int available() throws IOException { - return doPrivilegedOrThrow(() -> in.available()); + return securityContext.doPrivilegedOrThrow(() -> in.available()); } public synchronized void reset() throws IOException { - doPrivilegedOrThrow(() -> { + securityContext.doPrivilegedOrThrow(() -> { in.reset(); return null; }); } - - private T doPrivilegedOrThrow(PrivilegedExceptionAction action) throws IOException { - SecurityManager sm = System.getSecurityManager(); - if (sm != null) { - // unprivileged code such as scripts do not have SpecialPermission - sm.checkPermission(new SpecialPermission()); - } - try { - return AccessController.doPrivileged(action, null, securityContext.getRestrictedExecutionPermissions()); - } catch (PrivilegedActionException e) { - throw (IOException) e.getCause(); - } - } } } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java index 0556c17d08513..a09ce7ed6df86 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java @@ -29,9 +29,6 @@ import org.elasticsearch.common.blobstore.BlobStore; import java.io.IOException; -import java.security.AccessController; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; final class HdfsBlobStore implements BlobStore { @@ -42,8 +39,14 @@ final class HdfsBlobStore implements BlobStore { private volatile boolean closed; HdfsBlobStore(FileContext fileContext, String path, int bufferSize) throws IOException { + this(fileContext, path, bufferSize, false); + } + + HdfsBlobStore(FileContext fileContext, String path, int bufferSize, boolean haEnabled) throws IOException { this.fileContext = fileContext; - this.securityContext = new HdfsSecurityContext(fileContext.getUgi()); + // Only restrict permissions if not running with HA + boolean restrictPermissions = (haEnabled == false); + this.securityContext = new HdfsSecurityContext(fileContext.getUgi(), restrictPermissions); this.bufferSize = bufferSize; this.root = execute(new Operation() { @Override @@ -125,17 +128,10 @@ V execute(Operation operation) throws IOException { if (closed) { throw new AlreadyClosedException("HdfsBlobStore is closed: " + this); } - try { - return AccessController.doPrivileged((PrivilegedExceptionAction) - () -> { - securityContext.ensureLogin(); - return operation.run(fileContext); - }, - null, - securityContext.getRestrictedExecutionPermissions()); - } catch (PrivilegedActionException pae) { - throw (IOException) pae.getException(); - } + return securityContext.doPrivilegedOrThrow(() -> { + securityContext.ensureLogin(); + return operation.run(fileContext); + }); } @Override diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index 9fd64b79d29a7..f1ad57f0595cf 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -33,6 +33,8 @@ import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -85,12 +87,12 @@ protected void doStart() { } URI uri = URI.create(uriSetting); if ("hdfs".equalsIgnoreCase(uri.getScheme()) == false) { - throw new IllegalArgumentException( - String.format(Locale.ROOT, "Invalid scheme [%s] specified in uri [%s]; only 'hdfs' uri allowed for hdfs snapshot/restore", uri.getScheme(), uriSetting)); + throw new IllegalArgumentException(String.format(Locale.ROOT, + "Invalid scheme [%s] specified in uri [%s]; only 'hdfs' uri allowed for hdfs snapshot/restore", uri.getScheme(), uriSetting)); } if (Strings.hasLength(uri.getPath()) && uri.getPath().equals("/") == false) { throw new IllegalArgumentException(String.format(Locale.ROOT, - "Use 'path' option to specify a path [%s], not the uri [%s] for hdfs snapshot/restore", uri.getPath(), uriSetting)); + "Use 'path' option to specify a path [%s], not the uri [%s] for hdfs snapshot/restore", uri.getPath(), uriSetting)); } String pathSetting = getMetadata().settings().get("path"); @@ -99,48 +101,45 @@ protected void doStart() { throw new IllegalArgumentException("No 'path' defined for hdfs snapshot/restore"); } - int bufferSize = getMetadata().settings().getAsBytesSize("buffer_size", DEFAULT_BUFFER_SIZE).bytesAsInt(); - - try { - // initialize our filecontext - SecurityManager sm = System.getSecurityManager(); - if (sm != null) { - sm.checkPermission(new SpecialPermission()); - } - FileContext fileContext = AccessController.doPrivileged(new PrivilegedAction() { - @Override - public FileContext run() { - return createContext(uri, getMetadata().settings()); - } - }); - blobStore = new HdfsBlobStore(fileContext, pathSetting, bufferSize); - logger.debug("Using file-system [{}] for URI [{}], path [{}]", fileContext.getDefaultFileSystem(), fileContext.getDefaultFileSystem().getUri(), pathSetting); - } catch (IOException e) { - throw new UncheckedIOException(String.format(Locale.ROOT, "Cannot create HDFS repository for uri [%s]", uri), e); + // initialize our blobstore using elevated privileges. + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + sm.checkPermission(new SpecialPermission()); } + blobStore = AccessController.doPrivileged((PrivilegedAction) () -> createBlobstore(uri, pathSetting, getMetadata().settings())); super.doStart(); } - // create hadoop filecontext - private FileContext createContext(URI uri, Settings repositorySettings) { + private HdfsBlobStore createBlobstore(URI uri, String path, Settings repositorySettings) { Configuration hadoopConfiguration = new Configuration(repositorySettings.getAsBoolean("load_defaults", true)); hadoopConfiguration.setClassLoader(HdfsRepository.class.getClassLoader()); hadoopConfiguration.reloadConfiguration(); Map map = repositorySettings.getByPrefix("conf.").getAsMap(); for (Entry entry : map.entrySet()) { + LOGGER.debug("Adding configuration to HDFS Client Configuration : {} = {}", entry.getKey(), entry.getValue()); hadoopConfiguration.set(entry.getKey(), entry.getValue()); } + // Disable FS cache + hadoopConfiguration.setBoolean("fs.hdfs.impl.disable.cache", true); + // Create a hadoop user UserGroupInformation ugi = login(hadoopConfiguration, repositorySettings); - // Disable FS cache - hadoopConfiguration.setBoolean("fs.hdfs.impl.disable.cache", true); + // Sense if HA is enabled + // HA requires elevated permissions during regular usage in the event that a failover operation + // occurs and a new connection is required. + String host = uri.getHost(); + String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + host; + Class ret = hadoopConfiguration.getClass(configKey, null, FailoverProxyProvider.class); + boolean haEnabled = ret != null; + + int bufferSize = repositorySettings.getAsBytesSize("buffer_size", DEFAULT_BUFFER_SIZE).bytesAsInt(); // Create the filecontext with our user information // This will correctly configure the filecontext to have our UGI as its internal user. - return ugi.doAs((PrivilegedAction) () -> { + FileContext fileContext = ugi.doAs((PrivilegedAction) () -> { try { AbstractFileSystem fs = AbstractFileSystem.get(uri, hadoopConfiguration); return FileContext.getFileContext(fs, hadoopConfiguration); @@ -148,6 +147,14 @@ private FileContext createContext(URI uri, Settings repositorySettings) { throw new UncheckedIOException(e); } }); + + logger.debug("Using file-system [{}] for URI [{}], path [{}]", fileContext.getDefaultFileSystem(), fileContext.getDefaultFileSystem().getUri(), path); + + try { + return new HdfsBlobStore(fileContext, path, bufferSize, haEnabled); + } catch (IOException e) { + throw new UncheckedIOException(String.format(Locale.ROOT, "Cannot create HDFS repository for uri [%s]", uri), e); + } } private UserGroupInformation login(Configuration hadoopConfiguration, Settings repositorySettings) { diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsSecurityContext.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsSecurityContext.java index 2a63df6c8bc43..fe573d33cd809 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsSecurityContext.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsSecurityContext.java @@ -24,15 +24,17 @@ import java.net.SocketPermission; import java.nio.file.Files; import java.nio.file.Path; +import java.security.AccessController; import java.security.Permission; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; import java.util.Arrays; import javax.security.auth.AuthPermission; import javax.security.auth.PrivateCredentialPermission; import javax.security.auth.kerberos.ServicePermission; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.SpecialPermission; import org.elasticsearch.env.Environment; /** @@ -43,8 +45,6 @@ */ class HdfsSecurityContext { - private static final Logger LOGGER = Loggers.getLogger(HdfsSecurityContext.class); - private static final Permission[] SIMPLE_AUTH_PERMISSIONS; private static final Permission[] KERBEROS_AUTH_PERMISSIONS; static { @@ -102,10 +102,12 @@ static Path locateKeytabFile(Environment environment) { } private final UserGroupInformation ugi; + private final boolean restrictPermissions; private final Permission[] restrictedExecutionPermissions; - HdfsSecurityContext(UserGroupInformation ugi) { + HdfsSecurityContext(UserGroupInformation ugi, boolean restrictPermissions) { this.ugi = ugi; + this.restrictPermissions = restrictPermissions; this.restrictedExecutionPermissions = renderPermissions(ugi); } @@ -129,10 +131,27 @@ private Permission[] renderPermissions(UserGroupInformation ugi) { return permissions; } - Permission[] getRestrictedExecutionPermissions() { + private Permission[] getRestrictedExecutionPermissions() { return restrictedExecutionPermissions; } + T doPrivilegedOrThrow(PrivilegedExceptionAction action) throws IOException { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + // unprivileged code such as scripts do not have SpecialPermission + sm.checkPermission(new SpecialPermission()); + } + try { + if (restrictPermissions) { + return AccessController.doPrivileged(action, null, this.getRestrictedExecutionPermissions()); + } else { + return AccessController.doPrivileged(action); + } + } catch (PrivilegedActionException e) { + throw (IOException) e.getCause(); + } + } + void ensureLogin() { if (ugi.isFromKeytab()) { try { diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HaHdfsFailoverTestSuiteIT.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HaHdfsFailoverTestSuiteIT.java new file mode 100644 index 0000000000000..ce4fe9b6d3f42 --- /dev/null +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HaHdfsFailoverTestSuiteIT.java @@ -0,0 +1,283 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories.hdfs; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.BadFencingConfigurationException; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceTarget; +import org.apache.hadoop.ha.NodeFencer; +import org.apache.hadoop.ha.ZKFCProtocol; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.tools.DFSHAAdmin; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.http.Header; +import org.apache.http.message.BasicHeader; +import org.apache.http.nio.entity.NStringEntity; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.junit.Assert; + +/** + * Integration test that runs against an HA-Enabled HDFS instance + */ +public class HaHdfsFailoverTestSuiteIT extends ESRestTestCase { + + public void testHAFailoverWithRepository() throws Exception { + RestClient client = client(); + Map emptyParams = Collections.emptyMap(); + Header contentHeader = new BasicHeader("Content-Type", "application/json"); + + String esKerberosPrincipal = System.getProperty("test.krb5.principal.es"); + String hdfsKerberosPrincipal = System.getProperty("test.krb5.principal.hdfs"); + String kerberosKeytabLocation = System.getProperty("test.krb5.keytab.hdfs"); + boolean securityEnabled = hdfsKerberosPrincipal != null; + + Configuration hdfsConfiguration = new Configuration(); + hdfsConfiguration.set("dfs.nameservices", "ha-hdfs"); + hdfsConfiguration.set("dfs.ha.namenodes.ha-hdfs", "nn1,nn2"); + hdfsConfiguration.set("dfs.namenode.rpc-address.ha-hdfs.nn1", "localhost:10001"); + hdfsConfiguration.set("dfs.namenode.rpc-address.ha-hdfs.nn2", "localhost:10002"); + hdfsConfiguration.set( + "dfs.client.failover.proxy.provider.ha-hdfs", + "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + ); + + AccessController.doPrivileged((PrivilegedExceptionAction) () -> { + if (securityEnabled) { + // ensure that keytab exists + Path kt = PathUtils.get(kerberosKeytabLocation); + if (Files.exists(kt) == false) { + throw new IllegalStateException("Could not locate keytab at " + kerberosKeytabLocation); + } + if (Files.isReadable(kt) != true) { + throw new IllegalStateException("Could not read keytab at " + kerberosKeytabLocation); + } + logger.info("Keytab Length: " + Files.readAllBytes(kt).length); + + // set principal names + hdfsConfiguration.set("dfs.namenode.kerberos.principal", hdfsKerberosPrincipal); + hdfsConfiguration.set("dfs.datanode.kerberos.principal", hdfsKerberosPrincipal); + hdfsConfiguration.set("dfs.data.transfer.protection", "authentication"); + + SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, hdfsConfiguration); + UserGroupInformation.setConfiguration(hdfsConfiguration); + UserGroupInformation.loginUserFromKeytab(hdfsKerberosPrincipal, kerberosKeytabLocation); + } else { + SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE, hdfsConfiguration); + UserGroupInformation.setConfiguration(hdfsConfiguration); + UserGroupInformation.getCurrentUser(); + } + return null; + }); + + // Create repository + { + Response response = client.performRequest("PUT", "/_snapshot/hdfs_ha_repo_read", emptyParams, new NStringEntity( + "{" + + "\"type\":\"hdfs\"," + + "\"settings\":{" + + "\"uri\": \"hdfs://ha-hdfs/\",\n" + + "\"path\": \"/user/elasticsearch/existing/readonly-repository\"," + + "\"readonly\": \"true\"," + + securityCredentials(securityEnabled, esKerberosPrincipal) + + "\"conf.dfs.nameservices\": \"ha-hdfs\"," + + "\"conf.dfs.ha.namenodes.ha-hdfs\": \"nn1,nn2\"," + + "\"conf.dfs.namenode.rpc-address.ha-hdfs.nn1\": \"localhost:10001\"," + + "\"conf.dfs.namenode.rpc-address.ha-hdfs.nn2\": \"localhost:10002\"," + + "\"conf.dfs.client.failover.proxy.provider.ha-hdfs\": " + + "\"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider\"" + + "}" + + "}", + Charset.defaultCharset()), contentHeader); + + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + } + + // Get repository + { + Response response = client.performRequest("GET", "/_snapshot/hdfs_ha_repo_read/_all", emptyParams); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + } + + // Failover the namenode to the second. + failoverHDFS("nn1", "nn2", hdfsConfiguration); + + // Get repository again + { + Response response = client.performRequest("GET", "/_snapshot/hdfs_ha_repo_read/_all", emptyParams); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + } + } + + private String securityCredentials(boolean securityEnabled, String kerberosPrincipal) { + if (securityEnabled) { + return "\"security.principal\": \""+kerberosPrincipal+"\"," + + "\"conf.dfs.data.transfer.protection\": \"authentication\","; + } else { + return ""; + } + } + + /** + * Wraps an HAServiceTarget, keeping track of any HAServiceProtocol proxies it generates in order + * to close them at the end of the test lifecycle. + */ + private static class CloseableHAServiceTarget extends HAServiceTarget { + private final HAServiceTarget delegate; + private final List protocolsToClose = new ArrayList<>(); + + CloseableHAServiceTarget(HAServiceTarget delegate) { + this.delegate = delegate; + } + + @Override + public InetSocketAddress getAddress() { + return delegate.getAddress(); + } + + @Override + public InetSocketAddress getHealthMonitorAddress() { + return delegate.getHealthMonitorAddress(); + } + + @Override + public InetSocketAddress getZKFCAddress() { + return delegate.getZKFCAddress(); + } + + @Override + public NodeFencer getFencer() { + return delegate.getFencer(); + } + + @Override + public void checkFencingConfigured() throws BadFencingConfigurationException { + delegate.checkFencingConfigured(); + } + + @Override + public HAServiceProtocol getProxy(Configuration conf, int timeoutMs) throws IOException { + HAServiceProtocol proxy = delegate.getProxy(conf, timeoutMs); + protocolsToClose.add(proxy); + return proxy; + } + + @Override + public HAServiceProtocol getHealthMonitorProxy(Configuration conf, int timeoutMs) throws IOException { + return delegate.getHealthMonitorProxy(conf, timeoutMs); + } + + @Override + public ZKFCProtocol getZKFCProxy(Configuration conf, int timeoutMs) throws IOException { + return delegate.getZKFCProxy(conf, timeoutMs); + } + + @Override + public boolean isAutoFailoverEnabled() { + return delegate.isAutoFailoverEnabled(); + } + + private void close() { + for (HAServiceProtocol protocol : protocolsToClose) { + if (protocol instanceof HAServiceProtocolClientSideTranslatorPB) { + ((HAServiceProtocolClientSideTranslatorPB) protocol).close(); + } + } + } + } + + /** + * The default HAAdmin tool does not throw exceptions on failures, and does not close any client connection + * resources when it concludes. This subclass overrides the tool to allow for exception throwing, and to + * keep track of and clean up connection resources. + */ + private static class CloseableHAAdmin extends DFSHAAdmin { + private final List serviceTargets = new ArrayList<>(); + + @Override + protected HAServiceTarget resolveTarget(String nnId) { + CloseableHAServiceTarget target = new CloseableHAServiceTarget(super.resolveTarget(nnId)); + serviceTargets.add(target); + return target; + } + + @Override + public int run(String[] argv) throws Exception { + return runCmd(argv); + } + + public int transitionToStandby(String namenodeID) throws Exception { + return run(new String[]{"-transitionToStandby", namenodeID}); + } + + public int transitionToActive(String namenodeID) throws Exception { + return run(new String[]{"-transitionToActive", namenodeID}); + } + + public void close() { + for (CloseableHAServiceTarget serviceTarget : serviceTargets) { + serviceTarget.close(); + } + } + } + + /** + * Performs a two-phase leading namenode transition. + * @param from Namenode ID to transition to standby + * @param to Namenode ID to transition to active + * @param configuration Client configuration for HAAdmin tool + * @throws IOException In the event of a raised exception during namenode failover. + */ + private void failoverHDFS(String from, String to, Configuration configuration) throws IOException { + logger.info("Swapping active namenodes: [{}] to standby and [{}] to active", from, to); + try { + AccessController.doPrivileged((PrivilegedExceptionAction) () -> { + CloseableHAAdmin haAdmin = new CloseableHAAdmin(); + haAdmin.setConf(configuration); + try { + haAdmin.transitionToStandby(from); + haAdmin.transitionToActive(to); + } finally { + haAdmin.close(); + } + return null; + }); + } catch (PrivilegedActionException pae) { + throw new IOException("Unable to perform namenode failover", pae); + } + } +} diff --git a/test/fixtures/hdfs-fixture/src/main/java/hdfs/MiniHDFS.java b/test/fixtures/hdfs-fixture/src/main/java/hdfs/MiniHDFS.java index 73f4e443b0769..0ddf7af653391 100644 --- a/test/fixtures/hdfs-fixture/src/main/java/hdfs/MiniHDFS.java +++ b/test/fixtures/hdfs-fixture/src/main/java/hdfs/MiniHDFS.java @@ -41,6 +41,9 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.security.UserGroupInformation; /** @@ -91,6 +94,7 @@ public static void main(String[] args) throws Exception { cfg.set(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, "true"); cfg.set(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, "true"); cfg.set(DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY, "true"); + cfg.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, "true"); } UserGroupInformation.setConfiguration(cfg); @@ -102,12 +106,32 @@ public static void main(String[] args) throws Exception { } else { builder.nameNodePort(9999); } + + // Configure HA mode + String haNameService = System.getProperty("ha-nameservice"); + boolean haEnabled = haNameService != null; + if (haEnabled) { + MiniDFSNNTopology.NNConf nn1 = new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10001); + MiniDFSNNTopology.NNConf nn2 = new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10002); + MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf(haNameService).addNN(nn1).addNN(nn2); + MiniDFSNNTopology namenodeTopology = new MiniDFSNNTopology().addNameservice(nameservice); + builder.nnTopology(namenodeTopology); + } + MiniDFSCluster dfs = builder.build(); // Configure contents of the filesystem org.apache.hadoop.fs.Path esUserPath = new org.apache.hadoop.fs.Path("/user/elasticsearch"); - try (FileSystem fs = dfs.getFileSystem()) { + FileSystem fs; + if (haEnabled) { + dfs.transitionToActive(0); + fs = HATestUtil.configureFailoverFs(dfs, cfg); + } else { + fs = dfs.getFileSystem(); + } + + try { // Set the elasticsearch user directory up fs.mkdirs(esUserPath); if (UserGroupInformation.isSecurityEnabled()) { @@ -133,6 +157,8 @@ public static void main(String[] args) throws Exception { FileUtils.deleteDirectory(tempDirectory.toFile()); } + } finally { + fs.close(); } // write our PID file @@ -142,8 +168,12 @@ public static void main(String[] args) throws Exception { Files.move(tmp, baseDir.resolve(PID_FILE_NAME), StandardCopyOption.ATOMIC_MOVE); // write our port file + String portFileContent = Integer.toString(dfs.getNameNodePort(0)); + if (haEnabled) { + portFileContent = portFileContent + "\n" + Integer.toString(dfs.getNameNodePort(1)); + } tmp = Files.createTempFile(baseDir, null, null); - Files.write(tmp, Integer.toString(dfs.getNameNodePort()).getBytes(StandardCharsets.UTF_8)); + Files.write(tmp, portFileContent.getBytes(StandardCharsets.UTF_8)); Files.move(tmp, baseDir.resolve(PORT_FILE_NAME), StandardCopyOption.ATOMIC_MOVE); } }