Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,8 @@
* The replication factor to be used while writing key into ozone.
*/
public enum ReplicationFactor {
ONE(1),
THREE(3);

/**
* Integer representation of replication.
*/
private int value;

/**
* Initializes ReplicationFactor with value.
* @param value replication value
*/
ReplicationFactor(int value) {
this.value = value;
}
ONE,
THREE;

/**
* Returns enum value corresponding to the int value.
Expand Down Expand Up @@ -85,16 +72,19 @@ public HddsProtos.ReplicationFactor toProto() {
case THREE:
return HddsProtos.ReplicationFactor.THREE;
default:
throw new IllegalArgumentException(
"Unsupported ProtoBuf replication factor: " + this);
throw new IllegalStateException("Unexpected enum value: " + this);
}
}

/**
* Returns integer representation of ReplicationFactor.
* @return replication value
*/
/** @return the number of replication(s). */
public int getValue() {
return value;
switch (this) {
case ONE:
return 1;
case THREE:
return 3;
default:
throw new IllegalStateException("Unexpected enum value: " + this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public List<DatanodeDetails> filterNodesWithSpace(List<DatanodeDetails> nodes,
int nodesRequired, long metadataSizeRequired, long dataSizeRequired)
throws SCMException {
List<DatanodeDetails> nodesWithSpace = nodes.stream().filter(d ->
hasEnoughSpace(d, metadataSizeRequired, dataSizeRequired, conf))
hasEnoughSpace(d, metadataSizeRequired, dataSizeRequired))
.collect(Collectors.toList());

if (nodesWithSpace.size() < nodesRequired) {
Expand All @@ -298,8 +298,7 @@ public List<DatanodeDetails> filterNodesWithSpace(List<DatanodeDetails> nodes,
*/
public static boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
long metadataSizeRequired,
long dataSizeRequired,
ConfigurationSource conf) {
long dataSizeRequired) {
Preconditions.checkArgument(datanodeDetails instanceof DatanodeInfo);

boolean enoughForData = false;
Expand Down Expand Up @@ -523,9 +522,7 @@ public boolean isValidNode(DatanodeDetails datanodeDetails,
return false;
}
NodeStatus nodeStatus = datanodeInfo.getNodeStatus();
if (nodeStatus.isNodeWritable() &&
(hasEnoughSpace(datanodeInfo, metadataSizeRequired,
dataSizeRequired, conf))) {
if (nodeStatus.isNodeWritable() && (hasEnoughSpace(datanodeInfo, metadataSizeRequired, dataSizeRequired))) {
LOG.debug("Datanode {} is chosen. Required metadata size is {} and " +
"required data size is {} and NodeStatus is {}",
datanodeDetails, metadataSizeRequired, dataSizeRequired, nodeStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ public class SCMNodeManager implements NodeManager {
private final SCMNodeMetrics metrics;
// Node manager MXBean
private ObjectName nmInfoBean;
private final OzoneConfiguration conf;
private final SCMStorageConfig scmStorageConfig;
private final NetworkTopology clusterMap;
private final Function<String, String> nodeResolver;
Expand All @@ -140,6 +139,7 @@ public class SCMNodeManager implements NodeManager {
private final SCMContext scmContext;
private final Map<SCMCommandProto.Type,
BiConsumer<DatanodeDetails, SCMCommand<?>>> sendCommandNotifyMap;
private final NonWritableNodeFilter nonWritableNodeFilter;

/**
* Lock used to synchronize some operation in Node manager to ensure a
Expand Down Expand Up @@ -176,7 +176,6 @@ public SCMNodeManager(
SCMContext scmContext,
HDDSLayoutVersionManager layoutVersionManager,
Function<String, String> nodeResolver) {
this.conf = conf;
this.scmNodeEventPublisher = eventPublisher;
this.nodeStateManager = new NodeStateManager(conf, eventPublisher,
layoutVersionManager, scmContext);
Expand All @@ -199,6 +198,7 @@ public SCMNodeManager(
this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
this.scmContext = scmContext;
this.sendCommandNotifyMap = new HashMap<>();
this.nonWritableNodeFilter = new NonWritableNodeFilter(conf);
}

@Override
Expand Down Expand Up @@ -1394,7 +1394,7 @@ private void nodeSpaceStatistics(Map<String, String> nodeStatics) {

private void nodeNonWritableStatistics(Map<String, String> nodeStatics) {
int nonWritableNodesCount = (int) getAllNodes().parallelStream()
.filter(new NonWritableNodeFilter(conf))
.filter(nonWritableNodeFilter)
.count();

nodeStatics.put("NonWritableNodes", String.valueOf(nonWritableNodesCount));
Expand All @@ -1405,7 +1405,6 @@ static class NonWritableNodeFilter implements Predicate<DatanodeInfo> {
private final long blockSize;
private final long minRatisVolumeSizeBytes;
private final long containerSize;
Comment on lines 1405 to 1407
Copy link
Contributor

@ptlrs ptlrs Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR we are now using a final filter instead of instantiating a new one.
Will the Ozone configurations mapping to these properties ever be modified dynamically?
If so, the instantiated filter won't get the updated values for these configurations.

Copy link
Contributor Author

@szetszwo szetszwo Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ptlrs , thanks for reviewing this!

There are final fields such as numPipelinesPerMetadataVolume in SCMNodeManager. It is consistent to make NonWritableNodeFilter final.

For reconf, SCM currently only supports reconf admins; see below. So, the filter cannot be modified dynamically.

reconfigurationHandler =
new ReconfigurationHandler("SCM", conf, this::checkAdminAccess)
.register(OZONE_ADMINISTRATORS, this::reconfOzoneAdmins)
.register(OZONE_READONLY_ADMINISTRATORS,
this::reconfOzoneReadOnlyAdmins);
reconfigurationHandler.setReconfigurationCompleteCallback(reconfigurationHandler.defaultLoggingCallback());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation @szetszwo.

private final ConfigurationSource conf;

NonWritableNodeFilter(ConfigurationSource conf) {
blockSize = (long) conf.getStorageSize(
Expand All @@ -1420,13 +1419,12 @@ static class NonWritableNodeFilter implements Predicate<DatanodeInfo> {
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
StorageUnit.BYTES);
this.conf = conf;
}

@Override
public boolean test(DatanodeInfo dn) {
return !dn.getNodeStatus().isNodeWritable()
|| (!hasEnoughSpace(dn, minRatisVolumeSizeBytes, containerSize, conf)
|| (!hasEnoughSpace(dn, minRatisVolumeSizeBytes, containerSize)
&& !hasEnoughCommittedVolumeSpace(dn));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,4 @@ private Pipeline createPipelineInternal(ECReplicationConfig repConfig,
protected void close(Pipeline pipeline) throws IOException {
}

@Override
protected void shutdown() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,6 @@ public void close(ReplicationType type, Pipeline pipeline)
providers.get(type).close(pipeline);
}

public void shutdown() {
providers.values().forEach(provider -> provider.shutdown());
}

@VisibleForTesting
public Map<ReplicationType, PipelineProvider> getProviders() {
return providers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) {
if (!(node instanceof DatanodeInfo)) {
node = nodeManager.getDatanodeInfo(node);
}
if (!SCMCommonPlacementPolicy.hasEnoughSpace(node, 0, containerSize, null)) {
if (!SCMCommonPlacementPolicy.hasEnoughSpace(node, 0, containerSize)) {
return false;
}
}
Expand Down Expand Up @@ -842,8 +842,6 @@ public void close() throws IOException {

SCMPipelineMetrics.unRegister();

// shutdown pipeline provider.
pipelineFactory.shutdown();
try {
stateManager.close();
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
Expand Down Expand Up @@ -81,18 +80,14 @@ protected abstract Pipeline createForRead(

protected abstract void close(Pipeline pipeline) throws IOException;

protected abstract void shutdown();

List<DatanodeDetails> pickNodesNotUsed(REPLICATION_CONFIG replicationConfig,
long metadataSizeRequired,
long dataSizeRequired,
ConfigurationSource conf)
long dataSizeRequired)
throws SCMException {
int nodesRequired = replicationConfig.getRequiredNodes();
List<DatanodeDetails> healthyDNs = pickAllNodesNotUsed(replicationConfig);
List<DatanodeDetails> healthyDNsWithSpace = healthyDNs.stream()
.filter(dn -> SCMCommonPlacementPolicy
.hasEnoughSpace(dn, metadataSizeRequired, dataSizeRequired, conf))
.filter(dn -> SCMCommonPlacementPolicy.hasEnoughSpace(dn, metadataSizeRequired, dataSizeRequired))
.limit(nodesRequired)
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ public class RatisPipelineProvider
private static final Logger LOG =
LoggerFactory.getLogger(RatisPipelineProvider.class);

private final ConfigurationSource conf;
private final EventPublisher eventPublisher;
private final PlacementPolicy placementPolicy;
private int pipelineNumberLimit;
private int maxPipelinePerDatanode;
private final int pipelineNumberLimit;
private final int maxPipelinePerDatanode;
private final LeaderChoosePolicy leaderChoosePolicy;
private final SCMContext scmContext;
private final long containerSizeBytes;
Expand All @@ -74,7 +73,6 @@ public RatisPipelineProvider(NodeManager nodeManager,
EventPublisher eventPublisher,
SCMContext scmContext) {
super(nodeManager, stateManager);
this.conf = conf;
this.eventPublisher = eventPublisher;
this.scmContext = scmContext;
this.placementPolicy = PipelinePlacementPolicyFactory
Expand All @@ -85,11 +83,11 @@ public RatisPipelineProvider(NodeManager nodeManager,
String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
this.maxPipelinePerDatanode = dnLimit == null ? 0 :
Integer.parseInt(dnLimit);
this.containerSizeBytes = (long) this.conf.getStorageSize(
this.containerSizeBytes = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
StorageUnit.BYTES);
this.minRatisVolumeSizeBytes = (long) this.conf.getStorageSize(
this.minRatisVolumeSizeBytes = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN,
ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN_DEFAULT,
StorageUnit.BYTES);
Expand Down Expand Up @@ -158,14 +156,12 @@ public synchronized Pipeline create(RatisReplicationConfig replicationConfig,
);
}

List<DatanodeDetails> dns;

final List<DatanodeDetails> dns;
final ReplicationFactor factor =
replicationConfig.getReplicationFactor();
switch (factor) {
case ONE:
dns = pickNodesNotUsed(replicationConfig, minRatisVolumeSizeBytes,
containerSizeBytes, conf);
dns = pickNodesNotUsed(replicationConfig, minRatisVolumeSizeBytes, containerSizeBytes);
break;
case THREE:
List<DatanodeDetails> excludeDueToEngagement = filterPipelineEngagement();
Expand Down Expand Up @@ -251,10 +247,6 @@ private List<DatanodeDetails> filterPipelineEngagement() {
return excluded;
}

@Override
public void shutdown() {
}

/**
* Removes pipeline from SCM. Sends command to destroy pipeline on all
* the datanodes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,4 @@ public void close(Pipeline pipeline) throws IOException {

}

@Override
public void shutdown() {
// Do nothing.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,6 @@ public static void markPipelineHealthy(Pipeline pipeline)
pipeline.setLeaderId(pipeline.getFirstNode().getID());
}

@Override
public void shutdown() {
// Do nothing.
}

@Override
public Pipeline create(RatisReplicationConfig replicationConfig,
List<DatanodeDetails> nodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,5 @@ public void close(Pipeline pipeline) {
// Do nothing in Recon.
}

@Override
public void shutdown() {
// Do nothing
}
}
}