diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index 7780c0839462..15a566f6421b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -31,7 +31,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -39,7 +38,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics; import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; @@ -79,8 +77,6 @@ public class ContainerManagerImpl implements ContainerManager { @SuppressWarnings("java:S2245") // no need for secure random private final Random random = new Random(); - private final long maxContainerSize; - /** * */ @@ -106,9 +102,6 @@ public ContainerManagerImpl( .setContainerReplicaPendingOps(containerReplicaPendingOps) .build(); - maxContainerSize = (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); - this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create(); } @@ -245,9 +238,8 @@ private ContainerInfo createContainer(Pipeline pipeline, String owner) private ContainerInfo allocateContainer(final Pipeline pipeline, final String owner) throws IOException { - if (!pipelineManager.hasEnoughSpace(pipeline, maxContainerSize)) { - LOG.debug("Cannot allocate a new container because pipeline {} does not have the required space {}.", - pipeline, maxContainerSize); + if (!pipelineManager.hasEnoughSpace(pipeline)) { + LOG.debug("Cannot allocate a new container because pipeline {} does not have enough space.", pipeline); return null; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java index 8f8753a4991b..cb4cb1db6967 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker.TwoWindowBucket; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,11 @@ public class DatanodeInfo extends DatanodeDetails { private long lastStatsUpdatedTime; private int failedVolumeCount; + /** + * Two-window tumbling bucket for tracking pending container allocations on this datanode. + */ + private final TwoWindowBucket pendingContainerAllocations; + private List storageReports; private List metadataStorageReports; private LayoutVersionProto lastKnownLayoutVersion; @@ -64,7 +70,7 @@ public class DatanodeInfo extends DatanodeDetails { * @param layoutInfo Details about the LayoutVersionProto */ public DatanodeInfo(DatanodeDetails datanodeDetails, NodeStatus nodeStatus, - LayoutVersionProto layoutInfo) { + LayoutVersionProto layoutInfo, long containerRollIntervalMs) { super(datanodeDetails); this.lock = new ReentrantReadWriteLock(); this.lastHeartbeatTime = Time.monotonicNow(); @@ -75,6 +81,7 @@ public DatanodeInfo(DatanodeDetails datanodeDetails, NodeStatus nodeStatus, this.nodeStatus = nodeStatus; this.metadataStorageReports = Collections.emptyList(); this.commandCounts = new HashMap<>(); + this.pendingContainerAllocations = new TwoWindowBucket(this.getID(), containerRollIntervalMs); } /** @@ -353,6 +360,14 @@ public int getCommandCount(SCMCommandProto.Type cmd) { } } + /** + * Returns the {@link TwoWindowBucket} for this datanode. + */ + public TwoWindowBucket getPendingContainerAllocations() { + pendingContainerAllocations.rollIfNeeded(); + return pendingContainerAllocations; + } + @Override public int hashCode() { return super.hashCode(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index e9a019945c1f..4fb7f84394f3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -184,6 +184,19 @@ default int getAllNodeCount() { @Nullable DatanodeInfo getDatanodeInfo(DatanodeDetails dn); + /** + * True if the node can accept another container of the given size. + */ + boolean hasSpaceForNewContainerAllocation(DatanodeID datanodeID); + + /** + * Records a pending container allocation for a single DataNode identified by its ID. + * + * @param datanodeID the ID of the DataNode receiving the allocation + * @param containerID the container being allocated + */ + void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID); + /** * Return the node stat of the specified datanode. * @param datanodeDetails DatanodeDetails. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index fb065a0086b9..9e4b96999df0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -124,6 +124,8 @@ public class NodeStateManager implements Runnable, Closeable { */ private final long deadNodeIntervalMs; + private final long containerRollIntervalMs = 5 * 60 * 1000; //TODO + /** * The future is used to pause/unpause the scheduled checks. */ @@ -310,7 +312,7 @@ public void addNode(DatanodeDetails datanodeDetails, private DatanodeInfo newDatanodeInfo(DatanodeDetails datanode, LayoutVersionProto layout) { final NodeStatus status = newNodeStatus(datanode, layout); - return new DatanodeInfo(datanode, status, layout); + return new DatanodeInfo(datanode, status, layout, containerRollIntervalMs); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java index 0902529f1792..fc7bbc238192 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java @@ -17,17 +17,13 @@ package org.apache.hadoop.hdds.scm.node; -import com.google.common.annotations.VisibleForTesting; import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.ozone.container.common.volume.VolumeUsage; import org.apache.hadoop.util.Time; import org.slf4j.Logger; @@ -70,8 +66,6 @@ public class PendingContainerTracker { private static final Logger LOG = LoggerFactory.getLogger(PendingContainerTracker.class); - private final DatanodeBuckets datanodeBuckets; - /** * Maximum container size in bytes. */ @@ -86,13 +80,15 @@ public class PendingContainerTracker { * Two-window bucket for a single DataNode. * Contains current and previous window sets, plus last roll timestamp. */ - private static class TwoWindowBucket { + public static class TwoWindowBucket { private Set currentWindow = new HashSet<>(); private Set previousWindow = new HashSet<>(); private long lastRollTime = Time.monotonicNow(); private final long rollIntervalMs; + private final DatanodeID datanodeID; - TwoWindowBucket(long rollIntervalMs) { + TwoWindowBucket(DatanodeID id, long rollIntervalMs) { + this.datanodeID = id; this.rollIntervalMs = rollIntervalMs; } @@ -127,22 +123,22 @@ synchronized boolean contains(ContainerID containerID) { /** * Add container to current window. */ - synchronized boolean add(ContainerID containerID, DatanodeID dnID) { + synchronized boolean add(ContainerID containerID) { boolean added = currentWindow.add(containerID); LOG.debug("Recorded pending container {} on DataNode {}. Added={}, Total pending={}", - containerID, dnID, added, getCount()); + containerID, datanodeID, added, getCount()); return added; } /** * Remove container from both windows. */ - synchronized boolean remove(ContainerID containerID, DatanodeID dnID) { + synchronized boolean remove(ContainerID containerID) { boolean removedFromCurrent = currentWindow.remove(containerID); boolean removedFromPrevious = previousWindow.remove(containerID); boolean removed = removedFromCurrent || removedFromPrevious; LOG.debug("Removed pending container {} from DataNode {}. Removed={}, Remaining={}", - containerID, dnID, removed, getCount()); + containerID, datanodeID, removed, getCount()); return removed; } @@ -154,63 +150,25 @@ synchronized int getCount() { } } - /** - * Per-datanode two-window buckets. - */ - private static class DatanodeBuckets { - private final ConcurrentHashMap map = new ConcurrentHashMap<>(); - private final long rollIntervalMs; - - DatanodeBuckets(long rollIntervalMs) { - this.rollIntervalMs = rollIntervalMs; - } - - TwoWindowBucket get(DatanodeID id) { - final TwoWindowBucket bucket = map.compute(id, (k, b) -> b != null ? b : new TwoWindowBucket(rollIntervalMs)); - bucket.rollIfNeeded(); - return bucket; - } - - TwoWindowBucket get(DatanodeDetails dn) { - Objects.requireNonNull(dn, "dn == null"); - return get(dn.getID()); - } - } - - public PendingContainerTracker(long maxContainerSize, long rollIntervalMs, - SCMNodeMetrics metrics) { - this.datanodeBuckets = new DatanodeBuckets(rollIntervalMs); + public PendingContainerTracker(long maxContainerSize, long rollIntervalMs, SCMNodeMetrics metrics) { this.maxContainerSize = maxContainerSize; this.metrics = metrics; LOG.info("PendingContainerTracker initialized with maxContainerSize={}B, rollInterval={}ms", maxContainerSize, rollIntervalMs); } - /** - * Advances the two-window tumbling bucket for this datanode when the roll interval has elapsed. - * Call on periodic paths (node report) so windows age even when there are no new - * allocations or container reports touching this tracker. - */ - public void rollWindowsIfNeeded(DatanodeDetails node) { - Objects.requireNonNull(node, "node == null"); - datanodeBuckets.get(node.getID()); - } - /** * Whether the datanode can fit another container of {@link #maxContainerSize} after accounting for * SCM pending allocations for {@code node} (this tracker) and usable space across volumes on - * {@code datanodeInfo}. Pending bytes are {@link #getPendingContainerCount} × {@code maxContainerSize}; + * {@code datanodeInfo}. Pending bytes are count × {@code maxContainerSize}; * effective allocatable space sums full-container slots per storage report. * - * @param node identity used to look up pending allocations (same DN as {@code datanodeInfo}) * @param datanodeInfo storage reports for the datanode */ - public boolean hasEffectiveAllocatableSpaceForNewContainer( - DatanodeDetails node, DatanodeInfo datanodeInfo) { - Objects.requireNonNull(node, "node == null"); + public boolean hasEffectiveAllocatableSpaceForNewContainer(DatanodeInfo datanodeInfo) { Objects.requireNonNull(datanodeInfo, "datanodeInfo == null"); - long pendingAllocationSize = getPendingContainerCount(node) * maxContainerSize; + long pendingAllocationSize = datanodeInfo.getPendingContainerAllocations().getCount() * maxContainerSize; List storageReports = datanodeInfo.getStorageReports(); Objects.requireNonNull(storageReports, "storageReports == null"); if (storageReports.isEmpty()) { @@ -231,93 +189,37 @@ public boolean hasEffectiveAllocatableSpaceForNewContainer( return false; } - /** - * Record a pending container allocation for all DataNodes in the pipeline. - * Container is added to the current window. - * - * @param pipeline The pipeline where container is allocated - * @param containerID The container being allocated - */ - public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) { - Objects.requireNonNull(pipeline, "pipeline == null"); - Objects.requireNonNull(containerID, "containerID == null"); - - for (DatanodeDetails node : pipeline.getNodes()) { - recordPendingAllocationForDatanode(node, containerID); - } - } - /** * Record a pending container allocation for a single DataNode. * Container is added to the current window. * - * @param node The DataNode where container is being allocated/replicated * @param containerID The container being allocated/replicated */ - public void recordPendingAllocationForDatanode(DatanodeDetails node, ContainerID containerID) { - Objects.requireNonNull(node, "node == null"); + public void recordPendingAllocationForDatanode(DatanodeInfo datanodeInfo, ContainerID containerID) { Objects.requireNonNull(containerID, "containerID == null"); - - DatanodeID dnID = node.getID(); - boolean added = addContainerToBucket(containerID, dnID); - + if (datanodeInfo == null) { + return; + } + final boolean added = datanodeInfo.getPendingContainerAllocations().add(containerID); if (added && metrics != null) { metrics.incNumPendingContainersAdded(); } } - private boolean addContainerToBucket(ContainerID containerID, DatanodeID dnID) { - return datanodeBuckets.get(dnID).add(containerID, dnID); - } - /** * Remove a pending container allocation from a specific DataNode. * Removes from both current and previous windows. * Called when container is confirmed. * - * @param node The DataNode * @param containerID The container to remove from pending */ - public void removePendingAllocation(DatanodeDetails node, ContainerID containerID) { - Objects.requireNonNull(node, "node == null"); + public void removePendingAllocation(TwoWindowBucket bucket, ContainerID containerID) { Objects.requireNonNull(containerID, "containerID == null"); - DatanodeID dnID = node.getID(); - boolean removed = removeContainerFromBucket(containerID, dnID); + boolean removed = bucket.remove(containerID); if (removed && metrics != null) { metrics.incNumPendingContainersRemoved(); } } - - private boolean removeContainerFromBucket(ContainerID containerID, DatanodeID dnID) { - return datanodeBuckets.get(dnID).remove(containerID, dnID); - } - - /** - * Number of pending container allocations for this datanode (union of current and previous - * windows). This call may advance the internal tumbling window if the roll interval has elapsed. - * - * @param node The DataNode - * @return Pending container count - */ - public long getPendingContainerCount(DatanodeDetails node) { - Objects.requireNonNull(node, "node == null"); - return datanodeBuckets.get(node).getCount(); - } - - /** - * Whether container is in the current or previous window for this datanode. - */ - @VisibleForTesting - public boolean containsPendingContainer(DatanodeDetails node, ContainerID containerID) { - Objects.requireNonNull(node, "node == null"); - Objects.requireNonNull(containerID, "containerID == null"); - return datanodeBuckets.get(node).contains(containerID); - } - - @VisibleForTesting - public SCMNodeMetrics getMetrics() { - return metrics; - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 3289e7b312a8..ad392a247d53 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -126,6 +126,7 @@ public class SCMNodeManager implements NodeManager { private final VersionInfo version; private final CommandQueue commandQueue; private final SCMNodeMetrics metrics; + private final PendingContainerTracker pendingContainerTracker; // Node manager MXBean private ObjectName nmInfoBean; private final SCMStorageConfig scmStorageConfig; @@ -188,6 +189,11 @@ public SCMNodeManager( LOG.info("Entering startup safe mode."); registerMXBean(); this.metrics = SCMNodeMetrics.create(this); + this.pendingContainerTracker = new PendingContainerTracker( + (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES), + 5 * 60 * 1000, // TODO + this.metrics); this.clusterMap = networkTopology; this.nodeResolver = nodeResolver; this.useHostname = conf.getBoolean( @@ -1065,6 +1071,30 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) { } } + /** + * Effective space check aligned with container allocation: per-disk slot model minus + * SCM pending allocations. + */ + @Override + public boolean hasSpaceForNewContainerAllocation(DatanodeID datanodeID) { + DatanodeInfo datanodeInfo = getNode(datanodeID); + if (datanodeInfo == null) { + LOG.warn("DatanodeInfo not found for node {}", datanodeID); + return false; + } + return pendingContainerTracker.hasEffectiveAllocatableSpaceForNewContainer(datanodeInfo); + } + + @Override + public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) { + DatanodeInfo datanodeInfo = getNode(datanodeID); + if (datanodeInfo == null) { + LOG.warn("DatanodeInfo not found for node {}", datanodeID); + return; + } + pendingContainerTracker.recordPendingAllocationForDatanode(datanodeInfo, containerID); + } + /** * Return the node stat of the specified datanode. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 6a448d6c88df..739b0c058ec8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -94,6 +94,17 @@ int getPipelineCount( void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) throws PipelineNotFoundException, InvalidPipelineStateException; + /** + * Records a pending container allocation for every DataNode in the pipeline. + * The allocation is tracked in each node's two-window tumbling bucket so that + * {@code hasEnoughSpace} can account for in-flight allocations before a container + * report arrives from the DataNode. + * + * @param pipeline the pipeline whose nodes will receive the pending record + * @param containerID the container being allocated + */ + void recordPendingAllocation(Pipeline pipeline, ContainerID containerID); + /** * Add container to pipeline during SCM Start. * @@ -213,13 +224,13 @@ void reinitialize(Table pipelineStore) void releaseWriteLock(); /** - * Checks whether all Datanodes in the specified pipeline have greater than the specified space, containerSize. + * Checks whether all Datanodes in the specified pipeline have enough space to store a new container. + * * @param pipeline pipeline to check - * @param containerSize the required amount of space - * @return false if all the volumes on any Datanode in the pipeline have space less than equal to the specified - * containerSize, otherwise true + * @return false if any Datanode in the pipeline has no volume with space greater than the configured + * container size, otherwise true */ - boolean hasEnoughSpace(Pipeline pipeline, long containerSize); + boolean hasEnoughSpace(Pipeline pipeline); int openContainerLimit(List datanodes); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index 9c529e22e7e1..c4375e3f20ea 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; @@ -54,7 +53,6 @@ import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; -import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager; import org.apache.hadoop.hdds.server.events.EventPublisher; @@ -636,19 +634,22 @@ private boolean isOpenWithUnregisteredNodes(Pipeline pipeline) { } @Override - public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) { + public boolean hasEnoughSpace(Pipeline pipeline) { for (DatanodeDetails node : pipeline.getNodes()) { - if (!(node instanceof DatanodeInfo)) { - node = nodeManager.getDatanodeInfo(node); - } - if (!SCMCommonPlacementPolicy.hasEnoughSpace(node, 0, containerSize)) { + if (!nodeManager.hasSpaceForNewContainerAllocation(node.getID())) { return false; } } - return true; } + @Override + public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) { + for (DatanodeDetails dn : pipeline.getNodes()) { + nodeManager.recordPendingAllocationForDatanode(dn.getID(), containerID); + } + } + /** * Schedules a fixed interval job to create pipelines. */ diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java index 55f5610cd7e9..a0f5a14725fa 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java @@ -89,6 +89,7 @@ public final class HddsTestUtils { public static final long CONTAINER_USED_BYTES_DEFAULT = 100L; public static final long CONTAINER_NUM_KEYS_DEFAULT = 2L; + public static final long ROLL_INTERVAL_MS_DEFAULT = 5 * 60 * 1000L; //TODO private HddsTestUtils() { } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 013f14b16504..57d38ece3dd6 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; @@ -113,6 +114,8 @@ public class MockNodeManager implements NodeManager { private ConcurrentMap> dnsToUuidMap; private int numHealthyDisksPerDatanode; private int numPipelinePerDatanode; + private PendingContainerTracker pendingContainerTracker; + private final OzoneConfiguration conf = new OzoneConfiguration(); { this.healthyNodes = new LinkedList<>(); @@ -123,7 +126,9 @@ public class MockNodeManager implements NodeManager { this.node2ContainerMap = new NodeStateMap(); this.dnsToUuidMap = new ConcurrentHashMap<>(); this.aggregateStat = new SCMNodeStat(); - this.clusterMap = new NetworkTopologyImpl(new OzoneConfiguration()); + this.clusterMap = new NetworkTopologyImpl(conf); + this.pendingContainerTracker = new PendingContainerTracker(5L * 1024 * 1024 * 1024, + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT, null); } public MockNodeManager(NetworkTopologyImpl clusterMap, @@ -263,7 +268,7 @@ public List getNodes( List healthyNodesWithInfo = new ArrayList<>(); for (DatanodeDetails dd : healthyNodes) { DatanodeInfo di = new DatanodeInfo(dd, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); long capacity = nodeMetricMap.get(dd).getCapacity().get(); long used = nodeMetricMap.get(dd).getScmUsed().get(); @@ -342,7 +347,7 @@ public List getAllNodes() { nodeStatus = NodeStatus.inServiceDead(); } DatanodeInfo di = new DatanodeInfo(entry.getKey(), nodeStatus, - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); long capacity = entry.getValue().getCapacity().get(); long used = entry.getValue().getScmUsed().get(); @@ -431,7 +436,7 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dd) { } DatanodeInfo di = new DatanodeInfo(dd, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); long capacity = nodeMetricMap.get(dd).getCapacity().get(); long used = nodeMetricMap.get(dd).getScmUsed().get(); long remaining = nodeMetricMap.get(dd).getRemaining().get(); @@ -448,6 +453,15 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dd) { return di; } + @Override + public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) { + DatanodeDetails dd = nodeMetricMap.keySet().stream() + .filter(d -> d.getID().equals(datanodeID)) + .findFirst().orElse(null); + DatanodeInfo info = getDatanodeInfo(dd); + pendingContainerTracker.recordPendingAllocationForDatanode(info, containerID); + } + /** * Return the node stat of the specified datanode. * @param datanodeDetails - datanode details. @@ -750,7 +764,8 @@ public RegisteredCommand register(DatanodeDetails datanodeDetails, NodeReportProto nodeReport, PipelineReportsProto pipelineReportsProto, LayoutVersionProto layoutInfo) { - final DatanodeInfo info = new DatanodeInfo(datanodeDetails, NodeStatus.inServiceHealthy(), layoutInfo); + final DatanodeInfo info = new DatanodeInfo(datanodeDetails, + NodeStatus.inServiceHealthy(), layoutInfo, HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); try { node2ContainerMap.addNode(info); addEntryTodnsToUuidMap(datanodeDetails.getIpAddress(), @@ -941,6 +956,18 @@ public void setNumHealthyVolumes(int value) { numHealthyDisksPerDatanode = value; } + @Override + public boolean hasSpaceForNewContainerAllocation(DatanodeID datanodeID) { + DatanodeDetails dd = nodeMetricMap.keySet().stream() + .filter(d -> d.getID().equals(datanodeID)) + .findFirst().orElse(null); + DatanodeInfo info = getDatanodeInfo(dd); + if (info == null) { + return false; + } + return pendingContainerTracker.hasEffectiveAllocatableSpaceForNewContainer(info); + } + /** * A class to declare some values for the nodes so that our tests * won't fail. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java index 4f0679470eab..f2da8fd2878b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.net.NetworkTopology; @@ -66,7 +67,7 @@ public class SimpleMockNodeManager implements NodeManager { public void register(DatanodeDetails dd, NodeStatus status) { dd.setPersistedOpState(status.getOperationalState()); dd.setPersistedOpStateExpiryEpochSec(status.getOpStateExpiryEpochSeconds()); - nodeMap.put(dd.getID(), new DatanodeInfo(dd, status, null)); + nodeMap.put(dd.getID(), new DatanodeInfo(dd, status, null, HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT)); } public void setNodeStatus(DatanodeDetails dd, NodeStatus status) { @@ -248,6 +249,15 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) { return null; } + @Override + public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) { + } + + @Override + public boolean hasSpaceForNewContainerAllocation(DatanodeID datanodeID) { + return true; + } + @Override public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) { return null; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java index 218a2137e3e6..68dcc634a5e3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java @@ -26,7 +26,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -103,7 +102,7 @@ void setUp() throws Exception { pipelineManager = spy(base); // Default: allow allocation in tests unless a test overrides it. - doReturn(true).when(pipelineManager).hasEnoughSpace(any(Pipeline.class), anyLong()); + doReturn(true).when(pipelineManager).hasEnoughSpace(any(Pipeline.class)); pipelineManager.createPipeline(RatisReplicationConfig.getInstance( ReplicationFactor.THREE)); @@ -141,7 +140,7 @@ void testAllocateContainer() throws Exception { */ @Test public void testGetMatchingContainerReturnsNullWhenNotEnoughSpaceInDatanodes() throws IOException { - doReturn(false).when(pipelineManager).hasEnoughSpace(any(), anyLong()); + doReturn(false).when(pipelineManager).hasEnoughSpace(any()); long sizeRequired = 256 * 1024 * 1024; // 256 MB Pipeline pipeline = pipelineManager.getPipelines().iterator().next(); @@ -166,7 +165,7 @@ public void testGetMatchingContainerReturnsContainerWhenEnoughSpaceInDatanodes() // create a spy to mock hasEnoughSpace to always return true PipelineManager spyPipelineManager = spy(pipelineManager); doReturn(true).when(spyPipelineManager) - .hasEnoughSpace(any(Pipeline.class), anyLong()); + .hasEnoughSpace(any(Pipeline.class)); // create a new ContainerManager using the spy File tempDir = new File(testDir, "tempDir"); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java index d67320974918..ed4e96b8de56 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java @@ -99,7 +99,7 @@ public void testRackAwarePolicy() throws IOException { .createDatanodeDetails(hostname + i, rack + (i / 5)); DatanodeInfo datanodeInfo = new DatanodeInfo( datanodeDetails, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java index 5b940fc543f1..1fb3f53504d3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java @@ -64,7 +64,8 @@ public void chooseDatanodes() throws SCMException { DatanodeInfo datanodeInfo = new DatanodeInfo( MockDatanodeDetails.randomDatanodeDetails(), NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java index 94cdc33e869a..dd068f55cdfe 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java @@ -112,7 +112,8 @@ private void setup(int datanodeCount) { cluster.add(datanodeDetails); DatanodeInfo datanodeInfo = new DatanodeInfo( datanodeDetails, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), @@ -455,7 +456,8 @@ public void testDatanodeWithDefaultNetworkLocation(int datanodeCount) hostname + i, null); DatanodeInfo dnInfo = new DatanodeInfo( dn, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( dnInfo.getID(), "/data1-" + dnInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java index ce66bf9f68c7..e015b93c1e31 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackScatter.java @@ -180,7 +180,8 @@ private void setupDatanode(DatanodeDetails datanodeDetails) { cluster.add(datanodeDetails); DatanodeInfo datanodeInfo = new DatanodeInfo( datanodeDetails, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), @@ -490,7 +491,8 @@ public void testDatanodeWithDefaultNetworkLocation(int datanodeCount) hostname + i, null); DatanodeInfo dnInfo = new DatanodeInfo( dn, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( dnInfo.getID(), "/data1-" + dnInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java index 3171df95db4a..47602a385fd6 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java @@ -61,7 +61,8 @@ public void chooseDatanodes() throws SCMException { DatanodeInfo datanodeInfo = new DatanodeInfo( MockDatanodeDetails.randomDatanodeDetails(), NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), @@ -167,7 +168,8 @@ public void testIsValidNode() throws SCMException { DatanodeInfo datanodeInfo = new DatanodeInfo( MockDatanodeDetails.randomDatanodeDetails(), NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageReportProto storage1 = HddsTestUtils.createStorageReport( datanodeInfo.getID(), "/data1-" + datanodeInfo.getID(), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 9b7c5c77b2cd..4dbe79fc1351 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -26,7 +26,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -160,7 +159,7 @@ SCMNodeManager createNodeManager(OzoneConfiguration config) { ContainerManager createContainerManager() throws IOException { pipelineManager = spy(pipelineManager); - doReturn(true).when(pipelineManager).hasEnoughSpace(any(), anyLong()); + doReturn(true).when(pipelineManager).hasEnoughSpace(any()); return new ContainerManagerImpl(conf, scmhaManager, sequenceIdGen, pipelineManager, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java index 00a0c565b2ca..c747dc7d60ae 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java @@ -23,15 +23,11 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -42,18 +38,14 @@ public class TestPendingContainerTracker { private static final long MAX_CONTAINER_SIZE = 5L * 1024 * 1024 * 1024; // 5GB - private static final long DEFAULT_ROLL_INTERVAL_MS = 5 * 60 * 1000; private static final int NUM_DATANODES = 1000; - private static final int NUM_PIPELINES = 1000; private static final int NUM_CONTAINERS = 10_000; - private List datanodes; - private List pipelines; + private List datanodes; private List containers; private PendingContainerTracker tracker; - private Pipeline pipeline; - private DatanodeDetails dn1; - private DatanodeDetails dn2; + private DatanodeInfo dn1; + private DatanodeInfo dn2; /** First three container IDs. */ private ContainerID container1; @@ -61,16 +53,13 @@ public class TestPendingContainerTracker { @BeforeEach public void setUp() throws IOException { - tracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, DEFAULT_ROLL_INTERVAL_MS, null); + tracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT, null); datanodes = new ArrayList<>(NUM_DATANODES); for (int i = 0; i < NUM_DATANODES; i++) { - datanodes.add(MockDatanodeDetails.randomLocalDatanodeDetails()); - } - - pipelines = new ArrayList<>(NUM_PIPELINES); - for (int i = 0; i < NUM_PIPELINES; i++) { - pipelines.add(MockPipeline.createPipeline(Collections.singletonList(datanodes.get(i)))); + datanodes.add(new DatanodeInfo( + MockDatanodeDetails.randomLocalDatanodeDetails(), NodeStatus.inServiceHealthy(), null, + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT)); } containers = new ArrayList<>(NUM_CONTAINERS); @@ -78,7 +67,6 @@ public void setUp() throws IOException { containers.add(ContainerID.valueOf(id)); } - pipeline = MockPipeline.createPipeline(datanodes.subList(0, 3)); dn1 = datanodes.get(0); dn2 = datanodes.get(1); @@ -88,43 +76,43 @@ public void setUp() throws IOException { @Test public void testRecordPendingAllocation() { - // Allocate first 100 containers across first 100 pipelines (1 DN each) + // Allocate first 100 containers, one per datanode for (int i = 0; i < 100; i++) { - tracker.recordPendingAllocation(pipelines.get(i), containers.get(i)); + tracker.recordPendingAllocationForDatanode(datanodes.get(i), containers.get(i)); } // Each of the first 100 DNs should have 1 pending container for (int i = 0; i < 100; i++) { - assertEquals(1, tracker.getPendingContainerCount(datanodes.get(i))); + assertEquals(1, datanodes.get(i).getPendingContainerAllocations().getCount()); assertEquals(MAX_CONTAINER_SIZE, - tracker.getPendingContainerCount(datanodes.get(i)) * MAX_CONTAINER_SIZE); + datanodes.get(i).getPendingContainerAllocations().getCount() * MAX_CONTAINER_SIZE); } // DNs beyond index 100 should have 0 pending - assertEquals(0, tracker.getPendingContainerCount(datanodes.get(500))); - assertEquals(0, tracker.getPendingContainerCount(datanodes.get(999))); + assertEquals(0, datanodes.get(500).getPendingContainerAllocations().getCount()); + assertEquals(0, datanodes.get(999).getPendingContainerAllocations().getCount()); } @Test public void testRemovePendingAllocation() { - // Allocate containers 0-99 to first 100 pipelines + // Allocate containers 0-99, one per datanode for (int i = 0; i < 100; i++) { - tracker.recordPendingAllocation(pipelines.get(i), containers.get(i)); + tracker.recordPendingAllocationForDatanode(datanodes.get(i), containers.get(i)); } // Remove from first 50 DNs for (int i = 0; i < 50; i++) { - tracker.removePendingAllocation(datanodes.get(i), containers.get(i)); + tracker.removePendingAllocation(datanodes.get(i).getPendingContainerAllocations(), containers.get(i)); } // First 50 DNs should have 0 pending for (int i = 0; i < 50; i++) { - assertEquals(0, tracker.getPendingContainerCount(datanodes.get(i))); + assertEquals(0, datanodes.get(i).getPendingContainerAllocations().getCount()); } // DNs 50-99 should still have 1 pending for (int i = 50; i < 100; i++) { - assertEquals(1, tracker.getPendingContainerCount(datanodes.get(i))); + assertEquals(1, datanodes.get(i).getPendingContainerAllocations().getCount()); } } @@ -137,41 +125,46 @@ public void testRemovePendingAllocation() { @Timeout(30) public void testTwoWindowRollAgesOutContainerAfterTwoIntervals() throws InterruptedException { long rollMs = 200L; - PendingContainerTracker shortRollTracker = - new PendingContainerTracker(MAX_CONTAINER_SIZE, rollMs, null); + DatanodeInfo shortDn = new DatanodeInfo( + MockDatanodeDetails.randomLocalDatanodeDetails(), NodeStatus.inServiceHealthy(), null, + rollMs); + + PendingContainerTracker shortRollTracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, rollMs, null); - shortRollTracker.recordPendingAllocationForDatanode(dn1, container1); - assertEquals(1, shortRollTracker.getPendingContainerCount(dn1)); - assertTrue(shortRollTracker.containsPendingContainer(dn1, container1)); + shortRollTracker.recordPendingAllocationForDatanode(shortDn, container1); + assertEquals(1, shortDn.getPendingContainerAllocations().getCount()); + assertTrue(shortDn.getPendingContainerAllocations().contains(container1)); // First roll: C1 moves from currentWindow to previousWindow; union still includes C1 Thread.sleep(rollMs + 80); - shortRollTracker.rollWindowsIfNeeded(dn1); - assertEquals(1, shortRollTracker.getPendingContainerCount(dn1)); - assertTrue(shortRollTracker.containsPendingContainer(dn1, container1)); + shortDn.getPendingContainerAllocations(); // triggers rollIfNeeded + assertEquals(1, shortDn.getPendingContainerAllocations().getCount()); + assertTrue(shortDn.getPendingContainerAllocations().contains(container1)); // Second roll: prior previousWindow (holding C1) is dropped; C1 is no longer pending Thread.sleep(rollMs + 80); - shortRollTracker.rollWindowsIfNeeded(dn1); - assertEquals(0, shortRollTracker.getPendingContainerCount(dn1)); - assertEquals(0L, shortRollTracker.getPendingContainerCount(dn1) * MAX_CONTAINER_SIZE); + shortDn.getPendingContainerAllocations(); // triggers rollIfNeeded + assertEquals(0, shortDn.getPendingContainerAllocations().getCount()); + assertEquals(0L, shortDn.getPendingContainerAllocations().getCount() * MAX_CONTAINER_SIZE); } @Test public void testRemoveNonExistentContainer() { - tracker.recordPendingAllocation(pipeline, container1); + datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, container1)); // Remove a container that was never added - should not throw exception - tracker.removePendingAllocation(dn1, container2); + tracker.removePendingAllocation(dn1.getPendingContainerAllocations(), container2); // DN1 should still have container1 - assertEquals(1, tracker.getPendingContainerCount(dn1)); + assertEquals(1, dn1.getPendingContainerAllocations().getCount()); } @Test public void testUnknownDatanodeHasZeroPendingCount() { - DatanodeDetails unknownDN = MockDatanodeDetails.randomDatanodeDetails(); - assertEquals(0, tracker.getPendingContainerCount(unknownDN)); + DatanodeInfo unknownDN = new DatanodeInfo( + MockDatanodeDetails.randomDatanodeDetails(), NodeStatus.inServiceHealthy(), null, + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); + assertEquals(0, unknownDN.getPendingContainerAllocations().getCount()); } @Test @@ -187,10 +180,10 @@ public void testConcurrentModification() throws InterruptedException { threads[i] = new Thread(() -> { for (int j = 0; j < operationsPerThread; j++) { ContainerID cid = ContainerID.valueOf(threadId * 1000L + j); - tracker.recordPendingAllocation(pipeline, cid); + datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, cid)); if (j % 2 == 0) { - tracker.removePendingAllocation(dn1, cid); + tracker.removePendingAllocation(dn1.getPendingContainerAllocations(), cid); } } }); @@ -209,19 +202,19 @@ public void testConcurrentModification() throws InterruptedException { @Test public void testBucketsRetainedWhenEmpty() { - tracker.recordPendingAllocation(pipeline, container1); + datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, container1)); - assertEquals(1, tracker.getPendingContainerCount(dn1)); + assertEquals(1, dn1.getPendingContainerAllocations().getCount()); // Remove the only pending container from DN1 - tracker.removePendingAllocation(dn1, container1); + tracker.removePendingAllocation(dn1.getPendingContainerAllocations(), container1); - assertEquals(0, tracker.getPendingContainerCount(dn1)); - assertEquals(1, tracker.getPendingContainerCount(dn2)); + assertEquals(0, dn1.getPendingContainerAllocations().getCount()); + assertEquals(1, dn2.getPendingContainerAllocations().getCount()); // Empty bucket for DN1 is still usable for new allocations tracker.recordPendingAllocationForDatanode(dn1, container2); - assertEquals(1, tracker.getPendingContainerCount(dn1)); + assertEquals(1, dn1.getPendingContainerAllocations().getCount()); } @Test @@ -230,51 +223,51 @@ public void testRemoveFromBothWindows() { // In general, a container could be in previous window after a roll // Add containers - tracker.recordPendingAllocation(pipeline, container1); - tracker.recordPendingAllocation(pipeline, container2); + datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, container1)); + datanodes.subList(0, 3).forEach(dn -> tracker.recordPendingAllocationForDatanode(dn, container2)); - assertEquals(2, tracker.getPendingContainerCount(dn1)); + assertEquals(2, dn1.getPendingContainerAllocations().getCount()); // Remove container1 - should work regardless of which window it's in - tracker.removePendingAllocation(dn1, container1); + tracker.removePendingAllocation(dn1.getPendingContainerAllocations(), container1); - assertEquals(1, tracker.getPendingContainerCount(dn1)); + assertEquals(1, dn1.getPendingContainerAllocations().getCount()); - assertFalse(tracker.containsPendingContainer(dn1, container1)); - assertTrue(tracker.containsPendingContainer(dn1, container2)); + assertFalse(dn1.getPendingContainerAllocations().contains(container1)); + assertTrue(dn1.getPendingContainerAllocations().contains(container2)); } @Test public void testManyContainersOnSingleDatanode() { // Allocate first 1000 containers to the first datanode - DatanodeDetails dn = datanodes.get(0); + DatanodeInfo dn = datanodes.get(0); for (int i = 0; i < 1000; i++) { tracker.recordPendingAllocationForDatanode(dn, containers.get(i)); } - assertEquals(1000, tracker.getPendingContainerCount(dn)); - assertEquals(1000 * MAX_CONTAINER_SIZE, tracker.getPendingContainerCount(dn) * MAX_CONTAINER_SIZE); + assertEquals(1000, dn.getPendingContainerAllocations().getCount()); + assertEquals(1000 * MAX_CONTAINER_SIZE, dn.getPendingContainerAllocations().getCount() * MAX_CONTAINER_SIZE); // Verify specific containers are present - assertTrue(tracker.containsPendingContainer(dn, containers.get(0))); - assertTrue(tracker.containsPendingContainer(dn, containers.get(500))); - assertTrue(tracker.containsPendingContainer(dn, containers.get(999))); + assertTrue(dn.getPendingContainerAllocations().contains(containers.get(0))); + assertTrue(dn.getPendingContainerAllocations().contains(containers.get(500))); + assertTrue(dn.getPendingContainerAllocations().contains(containers.get(999))); // Remove half of them for (int i = 0; i < 500; i++) { - tracker.removePendingAllocation(dn, containers.get(i)); + tracker.removePendingAllocation(dn.getPendingContainerAllocations(), containers.get(i)); } - assertEquals(500, tracker.getPendingContainerCount(dn)); - assertFalse(tracker.containsPendingContainer(dn, containers.get(0))); - assertTrue(tracker.containsPendingContainer(dn, containers.get(999))); + assertEquals(500, dn.getPendingContainerAllocations().getCount()); + assertFalse(dn.getPendingContainerAllocations().contains(containers.get(0))); + assertTrue(dn.getPendingContainerAllocations().contains(containers.get(999))); } @Test public void testAllDatanodesWithMultipleContainers() { // Allocate 10 containers to each of the 1000 datanodes for (int dnIdx = 0; dnIdx < NUM_DATANODES; dnIdx++) { - DatanodeDetails dn = datanodes.get(dnIdx); + DatanodeInfo dn = datanodes.get(dnIdx); for (int cIdx = 0; cIdx < 10; cIdx++) { int containerIdx = dnIdx * 10 + cIdx; tracker.recordPendingAllocationForDatanode(dn, containers.get(containerIdx)); @@ -283,35 +276,35 @@ public void testAllDatanodesWithMultipleContainers() { // Each DN should have 10 pending containers for (int i = 0; i < NUM_DATANODES; i++) { - assertEquals(10, tracker.getPendingContainerCount(datanodes.get(i))); + assertEquals(10, datanodes.get(i).getPendingContainerAllocations().getCount()); assertEquals(10 * MAX_CONTAINER_SIZE, - tracker.getPendingContainerCount(datanodes.get(i)) * MAX_CONTAINER_SIZE); + datanodes.get(i).getPendingContainerAllocations().getCount() * MAX_CONTAINER_SIZE); } // Remove all containers from every 10th DN for (int dnIdx = 0; dnIdx < NUM_DATANODES; dnIdx += 10) { - DatanodeDetails dn = datanodes.get(dnIdx); + DatanodeInfo dn = datanodes.get(dnIdx); for (int cIdx = 0; cIdx < 10; cIdx++) { int containerIdx = dnIdx * 10 + cIdx; - tracker.removePendingAllocation(dn, containers.get(containerIdx)); + tracker.removePendingAllocation(dn.getPendingContainerAllocations(), containers.get(containerIdx)); } } // Every 10th DN should have 0 pending for (int i = 0; i < NUM_DATANODES; i += 10) { - assertEquals(0, tracker.getPendingContainerCount(datanodes.get(i))); + assertEquals(0, datanodes.get(i).getPendingContainerAllocations().getCount()); } // Other DNs should still have 10 pending - assertEquals(10, tracker.getPendingContainerCount(datanodes.get(1))); - assertEquals(10, tracker.getPendingContainerCount(datanodes.get(15))); - assertEquals(10, tracker.getPendingContainerCount(datanodes.get(999))); + assertEquals(10, datanodes.get(1).getPendingContainerAllocations().getCount()); + assertEquals(10, datanodes.get(15).getPendingContainerAllocations().getCount()); + assertEquals(10, datanodes.get(999).getPendingContainerAllocations().getCount()); } @Test public void testIdempotentRecording() { // Allocate same 100 containers multiple times to first 100 DNs - DatanodeDetails dn = datanodes.get(0); + DatanodeInfo dn = datanodes.get(0); for (int round = 0; round < 5; round++) { for (int i = 0; i < 100; i++) { @@ -320,68 +313,69 @@ public void testIdempotentRecording() { } // Should still only have 100 containers - assertEquals(100, tracker.getPendingContainerCount(dn)); + assertEquals(100, dn.getPendingContainerAllocations().getCount()); } @Test public void testMultiVolumeAccumulatedSpaceIsNotEnough() { - DatanodeDetails dn = datanodes.get(0); long containerSize = MAX_CONTAINER_SIZE; - + + // Use the same DatanodeInfo object for both recording and checking. + DatanodeInfo dnInfo = datanodes.get(0); List reports = new ArrayList<>(); - reports.add(createStorageReport(dn, 100 * containerSize, containerSize / 4, 0)); - reports.add(createStorageReport(dn, 100 * containerSize, containerSize / 4, 0)); - reports.add(createStorageReport(dn, 100 * containerSize, containerSize / 2, 0)); - DatanodeInfo dnInfo = new DatanodeInfo(dn, NodeStatus.inServiceHealthy(), null); + reports.add(createStorageReport(dnInfo, 100 * containerSize, containerSize / 4, 0)); + reports.add(createStorageReport(dnInfo, 100 * containerSize, containerSize / 4, 0)); + reports.add(createStorageReport(dnInfo, 100 * containerSize, containerSize / 2, 0)); dnInfo.updateStorageReports(reports); - assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo)); + assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dnInfo)); } @Test public void testMultiVolumeWithPendingAllocation() { - DatanodeDetails dn = datanodes.get(0); long containerSize = MAX_CONTAINER_SIZE; + // Use the same DatanodeInfo object for recording pending allocations and checking space. + DatanodeInfo dnInfo = datanodes.get(0); + // Remaining space available for 3 containers across all the volumes - tracker.recordPendingAllocationForDatanode(dn, containers.get(0)); - tracker.recordPendingAllocationForDatanode(dn, containers.get(1)); + tracker.recordPendingAllocationForDatanode(dnInfo, containers.get(0)); + tracker.recordPendingAllocationForDatanode(dnInfo, containers.get(1)); List reports = new ArrayList<>(); - reports.add(createStorageReport(dn, 100 * containerSize, containerSize, 0)); - reports.add(createStorageReport(dn, 50 * containerSize, containerSize, 0)); - reports.add(createStorageReport(dn, 100 * containerSize, containerSize, 0)); - DatanodeInfo dnInfo = new DatanodeInfo(dn, NodeStatus.inServiceHealthy(), null); + reports.add(createStorageReport(dnInfo, 100 * containerSize, containerSize, 0)); + reports.add(createStorageReport(dnInfo, 50 * containerSize, containerSize, 0)); + reports.add(createStorageReport(dnInfo, 100 * containerSize, containerSize, 0)); dnInfo.updateStorageReports(reports); // Remaining space available for 1 container across all the volume after 2 container allocation - assertTrue(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo)); + assertTrue(tracker.hasEffectiveAllocatableSpaceForNewContainer(dnInfo)); - tracker.recordPendingAllocationForDatanode(dn, containers.get(2)); + tracker.recordPendingAllocationForDatanode(dnInfo, containers.get(2)); // Remaining space available for 0 container across all the volume - assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo)); + assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dnInfo)); } @Test public void testMultiVolumeWithCommittedBytes() { - DatanodeDetails dn = datanodes.get(0); long containerSize = MAX_CONTAINER_SIZE; - + + // Use the same DatanodeInfo object for recording pending allocations and checking space. + DatanodeInfo dnInfo = datanodes.get(0); List reports = new ArrayList<>(); - reports.add(createStorageReport(dn, 100 * containerSize, 6 * containerSize, 5 * containerSize)); - reports.add(createStorageReport(dn, 50 * containerSize, 3 * containerSize, 3 * containerSize)); - DatanodeInfo dnInfo = new DatanodeInfo(dn, NodeStatus.inServiceHealthy(), null); + reports.add(createStorageReport(dnInfo, 100 * containerSize, 6 * containerSize, 5 * containerSize)); + reports.add(createStorageReport(dnInfo, 50 * containerSize, 3 * containerSize, 3 * containerSize)); dnInfo.updateStorageReports(reports); // Remaining space available for 1 container across all the volume considering committed bytes - assertTrue(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo)); - tracker.recordPendingAllocationForDatanode(dn, containers.get(0)); + assertTrue(tracker.hasEffectiveAllocatableSpaceForNewContainer(dnInfo)); + tracker.recordPendingAllocationForDatanode(dnInfo, containers.get(0)); // Remaining space available for 0 container across all the volume considering // committed bytes and container allocation - assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo)); + assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dnInfo)); } - private StorageReportProto createStorageReport(DatanodeDetails dn, long capacity, long remaining, long committed) { + private StorageReportProto createStorageReport(DatanodeInfo dn, long capacity, long remaining, long committed) { return HddsTestUtils.createStorageReports(dn.getID(), capacity, remaining, committed).get(0); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java index c079c509ad8a..e9f13ee82c8c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNodeStateMap.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.NodeStatus; @@ -52,7 +53,7 @@ void addNode(NodeStatus status) throws NodeAlreadyExistsException { } void addNode(DatanodeDetails datanode, NodeStatus status) throws NodeAlreadyExistsException { - map.addNode(new DatanodeInfo(datanode, status, null)); + map.addNode(new DatanodeInfo(datanode, status, null, HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT)); } @BeforeEach diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java index d6a3fc546352..69b1e24dfe3b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java @@ -50,9 +50,11 @@ public class MockPipelineManager implements PipelineManager { private final PipelineStateManager stateManager; + private final NodeManager nodeManager; public MockPipelineManager(DBStore dbStore, SCMHAManager scmhaManager, NodeManager nodeManager) throws RocksDatabaseException, CodecException, DuplicatedPipelineIdException { + this.nodeManager = nodeManager; stateManager = PipelineStateManagerImpl .newBuilder().setNodeManager(nodeManager) .setRatisServer(scmhaManager.getRatisServer()) @@ -332,10 +334,17 @@ public boolean isPipelineCreationFrozen() { } @Override - public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) { + public boolean hasEnoughSpace(Pipeline pipeline) { return false; } + @Override + public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) { + for (DatanodeDetails dn : pipeline.getNodes()) { + nodeManager.recordPendingAllocationForDatanode(dn.getID(), containerID); + } + } + @Override public int openContainerLimit(List datanodes) { // For tests that do not care about this limit, return a large value. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java index e7fc6f14f9b6..4e53f6d629ae 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java @@ -92,7 +92,6 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub; import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition; -import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy; @@ -937,12 +936,11 @@ public void testCreatePipelineForRead() throws IOException { } /** - * {@link PipelineManager#hasEnoughSpace(Pipeline, long)} should return false if all the - * volumes on any Datanode in the pipeline have less than equal to the space required for creating a new container. + * {@link PipelineManager#hasEnoughSpace(Pipeline)} should return false if all the + * volumes on any Datanode in the pipeline have space less than or equal to the configured container size. */ @Test public void testHasEnoughSpace() throws IOException { - // create a Mock NodeManager, the MockNodeManager class doesn't work for this test NodeManager mockedNodeManager = Mockito.mock(NodeManager.class); PipelineManagerImpl pipelineManager = PipelineManagerImpl.newPipelineManager(conf, SCMHAManagerStub.getInstance(true), @@ -953,50 +951,30 @@ public void testHasEnoughSpace() throws IOException { serviceManager, testClock); + DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails dn2 = MockDatanodeDetails.randomDatanodeDetails(); + DatanodeDetails dn3 = MockDatanodeDetails.randomDatanodeDetails(); Pipeline pipeline = Pipeline.newBuilder() .setId(PipelineID.randomId()) - .setNodes(ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails(), - MockDatanodeDetails.randomDatanodeDetails(), - MockDatanodeDetails.randomDatanodeDetails())) + .setNodes(ImmutableList.of(dn1, dn2, dn3)) .setState(OPEN) .setReplicationConfig(ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, THREE)) .build(); - List nodes = pipeline.getNodes(); - assertEquals(3, nodes.size()); - - long containerSize = 100L; // Case 1: All nodes have enough space. - List datanodeInfoList = new ArrayList<>(); - for (DatanodeDetails dn : nodes) { - // the method being tested needs NodeManager to return DatanodeInfo because DatanodeInfo has storage - // information (it extends DatanodeDetails) - DatanodeInfo info = new DatanodeInfo(dn, null, null); - info.updateStorageReports(HddsTestUtils.createStorageReports(dn.getID(), 200L, 200L, 10L)); - doReturn(info).when(mockedNodeManager).getDatanodeInfo(dn); - datanodeInfoList.add(info); - } - assertTrue(pipelineManager.hasEnoughSpace(pipeline, containerSize)); - - // Case 2: One node does not have enough space. - /* - Interestingly, SCMCommonPlacementPolicy#hasEnoughSpace returns false if exactly the required amount of space - is available. Which means it won't allow creating a pipeline on a node if all volumes have exactly 5 GB - available. We follow the same behavior here in the case of a new replica. - - So here, remaining - committed == containerSize, and hasEnoughSpace returns false. - TODO should this return true instead? - */ - DatanodeInfo datanodeInfo = datanodeInfoList.get(0); - datanodeInfo.updateStorageReports(HddsTestUtils.createStorageReports(datanodeInfo.getID(), 200L, 120L, - 20L)); - assertFalse(pipelineManager.hasEnoughSpace(pipeline, containerSize)); + doReturn(true).when(mockedNodeManager).hasSpaceForNewContainerAllocation(dn1.getID()); + doReturn(true).when(mockedNodeManager).hasSpaceForNewContainerAllocation(dn2.getID()); + doReturn(true).when(mockedNodeManager).hasSpaceForNewContainerAllocation(dn3.getID()); + assertTrue(pipelineManager.hasEnoughSpace(pipeline)); + + // Case 2: One node does not have enough space — pipeline should be rejected. + doReturn(false).when(mockedNodeManager).hasSpaceForNewContainerAllocation(dn1.getID()); + assertFalse(pipelineManager.hasEnoughSpace(pipeline)); // Case 3: All nodes do not have enough space. - for (DatanodeInfo info : datanodeInfoList) { - info.updateStorageReports(HddsTestUtils.createStorageReports(info.getID(), 200L, 100L, 20L)); - } - assertFalse(pipelineManager.hasEnoughSpace(pipeline, containerSize)); + doReturn(false).when(mockedNodeManager).hasSpaceForNewContainerAllocation(dn2.getID()); + doReturn(false).when(mockedNodeManager).hasSpaceForNewContainerAllocation(dn3.getID()); + assertFalse(pipelineManager.hasEnoughSpace(pipeline)); } private Set createContainerReplicasList( diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java index e548e45de849..84afb74f0a5a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementFactory.java @@ -103,7 +103,8 @@ private void setupRacks(int datanodeCount, int nodesPerRack, cluster.add(datanodeDetails); DatanodeInfo datanodeInfo = new DatanodeInfo( datanodeDetails, NodeStatus.inServiceHealthy(), - UpgradeUtils.defaultLayoutVersionProto()); + UpgradeUtils.defaultLayoutVersionProto(), + HddsTestUtils.ROLL_INTERVAL_MS_DEFAULT); StorageContainerDatanodeProtocolProtos.StorageReportProto storage1 = HddsTestUtils.createStorageReport( diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java index 8d21c2465a8f..d62aae159139 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestStorageDistributionEndpoint.java @@ -225,7 +225,7 @@ private List mockStorageDistributionData(int numNodes) throws Exception .build(); pendingDeletionMetrics.add(new DatanodePendingDeletionMetrics(hostName, uuid.toString(), PENDING_DELETION_SIZE)); - dataNodes.add(new DatanodeInfo(datanode, NodeStatus.inServiceHealthy(), null)); + dataNodes.add(new DatanodeInfo(datanode, NodeStatus.inServiceHealthy(), null, 5 * 60 * 1000)); when(nodeManager.getNodeStat(datanode)) .thenReturn(new SCMNodeMetric(OZONE_CAPACITY, OZONE_USED, OZONE_REMAINING, COMMITTED, MIN_FREE_SPACE, RESERVED));