From cf36ec224f52af64323e18d5d0befbfcfc15a176 Mon Sep 17 00:00:00 2001 From: "Yolanda M. Davis" Date: Tue, 8 Aug 2017 00:03:42 -0400 Subject: [PATCH] NIFI-4255 - added flag to allow migration of existing (source) acls to destination, update to documentation --- .../main/asciidoc/administration-guide.adoc | 1 + .../toolkit/zkmigrator/ZooKeeperMigrator.java | 45 ++++++++++++------- .../zkmigrator/ZooKeeperMigratorMain.java | 8 +++- .../zkmigrator/ZooKeeperMigratorTest.groovy | 25 +++++++++++ 4 files changed, 63 insertions(+), 16 deletions(-) diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index c2e9f8a5fc95..80d453516da3 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -2246,6 +2246,7 @@ You can use the following command line options with the ZooKeeper Migrator: * `-k`,`--krb-conf ` Allows the specification of a JAAS configuration file to allow authentication with a ZooKeeper configured to use Kerberos. This option is mutually exclusive with the `-a`,`--auth` option. * `-r`,`--receive` Receives data from ZooKeeper and writes to the given filename (if the `-f`,`--file` option is provided) or standard output. The data received will contain the full path to each node read from ZooKeeper. This option is mutually exclusive with the `-s`,`--send` option. * `-s`,`--send` Sends data to ZooKeeper that is read from the given filename (if the `-f`,`--file` option is provided) or standard input. The paths for each node in the data being sent to ZooKeeper are absolute paths, and will be stored in ZooKeeper under the *path* portion of the `-z`,`--zookeeper` argument. Typically, the *path* portion of the argument can be omitted, which will store the nodes at their absolute paths. This option is mutually exclusive with the `-r`,`--receive` option. +* `--use-existing-acl` Allows the Zookeeper Migrator to write ACL values retrieved from the source Zookeeper server to destination server. Default action will apply Open rights for unsecured destinations or Creator Only rights for secured destinations. * `-z`,`--zookeeper ` The ZooKeeper server(s) to use, specified by a connect string, comprised of one or more comma-separated host:port pairs followed by a path, in the format of _host:port[,host2:port...,hostn:port]/znode/path_. [[migrating_between_source_destination_zookeepers]] diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java index fa71ae0d8fa5..733321561441 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java @@ -25,6 +25,7 @@ import com.google.gson.JsonParser; import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonWriter; + import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; @@ -116,7 +117,7 @@ void readZooKeeper(OutputStream zkData, AuthMode authMode, byte[] authData) thro closeZooKeeper(zooKeeper); } - void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, boolean ignoreSource) throws IOException, ExecutionException, InterruptedException { + void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, boolean ignoreSource, boolean useExistingACL) throws IOException, ExecutionException, InterruptedException { // ensure that the chroot path exists ZooKeeper zooKeeperRoot = getZooKeeper(Joiner.on(',').join(zooKeeperEndpointConfig.getServers()), authMode, authData); ensureNodeExists(zooKeeperRoot, zooKeeperEndpointConfig.getPath(), CreateMode.PERSISTENT); @@ -132,7 +133,7 @@ void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, bool final ZooKeeperEndpointConfig sourceZooKeeperEndpointConfig = gson.fromJson(jsonReader, ZooKeeperEndpointConfig.class); LOGGER.info("Source data was obtained from ZooKeeper: {}", sourceZooKeeperEndpointConfig); Preconditions.checkArgument(!Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getPath()) - && sourceZooKeeperEndpointConfig.getServers() != null && sourceZooKeeperEndpointConfig.getServers().size() > 0, "Source ZooKeeper %s from %s is invalid", + && sourceZooKeeperEndpointConfig.getServers() != null && sourceZooKeeperEndpointConfig.getServers().size() > 0, "Source ZooKeeper %s from %s is invalid", sourceZooKeeperEndpointConfig, zkData); Preconditions.checkArgument(Collections.disjoint(zooKeeperEndpointConfig.getServers(), sourceZooKeeperEndpointConfig.getServers()) || !zooKeeperEndpointConfig.getPath().equals(sourceZooKeeperEndpointConfig.getPath()) || ignoreSource, @@ -160,17 +161,23 @@ public boolean tryAdvance(Consumer action) { }, false); final List> writeFutures = stream.parallel().map(node -> { + /* - * create stage to migrate paths and ACLs based on the migration parent path plus the node path and the given AuthMode, - * this stage must be run first + * create stage to determine the acls that should be applied to the node. + * this stage will be used to initialize the chain */ - final CompletableFuture transformNodeStage = CompletableFuture.supplyAsync(() -> transformNode(node, authMode)); + final CompletableFuture> determineACLStage = CompletableFuture.supplyAsync(() -> determineACLs(node, authMode, useExistingACL)); + /* + * create stage to apply acls to nodes and transform node to DataStatAclNode object + */ + final Function, CompletableFuture> transformNodeStage = acls -> CompletableFuture.supplyAsync(() -> transformNode(node, acls)); /* * create stage to ensure that nodes exist for the entire path of the zookeeper node, must be invoked after the transformNode stage to * ensure that the node will exist after path migration */ - final Function ensureNodeExistsStage = dataStatAclNode -> - ensureNodeExists(zooKeeper, dataStatAclNode.getPath(), dataStatAclNode.getEphemeralOwner() == 0 ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL); + final Function> ensureNodeExistsStage = dataStatAclNode -> + CompletableFuture.supplyAsync(() -> ensureNodeExists(zooKeeper, dataStatAclNode.getPath(), + dataStatAclNode.getEphemeralOwner() == 0 ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL)); /* * create stage that waits for both the transformNode and ensureNodeExists stages complete, and also provides that the given transformed node is * available to the next stage @@ -180,12 +187,16 @@ public boolean tryAdvance(Consumer action) { * create stage to transmit the node to the destination zookeeper endpoint, must be invoked after the node has been transformed and its path * has been created (or already exists) in the destination zookeeper */ - final Function> transmitNodeStage = dataStatNode -> - CompletableFuture.supplyAsync(() -> transmitNode(zooKeeper, dataStatNode)); + final Function> transmitNodeStage = dataStatNode -> CompletableFuture.supplyAsync(() -> transmitNode(zooKeeper, dataStatNode)); /* * submit the stages chained together in the proper order to perform the processing on the given node */ - return transformNodeStage.thenApply(ensureNodeExistsStage).thenCombine(transformNodeStage, combineEnsureNodeAndTransferNodeStage).thenCompose(transmitNodeStage); + final CompletableFuture dataStatAclNodeCompletableFuture = determineACLStage.thenCompose(transformNodeStage); + return dataStatAclNodeCompletableFuture.thenCompose(ensureNodeExistsStage) + .thenCombine(dataStatAclNodeCompletableFuture, combineEnsureNodeAndTransferNodeStage) + .thenCompose(transmitNodeStage); + + }).collect(Collectors.toList()); CompletableFuture allWritesFuture = CompletableFuture.allOf(writeFutures.toArray(new CompletableFuture[writeFutures.size()])); @@ -269,11 +280,15 @@ private String ensureNodeExists(ZooKeeper zooKeeper, String path, CreateMode cre } } - private DataStatAclNode transformNode(DataStatAclNode node, AuthMode destinationAuthMode) { - // For the NiFi use case, all nodes will be migrated to CREATOR_ALL_ACL - final DataStatAclNode migratedNode = new DataStatAclNode(node.getPath(), node.getData(), node.getStat(), - destinationAuthMode.equals(AuthMode.OPEN) ? ZooDefs.Ids.OPEN_ACL_UNSAFE : ZooDefs.Ids.CREATOR_ALL_ACL, - node.getEphemeralOwner()); + private List determineACLs(DataStatAclNode node, AuthMode authMode, Boolean useExistingACL) { + return useExistingACL ? node.getAcls() : + (authMode.equals(AuthMode.OPEN) ? ZooDefs.Ids.OPEN_ACL_UNSAFE : ZooDefs.Ids.CREATOR_ALL_ACL); + + } + + private DataStatAclNode transformNode(DataStatAclNode node, List acls) { + + final DataStatAclNode migratedNode = new DataStatAclNode(node.getPath(), node.getData(), node.getStat(), acls, node.getEphemeralOwner()); LOGGER.info("transformed original node {} to {}", node, migratedNode); return migratedNode; } diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorMain.java b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorMain.java index 8d5886661269..dd2f54bbb2ef 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorMain.java +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorMain.java @@ -90,6 +90,10 @@ enum Mode {READ, WRITE} .longOpt("ignore-source") .desc("ignores the source ZooKeeper endpoint specified in the exported data") .build(); + private static final Option OPTION_USE_EXISTING_ACL = Option.builder() + .longOpt("use-existing-acl") + .desc("allow write of existing acl data to destination") + .build(); private static Options createOptions() { final Options options = new Options(); @@ -98,6 +102,7 @@ private static Options createOptions() { options.addOption(OPTION_ZK_AUTH_INFO); options.addOption(OPTION_FILE); options.addOption(OPTION_IGNORE_SOURCE); + options.addOption(OPTION_USE_EXISTING_ACL); final OptionGroup optionGroupAuth = new OptionGroup().addOption(OPTION_ZK_AUTH_INFO).addOption(OPTION_ZK_KRB_CONF_FILE); optionGroupAuth.setRequired(false); options.addOptionGroup(optionGroupAuth); @@ -136,6 +141,7 @@ public static void main(String[] args) throws IOException { final String auth = commandLine.getOptionValue(OPTION_ZK_AUTH_INFO.getOpt()); final String jaasFilename = commandLine.getOptionValue(OPTION_ZK_KRB_CONF_FILE.getOpt()); final boolean ignoreSource = commandLine.hasOption(OPTION_IGNORE_SOURCE.getLongOpt()); + final boolean useExistingACL = commandLine.hasOption(OPTION_USE_EXISTING_ACL.getLongOpt()); final AuthMode authMode; final byte[] authData; if (auth != null) { @@ -157,7 +163,7 @@ public static void main(String[] args) throws IOException { } } else { try (InputStream zkData = filename != null ? new FileInputStream(Paths.get(filename).toFile()) : System.in) { - zookeeperMigrator.writeZooKeeper(zkData, authMode, authData, ignoreSource); + zookeeperMigrator.writeZooKeeper(zkData, authMode, authData, ignoreSource, useExistingACL); } } } diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy index 299fda521be9..2b9bc00f48dd 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy @@ -25,6 +25,8 @@ import org.apache.zookeeper.WatchedEvent import org.apache.zookeeper.ZKUtil import org.apache.zookeeper.ZooDefs import org.apache.zookeeper.ZooKeeper +import org.apache.zookeeper.data.Stat +import org.apache.zookeeper.server.auth.DigestAuthenticationProvider import spock.lang.Ignore import spock.lang.Specification import spock.lang.Unroll @@ -185,6 +187,29 @@ class ZooKeeperMigratorTest extends Specification { nodes.size() == 3 } + def "Send to open Zookeeper using existing ACL"() { + given: + def server = new TestingServer() + def securedClient = new ZooKeeper(server.connectString, 3000, { WatchedEvent watchedEvent -> }) + def userPass = "nifi:nifi" + securedClient.addAuthInfo("digest",userPass.getBytes(StandardCharsets.UTF_8)) + def digest = DigestAuthenticationProvider.generateDigest(userPass) + def migrationPathRoot = '/nifi' + def stat = new Stat() + + when: + ZooKeeperMigratorMain.main(['-s', '-z', "$server.connectString$migrationPathRoot", '-f', 'src/test/resources/test-data-user-pass.json','--use-existing-acl'] as String[]) + + then: + noExceptionThrown() + def acl = securedClient.getACL("/nifi",stat) + acl.get(0).id.scheme == "digest" + acl.get(0).id.id == digest + def nodes = ZKPaths.getSortedChildren(securedClient, '/nifi').collect { ZKUtil.listSubTreeBFS(securedClient, "/$it") }.flatten() + nodes.size() == 0 + } + + def "Parse Zookeeper connect string and path"() { when: def zooKeeperMigrator = new ZooKeeperMigrator("$connectString")