diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java
new file mode 100644
index 000000000000..0902529f1792
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeID;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.container.common.volume.VolumeUsage;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks per-datanode pending container allocations at SCM using a Two Window Tumbling Bucket
+ * pattern (similar to HDFS HADOOP-3707).
+ *
+ * Two Window Tumbling Bucket for automatic aging and cleanup.
+ *
+ * How It Works:
+ *
Each DataNode has two sets: currentWindow and previousWindow
+ * New allocations go into currentWindow
+ * Every ROLL_INTERVAL (default 5 minutes):
+ *
+ * - previousWindow = currentWindow (shift)
+ * - currentWindow = new empty set (reset)
+ * - Old previousWindow is discarded (automatic aging)
+ *
+ *
+ * When checking pending: return union of currentWindow + previousWindow
+ *
+ *
+ * Example Timeline:
+ *
+ * Time | Action | CurrentWindow | PreviousWindow | Total Pending
+ * ------+---------------------------+---------------+----------------+--------------
+ * 00:00 | Allocate Container-1 | {C1} | {} | {C1}
+ * 00:02 | Allocate Container-2 | {C1, C2} | {} | {C1, C2}
+ * 00:05 | [ROLL] Window tumbles | {} | {C1, C2} | {C1, C2}
+ * 00:07 | Allocate Container-3 | {C3} | {C1, C2} | {C1, C2, C3}
+ * 00:08 | Report confirms C1 | {C3} | {C2} | {C2, C3}
+ * 00:10 | [ROLL] Window tumbles | {} | {C3} | {C3}
+ * | (C2 aged out if not reported)
+ *
+ *
+ */
+public class PendingContainerTracker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PendingContainerTracker.class);
+
+ private final DatanodeBuckets datanodeBuckets;
+
+ /**
+ * Maximum container size in bytes.
+ */
+ private final long maxContainerSize;
+
+ /**
+ * Metrics for tracking pending containers (same instance as {@link SCMNodeManager}'s node metrics).
+ */
+ private final SCMNodeMetrics metrics;
+
+ /**
+ * Two-window bucket for a single DataNode.
+ * Contains current and previous window sets, plus last roll timestamp.
+ */
+ private static class TwoWindowBucket {
+ private Set currentWindow = new HashSet<>();
+ private Set previousWindow = new HashSet<>();
+ private long lastRollTime = Time.monotonicNow();
+ private final long rollIntervalMs;
+
+ TwoWindowBucket(long rollIntervalMs) {
+ this.rollIntervalMs = rollIntervalMs;
+ }
+
+ /**
+ * Roll one or both windows based on elapsed time.
+ */
+ synchronized void rollIfNeeded() {
+ long now = Time.monotonicNow();
+ long elapsed = now - lastRollTime;
+
+ if (elapsed >= 2 * rollIntervalMs) {
+ int dropped = getCount();
+ previousWindow.clear();
+ currentWindow.clear();
+ lastRollTime = now;
+ LOG.debug("Double roll interval elapsed ({}ms): dropped {} pending containers", elapsed, dropped);
+ } else if (elapsed >= rollIntervalMs) {
+ previousWindow.clear();
+ final Set tmp = previousWindow;
+ previousWindow = currentWindow;
+ currentWindow = tmp;
+ lastRollTime = now;
+ LOG.debug("Rolled window. Previous window size: {} elapsed: ({}ms), Current window reset to empty",
+ previousWindow.size(), elapsed);
+ }
+ }
+
+ synchronized boolean contains(ContainerID containerID) {
+ return currentWindow.contains(containerID) || previousWindow.contains(containerID);
+ }
+
+ /**
+ * Add container to current window.
+ */
+ synchronized boolean add(ContainerID containerID, DatanodeID dnID) {
+ boolean added = currentWindow.add(containerID);
+ LOG.debug("Recorded pending container {} on DataNode {}. Added={}, Total pending={}",
+ containerID, dnID, added, getCount());
+ return added;
+ }
+
+ /**
+ * Remove container from both windows.
+ */
+ synchronized boolean remove(ContainerID containerID, DatanodeID dnID) {
+ boolean removedFromCurrent = currentWindow.remove(containerID);
+ boolean removedFromPrevious = previousWindow.remove(containerID);
+ boolean removed = removedFromCurrent || removedFromPrevious;
+ LOG.debug("Removed pending container {} from DataNode {}. Removed={}, Remaining={}",
+ containerID, dnID, removed, getCount());
+ return removed;
+ }
+
+ /**
+ * Count of pending containers in both windows.
+ */
+ synchronized int getCount() {
+ return currentWindow.size() + previousWindow.size();
+ }
+ }
+
+ /**
+ * Per-datanode two-window buckets.
+ */
+ private static class DatanodeBuckets {
+ private final ConcurrentHashMap map = new ConcurrentHashMap<>();
+ private final long rollIntervalMs;
+
+ DatanodeBuckets(long rollIntervalMs) {
+ this.rollIntervalMs = rollIntervalMs;
+ }
+
+ TwoWindowBucket get(DatanodeID id) {
+ final TwoWindowBucket bucket = map.compute(id, (k, b) -> b != null ? b : new TwoWindowBucket(rollIntervalMs));
+ bucket.rollIfNeeded();
+ return bucket;
+ }
+
+ TwoWindowBucket get(DatanodeDetails dn) {
+ Objects.requireNonNull(dn, "dn == null");
+ return get(dn.getID());
+ }
+ }
+
+ public PendingContainerTracker(long maxContainerSize, long rollIntervalMs,
+ SCMNodeMetrics metrics) {
+ this.datanodeBuckets = new DatanodeBuckets(rollIntervalMs);
+ this.maxContainerSize = maxContainerSize;
+ this.metrics = metrics;
+ LOG.info("PendingContainerTracker initialized with maxContainerSize={}B, rollInterval={}ms",
+ maxContainerSize, rollIntervalMs);
+ }
+
+ /**
+ * Advances the two-window tumbling bucket for this datanode when the roll interval has elapsed.
+ * Call on periodic paths (node report) so windows age even when there are no new
+ * allocations or container reports touching this tracker.
+ */
+ public void rollWindowsIfNeeded(DatanodeDetails node) {
+ Objects.requireNonNull(node, "node == null");
+ datanodeBuckets.get(node.getID());
+ }
+
+ /**
+ * Whether the datanode can fit another container of {@link #maxContainerSize} after accounting for
+ * SCM pending allocations for {@code node} (this tracker) and usable space across volumes on
+ * {@code datanodeInfo}. Pending bytes are {@link #getPendingContainerCount} × {@code maxContainerSize};
+ * effective allocatable space sums full-container slots per storage report.
+ *
+ * @param node identity used to look up pending allocations (same DN as {@code datanodeInfo})
+ * @param datanodeInfo storage reports for the datanode
+ */
+ public boolean hasEffectiveAllocatableSpaceForNewContainer(
+ DatanodeDetails node, DatanodeInfo datanodeInfo) {
+ Objects.requireNonNull(node, "node == null");
+ Objects.requireNonNull(datanodeInfo, "datanodeInfo == null");
+
+ long pendingAllocationSize = getPendingContainerCount(node) * maxContainerSize;
+ List storageReports = datanodeInfo.getStorageReports();
+ Objects.requireNonNull(storageReports, "storageReports == null");
+ if (storageReports.isEmpty()) {
+ return false;
+ }
+ long effectiveAllocatableSpace = 0L;
+ for (StorageReportProto report : storageReports) {
+ long usableSpace = VolumeUsage.getUsableSpace(report);
+ long containersOnThisDisk = usableSpace / maxContainerSize;
+ effectiveAllocatableSpace += containersOnThisDisk * maxContainerSize;
+ if (effectiveAllocatableSpace - pendingAllocationSize >= maxContainerSize) {
+ return true;
+ }
+ }
+ if (metrics != null) {
+ metrics.incNumSkippedFullNodeContainerAllocation();
+ }
+ return false;
+ }
+
+ /**
+ * Record a pending container allocation for all DataNodes in the pipeline.
+ * Container is added to the current window.
+ *
+ * @param pipeline The pipeline where container is allocated
+ * @param containerID The container being allocated
+ */
+ public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) {
+ Objects.requireNonNull(pipeline, "pipeline == null");
+ Objects.requireNonNull(containerID, "containerID == null");
+
+ for (DatanodeDetails node : pipeline.getNodes()) {
+ recordPendingAllocationForDatanode(node, containerID);
+ }
+ }
+
+ /**
+ * Record a pending container allocation for a single DataNode.
+ * Container is added to the current window.
+ *
+ * @param node The DataNode where container is being allocated/replicated
+ * @param containerID The container being allocated/replicated
+ */
+ public void recordPendingAllocationForDatanode(DatanodeDetails node, ContainerID containerID) {
+ Objects.requireNonNull(node, "node == null");
+ Objects.requireNonNull(containerID, "containerID == null");
+
+ DatanodeID dnID = node.getID();
+ boolean added = addContainerToBucket(containerID, dnID);
+
+ if (added && metrics != null) {
+ metrics.incNumPendingContainersAdded();
+ }
+ }
+
+ private boolean addContainerToBucket(ContainerID containerID, DatanodeID dnID) {
+ return datanodeBuckets.get(dnID).add(containerID, dnID);
+ }
+
+ /**
+ * Remove a pending container allocation from a specific DataNode.
+ * Removes from both current and previous windows.
+ * Called when container is confirmed.
+ *
+ * @param node The DataNode
+ * @param containerID The container to remove from pending
+ */
+ public void removePendingAllocation(DatanodeDetails node, ContainerID containerID) {
+ Objects.requireNonNull(node, "node == null");
+ Objects.requireNonNull(containerID, "containerID == null");
+
+ DatanodeID dnID = node.getID();
+ boolean removed = removeContainerFromBucket(containerID, dnID);
+
+ if (removed && metrics != null) {
+ metrics.incNumPendingContainersRemoved();
+ }
+ }
+
+ private boolean removeContainerFromBucket(ContainerID containerID, DatanodeID dnID) {
+ return datanodeBuckets.get(dnID).remove(containerID, dnID);
+ }
+
+ /**
+ * Number of pending container allocations for this datanode (union of current and previous
+ * windows). This call may advance the internal tumbling window if the roll interval has elapsed.
+ *
+ * @param node The DataNode
+ * @return Pending container count
+ */
+ public long getPendingContainerCount(DatanodeDetails node) {
+ Objects.requireNonNull(node, "node == null");
+ return datanodeBuckets.get(node).getCount();
+ }
+
+ /**
+ * Whether container is in the current or previous window for this datanode.
+ */
+ @VisibleForTesting
+ public boolean containsPendingContainer(DatanodeDetails node, ContainerID containerID) {
+ Objects.requireNonNull(node, "node == null");
+ Objects.requireNonNull(containerID, "containerID == null");
+ return datanodeBuckets.get(node).contains(containerID);
+ }
+
+ @VisibleForTesting
+ public SCMNodeMetrics getMetrics() {
+ return metrics;
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
index 0dfb1206bb41..51389b3bf1ac 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
@@ -50,6 +50,10 @@ public final class SCMNodeMetrics implements MetricsSource {
private @Metric MutableCounterLong numNodeCommandQueueReportProcessed;
private @Metric MutableCounterLong numNodeCommandQueueReportProcessingFailed;
private @Metric String textMetric;
+ // Pending container allocations at SCM (per-DN tracker), not yet on datanodes.
+ private @Metric MutableCounterLong numPendingContainersAdded;
+ private @Metric MutableCounterLong numPendingContainersRemoved;
+ private @Metric MutableCounterLong numSkippedFullNodeContainerAllocation;
private final MetricsRegistry registry;
private final NodeManagerMXBean managerMXBean;
@@ -124,6 +128,18 @@ void incNumNodeCommandQueueReportProcessingFailed() {
numNodeCommandQueueReportProcessingFailed.incr();
}
+ void incNumPendingContainersAdded() {
+ numPendingContainersAdded.incr();
+ }
+
+ void incNumPendingContainersRemoved() {
+ numPendingContainersRemoved.incr();
+ }
+
+ void incNumSkippedFullNodeContainerAllocation() {
+ numSkippedFullNodeContainerAllocation.incr();
+ }
+
/**
* Get aggregated counter and gauge metrics.
*/
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java
new file mode 100644
index 000000000000..00a0c565b2ca
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestPendingContainerTracker.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Tests for PendingContainerTracker.
+ */
+public class TestPendingContainerTracker {
+
+ private static final long MAX_CONTAINER_SIZE = 5L * 1024 * 1024 * 1024; // 5GB
+ private static final long DEFAULT_ROLL_INTERVAL_MS = 5 * 60 * 1000;
+ private static final int NUM_DATANODES = 1000;
+ private static final int NUM_PIPELINES = 1000;
+ private static final int NUM_CONTAINERS = 10_000;
+ private List datanodes;
+ private List pipelines;
+ private List containers;
+
+ private PendingContainerTracker tracker;
+ private Pipeline pipeline;
+ private DatanodeDetails dn1;
+ private DatanodeDetails dn2;
+
+ /** First three container IDs. */
+ private ContainerID container1;
+ private ContainerID container2;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ tracker = new PendingContainerTracker(MAX_CONTAINER_SIZE, DEFAULT_ROLL_INTERVAL_MS, null);
+
+ datanodes = new ArrayList<>(NUM_DATANODES);
+ for (int i = 0; i < NUM_DATANODES; i++) {
+ datanodes.add(MockDatanodeDetails.randomLocalDatanodeDetails());
+ }
+
+ pipelines = new ArrayList<>(NUM_PIPELINES);
+ for (int i = 0; i < NUM_PIPELINES; i++) {
+ pipelines.add(MockPipeline.createPipeline(Collections.singletonList(datanodes.get(i))));
+ }
+
+ containers = new ArrayList<>(NUM_CONTAINERS);
+ for (long id = 1; id <= NUM_CONTAINERS; id++) {
+ containers.add(ContainerID.valueOf(id));
+ }
+
+ pipeline = MockPipeline.createPipeline(datanodes.subList(0, 3));
+ dn1 = datanodes.get(0);
+ dn2 = datanodes.get(1);
+
+ container1 = containers.get(0);
+ container2 = containers.get(1);
+ }
+
+ @Test
+ public void testRecordPendingAllocation() {
+ // Allocate first 100 containers across first 100 pipelines (1 DN each)
+ for (int i = 0; i < 100; i++) {
+ tracker.recordPendingAllocation(pipelines.get(i), containers.get(i));
+ }
+
+ // Each of the first 100 DNs should have 1 pending container
+ for (int i = 0; i < 100; i++) {
+ assertEquals(1, tracker.getPendingContainerCount(datanodes.get(i)));
+ assertEquals(MAX_CONTAINER_SIZE,
+ tracker.getPendingContainerCount(datanodes.get(i)) * MAX_CONTAINER_SIZE);
+ }
+
+ // DNs beyond index 100 should have 0 pending
+ assertEquals(0, tracker.getPendingContainerCount(datanodes.get(500)));
+ assertEquals(0, tracker.getPendingContainerCount(datanodes.get(999)));
+ }
+
+ @Test
+ public void testRemovePendingAllocation() {
+ // Allocate containers 0-99 to first 100 pipelines
+ for (int i = 0; i < 100; i++) {
+ tracker.recordPendingAllocation(pipelines.get(i), containers.get(i));
+ }
+
+ // Remove from first 50 DNs
+ for (int i = 0; i < 50; i++) {
+ tracker.removePendingAllocation(datanodes.get(i), containers.get(i));
+ }
+
+ // First 50 DNs should have 0 pending
+ for (int i = 0; i < 50; i++) {
+ assertEquals(0, tracker.getPendingContainerCount(datanodes.get(i)));
+ }
+
+ // DNs 50-99 should still have 1 pending
+ for (int i = 50; i < 100; i++) {
+ assertEquals(1, tracker.getPendingContainerCount(datanodes.get(i)));
+ }
+ }
+
+ /**
+ * After one roll interval, pending entries move from currentWindow to previousWindow and remain
+ * visible. After a second roll (2× interval total), the old previousWindow is discarded and the
+ * container ages out if not confirmed.
+ */
+ @Test
+ @Timeout(30)
+ public void testTwoWindowRollAgesOutContainerAfterTwoIntervals() throws InterruptedException {
+ long rollMs = 200L;
+ PendingContainerTracker shortRollTracker =
+ new PendingContainerTracker(MAX_CONTAINER_SIZE, rollMs, null);
+
+ shortRollTracker.recordPendingAllocationForDatanode(dn1, container1);
+ assertEquals(1, shortRollTracker.getPendingContainerCount(dn1));
+ assertTrue(shortRollTracker.containsPendingContainer(dn1, container1));
+
+ // First roll: C1 moves from currentWindow to previousWindow; union still includes C1
+ Thread.sleep(rollMs + 80);
+ shortRollTracker.rollWindowsIfNeeded(dn1);
+ assertEquals(1, shortRollTracker.getPendingContainerCount(dn1));
+ assertTrue(shortRollTracker.containsPendingContainer(dn1, container1));
+
+ // Second roll: prior previousWindow (holding C1) is dropped; C1 is no longer pending
+ Thread.sleep(rollMs + 80);
+ shortRollTracker.rollWindowsIfNeeded(dn1);
+ assertEquals(0, shortRollTracker.getPendingContainerCount(dn1));
+ assertEquals(0L, shortRollTracker.getPendingContainerCount(dn1) * MAX_CONTAINER_SIZE);
+ }
+
+ @Test
+ public void testRemoveNonExistentContainer() {
+ tracker.recordPendingAllocation(pipeline, container1);
+
+ // Remove a container that was never added - should not throw exception
+ tracker.removePendingAllocation(dn1, container2);
+
+ // DN1 should still have container1
+ assertEquals(1, tracker.getPendingContainerCount(dn1));
+ }
+
+ @Test
+ public void testUnknownDatanodeHasZeroPendingCount() {
+ DatanodeDetails unknownDN = MockDatanodeDetails.randomDatanodeDetails();
+ assertEquals(0, tracker.getPendingContainerCount(unknownDN));
+ }
+
+ @Test
+ public void testConcurrentModification() throws InterruptedException {
+ // Test thread-safety by having multiple threads add/remove containers
+ final int numThreads = 10;
+ final int operationsPerThread = 100;
+
+ Thread[] threads = new Thread[numThreads];
+
+ for (int i = 0; i < numThreads; i++) {
+ final int threadId = i;
+ threads[i] = new Thread(() -> {
+ for (int j = 0; j < operationsPerThread; j++) {
+ ContainerID cid = ContainerID.valueOf(threadId * 1000L + j);
+ tracker.recordPendingAllocation(pipeline, cid);
+
+ if (j % 2 == 0) {
+ tracker.removePendingAllocation(dn1, cid);
+ }
+ }
+ });
+ }
+
+ // Start all threads
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ // Wait for all to finish
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ }
+
+ @Test
+ public void testBucketsRetainedWhenEmpty() {
+ tracker.recordPendingAllocation(pipeline, container1);
+
+ assertEquals(1, tracker.getPendingContainerCount(dn1));
+
+ // Remove the only pending container from DN1
+ tracker.removePendingAllocation(dn1, container1);
+
+ assertEquals(0, tracker.getPendingContainerCount(dn1));
+ assertEquals(1, tracker.getPendingContainerCount(dn2));
+
+ // Empty bucket for DN1 is still usable for new allocations
+ tracker.recordPendingAllocationForDatanode(dn1, container2);
+ assertEquals(1, tracker.getPendingContainerCount(dn1));
+ }
+
+ @Test
+ public void testRemoveFromBothWindows() {
+ // This test verifies that removal works from both current and previous windows
+ // In general, a container could be in previous window after a roll
+
+ // Add containers
+ tracker.recordPendingAllocation(pipeline, container1);
+ tracker.recordPendingAllocation(pipeline, container2);
+
+ assertEquals(2, tracker.getPendingContainerCount(dn1));
+
+ // Remove container1 - should work regardless of which window it's in
+ tracker.removePendingAllocation(dn1, container1);
+
+ assertEquals(1, tracker.getPendingContainerCount(dn1));
+
+ assertFalse(tracker.containsPendingContainer(dn1, container1));
+ assertTrue(tracker.containsPendingContainer(dn1, container2));
+ }
+
+ @Test
+ public void testManyContainersOnSingleDatanode() {
+ // Allocate first 1000 containers to the first datanode
+ DatanodeDetails dn = datanodes.get(0);
+ for (int i = 0; i < 1000; i++) {
+ tracker.recordPendingAllocationForDatanode(dn, containers.get(i));
+ }
+
+ assertEquals(1000, tracker.getPendingContainerCount(dn));
+ assertEquals(1000 * MAX_CONTAINER_SIZE, tracker.getPendingContainerCount(dn) * MAX_CONTAINER_SIZE);
+
+ // Verify specific containers are present
+ assertTrue(tracker.containsPendingContainer(dn, containers.get(0)));
+ assertTrue(tracker.containsPendingContainer(dn, containers.get(500)));
+ assertTrue(tracker.containsPendingContainer(dn, containers.get(999)));
+
+ // Remove half of them
+ for (int i = 0; i < 500; i++) {
+ tracker.removePendingAllocation(dn, containers.get(i));
+ }
+
+ assertEquals(500, tracker.getPendingContainerCount(dn));
+ assertFalse(tracker.containsPendingContainer(dn, containers.get(0)));
+ assertTrue(tracker.containsPendingContainer(dn, containers.get(999)));
+ }
+
+ @Test
+ public void testAllDatanodesWithMultipleContainers() {
+ // Allocate 10 containers to each of the 1000 datanodes
+ for (int dnIdx = 0; dnIdx < NUM_DATANODES; dnIdx++) {
+ DatanodeDetails dn = datanodes.get(dnIdx);
+ for (int cIdx = 0; cIdx < 10; cIdx++) {
+ int containerIdx = dnIdx * 10 + cIdx;
+ tracker.recordPendingAllocationForDatanode(dn, containers.get(containerIdx));
+ }
+ }
+
+ // Each DN should have 10 pending containers
+ for (int i = 0; i < NUM_DATANODES; i++) {
+ assertEquals(10, tracker.getPendingContainerCount(datanodes.get(i)));
+ assertEquals(10 * MAX_CONTAINER_SIZE,
+ tracker.getPendingContainerCount(datanodes.get(i)) * MAX_CONTAINER_SIZE);
+ }
+
+ // Remove all containers from every 10th DN
+ for (int dnIdx = 0; dnIdx < NUM_DATANODES; dnIdx += 10) {
+ DatanodeDetails dn = datanodes.get(dnIdx);
+ for (int cIdx = 0; cIdx < 10; cIdx++) {
+ int containerIdx = dnIdx * 10 + cIdx;
+ tracker.removePendingAllocation(dn, containers.get(containerIdx));
+ }
+ }
+
+ // Every 10th DN should have 0 pending
+ for (int i = 0; i < NUM_DATANODES; i += 10) {
+ assertEquals(0, tracker.getPendingContainerCount(datanodes.get(i)));
+ }
+
+ // Other DNs should still have 10 pending
+ assertEquals(10, tracker.getPendingContainerCount(datanodes.get(1)));
+ assertEquals(10, tracker.getPendingContainerCount(datanodes.get(15)));
+ assertEquals(10, tracker.getPendingContainerCount(datanodes.get(999)));
+ }
+
+ @Test
+ public void testIdempotentRecording() {
+ // Allocate same 100 containers multiple times to first 100 DNs
+ DatanodeDetails dn = datanodes.get(0);
+
+ for (int round = 0; round < 5; round++) {
+ for (int i = 0; i < 100; i++) {
+ tracker.recordPendingAllocationForDatanode(dn, containers.get(i));
+ }
+ }
+
+ // Should still only have 100 containers
+ assertEquals(100, tracker.getPendingContainerCount(dn));
+ }
+
+ @Test
+ public void testMultiVolumeAccumulatedSpaceIsNotEnough() {
+ DatanodeDetails dn = datanodes.get(0);
+ long containerSize = MAX_CONTAINER_SIZE;
+
+ List reports = new ArrayList<>();
+ reports.add(createStorageReport(dn, 100 * containerSize, containerSize / 4, 0));
+ reports.add(createStorageReport(dn, 100 * containerSize, containerSize / 4, 0));
+ reports.add(createStorageReport(dn, 100 * containerSize, containerSize / 2, 0));
+ DatanodeInfo dnInfo = new DatanodeInfo(dn, NodeStatus.inServiceHealthy(), null);
+ dnInfo.updateStorageReports(reports);
+
+ assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo));
+ }
+
+ @Test
+ public void testMultiVolumeWithPendingAllocation() {
+ DatanodeDetails dn = datanodes.get(0);
+ long containerSize = MAX_CONTAINER_SIZE;
+
+ // Remaining space available for 3 containers across all the volumes
+ tracker.recordPendingAllocationForDatanode(dn, containers.get(0));
+ tracker.recordPendingAllocationForDatanode(dn, containers.get(1));
+
+ List reports = new ArrayList<>();
+ reports.add(createStorageReport(dn, 100 * containerSize, containerSize, 0));
+ reports.add(createStorageReport(dn, 50 * containerSize, containerSize, 0));
+ reports.add(createStorageReport(dn, 100 * containerSize, containerSize, 0));
+ DatanodeInfo dnInfo = new DatanodeInfo(dn, NodeStatus.inServiceHealthy(), null);
+ dnInfo.updateStorageReports(reports);
+ // Remaining space available for 1 container across all the volume after 2 container allocation
+
+ assertTrue(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo));
+
+ tracker.recordPendingAllocationForDatanode(dn, containers.get(2));
+ // Remaining space available for 0 container across all the volume
+ assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo));
+ }
+
+ @Test
+ public void testMultiVolumeWithCommittedBytes() {
+ DatanodeDetails dn = datanodes.get(0);
+ long containerSize = MAX_CONTAINER_SIZE;
+
+ List reports = new ArrayList<>();
+ reports.add(createStorageReport(dn, 100 * containerSize, 6 * containerSize, 5 * containerSize));
+ reports.add(createStorageReport(dn, 50 * containerSize, 3 * containerSize, 3 * containerSize));
+ DatanodeInfo dnInfo = new DatanodeInfo(dn, NodeStatus.inServiceHealthy(), null);
+ dnInfo.updateStorageReports(reports);
+ // Remaining space available for 1 container across all the volume considering committed bytes
+
+ assertTrue(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo));
+ tracker.recordPendingAllocationForDatanode(dn, containers.get(0));
+ // Remaining space available for 0 container across all the volume considering
+ // committed bytes and container allocation
+ assertFalse(tracker.hasEffectiveAllocatableSpaceForNewContainer(dn, dnInfo));
+ }
+
+ private StorageReportProto createStorageReport(DatanodeDetails dn, long capacity, long remaining, long committed) {
+ return HddsTestUtils.createStorageReports(dn.getID(), capacity, remaining, committed).get(0);
+ }
+}