Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,31 @@ public class ScmConfig extends ReconfigurableConfig {
)
private int transactionToDNsCommitMapLimit = 5000000;

@Config(key = "hdds.scm.container.pending.allocation.roll.interval",
defaultValue = "5m",
type = ConfigType.TIME,
tags = { ConfigTag.SCM, ConfigTag.CONTAINER },
description =
"Time interval for rolling the pending container allocation window. " +
"Pending container allocations are tracked in a two-window tumbling bucket " +
"pattern. Each window has this duration. " +
"After 2x this interval, allocations that haven't been confirmed via " +
"container reports will automatically age out. Default is 5 minutes."
)
private Duration pendingContainerAllocationRollInterval = Duration.ofMinutes(5);

public int getTransactionToDNsCommitMapLimit() {
return transactionToDNsCommitMapLimit;
}

public Duration getPendingContainerAllocationRollInterval() {
return pendingContainerAllocationRollInterval;
}

public void setPendingContainerAllocationRollInterval(Duration duration) {
this.pendingContainerAllocationRollInterval = duration;
}

public Duration getBlockDeletionInterval() {
return blockDeletionInterval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.locks.Lock;
Expand All @@ -44,6 +45,8 @@
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.PendingContainerTracker;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.utils.db.Table;
Expand Down Expand Up @@ -80,6 +83,7 @@ public class ContainerManagerImpl implements ContainerManager {
private final Random random = new Random();

private final long maxContainerSize;
private final PendingContainerTracker pendingContainerTracker;

/**
*
Expand All @@ -90,7 +94,8 @@ public ContainerManagerImpl(
final SequenceIdGenerator sequenceIdGen,
final PipelineManager pipelineManager,
final Table<ContainerID, ContainerInfo> containerStore,
final ContainerReplicaPendingOps containerReplicaPendingOps)
final ContainerReplicaPendingOps containerReplicaPendingOps,
final NodeManager nodeManager)
throws IOException {
// Introduce builder for this class?
this.lock = new ReentrantLock();
Expand All @@ -110,6 +115,8 @@ public ContainerManagerImpl(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);

this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
this.pendingContainerTracker = Objects.requireNonNull(
nodeManager.getPendingContainerTracker(), "pendingContainerTracker");
}

@Override
Expand Down Expand Up @@ -278,6 +285,11 @@ private ContainerInfo allocateContainer(final Pipeline pipeline,

containerStateManager.addContainer(containerInfoBuilder.build());
scmContainerManagerMetrics.incNumSuccessfulCreateContainers();
// Record pending allocation - tracks containers scheduled but not yet written
pendingContainerTracker.recordPendingAllocation(pipeline, containerID);
LOG.debug("Allocated container {} on pipeline {}. Recorded as pending on {} DataNodes",
containerID, pipeline.getId(), pipeline.getNodes().size());

return containerStateManager.getContainer(containerID);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.PendingContainerTracker;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer;
Expand Down Expand Up @@ -175,6 +176,17 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
if (!alreadyInDn) {
// This is a new Container not in the nodeManager -> dn map yet
getNodeManager().addContainer(datanodeDetails, cid);

// Remove from pending tracker when container is added to DN
// This container was just confirmed for the first time on this DN
// No need to remove on subsequent reports (it's already been removed)
if (container != null) {
PendingContainerTracker tracker =
getNodeManager().getPendingContainerTracker();
if (tracker != null) {
tracker.removePendingAllocation(datanodeDetails, cid);
}
}
}
if (container == null || ContainerReportValidator
.validate(container, datanodeDetails, replica)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.PendingContainerTracker;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
Expand Down Expand Up @@ -103,6 +104,12 @@ protected void processICR(IncrementalContainerReportFromDatanode report,
}
if (ContainerReportValidator.validate(container, dd, replicaProto)) {
processContainerReplica(dd, container, replicaProto, publisher, detailsForLogging);

PendingContainerTracker tracker =
getNodeManager().getPendingContainerTracker();
if (tracker != null) {
tracker.removePendingAllocation(dd, id);
}
}
success = true;
} catch (ContainerNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public void onMessage(final DatanodeDetails datanodeDetails,
* action.
*/
LOG.info("A dead datanode is detected. {}", datanodeDetails);
PendingContainerTracker pending = nodeManager.getPendingContainerTracker();
if (pending != null) {
pending.clearPendingForDatanode(datanodeDetails);
}
closeContainers(datanodeDetails, publisher);
destroyPipelines(datanodeDetails);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,4 +422,15 @@ default void removeNode(DatanodeDetails datanodeDetails) throws NodeNotFoundExce
}

int openContainerLimit(List<DatanodeDetails> datanodes);

/**
* SCM-side tracker for container allocations not yet reported by datanodes.
*/
PendingContainerTracker getPendingContainerTracker();

/**
* True if the node can accept another container of the given size, accounting for
* {@link #getPendingContainerTracker()}.
*/
boolean hasSpaceForNewContainerAllocation(DatanodeDetails node, long containerSize);
}
Loading