From 015836adc53fe532c3ede3dda122624a6d7c4980 Mon Sep 17 00:00:00 2001 From: Jeff Storck Date: Thu, 26 Jan 2017 14:30:32 -0500 Subject: [PATCH 1/2] NIFI-3300 Implemented usage of ZooKeeper chroot capability in the connect string Updated ZooKeeper connect string parsing tests Updated admin doc for ZooKeeper Migrator migration of nifi root nodes, updated source and destination ZK check by servers in the connection string instead of the entire connection string --- .../main/asciidoc/administration-guide.adoc | 6 +- .../nifi-toolkit-zookeeper-migrator/pom.xml | 6 + .../zkmigrator/ZooKeeperEndpointConfig.java | 26 ++++- .../toolkit/zkmigrator/ZooKeeperMigrator.java | 51 ++++----- .../zkmigrator/ZooKeeperMigratorTest.groovy | 104 +++++++++--------- .../test/resources/test-data-user-pass.json | 24 ++-- .../src/test/resources/test-data.json | 81 +++++++------- 7 files changed, 158 insertions(+), 140 deletions(-) diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index f970232a7b27..25b2f1190607 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -1678,7 +1678,7 @@ You can use the following command line options with the ZooKeeper Migrator: * `-k,--krb-conf ` Allows the specification of a JAAS configuration file to allow authentication with a ZooKeeper configured to use Kerberos. This option is mutually exclusive with the `-a,--auth` option. * `-r,--receive` Receives data from ZooKeeper and writes to the given filename (if the `-f,--file` option is provided) or standard output. The data received will contain the full path to each node read from ZooKeeper. This option is mutually exclusive with the `-s,--send` option. * `-s,--send` Sends data to ZooKeeper that is read from the given filename (if the `-f,--file` option is provided) or standard input. The paths for each node in the data being sent to ZooKeeper are absolute paths, and will be stored in ZooKeeper under the *path* portion of the `-z,--zookeeper` argument. Typically, the *path* portion of the argument can be omitted, which will store the nodes at their absolute paths. This option is mutually exclusive with the `-r,--receive` option. -* `-z,--zookeeper ` The ZooKeeper server to use, specified by a connection string with path, in the format of _host:port/znode/path_. +* `-z,--zookeeper ` The ZooKeeper server(s) to use, specified by a connect string, comprised of one or more comma-separated host:port pairs followed by a path, in the format of _host:port[,host2:port...,hostn:port]/znode/path_. [[migrating_between_source_destination_zookeepers]] ==== Migrating Between Source and Destination ZooKeepers @@ -1737,9 +1737,9 @@ If the state-management.xml specifies Open, no authentication is required. 5. Migrate the ZooKeeper data to the destination ZooKeeper. If the source and destination ZooKeepers are the same, the `--ignore-source` option can be added to the following examples. * For an open ZooKeeper: -** zk-migrator.sh -s -z *destinationHostname:destinationClientPort*/ -f /*path*/*to*/*export*/*zk-source-data.json* +** zk-migrator.sh -s -z *destinationHostname:destinationClientPort*/*destinationRootPath*/components -f /*path*/*to*/*export*/*zk-source-data.json* * For a ZooKeeper using Kerberos for authentication: -** zk-migrator.sh -s -z *destinationHostname:destinationClientPort*/ -k /*path*/*to*/*jaasconfig*/*jaas-config.conf* -f /*path*/*to*/*export*/*zk-source-data.json* +** zk-migrator.sh -s -z *destinationHostname:destinationClientPort*/*destinationRootPath*/components -k /*path*/*to*/*jaasconfig*/*jaas-config.conf* -f /*path*/*to*/*export*/*zk-source-data.json* 6. Once the migration has completed successfully, start the processors in the NiFi flow. Processing should continue from the point at which it was stopped when the NiFi flow was stopped. diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/pom.xml b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/pom.xml index 5c7d06185fd1..c964910acdb5 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/pom.xml +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/pom.xml @@ -69,6 +69,12 @@ curator-test test + + org.apache.curator + curator-client + 2.11.0 + test + diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperEndpointConfig.java b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperEndpointConfig.java index c5b40b2177ec..e1ea7cab7576 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperEndpointConfig.java +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperEndpointConfig.java @@ -21,24 +21,37 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import java.util.List; import java.util.Objects; class ZooKeeperEndpointConfig { private final String connectString; + private final List servers; private final String path; - ZooKeeperEndpointConfig(String connectString, String path) { + ZooKeeperEndpointConfig(String connectString) { Preconditions.checkArgument(!Strings.isNullOrEmpty(connectString), "connectString can not be null or empty"); - Preconditions.checkArgument(!Strings.isNullOrEmpty(path), "path can not be null or empty"); this.connectString = connectString; - this.path = '/' + Joiner.on('/').join(Splitter.on('/').omitEmptyStrings().trimResults().split(path)); + + final String[] connectStringPath = connectString.split("/", 2); + this.servers = Lists.newArrayList(connectStringPath[0].split(",")); + if (connectStringPath.length == 2) { + this.path = '/' + Joiner.on('/').join(Splitter.on('/').omitEmptyStrings().trimResults().split(connectStringPath[1])); + } else { + path = "/"; + } } public String getConnectString() { return connectString; } + public List getServers() { + return servers; + } + public String getPath() { return path; } @@ -48,18 +61,21 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ZooKeeperEndpointConfig that = (ZooKeeperEndpointConfig) o; - return Objects.equals(connectString, that.connectString) && Objects.equals(path, that.path); + return Objects.equals(connectString, that.connectString) + && Objects.equals(servers, that.servers) + && Objects.equals(path, that.path); } @Override public int hashCode() { - return Objects.hash(connectString, path); + return Objects.hash(connectString, servers, path); } @Override public String toString() { return MoreObjects.toStringHelper(this) .add("connectString", connectString) + .add("servers", servers) .add("path", path) .toString(); } diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java index fc7f6473705f..438c0cf9ac8c 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java @@ -43,6 +43,7 @@ import java.io.OutputStream; import java.io.OutputStreamWriter; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Spliterators; import java.util.concurrent.CompletableFuture; @@ -66,23 +67,14 @@ enum AuthMode {OPEN, DIGEST, SASL} private final ZooKeeperEndpointConfig zooKeeperEndpointConfig; - ZooKeeperMigrator(String zookeeperEndpoint) { - LOGGER.debug("ZooKeeper endpoint parameter: {}", zookeeperEndpoint); - Preconditions.checkArgument(!Strings.isNullOrEmpty(zookeeperEndpoint), "zookeeper endpoint must not be null"); - final String[] connectStringPath = zookeeperEndpoint.split("/", 2); - Preconditions.checkArgument(connectStringPath.length >= 1, "invalid ZooKeeper endpoint: %s", zookeeperEndpoint); - final String connectString = connectStringPath[0]; - final String path; - if (connectStringPath.length == 2) { - path = connectStringPath[1]; - } else { - path = ""; - } - this.zooKeeperEndpointConfig = new ZooKeeperEndpointConfig(connectString, "/" + path); + ZooKeeperMigrator(String zooKeeperConnectString) { + LOGGER.debug("ZooKeeper connect string parameter: {}", zooKeeperConnectString); + Preconditions.checkArgument(!Strings.isNullOrEmpty(zooKeeperConnectString), "ZooKeeper connect string must not be null"); + this.zooKeeperEndpointConfig = new ZooKeeperEndpointConfig(zooKeeperConnectString); } void readZooKeeper(OutputStream zkData, AuthMode authMode, byte[] authData) throws IOException, KeeperException, InterruptedException, ExecutionException { - ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig, authMode, authData); + ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig.getConnectString(), authMode, authData); JsonWriter jsonWriter = new JsonWriter(new BufferedWriter(new OutputStreamWriter(zkData))); jsonWriter.setIndent(" "); JsonParser jsonParser = new JsonParser(); @@ -93,8 +85,8 @@ void readZooKeeper(OutputStream zkData, AuthMode authMode, byte[] authData) thro // persist source ZooKeeperEndpointConfig gson.toJson(jsonParser.parse(gson.toJson(zooKeeperEndpointConfig)).getAsJsonObject(), jsonWriter); - LOGGER.info("Persisting data from source ZooKeeper: {}", zooKeeperEndpointConfig); - final List> readFutures = streamPaths(getNode(zooKeeper, zooKeeperEndpointConfig.getPath())) + LOGGER.info("Retrieving data from source ZooKeeper: {}", zooKeeperEndpointConfig); + final List> readFutures = streamPaths(getNode(zooKeeper, "/")) .parallel() .map(node -> CompletableFuture.supplyAsync(() -> { @@ -125,7 +117,12 @@ void readZooKeeper(OutputStream zkData, AuthMode authMode, byte[] authData) thro } void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, boolean ignoreSource) throws IOException, ExecutionException, InterruptedException { - ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig, authMode, authData); + // ensure that the chroot path exists + ZooKeeper zooKeeperRoot = getZooKeeper(Joiner.on(',').join(zooKeeperEndpointConfig.getServers()), authMode, authData); + ensureNodeExists(zooKeeperRoot, zooKeeperEndpointConfig.getPath(), CreateMode.PERSISTENT); + closeZooKeeper(zooKeeperRoot); + + ZooKeeper zooKeeper = getZooKeeper(zooKeeperEndpointConfig.getConnectString(), authMode, authData); JsonReader jsonReader = new JsonReader(new BufferedReader(new InputStreamReader(zkData))); Gson gson = new GsonBuilder().create(); @@ -134,9 +131,10 @@ void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, bool // determine source ZooKeeperEndpointConfig for this data final ZooKeeperEndpointConfig sourceZooKeeperEndpointConfig = gson.fromJson(jsonReader, ZooKeeperEndpointConfig.class); LOGGER.info("Source data was obtained from ZooKeeper: {}", sourceZooKeeperEndpointConfig); - Preconditions.checkArgument(!Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getPath()), - "Source ZooKeeper %s from %s is invalid", sourceZooKeeperEndpointConfig, zkData); - Preconditions.checkArgument(!(zooKeeperEndpointConfig.equals(sourceZooKeeperEndpointConfig) && !ignoreSource), + Preconditions.checkArgument(!Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getPath()) + && sourceZooKeeperEndpointConfig.getServers() != null && sourceZooKeeperEndpointConfig.getServers().size() > 0, "Source ZooKeeper %s from %s is invalid", + sourceZooKeeperEndpointConfig, zkData); + Preconditions.checkArgument(Collections.disjoint(zooKeeperEndpointConfig.getServers(), sourceZooKeeperEndpointConfig.getServers()) || ignoreSource, "Source ZooKeeper config %s for the data provided can not be the same as the configured destination ZooKeeper config %s", sourceZooKeeperEndpointConfig, zooKeeperEndpointConfig); @@ -271,9 +269,8 @@ private String ensureNodeExists(ZooKeeper zooKeeper, String path, CreateMode cre } private DataStatAclNode transformNode(DataStatAclNode node, AuthMode destinationAuthMode) { - String migrationPath = '/' + Joiner.on('/').skipNulls().join(Splitter.on('/').omitEmptyStrings().trimResults().split(zooKeeperEndpointConfig.getPath() + node.getPath())); // For the NiFi use case, all nodes will be migrated to CREATOR_ALL_ACL - final DataStatAclNode migratedNode = new DataStatAclNode(migrationPath, node.getData(), node.getStat(), + final DataStatAclNode migratedNode = new DataStatAclNode(node.getPath(), node.getData(), node.getStat(), destinationAuthMode.equals(AuthMode.OPEN) ? ZooDefs.Ids.OPEN_ACL_UNSAFE : ZooDefs.Ids.CREATOR_ALL_ACL, node.getEphemeralOwner()); LOGGER.info("transformed original node {} to {}", node, migratedNode); @@ -298,11 +295,11 @@ private Stat transmitNode(ZooKeeper zooKeeper, DataStatAclNode node) { return node.getStat(); } - private ZooKeeper getZooKeeper(ZooKeeperEndpointConfig zooKeeperEndpointConfig, AuthMode authMode, byte[] authData) throws IOException { + private ZooKeeper getZooKeeper(String zooKeeperConnectString, AuthMode authMode, byte[] authData) throws IOException { CountDownLatch connectionLatch = new CountDownLatch(1); - ZooKeeper zooKeeper = new ZooKeeper(zooKeeperEndpointConfig.getConnectString(), 3000, watchedEvent -> { + ZooKeeper zooKeeper = new ZooKeeper(zooKeeperConnectString, 3000, watchedEvent -> { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("ZooKeeper server state changed to {} in {}", watchedEvent.getState(), zooKeeperEndpointConfig); + LOGGER.debug("ZooKeeper server state changed to {} in {}", watchedEvent.getState(), zooKeeperConnectString); } if (watchedEvent.getType().equals(Watcher.Event.EventType.None) && watchedEvent.getState().equals(Watcher.Event.KeeperState.SyncConnected)) { connectionLatch.countDown(); @@ -315,12 +312,12 @@ private ZooKeeper getZooKeeper(ZooKeeperEndpointConfig zooKeeperEndpointConfig, } catch (InterruptedException e) { closeZooKeeper(zooKeeper); Thread.currentThread().interrupt(); - throw new IOException(String.format("interrupted while waiting for ZooKeeper connection to %s", zooKeeperEndpointConfig), e); + throw new IOException(String.format("interrupted while waiting for ZooKeeper connection to %s", zooKeeperConnectString), e); } if (!connected) { closeZooKeeper(zooKeeper); - throw new IOException(String.format("unable to connect to %s", zooKeeperEndpointConfig)); + throw new IOException(String.format("unable to connect to %s", zooKeeperConnectString)); } if (authMode.equals(AuthMode.DIGEST)) { diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy index 556273488976..7289374dfbc0 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy @@ -19,8 +19,10 @@ package org.apache.nifi.toolkit.zkmigrator import com.google.gson.Gson import com.google.gson.stream.JsonReader import org.apache.curator.test.TestingServer +import org.apache.curator.utils.ZKPaths import org.apache.zookeeper.CreateMode import org.apache.zookeeper.WatchedEvent +import org.apache.zookeeper.ZKUtil import org.apache.zookeeper.ZooDefs import org.apache.zookeeper.ZooKeeper import spock.lang.Ignore @@ -49,21 +51,22 @@ class ZooKeeperMigratorTest extends Specification { noExceptionThrown() } - def "Receive from open ZooKeeper without ACL migration"() { + def "Receive from open ZooKeeper"() { given: def server = new TestingServer() def client = new ZooKeeper(server.connectString, 3000, { WatchedEvent watchedEvent -> }) - def migrationPathRoot = '/nifi' - client.create(migrationPathRoot, 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) - def subPath = '1' - client.create("$migrationPathRoot/$subPath", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) - subPath = '1/a' - client.create("$migrationPathRoot/$subPath", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) - subPath = '2' - client.create("$migrationPathRoot/$subPath", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) - subPath = '3' - client.create("$migrationPathRoot/$subPath", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) + def migrationPathRoot = '/nifi/components' + ZKPaths.mkdirs(client, migrationPathRoot) + client.setData(migrationPathRoot, 'some data'.bytes, 0) + def componentName = '1' + client.create("$migrationPathRoot/$componentName", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) + componentName = '1/a' + client.create("$migrationPathRoot/$componentName", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL) + componentName = '2' + client.create("$migrationPathRoot/$componentName", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) + componentName = '3' + client.create("$migrationPathRoot/$componentName", 'some data'.bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) def outputFilePath = 'target/test-data.json' when: @@ -72,7 +75,7 @@ class ZooKeeperMigratorTest extends Specification { then: noExceptionThrown() def persistedData = new Gson().fromJson(new JsonReader(new FileReader(outputFilePath)), List) as List - 6 == persistedData.size(); + persistedData.size() == 6 } def "Send to open ZooKeeper without ACL migration"() { @@ -87,8 +90,8 @@ class ZooKeeperMigratorTest extends Specification { then: noExceptionThrown() - def nodes = getChildren(client, migrationPathRoot, []) - 6 == nodes.size() + def nodes = ZKPaths.getSortedChildren(client, '/').collect { ZKUtil.listSubTreeBFS(client, "/$it") }.flatten() + nodes.size() == 6 } def "Send to open ZooKeeper without ACL migration with new multi-node parent"() { @@ -103,8 +106,8 @@ class ZooKeeperMigratorTest extends Specification { then: noExceptionThrown() - def nodes = getChildren(client, migrationPathRoot, []) - 6 == nodes.size() + def nodes = ZKPaths.getSortedChildren(client, '/').collect { ZKUtil.listSubTreeBFS(client, "/$it") }.flatten() + nodes.size() == 7 } def "Receive all nodes from ZooKeeper root"() { @@ -123,7 +126,7 @@ class ZooKeeperMigratorTest extends Specification { then: noExceptionThrown() def persistedData = new Gson().fromJson(new JsonReader(new FileReader(outputFilePath)), List) as List - 5 == persistedData.size(); + persistedData.size() == 5 } def "Receive Zookeeper node created with username and password"() { @@ -144,7 +147,7 @@ class ZooKeeperMigratorTest extends Specification { then: noExceptionThrown() def persistedData = new Gson().fromJson(new JsonReader(new FileReader(outputFilePath)), List) as List - 2 == persistedData.size(); + persistedData.size() == 2 } def "Send to Zookeeper a node created with username and password"() { @@ -162,48 +165,59 @@ class ZooKeeperMigratorTest extends Specification { then: noExceptionThrown() - def nodes = getChildren(client, migrationPathRoot, []) - 2 == nodes.size() + def nodes = ZKPaths.getSortedChildren(client, '/').collect { ZKUtil.listSubTreeBFS(client, "/$it") }.flatten() + nodes.size() == 3 } - def "Send to open Zookeeper root"() { + def "Send to open Zookeeper with ACL migration"() { given: def server = new TestingServer() def client = new ZooKeeper(server.connectString, 3000, { WatchedEvent watchedEvent -> }) - def migrationPathRoot = '/' + def migrationPathRoot = '/nifi-open' when: ZooKeeperMigratorMain.main(['-s', '-z', "$server.connectString$migrationPathRoot", '-f', 'src/test/resources/test-data-user-pass.json'] as String[]) then: noExceptionThrown() - def nodes = getChildren(client, migrationPathRoot, []) - 4 == nodes.size() + def nodes = ZKPaths.getSortedChildren(client, '/').collect { ZKUtil.listSubTreeBFS(client, "/$it") }.flatten() + nodes.size() == 3 } def "Parse Zookeeper connect string and path"() { when: - def zooKeeperMigrator = new ZooKeeperMigrator("$connectStringAndPath") - def tokens = connectStringAndPath.split('/', 2) as List - def connectString = tokens[0] - def path = '/' + (tokens.size() > 1 ? tokens[1] : '') + def zooKeeperMigrator = new ZooKeeperMigrator("$connectString") then: - connectString == zooKeeperMigrator.getZooKeeperEndpointConfig().connectString - path == zooKeeperMigrator.getZooKeeperEndpointConfig().path + zooKeeperMigrator.zooKeeperEndpointConfig.connectString == connectString + zooKeeperMigrator.zooKeeperEndpointConfig.servers == servers.split(',').collect() + zooKeeperMigrator.zooKeeperEndpointConfig.path == path where: - connectStringAndPath || _ - '127.0.0.1' || _ - '127.0.0.1/' || _ - '127.0.0.1:2181' || _ - '127.0.0.1:2181/' || _ - '127.0.0.1/path' || _ - '127.0.0.1/path/node' || _ - '127.0.0.1:2181/' || _ - '127.0.0.1:2181/path' || _ - '127.0.0.1:2181/path/node' || _ + connectString | path | servers || _ + '127.0.0.1' | '/' | '127.0.0.1' || _ + '127.0.0.1,127.0.0.2' | '/' | '127.0.0.1,127.0.0.2' || _ + '127.0.0.1/' | '/' | '127.0.0.1' || _ + '127.0.0.1,127.0.0.2/' | '/' | '127.0.0.1,127.0.0.2' || _ + '127.0.0.1:2181' | '/' | '127.0.0.1:2181' || _ + '127.0.0.1,127.0.0.2:2181' | '/' | '127.0.0.1,127.0.0.2:2181' || _ + '127.0.0.1:2181/' | '/' | '127.0.0.1:2181' || _ + '127.0.0.1,127.0.0.2:2181/' | '/' | '127.0.0.1,127.0.0.2:2181' || _ + '127.0.0.1/path' | '/path' | '127.0.0.1' || _ + '127.0.0.1,127.0.0.2/path' | '/path' | '127.0.0.1,127.0.0.2' || _ + '127.0.0.1/path/node' | '/path/node' | '127.0.0.1' || _ + '127.0.0.1,127.0.0.2/path/node' | '/path/node' | '127.0.0.1,127.0.0.2' || _ + '127.0.0.1:2181/' | '/' | '127.0.0.1:2181' || _ + '127.0.0.1,127.0.0.2:2181/' | '/' | '127.0.0.1,127.0.0.2:2181' || _ + '127.0.0.1:2181/path' | '/path' | '127.0.0.1:2181' || _ + '127.0.0.1,127.0.0.2:2181/path' | '/path' | '127.0.0.1,127.0.0.2:2181' || _ + '127.0.0.1:2181/path/node' | '/path/node' | '127.0.0.1:2181' || _ + '127.0.0.1,127.0.0.2:2181/path/node' | '/path/node' | '127.0.0.1,127.0.0.2:2181' || _ + '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' | '/' | '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' || _ + '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183/' | '/' | '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' || _ + '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183/path' | '/path' | '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' || _ + '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183/path/node' | '/path/node' | '127.0.0.1,127.0.0.2:2182,127.0.0.3:2183' || _ } def "Test ignore source"() { @@ -230,14 +244,4 @@ class ZooKeeperMigratorTest extends Specification { then: "no exceptions are thrown" noExceptionThrown() } - - def List getChildren(ZooKeeper client, String path, List ag) { - def children = client.getChildren(path, null) - ag.add path - children.forEach { - def childPath = "/${(path.tokenize('/') + it).join('/')}" - getChildren(client, childPath, ag) - } - ag - } } diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data-user-pass.json b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data-user-pass.json index e38bd91b187b..87cb130d2415 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data-user-pass.json +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data-user-pass.json @@ -1,21 +1,13 @@ [ { - "connectString": "127.0.0.1:62406", - "path": "/nifi", - "auth": [ - 110, - 105, - 102, - 105, - 58, - 110, - 105, - 102, - 105 - ] + "connectString": "127.0.0.1:62317/nifi", + "servers": [ + "127.0.0.1:62317" + ], + "path": "/nifi" }, { - "path": "/nifi", + "path": "/", "data": [ 115, 111, @@ -30,8 +22,8 @@ "stat": { "czxid": 2, "mzxid": 2, - "ctime": 1478010596964, - "mtime": 1478010596964, + "ctime": 1485792794977, + "mtime": 1485792794977, "version": 0, "cversion": 0, "aversion": 0, diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data.json b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data.json index 758270a0f6ca..a7e5edc5d177 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data.json +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/resources/test-data.json @@ -1,10 +1,13 @@ [ { - "connectString": "127.0.0.1:0", - "path": "/nifi" + "connectString": "127.0.0.1:62295/nifi/components", + "servers": [ + "127.0.0.1:62295" + ], + "path": "/nifi/components" }, { - "path": "/nifi", + "path": "/1/a", "data": [ 115, 111, @@ -17,16 +20,16 @@ 97 ], "stat": { - "czxid": 2, - "mzxid": 2, - "ctime": 1477602095884, - "mtime": 1477602095884, + "czxid": 6, + "mzxid": 6, + "ctime": 1485792790772, + "mtime": 1485792790772, "version": 0, - "cversion": 3, + "cversion": 0, "aversion": 0, - "ephemeralOwner": 0, + "ephemeralOwner": 97372916257193984, "dataLength": 9, - "numChildren": 3, + "numChildren": 0, "pzxid": 6 }, "acls": [ @@ -38,10 +41,10 @@ } } ], - "ephemeralOwner": 0 + "ephemeralOwner": 97372916257193984 }, { - "path": "/nifi/1/a", + "path": "/1", "data": [ 115, 111, @@ -54,17 +57,17 @@ 97 ], "stat": { - "czxid": 4, - "mzxid": 4, - "ctime": 1477602095888, - "mtime": 1477602095888, + "czxid": 5, + "mzxid": 5, + "ctime": 1485792790771, + "mtime": 1485792790771, "version": 0, - "cversion": 0, + "cversion": 1, "aversion": 0, "ephemeralOwner": 0, "dataLength": 9, - "numChildren": 0, - "pzxid": 4 + "numChildren": 1, + "pzxid": 6 }, "acls": [ { @@ -78,7 +81,7 @@ "ephemeralOwner": 0 }, { - "path": "/nifi/2", + "path": "/3", "data": [ 115, 111, @@ -91,17 +94,17 @@ 97 ], "stat": { - "czxid": 5, - "mzxid": 5, - "ctime": 1477602095889, - "mtime": 1477602095889, + "czxid": 8, + "mzxid": 8, + "ctime": 1485792790773, + "mtime": 1485792790773, "version": 0, "cversion": 0, "aversion": 0, "ephemeralOwner": 0, "dataLength": 9, "numChildren": 0, - "pzxid": 5 + "pzxid": 8 }, "acls": [ { @@ -115,7 +118,7 @@ "ephemeralOwner": 0 }, { - "path": "/nifi/3", + "path": "/2", "data": [ 115, 111, @@ -128,17 +131,17 @@ 97 ], "stat": { - "czxid": 6, - "mzxid": 6, - "ctime": 1477602095890, - "mtime": 1477602095890, + "czxid": 7, + "mzxid": 7, + "ctime": 1485792790772, + "mtime": 1485792790772, "version": 0, "cversion": 0, "aversion": 0, "ephemeralOwner": 0, "dataLength": 9, "numChildren": 0, - "pzxid": 6 + "pzxid": 7 }, "acls": [ { @@ -152,7 +155,7 @@ "ephemeralOwner": 0 }, { - "path": "/nifi/1", + "path": "/", "data": [ 115, 111, @@ -166,16 +169,16 @@ ], "stat": { "czxid": 3, - "mzxid": 3, - "ctime": 1477602095888, - "mtime": 1477602095888, - "version": 0, - "cversion": 1, + "mzxid": 4, + "ctime": 1485792790757, + "mtime": 1485792790762, + "version": 1, + "cversion": 3, "aversion": 0, "ephemeralOwner": 0, "dataLength": 9, - "numChildren": 1, - "pzxid": 4 + "numChildren": 3, + "pzxid": 8 }, "acls": [ { From f4a84c10264af417b9d64a48e6e31bfa351c18a1 Mon Sep 17 00:00:00 2001 From: Jeff Storck Date: Wed, 1 Feb 2017 16:26:41 -0500 Subject: [PATCH 2/2] NIFI-3300 Added check between source and destination ZooKeeper paths to allow data to be written to the same ZooKeeper with a different path Added test for writing to the same ZooKeeper with a different path Added type parameter to server list in for ZooKeeperEndpointConfig --- .../zkmigrator/ZooKeeperEndpointConfig.java | 2 +- .../toolkit/zkmigrator/ZooKeeperMigrator.java | 5 +++-- .../zkmigrator/ZooKeeperMigratorTest.groovy | 20 ++++++++++++++++++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperEndpointConfig.java b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperEndpointConfig.java index e1ea7cab7576..8578a587ebc7 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperEndpointConfig.java +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperEndpointConfig.java @@ -28,7 +28,7 @@ class ZooKeeperEndpointConfig { private final String connectString; - private final List servers; + private final List servers; private final String path; ZooKeeperEndpointConfig(String connectString) { diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java index 438c0cf9ac8c..fa71ae0d8fa5 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/main/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.java @@ -134,8 +134,9 @@ void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, bool Preconditions.checkArgument(!Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty(sourceZooKeeperEndpointConfig.getPath()) && sourceZooKeeperEndpointConfig.getServers() != null && sourceZooKeeperEndpointConfig.getServers().size() > 0, "Source ZooKeeper %s from %s is invalid", sourceZooKeeperEndpointConfig, zkData); - Preconditions.checkArgument(Collections.disjoint(zooKeeperEndpointConfig.getServers(), sourceZooKeeperEndpointConfig.getServers()) || ignoreSource, - "Source ZooKeeper config %s for the data provided can not be the same as the configured destination ZooKeeper config %s", + Preconditions.checkArgument(Collections.disjoint(zooKeeperEndpointConfig.getServers(), sourceZooKeeperEndpointConfig.getServers()) + || !zooKeeperEndpointConfig.getPath().equals(sourceZooKeeperEndpointConfig.getPath()) || ignoreSource, + "Source ZooKeeper config %s for the data provided can not contain the same server and path as the configured destination ZooKeeper config %s", sourceZooKeeperEndpointConfig, zooKeeperEndpointConfig); // stream through each node read from the json input diff --git a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy index 7289374dfbc0..299fda521be9 100644 --- a/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy +++ b/nifi-toolkit/nifi-toolkit-zookeeper-migrator/src/test/java/org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigratorTest.groovy @@ -229,7 +229,7 @@ class ZooKeeperMigratorTest extends Specification { when: "data is read from the source zookeeper" ZooKeeperMigratorMain.main(['-r', '-z', connectString, '-f', dataPath] as String[]) - then: "verify the data has been written the output file" + then: "verify the data has been written to the output file" new File(dataPath).exists() when: "data is sent to the same zookeeper as the the source zookeeper without ignore source" @@ -244,4 +244,22 @@ class ZooKeeperMigratorTest extends Specification { then: "no exceptions are thrown" noExceptionThrown() } + + def "Send to same ZooKeeper with different path"() { + def server = new TestingServer() + def connectString = "$server.connectString" + def dataPath = 'target/test-data-different-path.json' + + when: "data is read from the source zookeeper" + ZooKeeperMigratorMain.main(['-r', '-z', connectString, '-f', dataPath] as String[]) + + then: "verify the data has been written to the output file" + new File(dataPath).exists() + + when: "data is sent to the same zookeeper as the the source zookeeper with a different path" + ZooKeeperMigratorMain.main(['-s', '-z', "$connectString/new-path", '-f', dataPath] as String[]) + + then: "no exceptions are thrown" + noExceptionThrown() + } }