From 9654ed892f7e8b7277d15c8c770a04a57071583d Mon Sep 17 00:00:00 2001 From: Aravindan Vijayan Date: Mon, 24 Feb 2020 10:01:19 -0800 Subject: [PATCH] HDDS-2995. Add integration test for Recon's Passive SCM state. --- .../common/statemachine/StateContext.java | 40 ++-- .../states/datanode/InitDatanodeState.java | 4 +- .../endpoint/HeartbeatEndpointTask.java | 8 +- .../common/statemachine/TestStateContext.java | 9 +- .../endpoint/TestHeartbeatEndpointTask.java | 6 +- .../dist/src/main/compose/ozone/docker-config | 3 +- .../hadoop/ozone/MiniOzoneClusterImpl.java | 8 +- .../ozoneimpl/TestOzoneContainer.java | 2 +- .../ozone/recon/TestReconAsPassiveScm.java | 213 ++++++++++++++++++ ...on.java => TestReconWithOzoneManager.java} | 4 +- 10 files changed, 257 insertions(+), 40 deletions(-) create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java rename hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/{TestRecon.java => TestReconWithOzoneManager.java} (98%) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 73b07a34cda..fcc8d999910 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -19,6 +19,7 @@ import com.google.common.base.Preconditions; import com.google.protobuf.GeneratedMessage; +import java.net.InetSocketAddress; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -75,10 +76,10 @@ public class StateContext { private final DatanodeStateMachine parent; private final AtomicLong stateExecutionCount; private final Configuration conf; - private final Set endpoints; - private final Map> reports; - private final Map> containerActions; - private final Map> pipelineActions; + private final Set endpoints; + private final Map> reports; + private final Map> containerActions; + private final Map> pipelineActions; private DatanodeStateMachine.DatanodeStates state; private boolean shutdownOnError = false; @@ -185,7 +186,7 @@ public boolean getShutdownOnError() { public void addReport(GeneratedMessage report) { if (report != null) { synchronized (reports) { - for (String endpoint : endpoints) { + for (InetSocketAddress endpoint : endpoints) { reports.get(endpoint).add(report); } } @@ -200,7 +201,7 @@ public void addReport(GeneratedMessage report) { * heartbeat. */ public void putBackReports(List reportsToPutBack, - String endpoint) { + InetSocketAddress endpoint) { synchronized (reports) { if (reports.containsKey(endpoint)){ reports.get(endpoint).addAll(0, reportsToPutBack); @@ -214,7 +215,8 @@ public void putBackReports(List reportsToPutBack, * * @return List of reports */ - public List getAllAvailableReports(String endpoint) { + public List getAllAvailableReports( + InetSocketAddress endpoint) { return getReports(endpoint, Integer.MAX_VALUE); } @@ -224,7 +226,8 @@ public List getAllAvailableReports(String endpoint) { * * @return List of reports */ - public List getReports(String endpoint, int maxLimit) { + public List getReports(InetSocketAddress endpoint, + int maxLimit) { List reportsToReturn = new LinkedList<>(); synchronized (reports) { List reportsForEndpoint = reports.get(endpoint); @@ -246,7 +249,7 @@ public List getReports(String endpoint, int maxLimit) { */ public void addContainerAction(ContainerAction containerAction) { synchronized (containerActions) { - for (String endpoint : endpoints) { + for (InetSocketAddress endpoint : endpoints) { containerActions.get(endpoint).add(containerAction); } } @@ -259,7 +262,7 @@ public void addContainerAction(ContainerAction containerAction) { */ public void addContainerActionIfAbsent(ContainerAction containerAction) { synchronized (containerActions) { - for (String endpoint : endpoints) { + for (InetSocketAddress endpoint : endpoints) { if (!containerActions.get(endpoint).contains(containerAction)) { containerActions.get(endpoint).add(containerAction); } @@ -273,7 +276,8 @@ public void addContainerActionIfAbsent(ContainerAction containerAction) { * * @return {@literal List} */ - public List getAllPendingContainerActions(String endpoint) { + public List getAllPendingContainerActions( + InetSocketAddress endpoint) { return getPendingContainerAction(endpoint, Integer.MAX_VALUE); } @@ -283,8 +287,9 @@ public List getAllPendingContainerActions(String endpoint) { * * @return {@literal List} */ - public List getPendingContainerAction(String endpoint, - int maxLimit) { + public List getPendingContainerAction( + InetSocketAddress endpoint, + int maxLimit) { List containerActionList = new ArrayList<>(); synchronized (containerActions) { if (!containerActions.isEmpty() && @@ -319,7 +324,7 @@ public void addPipelineActionIfAbsent(PipelineAction pipelineAction) { * action remains same on the given pipeline, it will end up adding it * multiple times here. */ - for (String endpoint : endpoints) { + for (InetSocketAddress endpoint : endpoints) { Queue actionsForEndpoint = this.pipelineActions.get(endpoint); for (PipelineAction pipelineActionIter : actionsForEndpoint) { @@ -342,8 +347,9 @@ public void addPipelineActionIfAbsent(PipelineAction pipelineAction) { * * @return {@literal List} */ - public List getPendingPipelineAction(String endpoint, - int maxLimit) { + public List getPendingPipelineAction( + InetSocketAddress endpoint, + int maxLimit) { List pipelineActionList = new ArrayList<>(); synchronized (pipelineActions) { if (!pipelineActions.isEmpty() && @@ -531,7 +537,7 @@ public long getHeartbeatFrequency() { return heartbeatFrequency.get(); } - public void addEndpoint(String endpoint) { + public void addEndpoint(InetSocketAddress endpoint) { if (!endpoints.contains(endpoint)) { this.endpoints.add(endpoint); this.containerActions.put(endpoint, new LinkedList<>()); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java index 86cc92c1925..a73f1c5054e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java @@ -105,12 +105,12 @@ public DatanodeStateMachine.DatanodeStates call() throws Exception { } for (InetSocketAddress addr : addresses) { connectionManager.addSCMServer(addr); - this.context.addEndpoint(addr.toString()); + this.context.addEndpoint(addr); } InetSocketAddress reconAddress = getReconAddresses(conf); if (reconAddress != null) { connectionManager.addReconServer(reconAddress); - this.context.addEndpoint(reconAddress.toString()); + this.context.addEndpoint(reconAddress); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index a9d2c2fb1d6..fb1d1af428e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -173,7 +173,7 @@ private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) { if (requestBuilder.getIncrementalContainerReportCount() != 0) { reports.addAll(requestBuilder.getIncrementalContainerReportList()); } - context.putBackReports(reports, rpcEndpoint.getAddressString()); + context.putBackReports(reports, rpcEndpoint.getAddress()); } /** @@ -183,7 +183,7 @@ private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) { */ private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) { for (GeneratedMessage report : - context.getAllAvailableReports(rpcEndpoint.getAddressString())) { + context.getAllAvailableReports(rpcEndpoint.getAddress())) { String reportName = report.getDescriptorForType().getFullName(); for (Descriptors.FieldDescriptor descriptor : SCMHeartbeatRequestProto.getDescriptor().getFields()) { @@ -207,7 +207,7 @@ private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) { private void addContainerActions( SCMHeartbeatRequestProto.Builder requestBuilder) { List actions = context.getPendingContainerAction( - rpcEndpoint.getAddressString(), maxContainerActionsPerHB); + rpcEndpoint.getAddress(), maxContainerActionsPerHB); if (!actions.isEmpty()) { ContainerActionsProto cap = ContainerActionsProto.newBuilder() .addAllContainerActions(actions) @@ -224,7 +224,7 @@ private void addContainerActions( private void addPipelineActions( SCMHeartbeatRequestProto.Builder requestBuilder) { List actions = context.getPendingPipelineAction( - rpcEndpoint.getAddressString(), maxPipelineActionsPerHB); + rpcEndpoint.getAddress(), maxPipelineActionsPerHB); if (!actions.isEmpty()) { PipelineActionsProto pap = PipelineActionsProto.newBuilder() .addAllPipelineActions(actions) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java index f01e89a0f18..c84795c89f4 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import java.net.InetSocketAddress; import java.util.List; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -47,8 +48,8 @@ public void testReportAPIs() { StateContext stateContext = new StateContext(conf, DatanodeStates.getInitState(), datanodeStateMachineMock); - String scm1 = "scm1:9001"; - String scm2 = "scm2:9001"; + InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001); + InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001); // Try to add report with endpoint. Should not be stored. stateContext.addReport(mock(GeneratedMessage.class)); @@ -81,8 +82,8 @@ public void testActionAPIs() { StateContext stateContext = new StateContext(conf, DatanodeStates.getInitState(), datanodeStateMachineMock); - String scm1 = "scm1:9001"; - String scm2 = "scm2:9001"; + InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001); + InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001); // Try to get containerActions for endpoint which is not yet added. List containerActions = diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java index c416442a635..95ac87fba86 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java @@ -48,6 +48,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import java.net.InetSocketAddress; import java.util.UUID; /** @@ -55,7 +56,8 @@ */ public class TestHeartbeatEndpointTask { - private static final String TEST_SCM_ENDPOINT = "test-scm-1:9861"; + private static final InetSocketAddress TEST_SCM_ENDPOINT = + new InetSocketAddress("test-scm-1", 9861); @Test public void testheartbeatWithoutReports() throws Exception { @@ -283,7 +285,7 @@ private HeartbeatEndpointTask getHeartbeatEndpointTask( EndpointStateMachine endpointStateMachine = Mockito .mock(EndpointStateMachine.class); Mockito.when(endpointStateMachine.getEndPoint()).thenReturn(proxy); - Mockito.when(endpointStateMachine.getAddressString()) + Mockito.when(endpointStateMachine.getAddress()) .thenReturn(TEST_SCM_ENDPOINT); return HeartbeatEndpointTask.newBuilder() .setConfig(conf) diff --git a/hadoop-ozone/dist/src/main/compose/ozone/docker-config b/hadoop-ozone/dist/src/main/compose/ozone/docker-config index 3b8545f7a8d..cf23d75c25a 100644 --- a/hadoop-ozone/dist/src/main/compose/ozone/docker-config +++ b/hadoop-ozone/dist/src/main/compose/ozone/docker-config @@ -23,8 +23,7 @@ OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data OZONE-SITE.XML_ozone.scm.block.client.address=scm OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon -OZONE-SITE.XML_ozone.recon.om.db.dir=/data/metadata/recon OZONE-SITE.XML_ozone.scm.client.address=scm OZONE-SITE.XML_hdds.datanode.dir=/data/hdds - +OZONE-SITE.XML_ozone.recon.address=recon:9891 no_proxy=om,scm,s3g,kdc,localhost,127.0.0.1 diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 06dac3c9e70..6160c8433bd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -19,7 +19,6 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; -import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_PORT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT; @@ -756,16 +755,11 @@ private void configureRecon() throws IOException { File tempNewFolder = tempFolder.newFolder(); conf.set(OZONE_RECON_DB_DIR, tempNewFolder.getAbsolutePath()); - - File reconOmDbDir = tempFolder.newFolder(); - conf.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR, reconOmDbDir + conf.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR, tempNewFolder .getAbsolutePath()); conf.set(OZONE_RECON_SQL_DB_JDBC_URL, "jdbc:sqlite:" + tempNewFolder.getAbsolutePath() + "/ozone_recon_sqlite.db"); - - conf.set(OZONE_RECON_DATANODE_ADDRESS_KEY, "0.0.0.0:0"); - ConfigurationProvider.setConfiguration(conf); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index 05e7837c192..0fd5c089e61 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -191,7 +191,7 @@ public void testOzoneContainerViaDataNode() throws Exception { } } - static void runTestOzoneContainerViaDataNode( + public static void runTestOzoneContainerViaDataNode( long testContainerID, XceiverClientSpi client) throws Exception { ContainerProtos.ContainerCommandRequestProto request, writeChunkRequest, putBlockRequest, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java new file mode 100644 index 00000000000..6456e100c65 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.recon; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS; +import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_KEY; +import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_ADDRESS_KEY; +import static org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer.runTestOzoneContainerViaDataNode; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.Optional; + +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.XceiverClientGrpc; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; +import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Recon's passive SCM integration tests. + */ +public class TestReconAsPassiveScm { + + private MiniOzoneCluster cluster = null; + private OzoneConfiguration conf; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Before + public void init() throws Exception { + File dir = GenericTestUtils.getRandomizedTestDir(); + conf = new OzoneConfiguration(); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString()); + String reconHTTPAddress = "localhost:" + NetUtils.getFreeSocketPort(); + conf.set(OZONE_RECON_HTTP_ADDRESS_KEY, reconHTTPAddress); + + int reconDnPort = NetUtils.getFreeSocketPort(); + conf.set(OZONE_RECON_DATANODE_ADDRESS_KEY, "0.0.0.0:" + reconDnPort); + conf.set(OZONE_RECON_ADDRESS_KEY, "0.0.0.0:" + reconDnPort); + conf.set(OZONE_RECON_SCM_DB_DIR, + temporaryFolder.newFolder().getAbsolutePath()); + conf.set(HDDS_CONTAINER_REPORT_INTERVAL, "10s"); + conf.set(HDDS_PIPELINE_REPORT_INTERVAL, "10s"); + cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build(); + cluster.waitForClusterToBeReady(); + } + + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test(timeout = 120000) + public void testDatanodeRegistrationAndReports() throws Exception { + ReconStorageContainerManagerFacade reconScm = + (ReconStorageContainerManagerFacade) + cluster.getReconServer().getReconStorageContainerManager(); + StorageContainerManager scm = cluster.getStorageContainerManager(); + PipelineManager reconPipelineManager = reconScm.getPipelineManager(); + PipelineManager scmPipelineManager = scm.getPipelineManager(); + while (scmPipelineManager.getPipelines().size() < 4 || + scmPipelineManager.getPipelines().size() > + reconPipelineManager.getPipelines().size()) { + Thread.sleep(5000); + } + + // Verify if Recon has all the pipelines from SCM. + scmPipelineManager.getPipelines().forEach(p -> { + try { + assertNotNull(reconPipelineManager.getPipeline(p.getId())); + } catch (PipelineNotFoundException e) { + Assert.fail(); + } + }); + + // Verify we can never create a pipeline in Recon. + LambdaTestUtils.intercept(UnsupportedOperationException.class, + "Trying to create pipeline in Recon, which is prohibited!", + () -> reconPipelineManager.createPipeline(RATIS, ONE)); + + ContainerManager scmContainerManager = scm.getContainerManager(); + assertTrue(scmContainerManager.getContainerIDs().isEmpty()); + + // Verify if Recon registered all the nodes. + NodeManager reconNodeManager = reconScm.getScmNodeManager(); + NodeManager scmNodeManager = scm.getScmNodeManager(); + assertEquals(scmNodeManager.getAllNodes().size(), + reconNodeManager.getAllNodes().size()); + + // Create container + ContainerManager reconContainerManager = reconScm.getContainerManager(); + ContainerInfo containerInfo = + scmContainerManager.allocateContainer(RATIS, ONE, "test"); + long containerID = containerInfo.getContainerID(); + Pipeline pipeline = + scmPipelineManager.getPipeline(containerInfo.getPipelineID()); + XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf); + runTestOzoneContainerViaDataNode(containerID, client); + + // Verify Recon picked up the new container that was created. + assertEquals(scmContainerManager.getContainerIDs(), + reconContainerManager.getContainerIDs()); + } + + @Test(timeout = 120000) + public void testReconRestart() throws Exception { + OzoneStorageContainerManager reconScm = + cluster.getReconServer().getReconStorageContainerManager(); + StorageContainerManager scm = cluster.getStorageContainerManager(); + + // Stop Recon + ContainerManager scmContainerManager = scm.getContainerManager(); + assertTrue(scmContainerManager.getContainerIDs().isEmpty()); + ContainerManager reconContainerManager = reconScm.getContainerManager(); + assertTrue(reconContainerManager.getContainerIDs().isEmpty()); + + cluster.getReconServer().stop(); + cluster.getReconServer().join(); + + // Create container in SCM. + ContainerInfo containerInfo = + scmContainerManager.allocateContainer(RATIS, ONE, "test"); + long containerID = containerInfo.getContainerID(); + PipelineManager scmPipelineManager = scm.getPipelineManager(); + Pipeline pipeline = + scmPipelineManager.getPipeline(containerInfo.getPipelineID()); + XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf); + runTestOzoneContainerViaDataNode(containerID, client); + assertFalse(scmContainerManager.getContainerIDs().isEmpty()); + + // Close a pipeline + Optional pipelineToClose = scmPipelineManager + .getPipelines(RATIS, ONE) + .stream() + .filter(p -> !p.getId().equals(containerInfo.getPipelineID())) + .findFirst(); + assertTrue(pipelineToClose.isPresent()); + scmPipelineManager.finalizeAndDestroyPipeline(pipelineToClose.get(), false); + + // Start Recon + // Using restart since there is no start API in MiniOzoneCluster. + cluster.restartReconServer(); + + // Verify if Recon has all the nodes on restart (even if heartbeats are + // not yet received). + NodeManager reconNodeManager = reconScm.getScmNodeManager(); + NodeManager scmNodeManager = scm.getScmNodeManager(); + assertEquals(scmNodeManager.getAllNodes().size(), + reconNodeManager.getAllNodes().size()); + + // Verify Recon picks up new container, close pipeline SCM actions. + reconScm = cluster.getReconServer().getReconStorageContainerManager(); + PipelineManager reconPipelineManager = reconScm.getPipelineManager(); + assertFalse( + reconPipelineManager.containsPipeline(pipelineToClose.get().getId())); + + long startTime = System.currentTimeMillis(); + long endTime = startTime + 60000L; + boolean containerPresentInRecon = + reconScm.getContainerManager().exists(ContainerID.valueof(containerID)); + while (endTime > System.currentTimeMillis() && !containerPresentInRecon) { + containerPresentInRecon = reconScm.getContainerManager() + .exists(ContainerID.valueof(containerID)); + Thread.sleep(5000); + } + assertTrue(containerPresentInRecon); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestRecon.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java similarity index 98% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestRecon.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java index f833d2b2cf1..1817e989e18 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestRecon.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconWithOzoneManager.java @@ -18,6 +18,7 @@ import static java.net.HttpURLConnection.HTTP_CREATED; import static java.net.HttpURLConnection.HTTP_OK; +import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_HTTP_ADDRESS_KEY; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_REQUEST_TIMEOUT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT; @@ -73,7 +74,7 @@ /** * Test Ozone Recon. */ -public class TestRecon { +public class TestReconWithOzoneManager { private static MiniOzoneCluster cluster = null; private static OzoneConfiguration conf; private static OMMetadataManager metadataManager; @@ -102,6 +103,7 @@ public static void init() throws Exception { String reconHTTPAddress = "localhost:" + NetUtils.getFreeSocketPort(); conf.set(OZONE_RECON_HTTP_ADDRESS_KEY, reconHTTPAddress); + conf.set(OZONE_RECON_DATANODE_ADDRESS_KEY, "0.0.0.0:0"); containerKeyServiceURL = "http://" + reconHTTPAddress + "/api/containers"; taskStatusURL = "http://" + reconHTTPAddress + "/api/task/status";