Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
Expand Down Expand Up @@ -79,8 +77,6 @@ public class ContainerManagerImpl implements ContainerManager {
@SuppressWarnings("java:S2245") // no need for secure random
private final Random random = new Random();

private final long maxContainerSize;

/**
*
*/
Expand All @@ -106,9 +102,6 @@ public ContainerManagerImpl(
.setContainerReplicaPendingOps(containerReplicaPendingOps)
.build();

maxContainerSize = (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);

this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
}

Expand Down Expand Up @@ -245,9 +238,8 @@ private ContainerInfo createContainer(Pipeline pipeline, String owner)
private ContainerInfo allocateContainer(final Pipeline pipeline,
final String owner)
throws IOException {
if (!pipelineManager.hasEnoughSpace(pipeline, maxContainerSize)) {
LOG.debug("Cannot allocate a new container because pipeline {} does not have the required space {}.",
pipeline, maxContainerSize);
if (!pipelineManager.hasEnoughSpace(pipeline)) {
LOG.debug("Cannot allocate a new container because pipeline {} does not have enough space.", pipeline);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.node.PendingContainerTracker.TwoWindowBucket;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,6 +51,11 @@ public class DatanodeInfo extends DatanodeDetails {
private long lastStatsUpdatedTime;
private int failedVolumeCount;

/**
* Two-window tumbling bucket for tracking pending container allocations on this datanode.
*/
private final TwoWindowBucket pendingContainerAllocations;

private List<StorageReportProto> storageReports;
private List<MetadataStorageReportProto> metadataStorageReports;
private LayoutVersionProto lastKnownLayoutVersion;
Expand All @@ -64,7 +70,7 @@ public class DatanodeInfo extends DatanodeDetails {
* @param layoutInfo Details about the LayoutVersionProto
*/
public DatanodeInfo(DatanodeDetails datanodeDetails, NodeStatus nodeStatus,
LayoutVersionProto layoutInfo) {
LayoutVersionProto layoutInfo, long containerRollIntervalMs) {
super(datanodeDetails);
this.lock = new ReentrantReadWriteLock();
this.lastHeartbeatTime = Time.monotonicNow();
Expand All @@ -75,6 +81,7 @@ public DatanodeInfo(DatanodeDetails datanodeDetails, NodeStatus nodeStatus,
this.nodeStatus = nodeStatus;
this.metadataStorageReports = Collections.emptyList();
this.commandCounts = new HashMap<>();
this.pendingContainerAllocations = new TwoWindowBucket(this.getID(), containerRollIntervalMs);
}

/**
Expand Down Expand Up @@ -353,6 +360,14 @@ public int getCommandCount(SCMCommandProto.Type cmd) {
}
}

/**
* Returns the {@link TwoWindowBucket} for this datanode.
*/
public TwoWindowBucket getPendingContainerAllocations() {
pendingContainerAllocations.rollIfNeeded();
return pendingContainerAllocations;
}

@Override
public int hashCode() {
return super.hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,19 @@ default int getAllNodeCount() {
@Nullable
DatanodeInfo getDatanodeInfo(DatanodeDetails dn);

/**
* True if the node can accept another container of the given size.
*/
boolean hasSpaceForNewContainerAllocation(DatanodeID datanodeID);

/**
* Records a pending container allocation for a single DataNode identified by its ID.
*
* @param datanodeID the ID of the DataNode receiving the allocation
* @param containerID the container being allocated
*/
void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID);

/**
* Return the node stat of the specified datanode.
* @param datanodeDetails DatanodeDetails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ public class NodeStateManager implements Runnable, Closeable {
*/
private final long deadNodeIntervalMs;

private final long containerRollIntervalMs = 5 * 60 * 1000; //TODO

/**
* The future is used to pause/unpause the scheduled checks.
*/
Expand Down Expand Up @@ -310,7 +312,7 @@ public void addNode(DatanodeDetails datanodeDetails,

private DatanodeInfo newDatanodeInfo(DatanodeDetails datanode, LayoutVersionProto layout) {
final NodeStatus status = newNodeStatus(datanode, layout);
return new DatanodeInfo(datanode, status, layout);
return new DatanodeInfo(datanode, status, layout, containerRollIntervalMs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@

package org.apache.hadoop.hdds.scm.node;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.container.common.volume.VolumeUsage;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
Expand Down Expand Up @@ -70,8 +66,6 @@ public class PendingContainerTracker {

private static final Logger LOG = LoggerFactory.getLogger(PendingContainerTracker.class);

private final DatanodeBuckets datanodeBuckets;

/**
* Maximum container size in bytes.
*/
Expand All @@ -86,13 +80,15 @@ public class PendingContainerTracker {
* Two-window bucket for a single DataNode.
* Contains current and previous window sets, plus last roll timestamp.
*/
private static class TwoWindowBucket {
public static class TwoWindowBucket {
private Set<ContainerID> currentWindow = new HashSet<>();
private Set<ContainerID> previousWindow = new HashSet<>();
private long lastRollTime = Time.monotonicNow();
private final long rollIntervalMs;
private final DatanodeID datanodeID;

TwoWindowBucket(long rollIntervalMs) {
TwoWindowBucket(DatanodeID id, long rollIntervalMs) {
this.datanodeID = id;
this.rollIntervalMs = rollIntervalMs;
}

Expand Down Expand Up @@ -127,22 +123,22 @@ synchronized boolean contains(ContainerID containerID) {
/**
* Add container to current window.
*/
synchronized boolean add(ContainerID containerID, DatanodeID dnID) {
synchronized boolean add(ContainerID containerID) {
boolean added = currentWindow.add(containerID);
LOG.debug("Recorded pending container {} on DataNode {}. Added={}, Total pending={}",
containerID, dnID, added, getCount());
containerID, datanodeID, added, getCount());
return added;
}

/**
* Remove container from both windows.
*/
synchronized boolean remove(ContainerID containerID, DatanodeID dnID) {
synchronized boolean remove(ContainerID containerID) {
boolean removedFromCurrent = currentWindow.remove(containerID);
boolean removedFromPrevious = previousWindow.remove(containerID);
boolean removed = removedFromCurrent || removedFromPrevious;
LOG.debug("Removed pending container {} from DataNode {}. Removed={}, Remaining={}",
containerID, dnID, removed, getCount());
containerID, datanodeID, removed, getCount());
return removed;
}

Expand All @@ -154,63 +150,25 @@ synchronized int getCount() {
}
}

/**
* Per-datanode two-window buckets.
*/
private static class DatanodeBuckets {
private final ConcurrentHashMap<DatanodeID, TwoWindowBucket> map = new ConcurrentHashMap<>();
private final long rollIntervalMs;

DatanodeBuckets(long rollIntervalMs) {
this.rollIntervalMs = rollIntervalMs;
}

TwoWindowBucket get(DatanodeID id) {
final TwoWindowBucket bucket = map.compute(id, (k, b) -> b != null ? b : new TwoWindowBucket(rollIntervalMs));
bucket.rollIfNeeded();
return bucket;
}

TwoWindowBucket get(DatanodeDetails dn) {
Objects.requireNonNull(dn, "dn == null");
return get(dn.getID());
}
}

public PendingContainerTracker(long maxContainerSize, long rollIntervalMs,
SCMNodeMetrics metrics) {
this.datanodeBuckets = new DatanodeBuckets(rollIntervalMs);
public PendingContainerTracker(long maxContainerSize, long rollIntervalMs, SCMNodeMetrics metrics) {
this.maxContainerSize = maxContainerSize;
this.metrics = metrics;
LOG.info("PendingContainerTracker initialized with maxContainerSize={}B, rollInterval={}ms",
maxContainerSize, rollIntervalMs);
}

/**
* Advances the two-window tumbling bucket for this datanode when the roll interval has elapsed.
* Call on periodic paths (node report) so windows age even when there are no new
* allocations or container reports touching this tracker.
*/
public void rollWindowsIfNeeded(DatanodeDetails node) {
Objects.requireNonNull(node, "node == null");
datanodeBuckets.get(node.getID());
}

/**
* Whether the datanode can fit another container of {@link #maxContainerSize} after accounting for
* SCM pending allocations for {@code node} (this tracker) and usable space across volumes on
* {@code datanodeInfo}. Pending bytes are {@link #getPendingContainerCount} × {@code maxContainerSize};
* {@code datanodeInfo}. Pending bytes are count × {@code maxContainerSize};
* effective allocatable space sums full-container slots per storage report.
*
* @param node identity used to look up pending allocations (same DN as {@code datanodeInfo})
* @param datanodeInfo storage reports for the datanode
*/
public boolean hasEffectiveAllocatableSpaceForNewContainer(
DatanodeDetails node, DatanodeInfo datanodeInfo) {
Objects.requireNonNull(node, "node == null");
public boolean hasEffectiveAllocatableSpaceForNewContainer(DatanodeInfo datanodeInfo) {
Objects.requireNonNull(datanodeInfo, "datanodeInfo == null");

long pendingAllocationSize = getPendingContainerCount(node) * maxContainerSize;
long pendingAllocationSize = datanodeInfo.getPendingContainerAllocations().getCount() * maxContainerSize;
List<StorageReportProto> storageReports = datanodeInfo.getStorageReports();
Objects.requireNonNull(storageReports, "storageReports == null");
if (storageReports.isEmpty()) {
Expand All @@ -231,93 +189,37 @@ public boolean hasEffectiveAllocatableSpaceForNewContainer(
return false;
}

/**
* Record a pending container allocation for all DataNodes in the pipeline.
* Container is added to the current window.
*
* @param pipeline The pipeline where container is allocated
* @param containerID The container being allocated
*/
public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) {
Objects.requireNonNull(pipeline, "pipeline == null");
Objects.requireNonNull(containerID, "containerID == null");

for (DatanodeDetails node : pipeline.getNodes()) {
recordPendingAllocationForDatanode(node, containerID);
}
}

/**
* Record a pending container allocation for a single DataNode.
* Container is added to the current window.
*
* @param node The DataNode where container is being allocated/replicated
* @param containerID The container being allocated/replicated
*/
public void recordPendingAllocationForDatanode(DatanodeDetails node, ContainerID containerID) {
Objects.requireNonNull(node, "node == null");
public void recordPendingAllocationForDatanode(DatanodeInfo datanodeInfo, ContainerID containerID) {
Objects.requireNonNull(containerID, "containerID == null");

DatanodeID dnID = node.getID();
boolean added = addContainerToBucket(containerID, dnID);

if (datanodeInfo == null) {
return;
}
final boolean added = datanodeInfo.getPendingContainerAllocations().add(containerID);
if (added && metrics != null) {
metrics.incNumPendingContainersAdded();
}
}

private boolean addContainerToBucket(ContainerID containerID, DatanodeID dnID) {
return datanodeBuckets.get(dnID).add(containerID, dnID);
}

/**
* Remove a pending container allocation from a specific DataNode.
* Removes from both current and previous windows.
* Called when container is confirmed.
*
* @param node The DataNode
* @param containerID The container to remove from pending
*/
public void removePendingAllocation(DatanodeDetails node, ContainerID containerID) {
Objects.requireNonNull(node, "node == null");
public void removePendingAllocation(TwoWindowBucket bucket, ContainerID containerID) {
Objects.requireNonNull(containerID, "containerID == null");

DatanodeID dnID = node.getID();
boolean removed = removeContainerFromBucket(containerID, dnID);
boolean removed = bucket.remove(containerID);

if (removed && metrics != null) {
metrics.incNumPendingContainersRemoved();
}
}

private boolean removeContainerFromBucket(ContainerID containerID, DatanodeID dnID) {
return datanodeBuckets.get(dnID).remove(containerID, dnID);
}

/**
* Number of pending container allocations for this datanode (union of current and previous
* windows). This call may advance the internal tumbling window if the roll interval has elapsed.
*
* @param node The DataNode
* @return Pending container count
*/
public long getPendingContainerCount(DatanodeDetails node) {
Objects.requireNonNull(node, "node == null");
return datanodeBuckets.get(node).getCount();
}

/**
* Whether container is in the current or previous window for this datanode.
*/
@VisibleForTesting
public boolean containsPendingContainer(DatanodeDetails node, ContainerID containerID) {
Objects.requireNonNull(node, "node == null");
Objects.requireNonNull(containerID, "containerID == null");
return datanodeBuckets.get(node).contains(containerID);
}

@VisibleForTesting
public SCMNodeMetrics getMetrics() {
return metrics;
}
}
Loading