diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index d5bbdf046c2f5..b1073b2393091 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -107,6 +107,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -115,6 +116,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Module that implements all the RPC calls in {@link ClientProtocol} in the @@ -1204,16 +1206,39 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { @Override public ContentSummary getContentSummary(String path) throws IOException { + return getContentSummary(path, new HashMap>()); + } + + public ContentSummary getContentSummary(String path, + Map> excludeNamespace) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); // Get the summaries from regular files final Collection summaries = new ArrayList<>(); final List locations = rpcServer.getLocationsForPath(path, false, false); + + Set curExcludeNamespace = new HashSet<>(); + String destPath = subclusterResolver.getDestinationForPath(path).getDefaultLocation().getDest(); + List parentExistLocations = excludeNamespace.keySet().stream().filter(s -> destPath.startsWith(s + "/")) + .collect(Collectors.toList()); + boolean parentAlreadyComputed = parentExistLocations.size() > 0; + List filteredLoctions = + locations.stream().filter(remoteLocation -> excludeNamespace.isEmpty() || !parentAlreadyComputed || + !isParentPathNamespaceComputed(remoteLocation, excludeNamespace, parentExistLocations)) + .collect(Collectors.toList()); + filteredLoctions.forEach(remoteLocation -> { + curExcludeNamespace.add(remoteLocation.getNameserviceId()); + }); + if (excludeNamespace.get(destPath) != null) { + excludeNamespace.get(destPath).addAll(curExcludeNamespace); + } else { + excludeNamespace.put(destPath, curExcludeNamespace); + } final RemoteMethod method = new RemoteMethod("getContentSummary", new Class[] {String.class}, new RemoteParam()); final List> results = - rpcClient.invokeConcurrent(locations, method, + rpcClient.invokeConcurrent(filteredLoctions, method, false, -1, ContentSummary.class); FileNotFoundException notFoundException = null; for (RemoteResult result : results) { @@ -1236,7 +1261,7 @@ public ContentSummary getContentSummary(String path) throws IOException { Path childPath = new Path(path, child); try { ContentSummary mountSummary = getContentSummary( - childPath.toString()); + childPath.toString(), excludeNamespace); if (mountSummary != null) { summaries.add(mountSummary); } @@ -1255,6 +1280,19 @@ public ContentSummary getContentSummary(String path) throws IOException { return aggregateContentSummary(summaries); } + private boolean isParentPathNamespaceComputed(RemoteLocation location, + Map> excludeNamespace, List parentExistLocations) { + boolean isComputed = false; + String curNsId = location.getNameserviceId(); + for (String loc : parentExistLocations) { + Set parentPathNamespaces = excludeNamespace.get(loc); + if (parentPathNamespaces.contains(curNsId)) { + return true; + } + } + return isComputed; + } + @Override public void fsync(String src, long fileId, String clientName, long lastBlockLength) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index ac6ecd4398cba..292af8e5b3025 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -114,11 +114,11 @@ public class MiniRouterDFSCluster { private static final Random RND = new Random(); /** Nameservices in the federated cluster. */ - private List nameservices; + protected List nameservices; /** Namenodes in the federated cluster. */ private List namenodes; /** Routers in the federated cluster. */ - private List routers; + protected List routers; /** If the Namenodes are in high availability.*/ private boolean highAvailability; /** Number of datanodes per nameservice. */ @@ -136,14 +136,14 @@ public class MiniRouterDFSCluster { protected static final long DEFAULT_CACHE_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5); /** Heartbeat interval in milliseconds. */ - private long heartbeatInterval; + protected long heartbeatInterval; /** Cache flush interval in milliseconds. */ - private long cacheFlushInterval; + protected long cacheFlushInterval; /** Router configuration initializes. */ private Configuration routerConf; /** Router configuration overrides. */ - private Configuration routerOverrides; + protected Configuration routerOverrides; /** Namenode configuration overrides. */ private Configuration namenodeOverrides; @@ -1133,7 +1133,7 @@ public void deleteAllFiles() throws IOException { *
  • /ns1 -> ns1->/target-ns1. * */ - public void installMockLocations() { + public void installMockLocations() throws IOException { for (RouterContext r : routers) { MockResolver resolver = (MockResolver) r.router.getSubclusterResolver(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSClusterForGetContentSummary.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSClusterForGetContentSummary.java new file mode 100644 index 0000000000000..74261b9264df9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSClusterForGetContentSummary.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map.Entry; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS; + +public class MiniRouterDFSClusterForGetContentSummary extends MiniRouterDFSCluster { + + public MiniRouterDFSClusterForGetContentSummary(boolean ha, int numNameservices) { + super(ha, numNameservices); + } + + /** + * Generate the configuration for a Router. + * + * @param nsId Nameservice identifier. + * @param nnId Namenode identifier. + * @return New configuration for a Router. + */ + public Configuration generateRouterConfiguration(String nsId, String nnId) { + + Configuration conf = new HdfsConfiguration(false); + conf.addResource(generateNamenodeConfiguration(nsId)); + + conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, 10); + conf.set(DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0"); + + conf.set(DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0"); + + conf.set(DFS_ROUTER_HTTP_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_ROUTER_HTTPS_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFS_ROUTER_HTTP_BIND_HOST_KEY, "0.0.0.0"); + + conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0)); + conf.setLong(DFS_ROUTER_HEARTBEAT_INTERVAL_MS, heartbeatInterval); + conf.setLong(DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, cacheFlushInterval); + + // Use mock resolver classes + conf.setClass(FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + MockResolver.class, ActiveNamenodeResolver.class); + conf.setClass(FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + MultipleDestinationMountTableResolver.class, FileSubclusterResolver.class); + + // Disable safemode on startup + conf.setBoolean(DFS_ROUTER_SAFEMODE_ENABLE, false); + + // Set the nameservice ID for the default NN monitor + conf.set(DFS_NAMESERVICE_ID, nsId); + if (nnId != null) { + conf.set(DFS_HA_NAMENODE_ID_KEY, nnId); + } + + // Namenodes to monitor + StringBuilder sb = new StringBuilder(); + for (String ns : this.nameservices) { + for (NamenodeContext context : getNamenodes(ns)) { + String suffix = context.getConfSuffix(); + if (sb.length() != 0) { + sb.append(","); + } + sb.append(suffix); + } + } + conf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString()); + + // Add custom overrides if available + if (this.routerOverrides != null) { + for (Entry entry : this.routerOverrides) { + String confKey = entry.getKey(); + String confValue = entry.getValue(); + conf.set(confKey, confValue); + } + } + return conf; + } + + /** + *

      + *
    • / -> [ns0->/]. + *
    • /nso -> ns0->/target-ns0. + *
    • /ns1 -> ns1->/target-ns1. + *
    + */ + public void installMockLocations() throws IOException { + for (RouterContext r : routers) { + MultipleDestinationMountTableResolver resolver = + (MultipleDestinationMountTableResolver) r.getRouter().getSubclusterResolver(); + // create table entries + for (String nsId : nameservices) { + // Direct path + String routerPath = getFederatedPathForNS(nsId); + String nnPath = getNamenodePathForNS(nsId); + MountTable entry = MountTable.newInstance(routerPath, Collections.singletonMap(nsId, nnPath)); + resolver.addEntry(entry); + } + + // Root path points to both first nameservice + String ns0 = nameservices.get(0); + MountTable entry = MountTable.newInstance("/", Collections.singletonMap(ns0, "/")); + resolver.addEntry(entry); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestGetContentSummaryRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestGetContentSummaryRpc.java new file mode 100644 index 0000000000000..22371307240ef --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestGetContentSummaryRpc.java @@ -0,0 +1,490 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSClusterForGetContentSummary; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RefreshMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.util.Time; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests the use of the getContentSummary RPC which implemented by + * {@link RouterRpcServer}. + */ +public class TestGetContentSummaryRpc { + + private static final Logger LOG = + LoggerFactory.getLogger(TestGetContentSummaryRpc.class); + + /** Federated HDFS cluster. */ + private static MiniRouterDFSClusterForGetContentSummary cluster; + + /** Random Router for this federated cluster. */ + private RouterContext router; + + /** Random nameservice in the federated cluster. */ + private String ns; + /** First namenode in the nameservice. */ + private NamenodeContext namenode; + + /** Client interface to the Router. */ + private ClientProtocol routerProtocol; + /** Client interface to the Namenode. */ + private ClientProtocol nnProtocol; + + /** NameNodeProtocol interface to the Router. */ + private NamenodeProtocol routerNamenodeProtocol; + /** NameNodeProtocol interface to the Namenode. */ + private NamenodeProtocol nnNamenodeProtocol; + + /** Filesystem interface to the Router. */ + private FileSystem routerFS; + /** Filesystem interface to the Namenode. */ + private FileSystem nnFS; + + /** MountTableManager interface to the Router */ + private MountTableManager mountTableManager; + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new MiniRouterDFSClusterForGetContentSummary(false, 2); + // We need 3 DNS to meets 3-replicas + cluster.setNumDatanodesPerNameservice(3); + + // Start NNs and DNs and wait until ready + cluster.startCluster(); + + // Start routers with only an RPC service + Configuration routerConf = new RouterConfigBuilder() + .metrics() + .rpc() + .stateStore() + .admin() + .heartbeat() + .refreshCache() + .build(); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + cluster.addRouterOverrides(routerConf); + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + } + + @AfterClass + public static void tearDown() { + cluster.shutdown(); + } + + @After + public void testReset() throws Exception { + FederationStateStoreTestUtils.clearAllRecords(router.getRouter().getStateStore()); + } + + @Before + public void testSetup() throws Exception { + + // Create mock locations + cluster.installMockLocations(); + + // Delete all files via the NNs and verify + cluster.deleteAllFiles(); + + // Wait to ensure NN has fully created its test directories + Thread.sleep(100); + + // Random router for this test + RouterContext rndRouter = cluster.getRandomRouter(); + this.setRouter(rndRouter); + + // Pick a namenode for this test + String ns0 = cluster.getNameservices().get(0); + this.setNs(ns0); + this.setNamenode(cluster.getNamenode(ns0, null)); + + this.mountTableManager = router.getAdminClient().getMountTableManager(); + } + + @Test + public void testRpcService() throws IOException { + Router testRouter = new Router(); + List nss = cluster.getNameservices(); + String ns0 = nss.get(0); + Configuration routerConfig = cluster.generateRouterConfiguration(ns0, null); + RouterRpcServer server = new RouterRpcServer(routerConfig, testRouter, + testRouter.getNamenodeResolver(), testRouter.getSubclusterResolver()); + server.init(routerConfig); + assertEquals(STATE.INITED, server.getServiceState()); + server.start(); + assertEquals(STATE.STARTED, server.getServiceState()); + server.stop(); + assertEquals(STATE.STOPPED, server.getServiceState()); + server.close(); + testRouter.close(); + } + + protected void setRouter(RouterContext r) + throws IOException, URISyntaxException { + this.router = r; + this.routerProtocol = r.getClient().getNamenode(); + this.routerFS = r.getFileSystem(); + this.routerNamenodeProtocol = NameNodeProxies.createProxy(router.getConf(), + router.getFileSystem().getUri(), NamenodeProtocol.class).getProxy(); + } + + protected NamenodeContext getNamenode() { + return this.namenode; + } + + protected void setNamenode(NamenodeContext nn) + throws IOException, URISyntaxException { + this.namenode = nn; + this.nnProtocol = nn.getClient().getNamenode(); + this.nnFS = nn.getFileSystem(); + + // Namenode from the default namespace + String ns0 = cluster.getNameservices().get(0); + NamenodeContext nn0 = cluster.getNamenode(ns0, null); + this.nnNamenodeProtocol = NameNodeProxies.createProxy(nn0.getConf(), + nn0.getFileSystem().getUri(), NamenodeProtocol.class).getProxy(); + } + + protected String getNs() { + return this.ns; + } + + protected void setNs(String nameservice) { + this.ns = nameservice; + } + + /** + * test case below: + * /A ---- ns0 ---- /A + * /A/B ---- ns0,ns1 ----/A/B + */ + @Test + public void testGetContentSummaryNormalNested() throws IOException { + nnFS.mkdirs(new Path("/A/B/")); + assertTrue(nnFS.exists(new Path("/A/B/"))); + + String ns1 = cluster.getNameservices().get(1); + NamenodeContext namenode1 = cluster.getNamenode(ns1, null); + FileSystem namenode1FS = namenode1.getFileSystem(); + namenode1FS.mkdirs(new Path("/A/B/")); + assertTrue(namenode1FS.exists(new Path("/A/B/"))); + + DFSTestUtil.createFile(nnFS, new Path("/A/test1.txt"), 2, (short) 1, 0xFEED); + assertTrue(nnFS.exists(new Path("/A/test1.txt"))); + + DFSTestUtil.createFile(nnFS, new Path("/A/B/test2.txt"), 4, (short) 1, 0xFEED); + assertTrue(nnFS.exists(new Path("/A/B/test2.txt"))); + + DFSTestUtil.createFile(namenode1FS, new Path("/A/B/test3.txt"), false, 1024, 5, 32 * 1024 * 1024, (short) 1, + 0xFEED, true); + + assertTrue(namenode1FS.exists(new Path("/A/B/test3.txt"))); + assertEquals(5, namenode1FS.getContentSummary(new Path("/A/B/test3.txt")).getLength()); + + MountTable addEntryA = MountTable.newInstance("/A/", + Collections.singletonMap("ns0", "/A/"), Time.now(), Time.now()); + + AddMountTableEntryRequest request1 = AddMountTableEntryRequest.newInstance(addEntryA); + AddMountTableEntryResponse addMountTableEntryResponse1 = mountTableManager.addMountTableEntry(request1); + assertTrue(addMountTableEntryResponse1.getStatus()); + mountTableManager.refreshMountTableEntries(RefreshMountTableEntriesRequest.newInstance()); + + List entries1 = + mountTableManager.getMountTableEntries(GetMountTableEntriesRequest.newInstance("/")).getEntries(); + assertEquals(1, entries1.size()); + + List entriesA1 = + mountTableManager.getMountTableEntries(GetMountTableEntriesRequest.newInstance("/A")).getEntries(); + + MountTable mountTable = entriesA1.get(0); + + List destinations = mountTable.getDestinations(); + assertEquals(1, destinations.size()); + + assertEquals("/A", mountTable.getSourcePath()); + assertEquals("ns0", destinations.get(0).getNameserviceId()); + assertEquals("/A", destinations.get(0).getDest()); + + Map entryMap = new LinkedHashMap<>(); + entryMap.put("ns0", "/A/B/"); + entryMap.put("ns1", "/A/B/"); + + MountTable addEntryAB = MountTable.newInstance("/A/B/", entryMap, Time.now(), Time.now()); + addEntryAB.setDestOrder(DestinationOrder.HASH); + assertTrue(mountTableManager.addMountTableEntry(AddMountTableEntryRequest.newInstance(addEntryAB)).getStatus()); + mountTableManager.refreshMountTableEntries(RefreshMountTableEntriesRequest.newInstance()); + List entries2 = mountTableManager.getMountTableEntries(GetMountTableEntriesRequest.newInstance("/")).getEntries(); + assertEquals(2, entries2.size()); + + + List entriesAB1 = + mountTableManager.getMountTableEntries(GetMountTableEntriesRequest.newInstance("/A/B")).getEntries(); + + MountTable mountTableAB1 = entriesAB1.get(0); + + List destinationABs = mountTableAB1.getDestinations(); + assertEquals(2, destinationABs.size()); + + assertEquals("/A/B", mountTableAB1.getSourcePath()); + assertEquals("ns0", destinationABs.get(0).getNameserviceId()); + assertEquals("ns1", destinationABs.get(1).getNameserviceId()); + assertEquals("/A/B", destinationABs.get(0).getDest()); + assertEquals("/A/B", destinationABs.get(1).getDest()); + + ContentSummary contentSummary = routerProtocol.getContentSummary("/A"); + assertEquals(11, contentSummary.getLength()); + } + + /** + * test case below: + * /A ---- ns0 ---- /A + * /A/B ---- ns0,ns1 ---- /A/B + * /A/B123 ---- ns0,ns1 ----/A/B123 + */ + @Test + public void testGetContentSummaryNormalNestedAndSameSuffix() throws IOException { + nnFS.mkdirs(new Path("/A/B/")); + assertTrue(nnFS.exists(new Path("/A/B/"))); + nnFS.mkdirs(new Path("/A/B123/")); + assertTrue(nnFS.exists(new Path("/A/B123/"))); + + String ns1 = cluster.getNameservices().get(1); + NamenodeContext namenode1 = cluster.getNamenode(ns1, null); + FileSystem namenode1FS = namenode1.getFileSystem(); + namenode1FS.mkdirs(new Path("/A/B/")); + assertTrue(namenode1FS.exists(new Path("/A/B/"))); + namenode1FS.mkdirs(new Path("/A/B123/")); + assertTrue(namenode1FS.exists(new Path("/A/B123/"))); + + DFSTestUtil.createFile(nnFS, new Path("/A/test1.txt"), 2, (short) 1, 0xFEED); + assertTrue(nnFS.exists(new Path("/A/test1.txt"))); + + DFSTestUtil.createFile(nnFS, new Path("/A/B/test2.txt"), 4, (short) 1, 0xFEED); + assertTrue(nnFS.exists(new Path("/A/B/test2.txt"))); + + DFSTestUtil.createFile(namenode1FS, new Path("/A/B/test3.txt"), false, 1024, + 5, 32 * 1024 * 1024, (short) 1, 0xFEED, true); + + assertTrue(namenode1FS.exists(new Path("/A/B/test3.txt"))); + + DFSTestUtil.createFile(namenode1FS, new Path("/A/B123/test4.txt"), 7, (short) 1, 0xFEED); + assertTrue(namenode1FS.exists(new Path("/A/B123/test4.txt"))); + + MountTable addEntryA = MountTable.newInstance("/A/", + Collections.singletonMap("ns0", "/A/"), Time.now(), Time.now()); + + assertTrue(addMountTableEntry(addEntryA)); + + List entries1 = + mountTableManager.getMountTableEntries(GetMountTableEntriesRequest.newInstance("/")).getEntries(); + assertEquals(1, entries1.size()); + + List entriesA1 = + mountTableManager.getMountTableEntries(GetMountTableEntriesRequest.newInstance("/A")).getEntries(); + + MountTable mountTable = entriesA1.get(0); + + List destinations = mountTable.getDestinations(); + assertEquals(1, destinations.size()); + + assertEquals("/A", mountTable.getSourcePath()); + assertEquals("ns0", destinations.get(0).getNameserviceId()); + assertEquals("/A", destinations.get(0).getDest()); + + Map entryMap = new LinkedHashMap<>(); + entryMap.put("ns0", "/A/B/"); + entryMap.put("ns1", "/A/B/"); + + MountTable addEntryAB = MountTable.newInstance("/A/B/", entryMap, Time.now(), Time.now()); + addEntryAB.setDestOrder(DestinationOrder.HASH); + assertTrue(addMountTableEntry(addEntryAB)); + + List entries2 = mountTableManager.getMountTableEntries(GetMountTableEntriesRequest.newInstance("/")).getEntries(); + assertEquals(2, entries2.size()); + + + List entriesAB1 = + mountTableManager.getMountTableEntries(GetMountTableEntriesRequest.newInstance("/A/B")).getEntries(); + + MountTable mountTableAB1 = entriesAB1.get(0); + + List destinationABs = mountTableAB1.getDestinations(); + assertEquals(2, destinationABs.size()); + + assertEquals("/A/B", mountTableAB1.getSourcePath()); + assertEquals("ns0", destinationABs.get(0).getNameserviceId()); + assertEquals("ns1", destinationABs.get(1).getNameserviceId()); + assertEquals("/A/B", destinationABs.get(0).getDest()); + assertEquals("/A/B", destinationABs.get(1).getDest()); + + Map entryMap123 = new LinkedHashMap<>(); + entryMap123.put("ns0", "/A/B123/"); + entryMap123.put("ns1", "/A/B123/"); + MountTable addEntryAB123 = MountTable.newInstance("/A/B123/", entryMap123, Time.now(), Time.now()); + addEntryAB.setDestOrder(DestinationOrder.HASH); + assertTrue(addMountTableEntry(addEntryAB123)); + + ContentSummary contentSummary = routerProtocol.getContentSummary("/A"); + assertEquals(18, contentSummary.getLength()); + } + + /** test case below. + * /A ---- ns0 ---- /A + * /A/B ---- ns0,ns1 ----/B + */ + @Test + public void testGetContentSummaryNonNormalNested() throws IOException { + nnFS.mkdirs(new Path("/A/B/")); + assertTrue(nnFS.exists(new Path("/A/B/"))); + nnFS.mkdirs(new Path("/B/")); + assertTrue(nnFS.exists(new Path("/B/"))); + + String ns1 = cluster.getNameservices().get(1); + NamenodeContext namenode1 = cluster.getNamenode(ns1, null); + FileSystem namenode1FS = namenode1.getFileSystem(); + namenode1FS.mkdirs(new Path("/A/B/")); + assertTrue(namenode1FS.exists(new Path("/A/B/"))); + namenode1FS.mkdirs(new Path("/B/")); + assertTrue(namenode1FS.exists(new Path("/B/"))); + + DFSTestUtil.createFile(nnFS, new Path("/A/test1.txt"), 2, (short) 1, 0xFEED); + assertTrue(nnFS.exists(new Path("/A/test1.txt"))); + + DFSTestUtil.createFile(nnFS, new Path("/B/test2.txt"), 4, (short) 1, 0xFEED); + assertTrue(nnFS.exists(new Path("/B/test2.txt"))); + + DFSTestUtil.createFile(namenode1FS, new Path("/B/test3.txt"), false, 1024, + 5, 32 * 1024 * 1024, (short) 1, 0xFEED, true); + + assertTrue(namenode1FS.exists(new Path("/B/test3.txt"))); + + + MountTable addEntryA = MountTable.newInstance("/A/", + Collections.singletonMap("ns0", "/A/"), Time.now(), Time.now()); + + assertTrue(addMountTableEntry(addEntryA)); + + List entries1 = + mountTableManager.getMountTableEntries(GetMountTableEntriesRequest.newInstance("/")).getEntries(); + assertEquals(1, entries1.size()); + + List entriesA1 = + mountTableManager.getMountTableEntries(GetMountTableEntriesRequest.newInstance("/A")).getEntries(); + + MountTable mountTable = entriesA1.get(0); + + List destinations = mountTable.getDestinations(); + assertEquals(1, destinations.size()); + + assertEquals("/A", mountTable.getSourcePath()); + assertEquals("ns0", destinations.get(0).getNameserviceId()); + assertEquals("/A", destinations.get(0).getDest()); + + Map entryMap = new LinkedHashMap<>(); + entryMap.put("ns0", "/B/"); + entryMap.put("ns1", "/B/"); + + MountTable addEntryAB = MountTable.newInstance("/A/B/", entryMap, Time.now(), Time.now()); + addEntryAB.setDestOrder(DestinationOrder.HASH); + assertTrue(addMountTableEntry(addEntryAB)); + + List entries2 = + mountTableManager.getMountTableEntries(GetMountTableEntriesRequest.newInstance("/")).getEntries(); + assertEquals(2, entries2.size()); + + + List entriesAB1 = + mountTableManager.getMountTableEntries(GetMountTableEntriesRequest.newInstance("/A/B")).getEntries(); + + MountTable mountTableAB1 = entriesAB1.get(0); + + List destinationABs = mountTableAB1.getDestinations(); + assertEquals(2, destinationABs.size()); + + assertEquals("/A/B", mountTableAB1.getSourcePath()); + assertEquals("ns0", destinationABs.get(0).getNameserviceId()); + assertEquals("ns1", destinationABs.get(1).getNameserviceId()); + assertEquals("/B", destinationABs.get(0).getDest()); + assertEquals("/B", destinationABs.get(1).getDest()); + + ContentSummary contentSummary = routerProtocol.getContentSummary("/A"); + assertEquals(11, contentSummary.getLength()); + } + + /** + * Add a mount table entry to the mount table through the admin API. + * + * @param entry Mount table entry to add. + * @return If it was succesfully added. + * @throws IOException Problems adding entries. + */ + private boolean addMountTableEntry(final MountTable entry) throws IOException { + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(entry); + AddMountTableEntryResponse addResponse = + mountTableManager.addMountTableEntry(addRequest); + + mountTableManager.refreshMountTableEntries(RefreshMountTableEntriesRequest.newInstance()); + return addResponse.getStatus(); + } +}