Skip to content

Commit

Permalink
HDDS-2995. Add integration test for Recon's Passive SCM state.
Browse files Browse the repository at this point in the history
  • Loading branch information
Aravindan Vijayan committed Feb 24, 2020
1 parent bb79045 commit 9654ed8
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 40 deletions.
Expand Up @@ -19,6 +19,7 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -75,10 +76,10 @@ public class StateContext {
private final DatanodeStateMachine parent;
private final AtomicLong stateExecutionCount;
private final Configuration conf;
private final Set<String> endpoints;
private final Map<String, List<GeneratedMessage>> reports;
private final Map<String, Queue<ContainerAction>> containerActions;
private final Map<String, Queue<PipelineAction>> pipelineActions;
private final Set<InetSocketAddress> endpoints;
private final Map<InetSocketAddress, List<GeneratedMessage>> reports;
private final Map<InetSocketAddress, Queue<ContainerAction>> containerActions;
private final Map<InetSocketAddress, Queue<PipelineAction>> pipelineActions;
private DatanodeStateMachine.DatanodeStates state;
private boolean shutdownOnError = false;

Expand Down Expand Up @@ -185,7 +186,7 @@ public boolean getShutdownOnError() {
public void addReport(GeneratedMessage report) {
if (report != null) {
synchronized (reports) {
for (String endpoint : endpoints) {
for (InetSocketAddress endpoint : endpoints) {
reports.get(endpoint).add(report);
}
}
Expand All @@ -200,7 +201,7 @@ public void addReport(GeneratedMessage report) {
* heartbeat.
*/
public void putBackReports(List<GeneratedMessage> reportsToPutBack,
String endpoint) {
InetSocketAddress endpoint) {
synchronized (reports) {
if (reports.containsKey(endpoint)){
reports.get(endpoint).addAll(0, reportsToPutBack);
Expand All @@ -214,7 +215,8 @@ public void putBackReports(List<GeneratedMessage> reportsToPutBack,
*
* @return List of reports
*/
public List<GeneratedMessage> getAllAvailableReports(String endpoint) {
public List<GeneratedMessage> getAllAvailableReports(
InetSocketAddress endpoint) {
return getReports(endpoint, Integer.MAX_VALUE);
}

Expand All @@ -224,7 +226,8 @@ public List<GeneratedMessage> getAllAvailableReports(String endpoint) {
*
* @return List of reports
*/
public List<GeneratedMessage> getReports(String endpoint, int maxLimit) {
public List<GeneratedMessage> getReports(InetSocketAddress endpoint,
int maxLimit) {
List<GeneratedMessage> reportsToReturn = new LinkedList<>();
synchronized (reports) {
List<GeneratedMessage> reportsForEndpoint = reports.get(endpoint);
Expand All @@ -246,7 +249,7 @@ public List<GeneratedMessage> getReports(String endpoint, int maxLimit) {
*/
public void addContainerAction(ContainerAction containerAction) {
synchronized (containerActions) {
for (String endpoint : endpoints) {
for (InetSocketAddress endpoint : endpoints) {
containerActions.get(endpoint).add(containerAction);
}
}
Expand All @@ -259,7 +262,7 @@ public void addContainerAction(ContainerAction containerAction) {
*/
public void addContainerActionIfAbsent(ContainerAction containerAction) {
synchronized (containerActions) {
for (String endpoint : endpoints) {
for (InetSocketAddress endpoint : endpoints) {
if (!containerActions.get(endpoint).contains(containerAction)) {
containerActions.get(endpoint).add(containerAction);
}
Expand All @@ -273,7 +276,8 @@ public void addContainerActionIfAbsent(ContainerAction containerAction) {
*
* @return {@literal List<ContainerAction>}
*/
public List<ContainerAction> getAllPendingContainerActions(String endpoint) {
public List<ContainerAction> getAllPendingContainerActions(
InetSocketAddress endpoint) {
return getPendingContainerAction(endpoint, Integer.MAX_VALUE);
}

Expand All @@ -283,8 +287,9 @@ public List<ContainerAction> getAllPendingContainerActions(String endpoint) {
*
* @return {@literal List<ContainerAction>}
*/
public List<ContainerAction> getPendingContainerAction(String endpoint,
int maxLimit) {
public List<ContainerAction> getPendingContainerAction(
InetSocketAddress endpoint,
int maxLimit) {
List<ContainerAction> containerActionList = new ArrayList<>();
synchronized (containerActions) {
if (!containerActions.isEmpty() &&
Expand Down Expand Up @@ -319,7 +324,7 @@ public void addPipelineActionIfAbsent(PipelineAction pipelineAction) {
* action remains same on the given pipeline, it will end up adding it
* multiple times here.
*/
for (String endpoint : endpoints) {
for (InetSocketAddress endpoint : endpoints) {
Queue<PipelineAction> actionsForEndpoint =
this.pipelineActions.get(endpoint);
for (PipelineAction pipelineActionIter : actionsForEndpoint) {
Expand All @@ -342,8 +347,9 @@ public void addPipelineActionIfAbsent(PipelineAction pipelineAction) {
*
* @return {@literal List<ContainerAction>}
*/
public List<PipelineAction> getPendingPipelineAction(String endpoint,
int maxLimit) {
public List<PipelineAction> getPendingPipelineAction(
InetSocketAddress endpoint,
int maxLimit) {
List<PipelineAction> pipelineActionList = new ArrayList<>();
synchronized (pipelineActions) {
if (!pipelineActions.isEmpty() &&
Expand Down Expand Up @@ -531,7 +537,7 @@ public long getHeartbeatFrequency() {
return heartbeatFrequency.get();
}

public void addEndpoint(String endpoint) {
public void addEndpoint(InetSocketAddress endpoint) {
if (!endpoints.contains(endpoint)) {
this.endpoints.add(endpoint);
this.containerActions.put(endpoint, new LinkedList<>());
Expand Down
Expand Up @@ -105,12 +105,12 @@ public DatanodeStateMachine.DatanodeStates call() throws Exception {
}
for (InetSocketAddress addr : addresses) {
connectionManager.addSCMServer(addr);
this.context.addEndpoint(addr.toString());
this.context.addEndpoint(addr);
}
InetSocketAddress reconAddress = getReconAddresses(conf);
if (reconAddress != null) {
connectionManager.addReconServer(reconAddress);
this.context.addEndpoint(reconAddress.toString());
this.context.addEndpoint(reconAddress);
}
}

Expand Down
Expand Up @@ -173,7 +173,7 @@ private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
if (requestBuilder.getIncrementalContainerReportCount() != 0) {
reports.addAll(requestBuilder.getIncrementalContainerReportList());
}
context.putBackReports(reports, rpcEndpoint.getAddressString());
context.putBackReports(reports, rpcEndpoint.getAddress());
}

/**
Expand All @@ -183,7 +183,7 @@ private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
*/
private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
for (GeneratedMessage report :
context.getAllAvailableReports(rpcEndpoint.getAddressString())) {
context.getAllAvailableReports(rpcEndpoint.getAddress())) {
String reportName = report.getDescriptorForType().getFullName();
for (Descriptors.FieldDescriptor descriptor :
SCMHeartbeatRequestProto.getDescriptor().getFields()) {
Expand All @@ -207,7 +207,7 @@ private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
private void addContainerActions(
SCMHeartbeatRequestProto.Builder requestBuilder) {
List<ContainerAction> actions = context.getPendingContainerAction(
rpcEndpoint.getAddressString(), maxContainerActionsPerHB);
rpcEndpoint.getAddress(), maxContainerActionsPerHB);
if (!actions.isEmpty()) {
ContainerActionsProto cap = ContainerActionsProto.newBuilder()
.addAllContainerActions(actions)
Expand All @@ -224,7 +224,7 @@ private void addContainerActions(
private void addPipelineActions(
SCMHeartbeatRequestProto.Builder requestBuilder) {
List<PipelineAction> actions = context.getPendingPipelineAction(
rpcEndpoint.getAddressString(), maxPipelineActionsPerHB);
rpcEndpoint.getAddress(), maxPipelineActionsPerHB);
if (!actions.isEmpty()) {
PipelineActionsProto pap = PipelineActionsProto.newBuilder()
.addAllPipelineActions(actions)
Expand Down
Expand Up @@ -24,6 +24,7 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

import java.net.InetSocketAddress;
import java.util.List;

import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand All @@ -47,8 +48,8 @@ public void testReportAPIs() {
StateContext stateContext = new StateContext(conf,
DatanodeStates.getInitState(), datanodeStateMachineMock);

String scm1 = "scm1:9001";
String scm2 = "scm2:9001";
InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);

// Try to add report with endpoint. Should not be stored.
stateContext.addReport(mock(GeneratedMessage.class));
Expand Down Expand Up @@ -81,8 +82,8 @@ public void testActionAPIs() {
StateContext stateContext = new StateContext(conf,
DatanodeStates.getInitState(), datanodeStateMachineMock);

String scm1 = "scm1:9001";
String scm2 = "scm2:9001";
InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);

// Try to get containerActions for endpoint which is not yet added.
List<ContainerAction> containerActions =
Expand Down
Expand Up @@ -48,14 +48,16 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.net.InetSocketAddress;
import java.util.UUID;

/**
* This class tests the functionality of HeartbeatEndpointTask.
*/
public class TestHeartbeatEndpointTask {

private static final String TEST_SCM_ENDPOINT = "test-scm-1:9861";
private static final InetSocketAddress TEST_SCM_ENDPOINT =
new InetSocketAddress("test-scm-1", 9861);

@Test
public void testheartbeatWithoutReports() throws Exception {
Expand Down Expand Up @@ -283,7 +285,7 @@ private HeartbeatEndpointTask getHeartbeatEndpointTask(
EndpointStateMachine endpointStateMachine = Mockito
.mock(EndpointStateMachine.class);
Mockito.when(endpointStateMachine.getEndPoint()).thenReturn(proxy);
Mockito.when(endpointStateMachine.getAddressString())
Mockito.when(endpointStateMachine.getAddress())
.thenReturn(TEST_SCM_ENDPOINT);
return HeartbeatEndpointTask.newBuilder()
.setConfig(conf)
Expand Down
3 changes: 1 addition & 2 deletions hadoop-ozone/dist/src/main/compose/ozone/docker-config
Expand Up @@ -23,8 +23,7 @@ OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data
OZONE-SITE.XML_ozone.scm.block.client.address=scm
OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
OZONE-SITE.XML_ozone.recon.om.db.dir=/data/metadata/recon
OZONE-SITE.XML_ozone.scm.client.address=scm
OZONE-SITE.XML_hdds.datanode.dir=/data/hdds

OZONE-SITE.XML_ozone.recon.address=recon:9891
no_proxy=om,scm,s3g,kdc,localhost,127.0.0.1
Expand Up @@ -19,7 +19,6 @@

import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT;
Expand Down Expand Up @@ -756,16 +755,11 @@ private void configureRecon() throws IOException {
File tempNewFolder = tempFolder.newFolder();
conf.set(OZONE_RECON_DB_DIR,
tempNewFolder.getAbsolutePath());

File reconOmDbDir = tempFolder.newFolder();
conf.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR, reconOmDbDir
conf.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR, tempNewFolder
.getAbsolutePath());

conf.set(OZONE_RECON_SQL_DB_JDBC_URL, "jdbc:sqlite:" +
tempNewFolder.getAbsolutePath() + "/ozone_recon_sqlite.db");

conf.set(OZONE_RECON_DATANODE_ADDRESS_KEY, "0.0.0.0:0");

ConfigurationProvider.setConfiguration(conf);
}
}
Expand Down
Expand Up @@ -191,7 +191,7 @@ public void testOzoneContainerViaDataNode() throws Exception {
}
}

static void runTestOzoneContainerViaDataNode(
public static void runTestOzoneContainerViaDataNode(
long testContainerID, XceiverClientSpi client) throws Exception {
ContainerProtos.ContainerCommandRequestProto
request, writeChunkRequest, putBlockRequest,
Expand Down

0 comments on commit 9654ed8

Please sign in to comment.