From fd6a473fb36612d48b5a90ba1ca04c72de91c8a2 Mon Sep 17 00:00:00 2001 From: ashishk Date: Fri, 24 Apr 2026 18:08:53 +0530 Subject: [PATCH 1/5] HDDS-15104. Refactor code related to container space management. --- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 5 + .../scm/container/ContainerManagerImpl.java | 12 +- .../hadoop/hdds/scm/node/DatanodeInfo.java | 106 +++++++++- .../hadoop/hdds/scm/node/NodeManager.java | 13 ++ .../hdds/scm/node/NodeStateManager.java | 5 +- .../scm/node/PendingContainerTracker.java | 191 +++--------------- .../hadoop/hdds/scm/node/SCMNodeManager.java | 29 +++ .../hdds/scm/pipeline/PipelineManager.java | 21 +- .../scm/pipeline/PipelineManagerImpl.java | 17 +- .../hdds/scm/container/MockNodeManager.java | 33 ++- .../scm/container/SimpleMockNodeManager.java | 12 +- .../container/TestContainerManagerImpl.java | 7 +- .../TestContainerPlacementFactory.java | 3 +- .../TestSCMContainerPlacementCapacity.java | 3 +- .../TestSCMContainerPlacementRackAware.java | 6 +- .../TestSCMContainerPlacementRackScatter.java | 6 +- .../TestSCMContainerPlacementRandom.java | 6 +- .../hdds/scm/node/TestContainerPlacement.java | 3 +- .../scm/node/TestPendingContainerTracker.java | 139 +++++++------ .../scm/node/states/TestNodeStateMap.java | 3 +- .../scm/pipeline/MockPipelineManager.java | 11 +- .../scm/pipeline/TestPipelineManagerImpl.java | 56 ++--- .../TestPipelinePlacementFactory.java | 3 +- .../api/TestStorageDistributionEndpoint.java | 3 +- 24 files changed, 375 insertions(+), 318 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 2c44fa881c41..01f8e2f56179 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -269,6 +269,11 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_STALENODE_INTERVAL_DEFAULT = "5m"; + public static final String OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL = + "ozone.scm.pending.container.roll.interval"; + public static final String OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT = + "5m"; + public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT = "ozone.scm.heartbeat.rpc-timeout"; public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT = diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index 7780c0839462..15a566f6421b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -31,7 +31,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -39,7 +38,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics; import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; @@ -79,8 +77,6 @@ public class ContainerManagerImpl implements ContainerManager { @SuppressWarnings("java:S2245") // no need for secure random private final Random random = new Random(); - private final long maxContainerSize; - /** * */ @@ -106,9 +102,6 @@ public ContainerManagerImpl( .setContainerReplicaPendingOps(containerReplicaPendingOps) .build(); - maxContainerSize = (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); - this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create(); } @@ -245,9 +238,8 @@ private ContainerInfo createContainer(Pipeline pipeline, String owner) private ContainerInfo allocateContainer(final Pipeline pipeline, final String owner) throws IOException { - if (!pipelineManager.hasEnoughSpace(pipeline, maxContainerSize)) { - LOG.debug("Cannot allocate a new container because pipeline {} does not have the required space {}.", - pipeline, maxContainerSize); + if (!pipelineManager.hasEnoughSpace(pipeline)) { + LOG.debug("Cannot allocate a new container because pipeline {} does not have enough space.", pipeline); return null; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java index 8f8753a4991b..dd262d2edb5a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java @@ -17,21 +17,29 @@ package org.apache.hadoop.hdds.scm.node; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.toLayoutVersionProto; import com.google.common.annotations.VisibleForTesting; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandQueueReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +58,11 @@ public class DatanodeInfo extends DatanodeDetails { private long lastStatsUpdatedTime; private int failedVolumeCount; + /** + * Two-window tumbling bucket for tracking pending container allocations on this datanode. + */ + private final TwoWindowBucket twoWindowBucket; + private List storageReports; private List metadataStorageReports; private LayoutVersionProto lastKnownLayoutVersion; @@ -61,10 +74,12 @@ public class DatanodeInfo extends DatanodeDetails { * Constructs DatanodeInfo from DatanodeDetails. * * @param datanodeDetails Details about the datanode + * @param nodeStatus initial node status * @param layoutInfo Details about the LayoutVersionProto + * @param conf configuration source */ public DatanodeInfo(DatanodeDetails datanodeDetails, NodeStatus nodeStatus, - LayoutVersionProto layoutInfo) { + LayoutVersionProto layoutInfo, ConfigurationSource conf) { super(datanodeDetails); this.lock = new ReentrantReadWriteLock(); this.lastHeartbeatTime = Time.monotonicNow(); @@ -75,6 +90,10 @@ public DatanodeInfo(DatanodeDetails datanodeDetails, NodeStatus nodeStatus, this.nodeStatus = nodeStatus; this.metadataStorageReports = Collections.emptyList(); this.commandCounts = new HashMap<>(); + this.twoWindowBucket = new TwoWindowBucket(this.getID(), conf.getTimeDuration( + OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL, + OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS)); } /** @@ -353,6 +372,13 @@ public int getCommandCount(SCMCommandProto.Type cmd) { } } + /** + * Returns the {@link TwoWindowBucket} for this datanode. + */ + public TwoWindowBucket getTwoWindowBucket() { + return twoWindowBucket; + } + @Override public int hashCode() { return super.hashCode(); @@ -362,4 +388,82 @@ public int hashCode() { public boolean equals(Object obj) { return super.equals(obj); } + + /** + * Two-window tumbling bucket for a single DataNode. + * + *

New allocations go into {@code currentWindow}. Every {@code rollIntervalMs}, the current + * window shifts to {@code previousWindow} and a fresh empty set becomes current. After two + * intervals without confirmation a container ID is automatically discarded (aged out). + * Pending count is the union of both windows. + */ + static class TwoWindowBucket { + private Set currentWindow = new HashSet<>(); + private Set previousWindow = new HashSet<>(); + private long lastRollTime = Time.monotonicNow(); + private final long rollIntervalMs; + private final DatanodeID datanodeID; + + TwoWindowBucket(DatanodeID datanodeID, long rollIntervalMs) { + this.datanodeID = datanodeID; + this.rollIntervalMs = rollIntervalMs; + } + + /** + * Roll one or both windows based on elapsed time. + */ + synchronized void rollIfNeeded() { + long now = Time.monotonicNow(); + long elapsed = now - lastRollTime; + + if (elapsed >= 2 * rollIntervalMs) { + int dropped = getCount(); + previousWindow.clear(); + currentWindow.clear(); + lastRollTime = now; + LOG.debug("Double roll interval elapsed ({}ms): dropped {} pending containers", elapsed, dropped); + } else if (elapsed >= rollIntervalMs) { + previousWindow.clear(); + final Set tmp = previousWindow; + previousWindow = currentWindow; + currentWindow = tmp; + lastRollTime = now; + LOG.debug("Rolled window. Previous window size: {} elapsed: ({}ms), Current window reset to empty", + previousWindow.size(), elapsed); + } + } + + synchronized boolean contains(ContainerID containerID) { + return currentWindow.contains(containerID) || previousWindow.contains(containerID); + } + + /** + * Add container to current window. + */ + synchronized boolean add(ContainerID containerID) { + boolean added = currentWindow.add(containerID); + LOG.debug("Recorded pending container {} on DataNode {}. Added={}, Total pending={}", + containerID, datanodeID, added, getCount()); + return added; + } + + /** + * Remove container from both windows. + */ + synchronized boolean remove(ContainerID containerID) { + boolean removedFromCurrent = currentWindow.remove(containerID); + boolean removedFromPrevious = previousWindow.remove(containerID); + boolean removed = removedFromCurrent || removedFromPrevious; + LOG.debug("Removed pending container {} from DataNode {}. Removed={}, Remaining={}", + containerID, datanodeID, removed, getCount()); + return removed; + } + + /** + * Count of pending containers in both windows. + */ + synchronized int getCount() { + return currentWindow.size() + previousWindow.size(); + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index e9a019945c1f..4fb7f84394f3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -184,6 +184,19 @@ default int getAllNodeCount() { @Nullable DatanodeInfo getDatanodeInfo(DatanodeDetails dn); + /** + * True if the node can accept another container of the given size. + */ + boolean hasSpaceForNewContainerAllocation(DatanodeID datanodeID); + + /** + * Records a pending container allocation for a single DataNode identified by its ID. + * + * @param datanodeID the ID of the DataNode receiving the allocation + * @param containerID the container being allocated + */ + void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID); + /** * Return the node stat of the specified datanode. * @param datanodeDetails DatanodeDetails. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index fb065a0086b9..8ce4af67e9bd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -134,6 +134,8 @@ public class NodeStateManager implements Runnable, Closeable { */ private boolean checkPaused; + private final ConfigurationSource conf; + /** * timestamp of the latest heartbeat check process. */ @@ -174,6 +176,7 @@ public NodeStateManager(ConfigurationSource conf, this.nodeHealthSM = new StateMachine<>(NodeState.HEALTHY, finalStates); initializeStateMachines(); + this.conf = conf; heartbeatCheckerIntervalMs = HddsServerUtil .getScmheartbeatCheckerInterval(conf); staleNodeIntervalMs = HddsServerUtil.getStaleNodeInterval(conf); @@ -310,7 +313,7 @@ public void addNode(DatanodeDetails datanodeDetails, private DatanodeInfo newDatanodeInfo(DatanodeDetails datanode, LayoutVersionProto layout) { final NodeStatus status = newNodeStatus(datanode, layout); - return new DatanodeInfo(datanode, status, layout); + return new DatanodeInfo(datanode, status, layout, conf); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java index 0902529f1792..b2bddd911259 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java @@ -18,18 +18,12 @@ package org.apache.hadoop.hdds.scm.node; import com.google.common.annotations.VisibleForTesting; -import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.node.DatanodeInfo.TwoWindowBucket; import org.apache.hadoop.ozone.container.common.volume.VolumeUsage; -import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,8 +64,6 @@ public class PendingContainerTracker { private static final Logger LOG = LoggerFactory.getLogger(PendingContainerTracker.class); - private final DatanodeBuckets datanodeBuckets; - /** * Maximum container size in bytes. */ @@ -82,108 +74,14 @@ public class PendingContainerTracker { */ private final SCMNodeMetrics metrics; - /** - * Two-window bucket for a single DataNode. - * Contains current and previous window sets, plus last roll timestamp. - */ - private static class TwoWindowBucket { - private Set currentWindow = new HashSet<>(); - private Set previousWindow = new HashSet<>(); - private long lastRollTime = Time.monotonicNow(); - private final long rollIntervalMs; - - TwoWindowBucket(long rollIntervalMs) { - this.rollIntervalMs = rollIntervalMs; - } - - /** - * Roll one or both windows based on elapsed time. - */ - synchronized void rollIfNeeded() { - long now = Time.monotonicNow(); - long elapsed = now - lastRollTime; - - if (elapsed >= 2 * rollIntervalMs) { - int dropped = getCount(); - previousWindow.clear(); - currentWindow.clear(); - lastRollTime = now; - LOG.debug("Double roll interval elapsed ({}ms): dropped {} pending containers", elapsed, dropped); - } else if (elapsed >= rollIntervalMs) { - previousWindow.clear(); - final Set tmp = previousWindow; - previousWindow = currentWindow; - currentWindow = tmp; - lastRollTime = now; - LOG.debug("Rolled window. Previous window size: {} elapsed: ({}ms), Current window reset to empty", - previousWindow.size(), elapsed); - } - } - - synchronized boolean contains(ContainerID containerID) { - return currentWindow.contains(containerID) || previousWindow.contains(containerID); - } - - /** - * Add container to current window. - */ - synchronized boolean add(ContainerID containerID, DatanodeID dnID) { - boolean added = currentWindow.add(containerID); - LOG.debug("Recorded pending container {} on DataNode {}. Added={}, Total pending={}", - containerID, dnID, added, getCount()); - return added; - } - - /** - * Remove container from both windows. - */ - synchronized boolean remove(ContainerID containerID, DatanodeID dnID) { - boolean removedFromCurrent = currentWindow.remove(containerID); - boolean removedFromPrevious = previousWindow.remove(containerID); - boolean removed = removedFromCurrent || removedFromPrevious; - LOG.debug("Removed pending container {} from DataNode {}. Removed={}, Remaining={}", - containerID, dnID, removed, getCount()); - return removed; - } - - /** - * Count of pending containers in both windows. - */ - synchronized int getCount() { - return currentWindow.size() + previousWindow.size(); - } - } - - /** - * Per-datanode two-window buckets. - */ - private static class DatanodeBuckets { - private final ConcurrentHashMap map = new ConcurrentHashMap<>(); - private final long rollIntervalMs; - - DatanodeBuckets(long rollIntervalMs) { - this.rollIntervalMs = rollIntervalMs; - } - - TwoWindowBucket get(DatanodeID id) { - final TwoWindowBucket bucket = map.compute(id, (k, b) -> b != null ? b : new TwoWindowBucket(rollIntervalMs)); - bucket.rollIfNeeded(); - return bucket; - } - - TwoWindowBucket get(DatanodeDetails dn) { - Objects.requireNonNull(dn, "dn == null"); - return get(dn.getID()); - } - } - - public PendingContainerTracker(long maxContainerSize, long rollIntervalMs, - SCMNodeMetrics metrics) { - this.datanodeBuckets = new DatanodeBuckets(rollIntervalMs); + public PendingContainerTracker(long maxContainerSize, SCMNodeMetrics metrics) { this.maxContainerSize = maxContainerSize; this.metrics = metrics; - LOG.info("PendingContainerTracker initialized with maxContainerSize={}B, rollInterval={}ms", - maxContainerSize, rollIntervalMs); + LOG.info("PendingContainerTracker initialized with maxContainerSize={}B", maxContainerSize); + } + + private TwoWindowBucket getBucket(DatanodeInfo datanodeInfo) { + return datanodeInfo.getTwoWindowBucket(); } /** @@ -191,26 +89,23 @@ public PendingContainerTracker(long maxContainerSize, long rollIntervalMs, * Call on periodic paths (node report) so windows age even when there are no new * allocations or container reports touching this tracker. */ - public void rollWindowsIfNeeded(DatanodeDetails node) { - Objects.requireNonNull(node, "node == null"); - datanodeBuckets.get(node.getID()); + public void rollWindowsIfNeeded(DatanodeInfo datanodeInfo) { + Objects.requireNonNull(datanodeInfo, "datanodeInfo == null"); + getBucket(datanodeInfo).rollIfNeeded(); } /** * Whether the datanode can fit another container of {@link #maxContainerSize} after accounting for - * SCM pending allocations for {@code node} (this tracker) and usable space across volumes on - * {@code datanodeInfo}. Pending bytes are {@link #getPendingContainerCount} × {@code maxContainerSize}; + * SCM pending allocations for {@code datanodeInfo} (this tracker) and usable space across volumes. + * Pending bytes are {@link #getPendingContainerCount} × {@code maxContainerSize}; * effective allocatable space sums full-container slots per storage report. * - * @param node identity used to look up pending allocations (same DN as {@code datanodeInfo}) - * @param datanodeInfo storage reports for the datanode + * @param datanodeInfo storage reports and pending-allocation bucket for the datanode */ - public boolean hasEffectiveAllocatableSpaceForNewContainer( - DatanodeDetails node, DatanodeInfo datanodeInfo) { - Objects.requireNonNull(node, "node == null"); + public boolean hasEffectiveAllocatableSpaceForNewContainer(DatanodeInfo datanodeInfo) { Objects.requireNonNull(datanodeInfo, "datanodeInfo == null"); - long pendingAllocationSize = getPendingContainerCount(node) * maxContainerSize; + long pendingAllocationSize = getPendingContainerCount(datanodeInfo) * maxContainerSize; List storageReports = datanodeInfo.getStorageReports(); Objects.requireNonNull(storageReports, "storageReports == null"); if (storageReports.isEmpty()) { @@ -231,89 +126,63 @@ public boolean hasEffectiveAllocatableSpaceForNewContainer( return false; } - /** - * Record a pending container allocation for all DataNodes in the pipeline. - * Container is added to the current window. - * - * @param pipeline The pipeline where container is allocated - * @param containerID The container being allocated - */ - public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) { - Objects.requireNonNull(pipeline, "pipeline == null"); - Objects.requireNonNull(containerID, "containerID == null"); - - for (DatanodeDetails node : pipeline.getNodes()) { - recordPendingAllocationForDatanode(node, containerID); - } - } - /** * Record a pending container allocation for a single DataNode. * Container is added to the current window. * - * @param node The DataNode where container is being allocated/replicated + * @param datanodeInfo The DataNode where container is being allocated/replicated * @param containerID The container being allocated/replicated */ - public void recordPendingAllocationForDatanode(DatanodeDetails node, ContainerID containerID) { - Objects.requireNonNull(node, "node == null"); + public void recordPendingAllocationForDatanode(DatanodeInfo datanodeInfo, ContainerID containerID) { + Objects.requireNonNull(datanodeInfo, "datanodeInfo == null"); Objects.requireNonNull(containerID, "containerID == null"); - DatanodeID dnID = node.getID(); - boolean added = addContainerToBucket(containerID, dnID); + boolean added = getBucket(datanodeInfo).add(containerID); if (added && metrics != null) { metrics.incNumPendingContainersAdded(); } } - private boolean addContainerToBucket(ContainerID containerID, DatanodeID dnID) { - return datanodeBuckets.get(dnID).add(containerID, dnID); - } - /** * Remove a pending container allocation from a specific DataNode. * Removes from both current and previous windows. * Called when container is confirmed. * - * @param node The DataNode + * @param datanodeInfo The DataNode * @param containerID The container to remove from pending */ - public void removePendingAllocation(DatanodeDetails node, ContainerID containerID) { - Objects.requireNonNull(node, "node == null"); + public void removePendingAllocation(DatanodeInfo datanodeInfo, ContainerID containerID) { + Objects.requireNonNull(datanodeInfo, "datanodeInfo == null"); Objects.requireNonNull(containerID, "containerID == null"); - DatanodeID dnID = node.getID(); - boolean removed = removeContainerFromBucket(containerID, dnID); + boolean removed = getBucket(datanodeInfo).remove(containerID); if (removed && metrics != null) { metrics.incNumPendingContainersRemoved(); } } - private boolean removeContainerFromBucket(ContainerID containerID, DatanodeID dnID) { - return datanodeBuckets.get(dnID).remove(containerID, dnID); - } - /** * Number of pending container allocations for this datanode (union of current and previous * windows). This call may advance the internal tumbling window if the roll interval has elapsed. * - * @param node The DataNode + * @param datanodeInfo The DataNode * @return Pending container count */ - public long getPendingContainerCount(DatanodeDetails node) { - Objects.requireNonNull(node, "node == null"); - return datanodeBuckets.get(node).getCount(); + public long getPendingContainerCount(DatanodeInfo datanodeInfo) { + Objects.requireNonNull(datanodeInfo, "datanodeInfo == null"); + return getBucket(datanodeInfo).getCount(); } /** * Whether container is in the current or previous window for this datanode. */ @VisibleForTesting - public boolean containsPendingContainer(DatanodeDetails node, ContainerID containerID) { - Objects.requireNonNull(node, "node == null"); + public boolean containsPendingContainer(DatanodeInfo datanodeInfo, ContainerID containerID) { + Objects.requireNonNull(datanodeInfo, "datanodeInfo == null"); Objects.requireNonNull(containerID, "containerID == null"); - return datanodeBuckets.get(node).contains(containerID); + return getBucket(datanodeInfo).contains(containerID); } @VisibleForTesting diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 3289e7b312a8..dd0d6a3cffc5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -126,6 +126,7 @@ public class SCMNodeManager implements NodeManager { private final VersionInfo version; private final CommandQueue commandQueue; private final SCMNodeMetrics metrics; + private final PendingContainerTracker pendingContainerTracker; // Node manager MXBean private ObjectName nmInfoBean; private final SCMStorageConfig scmStorageConfig; @@ -188,6 +189,10 @@ public SCMNodeManager( LOG.info("Entering startup safe mode."); registerMXBean(); this.metrics = SCMNodeMetrics.create(this); + this.pendingContainerTracker = new PendingContainerTracker( + (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES), + this.metrics); this.clusterMap = networkTopology; this.nodeResolver = nodeResolver; this.useHostname = conf.getBoolean( @@ -1065,6 +1070,30 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) { } } + /** + * Effective space check aligned with container allocation: per-disk slot model minus + * SCM pending allocations. + */ + @Override + public boolean hasSpaceForNewContainerAllocation(DatanodeID datanodeID) { + DatanodeInfo datanodeInfo = getNode(datanodeID); + if (datanodeInfo == null) { + LOG.warn("DatanodeInfo not found for node {}", datanodeID); + return false; + } + return pendingContainerTracker.hasEffectiveAllocatableSpaceForNewContainer(datanodeInfo); + } + + @Override + public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) { + DatanodeInfo datanodeInfo = getNode(datanodeID); + if (datanodeInfo == null) { + LOG.warn("DatanodeInfo not found for node {}", datanodeID); + return; + } + pendingContainerTracker.recordPendingAllocationForDatanode(datanodeInfo, containerID); + } + /** * Return the node stat of the specified datanode. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 6a448d6c88df..739b0c058ec8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -94,6 +94,17 @@ int getPipelineCount( void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) throws PipelineNotFoundException, InvalidPipelineStateException; + /** + * Records a pending container allocation for every DataNode in the pipeline. + * The allocation is tracked in each node's two-window tumbling bucket so that + * {@code hasEnoughSpace} can account for in-flight allocations before a container + * report arrives from the DataNode. + * + * @param pipeline the pipeline whose nodes will receive the pending record + * @param containerID the container being allocated + */ + void recordPendingAllocation(Pipeline pipeline, ContainerID containerID); + /** * Add container to pipeline during SCM Start. * @@ -213,13 +224,13 @@ void reinitialize(Table pipelineStore) void releaseWriteLock(); /** - * Checks whether all Datanodes in the specified pipeline have greater than the specified space, containerSize. + * Checks whether all Datanodes in the specified pipeline have enough space to store a new container. + * * @param pipeline pipeline to check - * @param containerSize the required amount of space - * @return false if all the volumes on any Datanode in the pipeline have space less than equal to the specified - * containerSize, otherwise true + * @return false if any Datanode in the pipeline has no volume with space greater than the configured + * container size, otherwise true */ - boolean hasEnoughSpace(Pipeline pipeline, long containerSize); + boolean hasEnoughSpace(Pipeline pipeline); int openContainerLimit(List datanodes); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index 9c529e22e7e1..c4375e3f20ea 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -54,7 +53,6 @@ import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; -import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -636,19 +634,22 @@ private boolean isOpenWithUnregisteredNodes(Pipeline pipeline) { } @Override - public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) { + public boolean hasEnoughSpace(Pipeline pipeline) { for (DatanodeDetails node : pipeline.getNodes()) { - if (!(node instanceof DatanodeInfo)) { - node = nodeManager.getDatanodeInfo(node); - } - if (!SCMCommonPlacementPolicy.hasEnoughSpace(node, 0, containerSize)) { + if (!nodeManager.hasSpaceForNewContainerAllocation(node.getID())) { return false; } } - return true; } + @Override + public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) { + for (DatanodeDetails dn : pipeline.getNodes()) { + nodeManager.recordPendingAllocationForDatanode(dn.getID(), containerID); + } + } + /** * Schedules a fixed interval job to create pipelines. */ diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 013f14b16504..fd8544eaf83a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -113,6 +114,7 @@ public class MockNodeManager implements NodeManager { private ConcurrentMap> dnsToUuidMap; private int numHealthyDisksPerDatanode; private int numPipelinePerDatanode; + private PendingContainerTracker pendingContainerTracker; { this.healthyNodes = new LinkedList<>(); @@ -124,6 +126,7 @@ public class MockNodeManager implements NodeManager { this.dnsToUuidMap = new ConcurrentHashMap<>(); this.aggregateStat = new SCMNodeStat(); this.clusterMap = new NetworkTopologyImpl(new OzoneConfiguration()); + this.pendingContainerTracker = new PendingContainerTracker(5L * 1024 * 1024 * 1024, null); } public MockNodeManager(NetworkTopologyImpl clusterMap, @@ -263,7 +266,7 @@ public List getNodes( List healthyNodesWithInfo = new ArrayList<>(); for (DatanodeDetails dd : healthyNodes) { DatanodeInfo di = new DatanodeInfo(dd, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), new OzoneConfiguration()); long capacity = nodeMetricMap.get(dd).getCapacity().get(); long used = nodeMetricMap.get(dd).getScmUsed().get(); @@ -342,7 +345,7 @@ public List getAllNodes() { nodeStatus = NodeStatus.inServiceDead(); } DatanodeInfo di = new DatanodeInfo(entry.getKey(), nodeStatus, - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), new OzoneConfiguration()); long capacity = entry.getValue().getCapacity().get(); long used = entry.getValue().getScmUsed().get(); @@ -431,7 +434,7 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dd) { } DatanodeInfo di = new DatanodeInfo(dd, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), new OzoneConfiguration()); long capacity = nodeMetricMap.get(dd).getCapacity().get(); long used = nodeMetricMap.get(dd).getScmUsed().get(); long remaining = nodeMetricMap.get(dd).getRemaining().get(); @@ -448,6 +451,15 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dd) { return di; } + @Override + public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) { + DatanodeDetails dd = nodeMetricMap.keySet().stream() + .filter(d -> d.getID().equals(datanodeID)) + .findFirst().orElse(null); + DatanodeInfo info = getDatanodeInfo(dd); + pendingContainerTracker.recordPendingAllocationForDatanode(info, containerID); + } + /** * Return the node stat of the specified datanode. * @param datanodeDetails - datanode details. @@ -750,7 +762,8 @@ public RegisteredCommand register(DatanodeDetails datanodeDetails, NodeReportProto nodeReport, PipelineReportsProto pipelineReportsProto, LayoutVersionProto layoutInfo) { - final DatanodeInfo info = new DatanodeInfo(datanodeDetails, NodeStatus.inServiceHealthy(), layoutInfo); + final DatanodeInfo info = new DatanodeInfo(datanodeDetails, + NodeStatus.inServiceHealthy(), layoutInfo, new OzoneConfiguration()); try { node2ContainerMap.addNode(info); addEntryTodnsToUuidMap(datanodeDetails.getIpAddress(), @@ -941,6 +954,18 @@ public void setNumHealthyVolumes(int value) { numHealthyDisksPerDatanode = value; } + @Override + public boolean hasSpaceForNewContainerAllocation(DatanodeID datanodeID) { + DatanodeDetails dd = nodeMetricMap.keySet().stream() + .filter(d -> d.getID().equals(datanodeID)) + .findFirst().orElse(null); + DatanodeInfo info = getDatanodeInfo(dd); + if (info == null) { + return false; + } + return pendingContainerTracker.hasEffectiveAllocatableSpaceForNewContainer(info); + } + /** * A class to declare some values for the nodes so that our tests * won't fail. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java index 4f0679470eab..d9ec257a4f06 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -66,7 +67,7 @@ public class SimpleMockNodeManager implements NodeManager { public void register(DatanodeDetails dd, NodeStatus status) { dd.setPersistedOpState(status.getOperationalState()); dd.setPersistedOpStateExpiryEpochSec(status.getOpStateExpiryEpochSeconds()); - nodeMap.put(dd.getID(), new DatanodeInfo(dd, status, null)); + nodeMap.put(dd.getID(), new DatanodeInfo(dd, status, null, new OzoneConfiguration())); } public void setNodeStatus(DatanodeDetails dd, NodeStatus status) { @@ -248,6 +249,15 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) { return null; } + @Override + public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) { + } + + @Override + public boolean hasSpaceForNewContainerAllocation(DatanodeID datanodeID) { + return true; + } + @Override public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) { return null; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java index 218a2137e3e6..68dcc634a5e3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java @@ -26,7 +26,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -103,7 +102,7 @@ void setUp() throws Exception { pipelineManager = spy(base); // Default: allow allocation in tests unless a test overrides it. - doReturn(true).when(pipelineManager).hasEnoughSpace(any(Pipeline.class), anyLong()); + doReturn(true).when(pipelineManager).hasEnoughSpace(any(Pipeline.class)); pipelineManager.createPipeline(RatisReplicationConfig.getInstance( ReplicationFactor.THREE)); @@ -141,7 +140,7 @@ void testAllocateContainer() throws Exception { */ @Test public void testGetMatchingContainerReturnsNullWhenNotEnoughSpaceInDatanodes() throws IOException { - doReturn(false).when(pipelineManager).hasEnoughSpace(any(), anyLong()); + doReturn(false).when(pipelineManager).hasEnoughSpace(any()); long sizeRequired = 256 * 1024 * 1024; // 256 MB Pipeline pipeline = pipelineManager.getPipelines().iterator().next(); @@ -166,7 +165,7 @@ public void testGetMatchingContainerReturnsContainerWhenEnoughSpaceInDatanodes() // create a spy to mock hasEnoughSpace to always return true PipelineManager spyPipelineManager = spy(pipelineManager); doReturn(true).when(spyPipelineManager) - .hasEnoughSpace(any(Pipeline.class), anyLong()); + .hasEnoughSpace(any(Pipeline.class)); // create a new ContainerManager using the spy File tempDir = new File(testDir, "tempDir"); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java index d67320974918..32d43cdff5a4 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java @@ -99,7 +99,8 @@ public void testRackAwarePolicy() throws IOException { .createDatanodeDetails(hostname + i, rack + (i / 5)); DatanodeInfo datanodeInfo = new DatanodeInfo( datanodeDetails, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + conf); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java index 5b940fc543f1..0229673c2248 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java @@ -64,7 +64,8 @@ public void chooseDatanodes() throws SCMException { DatanodeInfo datanodeInfo = new DatanodeInfo( MockDatanodeDetails.randomDatanodeDetails(), NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + conf); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java index 94cdc33e869a..5e2f8992ca41 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java @@ -112,7 +112,8 @@ private void setup(int datanodeCount) { cluster.add(datanodeDetails); DatanodeInfo datanodeInfo = new DatanodeInfo( datanodeDetails, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + conf); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), @@ -455,7 +456,8 @@ public void testDatanodeWithDefaultNetworkLocation(int datanodeCount) hostname + i, null); DatanodeInfo dnInfo = new DatanodeInfo( dn, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + conf); StorageReportProto storage1 = HddsTestUtils.createStorageReport( dnInfo.getID(), "/data1-" + dnInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java index ce66bf9f68c7..db491c501e53 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java @@ -180,7 +180,8 @@ private void setupDatanode(DatanodeDetails datanodeDetails) { cluster.add(datanodeDetails); DatanodeInfo datanodeInfo = new DatanodeInfo( datanodeDetails, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + conf); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), @@ -490,7 +491,8 @@ public void testDatanodeWithDefaultNetworkLocation(int datanodeCount) hostname + i, null); DatanodeInfo dnInfo = new DatanodeInfo( dn, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + conf); StorageReportProto storage1 = HddsTestUtils.createStorageReport( dnInfo.getID(), "/data1-" + dnInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java index 3171df95db4a..eb3f7fc8f494 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java @@ -61,7 +61,8 @@ public void chooseDatanodes() throws SCMException { DatanodeInfo datanodeInfo = new DatanodeInfo( MockDatanodeDetails.randomDatanodeDetails(), NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + conf); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), @@ -167,7 +168,8 @@ public void testIsValidNode() throws SCMException { DatanodeInfo datanodeInfo = new DatanodeInfo( MockDatanodeDetails.randomDatanodeDetails(), NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + conf); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 9b7c5c77b2cd..4dbe79fc1351 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -26,7 +26,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -160,7 +159,7 @@ SCMNodeManager createNodeManager(OzoneConfiguration config) { ContainerManager createContainerManager() throws IOException { pipelineManager = spy(pipelineManager); - doReturn(true).when(pipelineManager).hasEnoughSpace(any(), anyLong()); + doReturn(true).when(pipelineManager).hasEnoughSpace(any()); return new ContainerManagerImpl(conf, scmhaManager, sequenceIdGen, pipelineManager, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java index 00a0c565b2ca..a04f0dda0a71 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java @@ -17,21 +17,19 @@ package org.apache.hadoop.hdds.scm.node; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -42,18 +40,15 @@ public class TestPendingContainerTracker { private static final long MAX_CONTAINER_SIZE = 5L * 1024 * 1024 * 1024; // 5GB - private static final long DEFAULT_ROLL_INTERVAL_MS = 5 * 60 * 1000; private static final int NUM_DATANODES = 1000; - private static final int NUM_PIPELINES = 1000; private static final int NUM_CONTAINERS = 10_000; - private List datanodes; - private List pipelines; + private List datanodes; private List containers; + private final OzoneConfiguration conf = new OzoneConfiguration(); private PendingContainerTracker tracker; - private Pipeline pipeline; - private DatanodeDetails dn1; - private DatanodeDetails dn2; + private DatanodeInfo dn1; + private DatanodeInfo dn2; /** First three container IDs. */ private ContainerID container1; @@ -61,16 +56,13 @@ public class TestPendingContainerTracker { @BeforeEach public void setUp() throws IOException { - tracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, DEFAULT_ROLL_INTERVAL_MS, null); + tracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, null); datanodes = new ArrayList<>(NUM_DATANODES); for (int i = 0; i < NUM_DATANODES; i++) { - datanodes.add(MockDatanodeDetails.randomLocalDatanodeDetails()); - } - - pipelines = new ArrayList<>(NUM_PIPELINES); - for (int i = 0; i < NUM_PIPELINES; i++) { - pipelines.add(MockPipeline.createPipeline(Collections.singletonList(datanodes.get(i)))); + datanodes.add(new DatanodeInfo( + MockDatanodeDetails.randomLocalDatanodeDetails(), NodeStatus.inServiceHealthy(), null, + conf)); } containers = new ArrayList<>(NUM_CONTAINERS); @@ -78,7 +70,6 @@ public void setUp() throws IOException { containers.add(ContainerID.valueOf(id)); } - pipeline = MockPipeline.createPipeline(datanodes.subList(0, 3)); dn1 = datanodes.get(0); dn2 = datanodes.get(1); @@ -88,9 +79,9 @@ public void setUp() throws IOException { @Test public void testRecordPendingAllocation() { - // Allocate first 100 containers across first 100 pipelines (1 DN each) + // Allocate first 100 containers, one per datanode for (int i = 0; i < 100; i++) { - tracker.recordPendingAllocation(pipelines.get(i), containers.get(i)); + tracker.recordPendingAllocationForDatanode(datanodes.get(i), containers.get(i)); } // Each of the first 100 DNs should have 1 pending container @@ -107,9 +98,9 @@ public void testRecordPendingAllocation() { @Test public void testRemovePendingAllocation() { - // Allocate containers 0-99 to first 100 pipelines + // Allocate containers 0-99, one per datanode for (int i = 0; i < 100; i++) { - tracker.recordPendingAllocation(pipelines.get(i), containers.get(i)); + tracker.recordPendingAllocationForDatanode(datanodes.get(i), containers.get(i)); } // Remove from first 50 DNs @@ -137,29 +128,34 @@ public void testRemovePendingAllocation() { @Timeout(30) public void testTwoWindowRollAgesOutContainerAfterTwoIntervals() throws InterruptedException { long rollMs = 200L; - PendingContainerTracker shortRollTracker = - new PendingContainerTracker(MAX_CONTAINER_SIZE, rollMs, null); + OzoneConfiguration shortConf = new OzoneConfiguration(); + shortConf.set(OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL, rollMs + "ms"); + DatanodeInfo shortDn = new DatanodeInfo( + MockDatanodeDetails.randomLocalDatanodeDetails(), NodeStatus.inServiceHealthy(), null, + shortConf); + + PendingContainerTracker shortRollTracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, null); - shortRollTracker.recordPendingAllocationForDatanode(dn1, container1); - assertEquals(1, shortRollTracker.getPendingContainerCount(dn1)); - assertTrue(shortRollTracker.containsPendingContainer(dn1, container1)); + shortRollTracker.recordPendingAllocationForDatanode(shortDn, container1); + assertEquals(1, shortRollTracker.getPendingContainerCount(shortDn)); + assertTrue(shortRollTracker.containsPendingContainer(shortDn, container1)); // First roll: C1 moves from currentWindow to previousWindow; union still includes C1 Thread.sleep(rollMs + 80); - shortRollTracker.rollWindowsIfNeeded(dn1); - assertEquals(1, shortRollTracker.getPendingContainerCount(dn1)); - assertTrue(shortRollTracker.containsPendingContainer(dn1, container1)); + shortRollTracker.rollWindowsIfNeeded(shortDn); + assertEquals(1, shortRollTracker.getPendingContainerCount(shortDn)); + assertTrue(shortRollTracker.containsPendingContainer(shortDn, container1)); // Second roll: prior previousWindow (holding C1) is dropped; C1 is no longer pending Thread.sleep(rollMs + 80); - shortRollTracker.rollWindowsIfNeeded(dn1); - assertEquals(0, shortRollTracker.getPendingContainerCount(dn1)); - assertEquals(0L, shortRollTracker.getPendingContainerCount(dn1) * MAX_CONTAINER_SIZE); + shortRollTracker.rollWindowsIfNeeded(shortDn); + assertEquals(0, shortRollTracker.getPendingContainerCount(shortDn)); + assertEquals(0L, shortRollTracker.getPendingContainerCount(shortDn) * MAX_CONTAINER_SIZE); } @Test public void testRemoveNonExistentContainer() { - tracker.recordPendingAllocation(pipeline, container1); + datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, container1)); // Remove a container that was never added - should not throw exception tracker.removePendingAllocation(dn1, container2); @@ -170,7 +166,9 @@ public void testRemoveNonExistentContainer() { @Test public void testUnknownDatanodeHasZeroPendingCount() { - DatanodeDetails unknownDN = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeInfo unknownDN = new DatanodeInfo( + MockDatanodeDetails.randomDatanodeDetails(), NodeStatus.inServiceHealthy(), null, + conf); assertEquals(0, tracker.getPendingContainerCount(unknownDN)); } @@ -187,7 +185,7 @@ public void testConcurrentModification() throws InterruptedException { threads[i] = new Thread(() -> { for (int j = 0; j < operationsPerThread; j++) { ContainerID cid = ContainerID.valueOf(threadId * 1000L + j); - tracker.recordPendingAllocation(pipeline, cid); + datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, cid)); if (j % 2 == 0) { tracker.removePendingAllocation(dn1, cid); @@ -209,7 +207,7 @@ public void testConcurrentModification() throws InterruptedException { @Test public void testBucketsRetainedWhenEmpty() { - tracker.recordPendingAllocation(pipeline, container1); + datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, container1)); assertEquals(1, tracker.getPendingContainerCount(dn1)); @@ -230,8 +228,8 @@ public void testRemoveFromBothWindows() { // In general, a container could be in previous window after a roll // Add containers - tracker.recordPendingAllocation(pipeline, container1); - tracker.recordPendingAllocation(pipeline, container2); + datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, container1)); + datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, container2)); assertEquals(2, tracker.getPendingContainerCount(dn1)); @@ -247,7 +245,7 @@ public void testRemoveFromBothWindows() { @Test public void testManyContainersOnSingleDatanode() { // Allocate first 1000 containers to the first datanode - DatanodeDetails dn = datanodes.get(0); + DatanodeInfo dn = datanodes.get(0); for (int i = 0; i < 1000; i++) { tracker.recordPendingAllocationForDatanode(dn, containers.get(i)); } @@ -274,7 +272,7 @@ public void testManyContainersOnSingleDatanode() { public void testAllDatanodesWithMultipleContainers() { // Allocate 10 containers to each of the 1000 datanodes for (int dnIdx = 0; dnIdx < NUM_DATANODES; dnIdx++) { - DatanodeDetails dn = datanodes.get(dnIdx); + DatanodeInfo dn = datanodes.get(dnIdx); for (int cIdx = 0; cIdx < 10; cIdx++) { int containerIdx = dnIdx * 10 + cIdx; tracker.recordPendingAllocationForDatanode(dn, containers.get(containerIdx)); @@ -290,7 +288,7 @@ public void testAllDatanodesWithMultipleContainers() { // Remove all containers from every 10th DN for (int dnIdx = 0; dnIdx < NUM_DATANODES; dnIdx += 10) { - DatanodeDetails dn = datanodes.get(dnIdx); + DatanodeInfo dn = datanodes.get(dnIdx); for (int cIdx = 0; cIdx < 10; cIdx++) { int containerIdx = dnIdx * 10 + cIdx; tracker.removePendingAllocation(dn, containers.get(containerIdx)); @@ -311,7 +309,7 @@ public void testAllDatanodesWithMultipleContainers() { @Test public void testIdempotentRecording() { // Allocate same 100 containers multiple times to first 100 DNs - DatanodeDetails dn = datanodes.get(0); + DatanodeInfo dn = datanodes.get(0); for (int round = 0; round < 5; round++) { for (int i = 0; i < 100; i++) { @@ -325,63 +323,64 @@ public void testIdempotentRecording() { @Test public void testMultiVolumeAccumulatedSpaceIsNotEnough() { - DatanodeDetails dn = datanodes.get(0); long containerSize = MAX_CONTAINER_SIZE; - + + // Use the same DatanodeInfo object for both recording and checking. + DatanodeInfo dnInfo = datanodes.get(0); List reports = new ArrayList<>(); - reports.add(createStorageReport(dn, 100 * containerSize, containerSize / 4, 0)); - reports.add(createStorageReport(dn, 100 * containerSize, containerSize / 4, 0)); - reports.add(createStorageReport(dn, 100 * containerSize, containerSize / 2, 0)); - DatanodeInfo dnInfo = new DatanodeInfo(dn, NodeStatus.inServiceHealthy(), null); + reports.add(createStorageReport(dnInfo, 100 * containerSize, containerSize / 4, 0)); + reports.add(createStorageReport(dnInfo, 100 * containerSize, containerSize / 4, 0)); + reports.add(createStorageReport(dnInfo, 100 * containerSize, containerSize / 2, 0)); dnInfo.updateStorageReports(reports); - assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo)); + assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dnInfo)); } @Test public void testMultiVolumeWithPendingAllocation() { - DatanodeDetails dn = datanodes.get(0); long containerSize = MAX_CONTAINER_SIZE; + // Use the same DatanodeInfo object for recording pending allocations and checking space. + DatanodeInfo dnInfo = datanodes.get(0); + // Remaining space available for 3 containers across all the volumes - tracker.recordPendingAllocationForDatanode(dn, containers.get(0)); - tracker.recordPendingAllocationForDatanode(dn, containers.get(1)); + tracker.recordPendingAllocationForDatanode(dnInfo, containers.get(0)); + tracker.recordPendingAllocationForDatanode(dnInfo, containers.get(1)); List reports = new ArrayList<>(); - reports.add(createStorageReport(dn, 100 * containerSize, containerSize, 0)); - reports.add(createStorageReport(dn, 50 * containerSize, containerSize, 0)); - reports.add(createStorageReport(dn, 100 * containerSize, containerSize, 0)); - DatanodeInfo dnInfo = new DatanodeInfo(dn, NodeStatus.inServiceHealthy(), null); + reports.add(createStorageReport(dnInfo, 100 * containerSize, containerSize, 0)); + reports.add(createStorageReport(dnInfo, 50 * containerSize, containerSize, 0)); + reports.add(createStorageReport(dnInfo, 100 * containerSize, containerSize, 0)); dnInfo.updateStorageReports(reports); // Remaining space available for 1 container across all the volume after 2 container allocation - assertTrue(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo)); + assertTrue(tracker.hasEffectiveAllocatableSpaceForNewContainer(dnInfo)); - tracker.recordPendingAllocationForDatanode(dn, containers.get(2)); + tracker.recordPendingAllocationForDatanode(dnInfo, containers.get(2)); // Remaining space available for 0 container across all the volume - assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo)); + assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dnInfo)); } @Test public void testMultiVolumeWithCommittedBytes() { - DatanodeDetails dn = datanodes.get(0); long containerSize = MAX_CONTAINER_SIZE; - + + // Use the same DatanodeInfo object for recording pending allocations and checking space. + DatanodeInfo dnInfo = datanodes.get(0); List reports = new ArrayList<>(); - reports.add(createStorageReport(dn, 100 * containerSize, 6 * containerSize, 5 * containerSize)); - reports.add(createStorageReport(dn, 50 * containerSize, 3 * containerSize, 3 * containerSize)); - DatanodeInfo dnInfo = new DatanodeInfo(dn, NodeStatus.inServiceHealthy(), null); + reports.add(createStorageReport(dnInfo, 100 * containerSize, 6 * containerSize, 5 * containerSize)); + reports.add(createStorageReport(dnInfo, 50 * containerSize, 3 * containerSize, 3 * containerSize)); dnInfo.updateStorageReports(reports); // Remaining space available for 1 container across all the volume considering committed bytes - assertTrue(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo)); - tracker.recordPendingAllocationForDatanode(dn, containers.get(0)); + assertTrue(tracker.hasEffectiveAllocatableSpaceForNewContainer(dnInfo)); + tracker.recordPendingAllocationForDatanode(dnInfo, containers.get(0)); // Remaining space available for 0 container across all the volume considering // committed bytes and container allocation - assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo)); + assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dnInfo)); } - private StorageReportProto createStorageReport(DatanodeDetails dn, long capacity, long remaining, long committed) { + private StorageReportProto createStorageReport(DatanodeInfo dn, long capacity, long remaining, long committed) { return HddsTestUtils.createStorageReports(dn.getID(), capacity, remaining, committed).get(0); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java index c079c509ad8a..8a689f56843f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java @@ -24,6 +24,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; @@ -52,7 +53,7 @@ void addNode(NodeStatus status) throws NodeAlreadyExistsException { } void addNode(DatanodeDetails datanode, NodeStatus status) throws NodeAlreadyExistsException { - map.addNode(new DatanodeInfo(datanode, status, null)); + map.addNode(new DatanodeInfo(datanode, status, null, new OzoneConfiguration())); } @BeforeEach diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java index d6a3fc546352..69b1e24dfe3b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java @@ -50,9 +50,11 @@ public class MockPipelineManager implements PipelineManager { private final PipelineStateManager stateManager; + private final NodeManager nodeManager; public MockPipelineManager(DBStore dbStore, SCMHAManager scmhaManager, NodeManager nodeManager) throws RocksDatabaseException, CodecException, DuplicatedPipelineIdException { + this.nodeManager = nodeManager; stateManager = PipelineStateManagerImpl .newBuilder().setNodeManager(nodeManager) .setRatisServer(scmhaManager.getRatisServer()) @@ -332,10 +334,17 @@ public boolean isPipelineCreationFrozen() { } @Override - public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) { + public boolean hasEnoughSpace(Pipeline pipeline) { return false; } + @Override + public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) { + for (DatanodeDetails dn : pipeline.getNodes()) { + nodeManager.recordPendingAllocationForDatanode(dn.getID(), containerID); + } + } + @Override public int openContainerLimit(List datanodes) { // For tests that do not care about this limit, return a large value. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index e7fc6f14f9b6..4e53f6d629ae 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -92,7 +92,6 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub; import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition; -import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy; @@ -937,12 +936,11 @@ public void testCreatePipelineForRead() throws IOException { } /** - * {@link PipelineManager#hasEnoughSpace(Pipeline, long)} should return false if all the - * volumes on any Datanode in the pipeline have less than equal to the space required for creating a new container. + * {@link PipelineManager#hasEnoughSpace(Pipeline)} should return false if all the + * volumes on any Datanode in the pipeline have space less than or equal to the configured container size. */ @Test public void testHasEnoughSpace() throws IOException { - // create a Mock NodeManager, the MockNodeManager class doesn't work for this test NodeManager mockedNodeManager = Mockito.mock(NodeManager.class); PipelineManagerImpl pipelineManager = PipelineManagerImpl.newPipelineManager(conf, SCMHAManagerStub.getInstance(true), @@ -953,50 +951,30 @@ public void testHasEnoughSpace() throws IOException { serviceManager, testClock); + DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails dn2 = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails dn3 = MockDatanodeDetails.randomDatanodeDetails(); Pipeline pipeline = Pipeline.newBuilder() .setId(PipelineID.randomId()) - .setNodes(ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails(), - MockDatanodeDetails.randomDatanodeDetails(), - MockDatanodeDetails.randomDatanodeDetails())) + .setNodes(ImmutableList.of(dn1, dn2, dn3)) .setState(OPEN) .setReplicationConfig(ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, THREE)) .build(); - List nodes = pipeline.getNodes(); - assertEquals(3, nodes.size()); - - long containerSize = 100L; // Case 1: All nodes have enough space. - List datanodeInfoList = new ArrayList<>(); - for (DatanodeDetails dn : nodes) { - // the method being tested needs NodeManager to return DatanodeInfo because DatanodeInfo has storage - // information (it extends DatanodeDetails) - DatanodeInfo info = new DatanodeInfo(dn, null, null); - info.updateStorageReports(HddsTestUtils.createStorageReports(dn.getID(), 200L, 200L, 10L)); - doReturn(info).when(mockedNodeManager).getDatanodeInfo(dn); - datanodeInfoList.add(info); - } - assertTrue(pipelineManager.hasEnoughSpace(pipeline, containerSize)); - - // Case 2: One node does not have enough space. - /* - Interestingly, SCMCommonPlacementPolicy#hasEnoughSpace returns false if exactly the required amount of space - is available. Which means it won't allow creating a pipeline on a node if all volumes have exactly 5 GB - available. We follow the same behavior here in the case of a new replica. - - So here, remaining - committed == containerSize, and hasEnoughSpace returns false. - TODO should this return true instead? - */ - DatanodeInfo datanodeInfo = datanodeInfoList.get(0); - datanodeInfo.updateStorageReports(HddsTestUtils.createStorageReports(datanodeInfo.getID(), 200L, 120L, - 20L)); - assertFalse(pipelineManager.hasEnoughSpace(pipeline, containerSize)); + doReturn(true).when(mockedNodeManager).hasSpaceForNewContainerAllocation(dn1.getID()); + doReturn(true).when(mockedNodeManager).hasSpaceForNewContainerAllocation(dn2.getID()); + doReturn(true).when(mockedNodeManager).hasSpaceForNewContainerAllocation(dn3.getID()); + assertTrue(pipelineManager.hasEnoughSpace(pipeline)); + + // Case 2: One node does not have enough space — pipeline should be rejected. + doReturn(false).when(mockedNodeManager).hasSpaceForNewContainerAllocation(dn1.getID()); + assertFalse(pipelineManager.hasEnoughSpace(pipeline)); // Case 3: All nodes do not have enough space. - for (DatanodeInfo info : datanodeInfoList) { - info.updateStorageReports(HddsTestUtils.createStorageReports(info.getID(), 200L, 100L, 20L)); - } - assertFalse(pipelineManager.hasEnoughSpace(pipeline, containerSize)); + doReturn(false).when(mockedNodeManager).hasSpaceForNewContainerAllocation(dn2.getID()); + doReturn(false).when(mockedNodeManager).hasSpaceForNewContainerAllocation(dn3.getID()); + assertFalse(pipelineManager.hasEnoughSpace(pipeline)); } private Set createContainerReplicasList( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java index e548e45de849..c2236e008621 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java @@ -103,7 +103,8 @@ private void setupRacks(int datanodeCount, int nodesPerRack, cluster.add(datanodeDetails); DatanodeInfo datanodeInfo = new DatanodeInfo( datanodeDetails, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + conf); StorageContainerDatanodeProtocolProtos.StorageReportProto storage1 = HddsTestUtils.createStorageReport( diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java index 8d21c2465a8f..03cd04f54883 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java @@ -36,6 +36,7 @@ import java.util.UUID; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.fs.SpaceUsageSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; @@ -225,7 +226,7 @@ private List mockStorageDistributionData(int numNodes) throws Exception .build(); pendingDeletionMetrics.add(new DatanodePendingDeletionMetrics(hostName, uuid.toString(), PENDING_DELETION_SIZE)); - dataNodes.add(new DatanodeInfo(datanode, NodeStatus.inServiceHealthy(), null)); + dataNodes.add(new DatanodeInfo(datanode, NodeStatus.inServiceHealthy(), null, new OzoneConfiguration())); when(nodeManager.getNodeStat(datanode)) .thenReturn(new SCMNodeMetric(OZONE_CAPACITY, OZONE_USED, OZONE_REMAINING, COMMITTED, MIN_FREE_SPACE, RESERVED)); From 954c232bb38fb9418cfc0cf0ec61745fef19bd05 Mon Sep 17 00:00:00 2001 From: ashishk Date: Fri, 24 Apr 2026 19:37:46 +0530 Subject: [PATCH 2/5] Add config in default xml --- .../common/src/main/resources/ozone-default.xml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index e159eb6948b8..3e32c4fe715c 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1055,6 +1055,17 @@ The number of stripes created for the container state manager lock. + + ozone.scm.pending.container.roll.interval + 5m + OZONE, SCM, PERFORMANCE, MANAGEMENT + + The interval at which the two-window tumbling bucket for pending + container allocations rolls over per DataNode. Pending containers + that have not been confirmed within two intervals are automatically + aged out. Default is 5 minutes. + + ozone.scm.datanode.address From 72f7200fdce29606778b5ee11a3998cefa58a3b8 Mon Sep 17 00:00:00 2001 From: ashishk Date: Fri, 24 Apr 2026 23:45:15 +0530 Subject: [PATCH 3/5] Update mocknode manager conf initialization --- .../hadoop/hdds/scm/container/MockNodeManager.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index fd8544eaf83a..05d0ec94652e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -115,6 +115,7 @@ public class MockNodeManager implements NodeManager { private int numHealthyDisksPerDatanode; private int numPipelinePerDatanode; private PendingContainerTracker pendingContainerTracker; + private final OzoneConfiguration conf = new OzoneConfiguration(); { this.healthyNodes = new LinkedList<>(); @@ -125,7 +126,7 @@ public class MockNodeManager implements NodeManager { this.node2ContainerMap = new NodeStateMap(); this.dnsToUuidMap = new ConcurrentHashMap<>(); this.aggregateStat = new SCMNodeStat(); - this.clusterMap = new NetworkTopologyImpl(new OzoneConfiguration()); + this.clusterMap = new NetworkTopologyImpl(conf); this.pendingContainerTracker = new PendingContainerTracker(5L * 1024 * 1024 * 1024, null); } @@ -266,7 +267,7 @@ public List getNodes( List healthyNodesWithInfo = new ArrayList<>(); for (DatanodeDetails dd : healthyNodes) { DatanodeInfo di = new DatanodeInfo(dd, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto(), new OzoneConfiguration()); + UpgradeUtils.defaultLayoutVersionProto(), conf); long capacity = nodeMetricMap.get(dd).getCapacity().get(); long used = nodeMetricMap.get(dd).getScmUsed().get(); @@ -345,7 +346,7 @@ public List getAllNodes() { nodeStatus = NodeStatus.inServiceDead(); } DatanodeInfo di = new DatanodeInfo(entry.getKey(), nodeStatus, - UpgradeUtils.defaultLayoutVersionProto(), new OzoneConfiguration()); + UpgradeUtils.defaultLayoutVersionProto(), conf); long capacity = entry.getValue().getCapacity().get(); long used = entry.getValue().getScmUsed().get(); @@ -434,7 +435,7 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dd) { } DatanodeInfo di = new DatanodeInfo(dd, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto(), new OzoneConfiguration()); + UpgradeUtils.defaultLayoutVersionProto(), conf); long capacity = nodeMetricMap.get(dd).getCapacity().get(); long used = nodeMetricMap.get(dd).getScmUsed().get(); long remaining = nodeMetricMap.get(dd).getRemaining().get(); @@ -763,7 +764,7 @@ public RegisteredCommand register(DatanodeDetails datanodeDetails, PipelineReportsProto pipelineReportsProto, LayoutVersionProto layoutInfo) { final DatanodeInfo info = new DatanodeInfo(datanodeDetails, - NodeStatus.inServiceHealthy(), layoutInfo, new OzoneConfiguration()); + NodeStatus.inServiceHealthy(), layoutInfo, conf); try { node2ContainerMap.addNode(info); addEntryTodnsToUuidMap(datanodeDetails.getIpAddress(), From df0f2a4804a4f2a863d05310dadba71d2fa8131e Mon Sep 17 00:00:00 2001 From: ashishk Date: Mon, 27 Apr 2026 12:28:18 +0530 Subject: [PATCH 4/5] Fix review suggestions --- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 5 - .../src/main/resources/ozone-default.xml | 11 -- .../hadoop/hdds/scm/node/DatanodeInfo.java | 103 +---------- .../hdds/scm/node/NodeStateManager.java | 7 +- .../scm/node/PendingContainerTracker.java | 170 ++++++++++++------ .../hadoop/hdds/scm/node/SCMNodeManager.java | 3 +- .../apache/hadoop/hdds/scm/HddsTestUtils.java | 1 + .../hdds/scm/container/MockNodeManager.java | 11 +- .../scm/container/SimpleMockNodeManager.java | 4 +- .../TestContainerPlacementFactory.java | 3 +- .../TestSCMContainerPlacementCapacity.java | 2 +- .../TestSCMContainerPlacementRackAware.java | 4 +- .../TestSCMContainerPlacementRackScatter.java | 4 +- .../TestSCMContainerPlacementRandom.java | 4 +- .../scm/node/TestPendingContainerTracker.java | 107 ++++++----- .../scm/node/states/TestNodeStateMap.java | 4 +- .../TestPipelinePlacementFactory.java | 2 +- .../api/TestStorageDistributionEndpoint.java | 3 +- 18 files changed, 197 insertions(+), 251 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 01f8e2f56179..2c44fa881c41 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -269,11 +269,6 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_STALENODE_INTERVAL_DEFAULT = "5m"; - public static final String OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL = - "ozone.scm.pending.container.roll.interval"; - public static final String OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT = - "5m"; - public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT = "ozone.scm.heartbeat.rpc-timeout"; public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT = diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 3e32c4fe715c..e159eb6948b8 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1055,17 +1055,6 @@ The number of stripes created for the container state manager lock. - - ozone.scm.pending.container.roll.interval - 5m - OZONE, SCM, PERFORMANCE, MANAGEMENT - - The interval at which the two-window tumbling bucket for pending - container allocations rolls over per DataNode. Pending containers - that have not been confirmed within two intervals are automatically - aged out. Default is 5 minutes. - - ozone.scm.datanode.address diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java index dd262d2edb5a..cb4cb1db6967 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java @@ -17,29 +17,22 @@ package org.apache.hadoop.hdds.scm.node; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.toLayoutVersionProto; import com.google.common.annotations.VisibleForTesting; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandQueueReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; -import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker.TwoWindowBucket; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +54,7 @@ public class DatanodeInfo extends DatanodeDetails { /** * Two-window tumbling bucket for tracking pending container allocations on this datanode. */ - private final TwoWindowBucket twoWindowBucket; + private final TwoWindowBucket pendingContainerAllocations; private List storageReports; private List metadataStorageReports; @@ -74,12 +67,10 @@ public class DatanodeInfo extends DatanodeDetails { * Constructs DatanodeInfo from DatanodeDetails. * * @param datanodeDetails Details about the datanode - * @param nodeStatus initial node status * @param layoutInfo Details about the LayoutVersionProto - * @param conf configuration source */ public DatanodeInfo(DatanodeDetails datanodeDetails, NodeStatus nodeStatus, - LayoutVersionProto layoutInfo, ConfigurationSource conf) { + LayoutVersionProto layoutInfo, long containerRollIntervalMs) { super(datanodeDetails); this.lock = new ReentrantReadWriteLock(); this.lastHeartbeatTime = Time.monotonicNow(); @@ -90,10 +81,7 @@ public DatanodeInfo(DatanodeDetails datanodeDetails, NodeStatus nodeStatus, this.nodeStatus = nodeStatus; this.metadataStorageReports = Collections.emptyList(); this.commandCounts = new HashMap<>(); - this.twoWindowBucket = new TwoWindowBucket(this.getID(), conf.getTimeDuration( - OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL, - OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS)); + this.pendingContainerAllocations = new TwoWindowBucket(this.getID(), containerRollIntervalMs); } /** @@ -375,8 +363,9 @@ public int getCommandCount(SCMCommandProto.Type cmd) { /** * Returns the {@link TwoWindowBucket} for this datanode. */ - public TwoWindowBucket getTwoWindowBucket() { - return twoWindowBucket; + public TwoWindowBucket getPendingContainerAllocations() { + pendingContainerAllocations.rollIfNeeded(); + return pendingContainerAllocations; } @Override @@ -388,82 +377,4 @@ public int hashCode() { public boolean equals(Object obj) { return super.equals(obj); } - - /** - * Two-window tumbling bucket for a single DataNode. - * - *

New allocations go into {@code currentWindow}. Every {@code rollIntervalMs}, the current - * window shifts to {@code previousWindow} and a fresh empty set becomes current. After two - * intervals without confirmation a container ID is automatically discarded (aged out). - * Pending count is the union of both windows. - */ - static class TwoWindowBucket { - private Set currentWindow = new HashSet<>(); - private Set previousWindow = new HashSet<>(); - private long lastRollTime = Time.monotonicNow(); - private final long rollIntervalMs; - private final DatanodeID datanodeID; - - TwoWindowBucket(DatanodeID datanodeID, long rollIntervalMs) { - this.datanodeID = datanodeID; - this.rollIntervalMs = rollIntervalMs; - } - - /** - * Roll one or both windows based on elapsed time. - */ - synchronized void rollIfNeeded() { - long now = Time.monotonicNow(); - long elapsed = now - lastRollTime; - - if (elapsed >= 2 * rollIntervalMs) { - int dropped = getCount(); - previousWindow.clear(); - currentWindow.clear(); - lastRollTime = now; - LOG.debug("Double roll interval elapsed ({}ms): dropped {} pending containers", elapsed, dropped); - } else if (elapsed >= rollIntervalMs) { - previousWindow.clear(); - final Set tmp = previousWindow; - previousWindow = currentWindow; - currentWindow = tmp; - lastRollTime = now; - LOG.debug("Rolled window. Previous window size: {} elapsed: ({}ms), Current window reset to empty", - previousWindow.size(), elapsed); - } - } - - synchronized boolean contains(ContainerID containerID) { - return currentWindow.contains(containerID) || previousWindow.contains(containerID); - } - - /** - * Add container to current window. - */ - synchronized boolean add(ContainerID containerID) { - boolean added = currentWindow.add(containerID); - LOG.debug("Recorded pending container {} on DataNode {}. Added={}, Total pending={}", - containerID, datanodeID, added, getCount()); - return added; - } - - /** - * Remove container from both windows. - */ - synchronized boolean remove(ContainerID containerID) { - boolean removedFromCurrent = currentWindow.remove(containerID); - boolean removedFromPrevious = previousWindow.remove(containerID); - boolean removed = removedFromCurrent || removedFromPrevious; - LOG.debug("Removed pending container {} from DataNode {}. Removed={}, Remaining={}", - containerID, datanodeID, removed, getCount()); - return removed; - } - - /** - * Count of pending containers in both windows. - */ - synchronized int getCount() { - return currentWindow.size() + previousWindow.size(); - } - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index 8ce4af67e9bd..9e4b96999df0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -124,6 +124,8 @@ public class NodeStateManager implements Runnable, Closeable { */ private final long deadNodeIntervalMs; + private final long containerRollIntervalMs = 5 * 60 * 1000; //TODO + /** * The future is used to pause/unpause the scheduled checks. */ @@ -134,8 +136,6 @@ public class NodeStateManager implements Runnable, Closeable { */ private boolean checkPaused; - private final ConfigurationSource conf; - /** * timestamp of the latest heartbeat check process. */ @@ -176,7 +176,6 @@ public NodeStateManager(ConfigurationSource conf, this.nodeHealthSM = new StateMachine<>(NodeState.HEALTHY, finalStates); initializeStateMachines(); - this.conf = conf; heartbeatCheckerIntervalMs = HddsServerUtil .getScmheartbeatCheckerInterval(conf); staleNodeIntervalMs = HddsServerUtil.getStaleNodeInterval(conf); @@ -313,7 +312,7 @@ public void addNode(DatanodeDetails datanodeDetails, private DatanodeInfo newDatanodeInfo(DatanodeDetails datanode, LayoutVersionProto layout) { final NodeStatus status = newNodeStatus(datanode, layout); - return new DatanodeInfo(datanode, status, layout, conf); + return new DatanodeInfo(datanode, status, layout, containerRollIntervalMs); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java index b2bddd911259..86038a1ca994 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java @@ -17,13 +17,18 @@ package org.apache.hadoop.hdds.scm.node; -import com.google.common.annotations.VisibleForTesting; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.node.DatanodeInfo.TwoWindowBucket; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.container.common.volume.VolumeUsage; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,38 +79,103 @@ public class PendingContainerTracker { */ private final SCMNodeMetrics metrics; - public PendingContainerTracker(long maxContainerSize, SCMNodeMetrics metrics) { - this.maxContainerSize = maxContainerSize; - this.metrics = metrics; - LOG.info("PendingContainerTracker initialized with maxContainerSize={}B", maxContainerSize); - } - - private TwoWindowBucket getBucket(DatanodeInfo datanodeInfo) { - return datanodeInfo.getTwoWindowBucket(); - } + private final Function getBucket; /** - * Advances the two-window tumbling bucket for this datanode when the roll interval has elapsed. - * Call on periodic paths (node report) so windows age even when there are no new - * allocations or container reports touching this tracker. + * Two-window bucket for a single DataNode. + * Contains current and previous window sets, plus last roll timestamp. */ - public void rollWindowsIfNeeded(DatanodeInfo datanodeInfo) { - Objects.requireNonNull(datanodeInfo, "datanodeInfo == null"); - getBucket(datanodeInfo).rollIfNeeded(); + public static class TwoWindowBucket { + private Set currentWindow = new HashSet<>(); + private Set previousWindow = new HashSet<>(); + private long lastRollTime = Time.monotonicNow(); + private final long rollIntervalMs; + private final DatanodeID datanodeID; + + TwoWindowBucket(DatanodeID id, long rollIntervalMs) { + this.datanodeID = id; + this.rollIntervalMs = rollIntervalMs; + } + + /** + * Roll one or both windows based on elapsed time. + */ + synchronized void rollIfNeeded() { + long now = Time.monotonicNow(); + long elapsed = now - lastRollTime; + + if (elapsed >= 2 * rollIntervalMs) { + int dropped = getCount(); + previousWindow.clear(); + currentWindow.clear(); + lastRollTime = now; + LOG.debug("Double roll interval elapsed ({}ms): dropped {} pending containers", elapsed, dropped); + } else if (elapsed >= rollIntervalMs) { + previousWindow.clear(); + final Set tmp = previousWindow; + previousWindow = currentWindow; + currentWindow = tmp; + lastRollTime = now; + LOG.debug("Rolled window. Previous window size: {} elapsed: ({}ms), Current window reset to empty", + previousWindow.size(), elapsed); + } + } + + synchronized boolean contains(ContainerID containerID) { + return currentWindow.contains(containerID) || previousWindow.contains(containerID); + } + + /** + * Add container to current window. + */ + synchronized boolean add(ContainerID containerID) { + boolean added = currentWindow.add(containerID); + LOG.debug("Recorded pending container {} on DataNode {}. Added={}, Total pending={}", + containerID, datanodeID, added, getCount()); + return added; + } + + /** + * Remove container from both windows. + */ + synchronized boolean remove(ContainerID containerID) { + boolean removedFromCurrent = currentWindow.remove(containerID); + boolean removedFromPrevious = previousWindow.remove(containerID); + boolean removed = removedFromCurrent || removedFromPrevious; + LOG.debug("Removed pending container {} from DataNode {}. Removed={}, Remaining={}", + containerID, datanodeID, removed, getCount()); + return removed; + } + + /** + * Count of pending containers in both windows. + */ + synchronized int getCount() { + return currentWindow.size() + previousWindow.size(); + } + } + + public PendingContainerTracker(long maxContainerSize, long rollIntervalMs, SCMNodeMetrics metrics, + Function getBucket) { + this.maxContainerSize = maxContainerSize; + this.metrics = metrics; + this.getBucket = getBucket; + LOG.info("PendingContainerTracker initialized with maxContainerSize={}B, rollInterval={}ms", + maxContainerSize, rollIntervalMs); } /** * Whether the datanode can fit another container of {@link #maxContainerSize} after accounting for - * SCM pending allocations for {@code datanodeInfo} (this tracker) and usable space across volumes. - * Pending bytes are {@link #getPendingContainerCount} × {@code maxContainerSize}; + * SCM pending allocations for {@code node} (this tracker) and usable space across volumes on + * {@code datanodeInfo}. Pending bytes are count × {@code maxContainerSize}; * effective allocatable space sums full-container slots per storage report. * - * @param datanodeInfo storage reports and pending-allocation bucket for the datanode + * @param datanodeInfo storage reports for the datanode */ public boolean hasEffectiveAllocatableSpaceForNewContainer(DatanodeInfo datanodeInfo) { Objects.requireNonNull(datanodeInfo, "datanodeInfo == null"); - long pendingAllocationSize = getPendingContainerCount(datanodeInfo) * maxContainerSize; + long pendingAllocationSize = datanodeInfo.getPendingContainerAllocations().getCount() * maxContainerSize; List storageReports = datanodeInfo.getStorageReports(); Objects.requireNonNull(storageReports, "storageReports == null"); if (storageReports.isEmpty()) { @@ -126,19 +196,34 @@ public boolean hasEffectiveAllocatableSpaceForNewContainer(DatanodeInfo datanode return false; } + /** + * Record a pending container allocation for all DataNodes in the pipeline. + * Container is added to the current window. + * + * @param pipeline The pipeline where container is allocated + * @param containerID The container being allocated + */ + public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) { + Objects.requireNonNull(pipeline, "pipeline == null"); + Objects.requireNonNull(containerID, "containerID == null"); + + for (DatanodeDetails node : pipeline.getNodes()) { + recordPendingAllocationForDatanode(getBucket.apply(node.getID()), containerID); + } + } + /** * Record a pending container allocation for a single DataNode. * Container is added to the current window. * - * @param datanodeInfo The DataNode where container is being allocated/replicated * @param containerID The container being allocated/replicated */ public void recordPendingAllocationForDatanode(DatanodeInfo datanodeInfo, ContainerID containerID) { - Objects.requireNonNull(datanodeInfo, "datanodeInfo == null"); Objects.requireNonNull(containerID, "containerID == null"); - - boolean added = getBucket(datanodeInfo).add(containerID); - + if (datanodeInfo == null) { + return; + } + final boolean added = datanodeInfo.getPendingContainerAllocations().add(containerID); if (added && metrics != null) { metrics.incNumPendingContainersAdded(); } @@ -149,44 +234,15 @@ public void recordPendingAllocationForDatanode(DatanodeInfo datanodeInfo, Contai * Removes from both current and previous windows. * Called when container is confirmed. * - * @param datanodeInfo The DataNode * @param containerID The container to remove from pending */ - public void removePendingAllocation(DatanodeInfo datanodeInfo, ContainerID containerID) { - Objects.requireNonNull(datanodeInfo, "datanodeInfo == null"); + public void removePendingAllocation(TwoWindowBucket bucket, ContainerID containerID) { Objects.requireNonNull(containerID, "containerID == null"); - boolean removed = getBucket(datanodeInfo).remove(containerID); + boolean removed = bucket.remove(containerID); if (removed && metrics != null) { metrics.incNumPendingContainersRemoved(); } } - - /** - * Number of pending container allocations for this datanode (union of current and previous - * windows). This call may advance the internal tumbling window if the roll interval has elapsed. - * - * @param datanodeInfo The DataNode - * @return Pending container count - */ - public long getPendingContainerCount(DatanodeInfo datanodeInfo) { - Objects.requireNonNull(datanodeInfo, "datanodeInfo == null"); - return getBucket(datanodeInfo).getCount(); - } - - /** - * Whether container is in the current or previous window for this datanode. - */ - @VisibleForTesting - public boolean containsPendingContainer(DatanodeInfo datanodeInfo, ContainerID containerID) { - Objects.requireNonNull(datanodeInfo, "datanodeInfo == null"); - Objects.requireNonNull(containerID, "containerID == null"); - return getBucket(datanodeInfo).contains(containerID); - } - - @VisibleForTesting - public SCMNodeMetrics getMetrics() { - return metrics; - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index dd0d6a3cffc5..8126e834a473 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -192,7 +192,8 @@ public SCMNodeManager( this.pendingContainerTracker = new PendingContainerTracker( (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES), - this.metrics); + 5 * 60 * 1000, // TODO + this.metrics, this::getNode); this.clusterMap = networkTopology; this.nodeResolver = nodeResolver; this.useHostname = conf.getBoolean( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java index 55f5610cd7e9..a0f5a14725fa 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java @@ -89,6 +89,7 @@ public final class HddsTestUtils { public static final long CONTAINER_USED_BYTES_DEFAULT = 100L; public static final long CONTAINER_NUM_KEYS_DEFAULT = 2L; + public static final long ROLL_INTERVAL_MS_DEFAULT = 5 * 60 * 1000L; //TODO private HddsTestUtils() { } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 05d0ec94652e..74da571301fb 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -127,7 +127,8 @@ public class MockNodeManager implements NodeManager { this.dnsToUuidMap = new ConcurrentHashMap<>(); this.aggregateStat = new SCMNodeStat(); this.clusterMap = new NetworkTopologyImpl(conf); - this.pendingContainerTracker = new PendingContainerTracker(5L * 1024 * 1024 * 1024, null); + this.pendingContainerTracker = new PendingContainerTracker(5L * 1024 * 1024 * 1024, + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT, null, null); } public MockNodeManager(NetworkTopologyImpl clusterMap, @@ -267,7 +268,7 @@ public List getNodes( List healthyNodesWithInfo = new ArrayList<>(); for (DatanodeDetails dd : healthyNodes) { DatanodeInfo di = new DatanodeInfo(dd, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto(), conf); + UpgradeUtils.defaultLayoutVersionProto(), HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); long capacity = nodeMetricMap.get(dd).getCapacity().get(); long used = nodeMetricMap.get(dd).getScmUsed().get(); @@ -346,7 +347,7 @@ public List getAllNodes() { nodeStatus = NodeStatus.inServiceDead(); } DatanodeInfo di = new DatanodeInfo(entry.getKey(), nodeStatus, - UpgradeUtils.defaultLayoutVersionProto(), conf); + UpgradeUtils.defaultLayoutVersionProto(), HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); long capacity = entry.getValue().getCapacity().get(); long used = entry.getValue().getScmUsed().get(); @@ -435,7 +436,7 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dd) { } DatanodeInfo di = new DatanodeInfo(dd, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto(), conf); + UpgradeUtils.defaultLayoutVersionProto(), HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); long capacity = nodeMetricMap.get(dd).getCapacity().get(); long used = nodeMetricMap.get(dd).getScmUsed().get(); long remaining = nodeMetricMap.get(dd).getRemaining().get(); @@ -764,7 +765,7 @@ public RegisteredCommand register(DatanodeDetails datanodeDetails, PipelineReportsProto pipelineReportsProto, LayoutVersionProto layoutInfo) { final DatanodeInfo info = new DatanodeInfo(datanodeDetails, - NodeStatus.inServiceHealthy(), layoutInfo, conf); + NodeStatus.inServiceHealthy(), layoutInfo, HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); try { node2ContainerMap.addNode(info); addEntryTodnsToUuidMap(datanodeDetails.getIpAddress(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java index d9ec257a4f06..f2da8fd2878b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -36,6 +35,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.net.NetworkTopology; @@ -67,7 +67,7 @@ public class SimpleMockNodeManager implements NodeManager { public void register(DatanodeDetails dd, NodeStatus status) { dd.setPersistedOpState(status.getOperationalState()); dd.setPersistedOpStateExpiryEpochSec(status.getOpStateExpiryEpochSeconds()); - nodeMap.put(dd.getID(), new DatanodeInfo(dd, status, null, new OzoneConfiguration())); + nodeMap.put(dd.getID(), new DatanodeInfo(dd, status, null, HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT)); } public void setNodeStatus(DatanodeDetails dd, NodeStatus status) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java index 32d43cdff5a4..ed4e96b8de56 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java @@ -99,8 +99,7 @@ public void testRackAwarePolicy() throws IOException { .createDatanodeDetails(hostname + i, rack + (i / 5)); DatanodeInfo datanodeInfo = new DatanodeInfo( datanodeDetails, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto(), - conf); + UpgradeUtils.defaultLayoutVersionProto(), HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java index 0229673c2248..1fb3f53504d3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java @@ -65,7 +65,7 @@ public void chooseDatanodes() throws SCMException { MockDatanodeDetails.randomDatanodeDetails(), NodeStatus.inServiceHealthy(), UpgradeUtils.defaultLayoutVersionProto(), - conf); + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java index 5e2f8992ca41..dd068f55cdfe 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java @@ -113,7 +113,7 @@ private void setup(int datanodeCount) { DatanodeInfo datanodeInfo = new DatanodeInfo( datanodeDetails, NodeStatus.inServiceHealthy(), UpgradeUtils.defaultLayoutVersionProto(), - conf); + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), @@ -457,7 +457,7 @@ public void testDatanodeWithDefaultNetworkLocation(int datanodeCount) DatanodeInfo dnInfo = new DatanodeInfo( dn, NodeStatus.inServiceHealthy(), UpgradeUtils.defaultLayoutVersionProto(), - conf); + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( dnInfo.getID(), "/data1-" + dnInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java index db491c501e53..e015b93c1e31 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java @@ -181,7 +181,7 @@ private void setupDatanode(DatanodeDetails datanodeDetails) { DatanodeInfo datanodeInfo = new DatanodeInfo( datanodeDetails, NodeStatus.inServiceHealthy(), UpgradeUtils.defaultLayoutVersionProto(), - conf); + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), @@ -492,7 +492,7 @@ public void testDatanodeWithDefaultNetworkLocation(int datanodeCount) DatanodeInfo dnInfo = new DatanodeInfo( dn, NodeStatus.inServiceHealthy(), UpgradeUtils.defaultLayoutVersionProto(), - conf); + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( dnInfo.getID(), "/data1-" + dnInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java index eb3f7fc8f494..47602a385fd6 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java @@ -62,7 +62,7 @@ public void chooseDatanodes() throws SCMException { MockDatanodeDetails.randomDatanodeDetails(), NodeStatus.inServiceHealthy(), UpgradeUtils.defaultLayoutVersionProto(), - conf); + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), @@ -169,7 +169,7 @@ public void testIsValidNode() throws SCMException { MockDatanodeDetails.randomDatanodeDetails(), NodeStatus.inServiceHealthy(), UpgradeUtils.defaultLayoutVersionProto(), - conf); + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java index a04f0dda0a71..6cd178551b58 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java @@ -17,7 +17,6 @@ package org.apache.hadoop.hdds.scm.node; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -25,7 +24,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.HddsTestUtils; @@ -45,7 +43,6 @@ public class TestPendingContainerTracker { private List datanodes; private List containers; - private final OzoneConfiguration conf = new OzoneConfiguration(); private PendingContainerTracker tracker; private DatanodeInfo dn1; private DatanodeInfo dn2; @@ -56,13 +53,13 @@ public class TestPendingContainerTracker { @BeforeEach public void setUp() throws IOException { - tracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, null); + tracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT, null, null); datanodes = new ArrayList<>(NUM_DATANODES); for (int i = 0; i < NUM_DATANODES; i++) { datanodes.add(new DatanodeInfo( MockDatanodeDetails.randomLocalDatanodeDetails(), NodeStatus.inServiceHealthy(), null, - conf)); + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT)); } containers = new ArrayList<>(NUM_CONTAINERS); @@ -86,14 +83,14 @@ public void testRecordPendingAllocation() { // Each of the first 100 DNs should have 1 pending container for (int i = 0; i < 100; i++) { - assertEquals(1, tracker.getPendingContainerCount(datanodes.get(i))); + assertEquals(1, datanodes.get(i).getPendingContainerAllocations().getCount()); assertEquals(MAX_CONTAINER_SIZE, - tracker.getPendingContainerCount(datanodes.get(i)) * MAX_CONTAINER_SIZE); + datanodes.get(i).getPendingContainerAllocations().getCount() * MAX_CONTAINER_SIZE); } // DNs beyond index 100 should have 0 pending - assertEquals(0, tracker.getPendingContainerCount(datanodes.get(500))); - assertEquals(0, tracker.getPendingContainerCount(datanodes.get(999))); + assertEquals(0, datanodes.get(500).getPendingContainerAllocations().getCount()); + assertEquals(0, datanodes.get(999).getPendingContainerAllocations().getCount()); } @Test @@ -105,17 +102,17 @@ public void testRemovePendingAllocation() { // Remove from first 50 DNs for (int i = 0; i < 50; i++) { - tracker.removePendingAllocation(datanodes.get(i), containers.get(i)); + tracker.removePendingAllocation(datanodes.get(i).getPendingContainerAllocations(), containers.get(i)); } // First 50 DNs should have 0 pending for (int i = 0; i < 50; i++) { - assertEquals(0, tracker.getPendingContainerCount(datanodes.get(i))); + assertEquals(0, datanodes.get(i).getPendingContainerAllocations().getCount()); } // DNs 50-99 should still have 1 pending for (int i = 50; i < 100; i++) { - assertEquals(1, tracker.getPendingContainerCount(datanodes.get(i))); + assertEquals(1, datanodes.get(i).getPendingContainerAllocations().getCount()); } } @@ -128,29 +125,27 @@ public void testRemovePendingAllocation() { @Timeout(30) public void testTwoWindowRollAgesOutContainerAfterTwoIntervals() throws InterruptedException { long rollMs = 200L; - OzoneConfiguration shortConf = new OzoneConfiguration(); - shortConf.set(OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL, rollMs + "ms"); DatanodeInfo shortDn = new DatanodeInfo( MockDatanodeDetails.randomLocalDatanodeDetails(), NodeStatus.inServiceHealthy(), null, - shortConf); + rollMs); - PendingContainerTracker shortRollTracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, null); + PendingContainerTracker shortRollTracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, rollMs, null, null); shortRollTracker.recordPendingAllocationForDatanode(shortDn, container1); - assertEquals(1, shortRollTracker.getPendingContainerCount(shortDn)); - assertTrue(shortRollTracker.containsPendingContainer(shortDn, container1)); + assertEquals(1, shortDn.getPendingContainerAllocations().getCount()); + assertTrue(shortDn.getPendingContainerAllocations().contains(container1)); // First roll: C1 moves from currentWindow to previousWindow; union still includes C1 Thread.sleep(rollMs + 80); - shortRollTracker.rollWindowsIfNeeded(shortDn); - assertEquals(1, shortRollTracker.getPendingContainerCount(shortDn)); - assertTrue(shortRollTracker.containsPendingContainer(shortDn, container1)); + shortDn.getPendingContainerAllocations(); // triggers rollIfNeeded + assertEquals(1, shortDn.getPendingContainerAllocations().getCount()); + assertTrue(shortDn.getPendingContainerAllocations().contains(container1)); // Second roll: prior previousWindow (holding C1) is dropped; C1 is no longer pending Thread.sleep(rollMs + 80); - shortRollTracker.rollWindowsIfNeeded(shortDn); - assertEquals(0, shortRollTracker.getPendingContainerCount(shortDn)); - assertEquals(0L, shortRollTracker.getPendingContainerCount(shortDn) * MAX_CONTAINER_SIZE); + shortDn.getPendingContainerAllocations(); // triggers rollIfNeeded + assertEquals(0, shortDn.getPendingContainerAllocations().getCount()); + assertEquals(0L, shortDn.getPendingContainerAllocations().getCount() * MAX_CONTAINER_SIZE); } @Test @@ -158,18 +153,18 @@ public void testRemoveNonExistentContainer() { datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, container1)); // Remove a container that was never added - should not throw exception - tracker.removePendingAllocation(dn1, container2); + tracker.removePendingAllocation(dn1.getPendingContainerAllocations(), container2); // DN1 should still have container1 - assertEquals(1, tracker.getPendingContainerCount(dn1)); + assertEquals(1, dn1.getPendingContainerAllocations().getCount()); } @Test public void testUnknownDatanodeHasZeroPendingCount() { DatanodeInfo unknownDN = new DatanodeInfo( MockDatanodeDetails.randomDatanodeDetails(), NodeStatus.inServiceHealthy(), null, - conf); - assertEquals(0, tracker.getPendingContainerCount(unknownDN)); + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); + assertEquals(0, unknownDN.getPendingContainerAllocations().getCount()); } @Test @@ -188,7 +183,7 @@ public void testConcurrentModification() throws InterruptedException { datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, cid)); if (j % 2 == 0) { - tracker.removePendingAllocation(dn1, cid); + tracker.removePendingAllocation(dn1.getPendingContainerAllocations(), cid); } } }); @@ -209,17 +204,17 @@ public void testConcurrentModification() throws InterruptedException { public void testBucketsRetainedWhenEmpty() { datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, container1)); - assertEquals(1, tracker.getPendingContainerCount(dn1)); + assertEquals(1, dn1.getPendingContainerAllocations().getCount()); // Remove the only pending container from DN1 - tracker.removePendingAllocation(dn1, container1); + tracker.removePendingAllocation(dn1.getPendingContainerAllocations(), container1); - assertEquals(0, tracker.getPendingContainerCount(dn1)); - assertEquals(1, tracker.getPendingContainerCount(dn2)); + assertEquals(0, dn1.getPendingContainerAllocations().getCount()); + assertEquals(1, dn2.getPendingContainerAllocations().getCount()); // Empty bucket for DN1 is still usable for new allocations tracker.recordPendingAllocationForDatanode(dn1, container2); - assertEquals(1, tracker.getPendingContainerCount(dn1)); + assertEquals(1, dn1.getPendingContainerAllocations().getCount()); } @Test @@ -231,15 +226,15 @@ public void testRemoveFromBothWindows() { datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, container1)); datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, container2)); - assertEquals(2, tracker.getPendingContainerCount(dn1)); + assertEquals(2, dn1.getPendingContainerAllocations().getCount()); // Remove container1 - should work regardless of which window it's in - tracker.removePendingAllocation(dn1, container1); + tracker.removePendingAllocation(dn1.getPendingContainerAllocations(), container1); - assertEquals(1, tracker.getPendingContainerCount(dn1)); + assertEquals(1, dn1.getPendingContainerAllocations().getCount()); - assertFalse(tracker.containsPendingContainer(dn1, container1)); - assertTrue(tracker.containsPendingContainer(dn1, container2)); + assertFalse(dn1.getPendingContainerAllocations().contains(container1)); + assertTrue(dn1.getPendingContainerAllocations().contains(container2)); } @Test @@ -250,22 +245,22 @@ public void testManyContainersOnSingleDatanode() { tracker.recordPendingAllocationForDatanode(dn, containers.get(i)); } - assertEquals(1000, tracker.getPendingContainerCount(dn)); - assertEquals(1000 * MAX_CONTAINER_SIZE, tracker.getPendingContainerCount(dn) * MAX_CONTAINER_SIZE); + assertEquals(1000, dn.getPendingContainerAllocations().getCount()); + assertEquals(1000 * MAX_CONTAINER_SIZE, dn.getPendingContainerAllocations().getCount() * MAX_CONTAINER_SIZE); // Verify specific containers are present - assertTrue(tracker.containsPendingContainer(dn, containers.get(0))); - assertTrue(tracker.containsPendingContainer(dn, containers.get(500))); - assertTrue(tracker.containsPendingContainer(dn, containers.get(999))); + assertTrue(dn.getPendingContainerAllocations().contains(containers.get(0))); + assertTrue(dn.getPendingContainerAllocations().contains(containers.get(500))); + assertTrue(dn.getPendingContainerAllocations().contains(containers.get(999))); // Remove half of them for (int i = 0; i < 500; i++) { - tracker.removePendingAllocation(dn, containers.get(i)); + tracker.removePendingAllocation(dn.getPendingContainerAllocations(), containers.get(i)); } - assertEquals(500, tracker.getPendingContainerCount(dn)); - assertFalse(tracker.containsPendingContainer(dn, containers.get(0))); - assertTrue(tracker.containsPendingContainer(dn, containers.get(999))); + assertEquals(500, dn.getPendingContainerAllocations().getCount()); + assertFalse(dn.getPendingContainerAllocations().contains(containers.get(0))); + assertTrue(dn.getPendingContainerAllocations().contains(containers.get(999))); } @Test @@ -281,9 +276,9 @@ public void testAllDatanodesWithMultipleContainers() { // Each DN should have 10 pending containers for (int i = 0; i < NUM_DATANODES; i++) { - assertEquals(10, tracker.getPendingContainerCount(datanodes.get(i))); + assertEquals(10, datanodes.get(i).getPendingContainerAllocations().getCount()); assertEquals(10 * MAX_CONTAINER_SIZE, - tracker.getPendingContainerCount(datanodes.get(i)) * MAX_CONTAINER_SIZE); + datanodes.get(i).getPendingContainerAllocations().getCount() * MAX_CONTAINER_SIZE); } // Remove all containers from every 10th DN @@ -291,19 +286,19 @@ public void testAllDatanodesWithMultipleContainers() { DatanodeInfo dn = datanodes.get(dnIdx); for (int cIdx = 0; cIdx < 10; cIdx++) { int containerIdx = dnIdx * 10 + cIdx; - tracker.removePendingAllocation(dn, containers.get(containerIdx)); + tracker.removePendingAllocation(dn.getPendingContainerAllocations(), containers.get(containerIdx)); } } // Every 10th DN should have 0 pending for (int i = 0; i < NUM_DATANODES; i += 10) { - assertEquals(0, tracker.getPendingContainerCount(datanodes.get(i))); + assertEquals(0, datanodes.get(i).getPendingContainerAllocations().getCount()); } // Other DNs should still have 10 pending - assertEquals(10, tracker.getPendingContainerCount(datanodes.get(1))); - assertEquals(10, tracker.getPendingContainerCount(datanodes.get(15))); - assertEquals(10, tracker.getPendingContainerCount(datanodes.get(999))); + assertEquals(10, datanodes.get(1).getPendingContainerAllocations().getCount()); + assertEquals(10, datanodes.get(15).getPendingContainerAllocations().getCount()); + assertEquals(10, datanodes.get(999).getPendingContainerAllocations().getCount()); } @Test @@ -318,7 +313,7 @@ public void testIdempotentRecording() { } // Should still only have 100 containers - assertEquals(100, tracker.getPendingContainerCount(dn)); + assertEquals(100, dn.getPendingContainerAllocations().getCount()); } @Test diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java index 8a689f56843f..e9f13ee82c8c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java @@ -24,12 +24,12 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.NodeStatus; @@ -53,7 +53,7 @@ void addNode(NodeStatus status) throws NodeAlreadyExistsException { } void addNode(DatanodeDetails datanode, NodeStatus status) throws NodeAlreadyExistsException { - map.addNode(new DatanodeInfo(datanode, status, null, new OzoneConfiguration())); + map.addNode(new DatanodeInfo(datanode, status, null, HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT)); } @BeforeEach diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java index c2236e008621..84afb74f0a5a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java @@ -104,7 +104,7 @@ private void setupRacks(int datanodeCount, int nodesPerRack, DatanodeInfo datanodeInfo = new DatanodeInfo( datanodeDetails, NodeStatus.inServiceHealthy(), UpgradeUtils.defaultLayoutVersionProto(), - conf); + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageContainerDatanodeProtocolProtos.StorageReportProto storage1 = HddsTestUtils.createStorageReport( diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java index 03cd04f54883..d62aae159139 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java @@ -36,7 +36,6 @@ import java.util.UUID; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.fs.SpaceUsageSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; @@ -226,7 +225,7 @@ private List mockStorageDistributionData(int numNodes) throws Exception .build(); pendingDeletionMetrics.add(new DatanodePendingDeletionMetrics(hostName, uuid.toString(), PENDING_DELETION_SIZE)); - dataNodes.add(new DatanodeInfo(datanode, NodeStatus.inServiceHealthy(), null, new OzoneConfiguration())); + dataNodes.add(new DatanodeInfo(datanode, NodeStatus.inServiceHealthy(), null, 5 * 60 * 1000)); when(nodeManager.getNodeStat(datanode)) .thenReturn(new SCMNodeMetric(OZONE_CAPACITY, OZONE_USED, OZONE_REMAINING, COMMITTED, MIN_FREE_SPACE, RESERVED)); From abbef1b07f753c4333d84d53021f9b17f81f6701 Mon Sep 17 00:00:00 2001 From: ashishk Date: Mon, 27 Apr 2026 13:25:28 +0530 Subject: [PATCH 5/5] Remove getBucket and related code --- .../scm/node/PendingContainerTracker.java | 25 +------------------ .../hadoop/hdds/scm/node/SCMNodeManager.java | 2 +- .../hdds/scm/container/MockNodeManager.java | 2 +- .../scm/node/TestPendingContainerTracker.java | 4 +-- 4 files changed, 5 insertions(+), 28 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java index 86038a1ca994..fc7bbc238192 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java @@ -21,12 +21,9 @@ import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.function.Function; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.container.common.volume.VolumeUsage; import org.apache.hadoop.util.Time; import org.slf4j.Logger; @@ -79,8 +76,6 @@ public class PendingContainerTracker { */ private final SCMNodeMetrics metrics; - private final Function getBucket; - /** * Two-window bucket for a single DataNode. * Contains current and previous window sets, plus last roll timestamp. @@ -155,11 +150,9 @@ synchronized int getCount() { } } - public PendingContainerTracker(long maxContainerSize, long rollIntervalMs, SCMNodeMetrics metrics, - Function getBucket) { + public PendingContainerTracker(long maxContainerSize, long rollIntervalMs, SCMNodeMetrics metrics) { this.maxContainerSize = maxContainerSize; this.metrics = metrics; - this.getBucket = getBucket; LOG.info("PendingContainerTracker initialized with maxContainerSize={}B, rollInterval={}ms", maxContainerSize, rollIntervalMs); } @@ -196,22 +189,6 @@ public boolean hasEffectiveAllocatableSpaceForNewContainer(DatanodeInfo datanode return false; } - /** - * Record a pending container allocation for all DataNodes in the pipeline. - * Container is added to the current window. - * - * @param pipeline The pipeline where container is allocated - * @param containerID The container being allocated - */ - public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) { - Objects.requireNonNull(pipeline, "pipeline == null"); - Objects.requireNonNull(containerID, "containerID == null"); - - for (DatanodeDetails node : pipeline.getNodes()) { - recordPendingAllocationForDatanode(getBucket.apply(node.getID()), containerID); - } - } - /** * Record a pending container allocation for a single DataNode. * Container is added to the current window. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 8126e834a473..ad392a247d53 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -193,7 +193,7 @@ public SCMNodeManager( (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES), 5 * 60 * 1000, // TODO - this.metrics, this::getNode); + this.metrics); this.clusterMap = networkTopology; this.nodeResolver = nodeResolver; this.useHostname = conf.getBoolean( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 74da571301fb..57d38ece3dd6 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -128,7 +128,7 @@ public class MockNodeManager implements NodeManager { this.aggregateStat = new SCMNodeStat(); this.clusterMap = new NetworkTopologyImpl(conf); this.pendingContainerTracker = new PendingContainerTracker(5L * 1024 * 1024 * 1024, - HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT, null, null); + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT, null); } public MockNodeManager(NetworkTopologyImpl clusterMap, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java index 6cd178551b58..c747dc7d60ae 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java @@ -53,7 +53,7 @@ public class TestPendingContainerTracker { @BeforeEach public void setUp() throws IOException { - tracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT, null, null); + tracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT, null); datanodes = new ArrayList<>(NUM_DATANODES); for (int i = 0; i < NUM_DATANODES; i++) { @@ -129,7 +129,7 @@ public void testTwoWindowRollAgesOutContainerAfterTwoIntervals() throws Interrup MockDatanodeDetails.randomLocalDatanodeDetails(), NodeStatus.inServiceHealthy(), null, rollMs); - PendingContainerTracker shortRollTracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, rollMs, null, null); + PendingContainerTracker shortRollTracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, rollMs, null); shortRollTracker.recordPendingAllocationForDatanode(shortDn, container1); assertEquals(1, shortDn.getPendingContainerAllocations().getCount());