Skip to content

Commit

Permalink
HDDS-8169. Delay Starting ContainerBalancer after SCM failover (#4458)
Browse files Browse the repository at this point in the history
  • Loading branch information
siddhantsangwan committed Mar 27, 2023
1 parent 01b7dc6 commit 31cc0bd
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hdds.scm.container.balancer;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.fs.DUFactory;
Expand Down Expand Up @@ -227,7 +228,7 @@ public void start() throws IllegalContainerBalancerStateException,
ozoneConfiguration);
validateConfiguration(configuration);
this.config = configuration;
startBalancingThread(proto.getNextIterationIndex());
startBalancingThread(proto.getNextIterationIndex(), true);
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -258,7 +259,7 @@ public void startBalancer(ContainerBalancerConfiguration configuration)
this.config = configuration;

//start balancing task
startBalancingThread(0);
startBalancingThread(0, false);
} finally {
lock.unlock();
}
Expand All @@ -267,9 +268,10 @@ public void startBalancer(ContainerBalancerConfiguration configuration)
/**
* Starts a new balancing thread asynchronously.
*/
private void startBalancingThread(int nextIterationIndex) {
private void startBalancingThread(int nextIterationIndex,
boolean delayStart) {
task = new ContainerBalancerTask(scm, nextIterationIndex, this, metrics,
config);
config, delayStart);
currentBalancingThread = new Thread(task);
currentBalancingThread.setName("ContainerBalancerTask");
currentBalancingThread.setDaemon(true);
Expand Down Expand Up @@ -323,6 +325,7 @@ public void stop() {
"stopping");
return;
}
LOG.info("Trying to stop ContainerBalancer in this SCM.");
task.stop();
balancingThread = currentBalancingThread;
} finally {
Expand Down Expand Up @@ -358,6 +361,7 @@ public void stopBalancer()
try {
validateState(true);
saveConfiguration(config, false, 0);
LOG.info("Trying to stop ContainerBalancer service.");
task.stop();
balancingThread = currentBalancingThread;
} finally {
Expand Down Expand Up @@ -427,11 +431,16 @@ public ContainerBalancerMetrics getMetrics() {
return metrics;
}

@VisibleForTesting
Thread getCurrentBalancingThread() {
return currentBalancingThread;
}

@Override
public String toString() {
String status = String.format("%nContainer Balancer status:%n" +
"%-30s %s%n" +
"%-30s %b%n", "Key", "Value", "Running", getBalancerStatus());
"%-30s %b%n", "Key", "Value", "Running", isBalancerRunning());
return status + config.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.container.balancer;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
Expand All @@ -41,6 +42,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -116,6 +118,7 @@ public class ContainerBalancerTask implements Runnable {
moveSelectionToFutureMap;
private IterationResult iterationResult;
private int nextIterationIndex;
private boolean delayStart;

/**
* Constructs ContainerBalancerTask with the specified arguments.
Expand All @@ -130,14 +133,16 @@ public ContainerBalancerTask(StorageContainerManager scm,
int nextIterationIndex,
ContainerBalancer containerBalancer,
ContainerBalancerMetrics metrics,
ContainerBalancerConfiguration config) {
ContainerBalancerConfiguration config,
boolean delayStart) {
this.nodeManager = scm.getScmNodeManager();
this.containerManager = scm.getContainerManager();
this.replicationManager = scm.getReplicationManager();
this.moveManager = scm.getMoveManager();
this.moveManager.setMoveTimeout(config.getMoveTimeout().toMillis());
this.moveManager.setReplicationTimeout(
config.getMoveReplicationTimeout().toMillis());
this.delayStart = delayStart;
this.ozoneConfiguration = scm.getConfiguration();
this.containerBalancer = containerBalancer;
this.config = config;
Expand All @@ -162,6 +167,15 @@ public ContainerBalancerTask(StorageContainerManager scm,
*/
public void run() {
try {
if (delayStart) {
long delayDuration = ozoneConfiguration.getTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
TimeUnit.SECONDS);
LOG.info("ContainerBalancer will sleep for {} seconds before starting" +
" balancing.", delayDuration);
Thread.sleep(Duration.ofSeconds(delayDuration).toMillis());
}
balance();
} catch (Exception e) {
LOG.error("Container Balancer is stopped abnormally, ", e);
Expand Down Expand Up @@ -211,7 +225,10 @@ private void balance() {
// one for sending command , one for running du, and one for
// reporting back make it like this for now, a more suitable
// value. can be set in the future if needed
Thread.sleep(3 * nodeReportInterval);
long sleepTime = 3 * nodeReportInterval;
LOG.info("ContainerBalancer will sleep for {} ms while waiting " +
"for updated usage information from Datanodes.", sleepTime);
Thread.sleep(nodeReportInterval);
} catch (InterruptedException e) {
LOG.info("Container Balancer was interrupted while waiting for" +
"datanodes refreshing volume usage info");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
Expand Down Expand Up @@ -56,14 +57,15 @@ public class TestContainerBalancer {
private ContainerBalancerConfiguration balancerConfiguration;
private Map<String, ByteString> serviceToConfigMap = new HashMap<>();
private StatefulServiceStateManager serviceStateManager;
private OzoneConfiguration conf;

/**
* Sets up configuration values and creates a mock cluster.
*/
@BeforeEach
public void setup() throws IOException, NodeNotFoundException,
TimeoutException {
OzoneConfiguration conf = new OzoneConfiguration();
conf = new OzoneConfiguration();
scm = Mockito.mock(StorageContainerManager.class);
serviceStateManager = Mockito.mock(StatefulServiceStateManagerImpl.class);
balancerConfiguration =
Expand Down Expand Up @@ -199,7 +201,7 @@ public void testNotifyStateChangeStopStart() throws Exception {
*/
@Test
public void testValidationOfConfigurations() {
OzoneConfiguration conf = new OzoneConfiguration();
conf = new OzoneConfiguration();

conf.setTimeDuration(
"hdds.container.balancer.move.replication.timeout", 60,
Expand All @@ -217,6 +219,55 @@ public void testValidationOfConfigurations() {
"be less than hdds.container.balancer.move.timeout.");
}

/**
* Tests that ContainerBalancerTask starts with a delay of
* "hdds.scm.wait.time.after.safemode.exit" when ContainerBalancer receives
* status change notification in
* {@link ContainerBalancer#notifyStatusChanged()}.
*/
@Test
public void testDelayedStartOnSCMStatusChange()
throws IllegalContainerBalancerStateException, IOException,
InvalidContainerBalancerConfigurationException, TimeoutException,
InterruptedException {
long delayDuration = 10;
conf.setTimeDuration("hdds.scm.wait.time.after.safemode.exit",
delayDuration, TimeUnit.SECONDS);
balancerConfiguration =
conf.getObject(ContainerBalancerConfiguration.class);

// Start the ContainerBalancer service.
containerBalancer.startBalancer(balancerConfiguration);
GenericTestUtils.waitFor(() -> containerBalancer.isBalancerRunning(), 1,
20);
Assertions.assertTrue(containerBalancer.isBalancerRunning());

// Balancer should stop the current balancing thread when it receives a
// status change notification
scm.getScmContext().updateLeaderAndTerm(false, 1);
containerBalancer.notifyStatusChanged();
Assertions.assertFalse(containerBalancer.isBalancerRunning());

GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(ContainerBalancerTask.LOG);
String expectedLog = "ContainerBalancer will sleep for " + delayDuration +
" seconds before starting balancing.";
/*
Send a status change notification again and check whether balancer
starts balancing. We're actually just checking for the expected log
line here.
*/
scm.getScmContext().updateLeaderAndTerm(true, 2);
scm.getScmContext().setLeaderReady();
containerBalancer.notifyStatusChanged();
Assertions.assertTrue(containerBalancer.isBalancerRunning());
Thread balancingThread = containerBalancer.getCurrentBalancingThread();
GenericTestUtils.waitFor(
() -> balancingThread.getState() == Thread.State.TIMED_WAITING, 2, 20);
Assertions.assertTrue(logCapturer.getOutput().contains(expectedLog));
stopBalancer();
}

private void startBalancer(ContainerBalancerConfiguration config)
throws IllegalContainerBalancerStateException, IOException,
InvalidContainerBalancerConfigurationException, TimeoutException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public void setup() throws IOException, NodeNotFoundException,
.register(Mockito.any(SCMService.class));
ContainerBalancer sb = new ContainerBalancer(scm);
containerBalancerTask = new ContainerBalancerTask(scm, 0, sb,
sb.getMetrics(), balancerConfiguration);
sb.getMetrics(), balancerConfiguration, false);
}

@Test
Expand Down Expand Up @@ -261,7 +261,7 @@ public void testCalculationOfUtilization() {

balancerConfiguration.setThreshold(randomThreshold);
containerBalancerTask = new ContainerBalancerTask(scm, 0, sb,
sb.getMetrics(), balancerConfiguration);
sb.getMetrics(), balancerConfiguration, false);
containerBalancerTask.run();

unBalancedNodesAccordingToBalancer =
Expand Down Expand Up @@ -580,7 +580,7 @@ public void balancerShouldObeyMaxSizeEnteringTargetLimit()
cbc.setBalancingInterval(1);
ContainerBalancer sb = new ContainerBalancer(scm);
containerBalancerTask = new ContainerBalancerTask(scm, 0, sb,
sb.getMetrics(), cbc);
sb.getMetrics(), cbc, false);
containerBalancerTask.run();

stopBalancer();
Expand Down Expand Up @@ -619,7 +619,7 @@ public void balancerShouldObeyMaxSizeLeavingSourceLimit()
cbc.setBalancingInterval(1);
ContainerBalancer sb = new ContainerBalancer(scm);
containerBalancerTask = new ContainerBalancerTask(scm, 0, sb,
sb.getMetrics(), cbc);
sb.getMetrics(), cbc, false);
containerBalancerTask.run();

stopBalancer();
Expand Down Expand Up @@ -980,6 +980,41 @@ public void checkIterationResultException()
stopBalancer();
}

@Test
public void testDelayedStart() throws InterruptedException, TimeoutException {
conf.setTimeDuration("hdds.scm.wait.time.after.safemode.exit", 10,
TimeUnit.SECONDS);
ContainerBalancer balancer = new ContainerBalancer(scm);
containerBalancerTask = new ContainerBalancerTask(scm, 2, balancer,
balancer.getMetrics(), balancerConfiguration, true);
Thread balancingThread = new Thread(containerBalancerTask);
// start the thread and assert that balancer is RUNNING
balancingThread.start();
Assertions.assertEquals(ContainerBalancerTask.Status.RUNNING,
containerBalancerTask.getBalancerStatus());

/*
Wait for the thread to start sleeping and assert that it's sleeping.
This is the delay before it starts balancing.
*/
GenericTestUtils.waitFor(
() -> balancingThread.getState() == Thread.State.TIMED_WAITING, 1, 20);
Assertions.assertEquals(Thread.State.TIMED_WAITING,
balancingThread.getState());

// interrupt the thread from its sleep, wait and assert that balancer has
// STOPPED
balancingThread.interrupt();
GenericTestUtils.waitFor(() -> containerBalancerTask.getBalancerStatus() ==
ContainerBalancerTask.Status.STOPPED, 1, 20);
Assertions.assertEquals(ContainerBalancerTask.Status.STOPPED,
containerBalancerTask.getBalancerStatus());

// ensure the thread dies
GenericTestUtils.waitFor(() -> !balancingThread.isAlive(), 1, 20);
Assertions.assertFalse(balancingThread.isAlive());
}

/**
* Determines unBalanced nodes, that is, over and under utilized nodes,
* according to the generated utilization values for nodes and the threshold.
Expand Down

0 comments on commit 31cc0bd

Please sign in to comment.