Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-8494. Adjust replication queue limits for out-of-service nodes #4645

Merged
merged 7 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,27 @@ public HddsProtos.NodeOperationalState getPersistedOpState() {
}

/**
* Checks if the OperationalState is Node is Decomissioned or Decomissioning.
* @return True if OperationalState is Decommissioned or Decomissioning.
*/
public boolean isDecomissioned() {
return this.getPersistedOpState() ==
HddsProtos.NodeOperationalState.DECOMMISSIONED ||
this.getPersistedOpState() ==
HddsProtos.NodeOperationalState.DECOMMISSIONING;
* @return true if the node is or being decommissioned
*/
public boolean isDecommissioned() {
return isDecommission(getPersistedOpState());
}

public static boolean isDecommission(HddsProtos.NodeOperationalState state) {
return state == HddsProtos.NodeOperationalState.DECOMMISSIONED ||
state == HddsProtos.NodeOperationalState.DECOMMISSIONING;
}

/**
* @return true if node is in or entering maintenance
*/
public boolean isMaintenance() {
return isMaintenance(getPersistedOpState());
}

public static boolean isMaintenance(HddsProtos.NodeOperationalState state) {
return state == HddsProtos.NodeOperationalState.IN_MAINTENANCE ||
state == HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,12 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,

ReplicationConfig replicationConfig =
conf.getObject(ReplicationConfig.class);
supervisor = new ReplicationSupervisor(context, replicationConfig, clock,
dnConf.getCommandQueueLimit());
supervisor = ReplicationSupervisor.newBuilder()
.stateContext(context)
.datanodeConfig(dnConf)
.replicationConfig(replicationConfig)
.clock(clock)
.build();

replicationSupervisorMetrics =
ReplicationSupervisorMetrics.create(supervisor);
Expand Down Expand Up @@ -227,7 +231,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
dnConf.getCommandQueueLimit()))
.addHandler(new ClosePipelineCommandHandler())
.addHandler(new CreatePipelineCommandHandler(conf))
.addHandler(new SetNodeOperationalStateCommandHandler(conf))
.addHandler(new SetNodeOperationalStateCommandHandler(conf,
supervisor::nodeStateUpdated))
.addHandler(new FinalizeNewLayoutVersionCommandHandler())
.addHandler(new RefreshVolumeUsageCommandHandler())
.setConnectionManager(connectionManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
Expand All @@ -39,6 +40,7 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;


/**
Expand All @@ -50,6 +52,7 @@ public class SetNodeOperationalStateCommandHandler implements CommandHandler {
private static final Logger LOG =
LoggerFactory.getLogger(SetNodeOperationalStateCommandHandler.class);
private final ConfigurationSource conf;
private final Consumer<HddsProtos.NodeOperationalState> replicationSupervisor;
private final AtomicInteger invocationCount = new AtomicInteger(0);
private final AtomicLong totalTime = new AtomicLong(0);

Expand All @@ -58,8 +61,10 @@ public class SetNodeOperationalStateCommandHandler implements CommandHandler {
*
* @param conf - Configuration for the datanode.
*/
public SetNodeOperationalStateCommandHandler(ConfigurationSource conf) {
public SetNodeOperationalStateCommandHandler(ConfigurationSource conf,
Consumer<HddsProtos.NodeOperationalState> replicationSupervisor) {
this.conf = conf;
this.replicationSupervisor = replicationSupervisor;
}

/**
Expand Down Expand Up @@ -88,7 +93,9 @@ public void handle(SCMCommand command, OzoneContainer container,
(SetNodeOperationalStateCommand) command;
setNodeCmdProto = setNodeCmd.getProto();
DatanodeDetails dni = context.getParent().getDatanodeDetails();
dni.setPersistedOpState(setNodeCmdProto.getNodeOperationalState());
HddsProtos.NodeOperationalState state =
setNodeCmdProto.getNodeOperationalState();
dni.setPersistedOpState(state);
dni.setPersistedOpStateExpiryEpochSec(
setNodeCmd.getStateExpiryEpochSeconds());
try {
Expand All @@ -98,6 +105,7 @@ public void handle(SCMCommand command, OzoneContainer container,
// TODO - this should probably be raised, but it will break the command
// handler interface.
}
replicationSupervisor.accept(state);
totalTime.addAndGet(Time.monotonicNow() - startTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
Expand All @@ -31,10 +32,15 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntConsumer;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
Expand All @@ -44,10 +50,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdds.protocol.DatanodeDetails.isDecommission;
import static org.apache.hadoop.hdds.protocol.DatanodeDetails.isMaintenance;

/**
* Single point to schedule the downloading tasks based on priorities.
*/
public class ReplicationSupervisor {
public final class ReplicationSupervisor {

private static final Logger LOG =
LoggerFactory.getLogger(ReplicationSupervisor.class);
Expand All @@ -74,45 +83,131 @@ public class ReplicationSupervisor {
*/
private final Set<AbstractReplicationTask> inFlight;

private final Map<Class, AtomicInteger> taskCounter =
private final Map<Class<?>, AtomicInteger> taskCounter =
new ConcurrentHashMap<>();
private int maxQueueSize;

@VisibleForTesting
ReplicationSupervisor(
StateContext context, ExecutorService executor, Clock clock,
int maxQueueSize) {
this.inFlight = ConcurrentHashMap.newKeySet();
this.executor = executor;
this.context = context;
this.clock = clock;
this.maxQueueSize = maxQueueSize;
}
private final AtomicReference<HddsProtos.NodeOperationalState> state
= new AtomicReference<>();
private final IntConsumer executorThreadUpdater;
private final ReplicationConfig replicationConfig;
private final DatanodeConfiguration datanodeConfig;

/**
* Builder for {@link ReplicationSupervisor}.
*/
public static class Builder {
private StateContext context;
private ReplicationConfig replicationConfig;
private DatanodeConfiguration datanodeConfig;
private ExecutorService executor;
private Clock clock;
private IntConsumer executorThreadUpdater = threadCount -> { };

public Builder clock(Clock newClock) {
clock = newClock;
return this;
}

public Builder executor(ExecutorService newExecutor) {
executor = newExecutor;
return this;
}

public Builder replicationConfig(ReplicationConfig newReplicationConfig) {
replicationConfig = newReplicationConfig;
return this;
}

public Builder datanodeConfig(DatanodeConfiguration newDatanodeConfig) {
datanodeConfig = newDatanodeConfig;
return this;
}

public Builder stateContext(StateContext newContext) {
context = newContext;
return this;
}

public Builder executorThreadUpdater(IntConsumer newUpdater) {
executorThreadUpdater = newUpdater;
return this;
}

public ReplicationSupervisor build() {
if (replicationConfig == null || datanodeConfig == null) {
ConfigurationSource conf = new OzoneConfiguration();
if (replicationConfig == null) {
replicationConfig =
conf.getObject(ReplicationServer.ReplicationConfig.class);
}
if (datanodeConfig == null) {
datanodeConfig = conf.getObject(DatanodeConfiguration.class);
}
}

if (clock == null) {
clock = Clock.system(ZoneId.systemDefault());
}

public ReplicationSupervisor(
StateContext context, ReplicationConfig replicationConfig, Clock clock,
int maxQueueSize) {
this(context,
new ThreadPoolExecutor(
if (executor == null) {
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
replicationConfig.getReplicationMaxStreams(),
replicationConfig.getReplicationMaxStreams(),
replicationConfig.getReplicationMaxStreams(), 60, TimeUnit.SECONDS,
60, TimeUnit.SECONDS,
new PriorityBlockingQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ContainerReplicationThread-%d")
.build()),
clock, maxQueueSize);
.build());
executor = tpe;
executorThreadUpdater = threadCount -> {
tpe.setMaximumPoolSize(threadCount);
tpe.setCorePoolSize(threadCount);
};
}

return new ReplicationSupervisor(context, executor, replicationConfig,
datanodeConfig, clock, executorThreadUpdater);
}
}

public static Builder newBuilder() {
return new Builder();
}

private ReplicationSupervisor(StateContext context, ExecutorService executor,
ReplicationConfig replicationConfig, DatanodeConfiguration datanodeConfig,
Clock clock, IntConsumer executorThreadUpdater) {
this.inFlight = ConcurrentHashMap.newKeySet();
this.context = context;
this.executor = executor;
this.replicationConfig = replicationConfig;
this.datanodeConfig = datanodeConfig;
maxQueueSize = datanodeConfig.getCommandQueueLimit();
this.clock = clock;
this.executorThreadUpdater = executorThreadUpdater;

// set initial state
if (context != null) {
DatanodeDetails dn = context.getParent().getDatanodeDetails();
if (dn != null) {
nodeStateUpdated(dn.getPersistedOpState());
}
}
}

/**
* Queue an asynchronous download of the given container.
*/
public void addTask(AbstractReplicationTask task) {
if (getTotalInFlightReplications() >= maxQueueSize) {
final int max = maxQueueSize;
if (getTotalInFlightReplications() >= max) {
LOG.warn("Ignored {} command for container {} in Replication Supervisor"
+ "as queue reached max size of {}.",
task.getClass(), task.getContainerId(), maxQueueSize);
task.getClass(), task.getContainerId(), max);
return;
}

if (inFlight.add(task)) {
if (task.getPriority() != ReplicationCommandPriority.LOW) {
// Low priority tasks are not included in the replication queue sizes
Expand Down Expand Up @@ -178,10 +273,32 @@ public int getTotalInFlightReplications() {
return inFlight.size();
}

public int getMaxQueueSize() {
return maxQueueSize;
}

public void nodeStateUpdated(HddsProtos.NodeOperationalState newState) {
if (state.getAndSet(newState) != newState) {
int threadCount = replicationConfig.getReplicationMaxStreams();
int newMaxQueueSize = datanodeConfig.getCommandQueueLimit();

if (isMaintenance(newState) || isDecommission(newState)) {
threadCount *= 2;
newMaxQueueSize *= 2;
}

LOG.info("Node state updated to {}, scaling executor pool size to {}",
newState, threadCount);

maxQueueSize = newMaxQueueSize;
executorThreadUpdater.accept(threadCount);
}
}

/**
* An executable form of a replication task with status handling.
*/
public final class TaskRunner implements Comparable, Runnable {
public final class TaskRunner implements Comparable<TaskRunner>, Runnable {
private final AbstractReplicationTask task;

public TaskRunner(AbstractReplicationTask task) {
Expand Down Expand Up @@ -257,8 +374,8 @@ public long getTaskQueueTime() {
}

@Override
public int compareTo(Object o) {
return TASK_RUNNER_COMPARATOR.compare(this, (TaskRunner) o);
public int compareTo(TaskRunner o) {
return TASK_RUNNER_COMPARATOR.compare(this, o);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@
*/
package org.apache.hadoop.ozone.container.replication;

import java.time.Clock;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;

Expand All @@ -47,9 +44,6 @@ public class ReplicationSupervisorScheduling {

@Test
public void test() throws InterruptedException {
OzoneConfiguration conf = new OzoneConfiguration();
ReplicationServer.ReplicationConfig replicationConfig
= conf.getObject(ReplicationServer.ReplicationConfig.class);
List<DatanodeDetails> datanodes = new ArrayList<>();
datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
Expand Down Expand Up @@ -110,8 +104,7 @@ public void test() throws InterruptedException {
}
};

ReplicationSupervisor rs = new ReplicationSupervisor(null,
replicationConfig, Clock.system(ZoneId.systemDefault()), 1000);
ReplicationSupervisor rs = ReplicationSupervisor.newBuilder().build();

final long start = System.currentTimeMillis();

Expand Down
Loading