Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename maintenance mode to decommission #7154

Merged
merged 15 commits into from Mar 9, 2019
Merged
8 changes: 4 additions & 4 deletions docs/content/configuration/index.md
Expand Up @@ -783,8 +783,8 @@ A sample Coordinator dynamic config JSON object is shown below:
"replicationThrottleLimit": 10,
"emitBalancingStats": false,
"killDataSourceWhitelist": ["wikipedia", "testDatasource"],
"historicalNodesInMaintenance": ["localhost:8182", "localhost:8282"],
"nodesInMaintenancePriority": 7
"decommissionNodes": ["localhost:8182", "localhost:8282"],
"decommissionPriority": 7
Copy link
Member

@leventov leventov Feb 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parhaps better to call it "decommissionVelocity" as @drcrallen suggested here: #6349 (comment) because "priority" is a super overloaded term.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me, though I think the properties should be consistent named so maybe I'll either rename the other decommissioningNodes or call this one decommissionVelocity to match. Anyone have a preference?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to decommissioningNodes and decommissioningVelocity

}
```

Expand All @@ -804,8 +804,8 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false|
|`killPendingSegmentsSkipList`|List of dataSources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated dataSources or a JSON array.|none|
|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 0 (loading queue is unbounded) |0|
|`historicalNodesInMaintenance`| List of Historical nodes in maintenance mode. Coordinator doesn't assign new segments on those nodes and moves segments from the nodes according to a specified priority.|none|
|`nodesInMaintenancePriority`| Priority of segments from servers in maintenance. Coordinator takes ceil(maxSegmentsToMove * (priority / 10)) from servers in maitenance during balancing phase, i.e.:<br>0 - no segments from servers in maintenance will be processed during balancing<br>5 - 50% segments from servers in maintenance<br>10 - 100% segments from servers in maintenance<br>By leveraging the priority an operator can prevent general nodes from overload or decrease maitenance time instead.|7|
|`decommissionNodes`| List of 'decommissioned' historical nodes. The Coordinator doesn't assign new segments to these nodes and moves segments from the nodes at the rate specified by `decommissionPriority`.|none|
|`decommissionPriority`| Priority of how many 'move' operations will be spent towards 'decommissioning' servers by moving segments from them to non-decommissioned servers, instead of 'balancing' segments between servers. Coordinator takes ceil(maxSegmentsToMove * (priority / 10)) from servers in maitenance during balancing phase, i.e.:<br>0 - no segments from decommissioned servers will be processed during balancing<br>5 - 50% segments from decommissioned servers<br>10 - 100% segments from decommissioned servers<br>By leveraging the priority an operator can prevent general nodes from overload or decrease decommissioning time instead.|7|

To view the audit history of Coordinator dynamic config issue a GET request to the URL -

Expand Down
Expand Up @@ -56,8 +56,8 @@ public class CoordinatorDynamicConfig
private final boolean emitBalancingStats;
private final boolean killAllDataSources;
private final Set<String> killableDataSources;
private final Set<String> historicalNodesInMaintenance;
private final int nodesInMaintenancePriority;
private final Set<String> decommissionNodes;
private final int decommissionPriority;

// The pending segments of the dataSources in this list are not killed.
private final Set<String> protectedPendingSegmentDatasources;
Expand Down Expand Up @@ -88,8 +88,8 @@ public CoordinatorDynamicConfig(
@JsonProperty("killAllDataSources") boolean killAllDataSources,
@JsonProperty("killPendingSegmentsSkipList") Object protectedPendingSegmentDatasources,
@JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue,
@JsonProperty("historicalNodesInMaintenance") Object historicalNodesInMaintenance,
@JsonProperty("nodesInMaintenancePriority") int nodesInMaintenancePriority
@JsonProperty("decommissionNodes") Object decommissionNodes,
@JsonProperty("decommissionPriority") int decommissionPriority
)
{
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
Expand All @@ -104,12 +104,12 @@ public CoordinatorDynamicConfig(
this.killableDataSources = parseJsonStringOrArray(killableDataSources);
this.protectedPendingSegmentDatasources = parseJsonStringOrArray(protectedPendingSegmentDatasources);
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
this.historicalNodesInMaintenance = parseJsonStringOrArray(historicalNodesInMaintenance);
this.decommissionNodes = parseJsonStringOrArray(decommissionNodes);
Preconditions.checkArgument(
nodesInMaintenancePriority >= 0 && nodesInMaintenancePriority <= 10,
"nodesInMaintenancePriority should be in range [0, 10]"
decommissionPriority >= 0 && decommissionPriority <= 10,
"decommissionPriority should be in range [0, 10]"
);
this.nodesInMaintenancePriority = nodesInMaintenancePriority;
this.decommissionPriority = decommissionPriority;

if (this.killAllDataSources && !this.killableDataSources.isEmpty()) {
throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist");
Expand Down Expand Up @@ -231,32 +231,33 @@ public int getMaxSegmentsInNodeLoadingQueue()
}

/**
* Historical nodes list in maintenance mode. Coordinator doesn't assign new segments on those nodes and moves
* List of historical nodes to 'decommission'. Coordinator doesn't assign new segments on those nodes and moves
* segments from those nodes according to a specified priority.
*
* @return list of host:port entries
*/
@JsonProperty
public Set<String> getHistoricalNodesInMaintenance()
public Set<String> getDecommissionNodes()
{
return historicalNodesInMaintenance;
return decommissionNodes;
}

/**
* Priority of segments from servers in maintenance. Coordinator takes ceil(maxSegmentsToMove * (priority / 10))
* from servers in maitenance during balancing phase, i.e.:
* 0 - no segments from servers in maintenance will be processed during balancing
* 5 - 50% segments from servers in maintenance
* 10 - 100% segments from servers in maintenance
* By leveraging the priority an operator can prevent general nodes from overload or decrease maitenance time
* Priority of how many of 'move' operations will be spent towards 'decommissioning' servers by moving segments from
* them to non-decommissioned servers, instead of 'balancing' segments between servers.
* Coordinator takes ceil(maxSegmentsToMove * (priority / 10)) from servers in maitenance during balancing phase:
* 0 - no segments from decommissioned servers will be processed during balancing
* 5 - 50% segments from decommissioned servers
* 10 - 100% segments from decommissioned servers
* By leveraging the priority an operator can prevent general nodes from overload or decrease 'decommissioning' time
* instead.
*
* @return number in range [0, 10]
*/
@JsonProperty
public int getNodesInMaintenancePriority()
public int getDecommissionPriority()
{
return nodesInMaintenancePriority;
return decommissionPriority;
}

@Override
Expand All @@ -275,8 +276,8 @@ public String toString()
", killDataSourceWhitelist=" + killableDataSources +
", protectedPendingSegmentDatasources=" + protectedPendingSegmentDatasources +
", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue +
", historicalNodesInMaintenance=" + historicalNodesInMaintenance +
", nodesInMaintenancePriority=" + nodesInMaintenancePriority +
", decommissionNodes=" + decommissionNodes +
", decommissionPriority=" + decommissionPriority +
'}';
}

Expand Down Expand Up @@ -328,10 +329,10 @@ public boolean equals(Object o)
if (!Objects.equals(protectedPendingSegmentDatasources, that.protectedPendingSegmentDatasources)) {
return false;
}
if (!Objects.equals(historicalNodesInMaintenance, that.historicalNodesInMaintenance)) {
if (!Objects.equals(decommissionNodes, that.decommissionNodes)) {
return false;
}
return nodesInMaintenancePriority == that.nodesInMaintenancePriority;
return decommissionPriority == that.decommissionPriority;
}

@Override
Expand All @@ -350,8 +351,8 @@ public int hashCode()
maxSegmentsInNodeLoadingQueue,
killableDataSources,
protectedPendingSegmentDatasources,
historicalNodesInMaintenance,
nodesInMaintenancePriority
decommissionNodes,
decommissionPriority
);
}

Expand All @@ -372,7 +373,7 @@ public static class Builder
private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
private static final boolean DEFAULT_KILL_ALL_DATA_SOURCES = false;
private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0;
private static final int DEFAULT_MAINTENANCE_MODE_SEGMENTS_PRIORITY = 7;
private static final int DEFAULT_DECOMMISSIONING_PRIORITY = 7;

private Long millisToWaitBeforeDeleting;
private Long mergeBytesLimit;
Expand All @@ -386,8 +387,8 @@ public static class Builder
private Boolean killAllDataSources;
private Object killPendingSegmentsSkipList;
private Integer maxSegmentsInNodeLoadingQueue;
private Object maintenanceList;
private Integer maintenanceModeSegmentsPriority;
private Object decommissionNodes;
private Integer decommissionPriority;

public Builder()
{
Expand All @@ -407,8 +408,8 @@ public Builder(
@JsonProperty("killAllDataSources") @Nullable Boolean killAllDataSources,
@JsonProperty("killPendingSegmentsSkipList") @Nullable Object killPendingSegmentsSkipList,
@JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue,
@JsonProperty("historicalNodesInMaintenance") @Nullable Object maintenanceList,
@JsonProperty("nodesInMaintenancePriority") @Nullable Integer maintenanceModeSegmentsPriority
@JsonProperty("decommissionNodes") @Nullable Object decommissionNodes,
@JsonProperty("decommissionPriority") @Nullable Integer decommissionPriority
)
{
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
Expand All @@ -423,8 +424,8 @@ public Builder(
this.killableDataSources = killableDataSources;
this.killPendingSegmentsSkipList = killPendingSegmentsSkipList;
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
this.maintenanceList = maintenanceList;
this.maintenanceModeSegmentsPriority = maintenanceModeSegmentsPriority;
this.decommissionNodes = decommissionNodes;
this.decommissionPriority = decommissionPriority;
}

public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
Expand Down Expand Up @@ -493,15 +494,15 @@ public Builder withMaxSegmentsInNodeLoadingQueue(int maxSegmentsInNodeLoadingQue
return this;
}

public Builder withMaintenanceList(Set<String> list)
public Builder withDecommissionNodes(Set<String> decommissioned)
{
this.maintenanceList = list;
this.decommissionNodes = decommissioned;
return this;
}

public Builder withMaintenanceModeSegmentsPriority(Integer priority)
public Builder withDecommissionPriority(Integer priority)
{
this.maintenanceModeSegmentsPriority = priority;
this.decommissionPriority = priority;
return this;
}

Expand All @@ -522,10 +523,10 @@ public CoordinatorDynamicConfig build()
maxSegmentsInNodeLoadingQueue == null
? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
: maxSegmentsInNodeLoadingQueue,
maintenanceList,
maintenanceModeSegmentsPriority == null
? DEFAULT_MAINTENANCE_MODE_SEGMENTS_PRIORITY
: maintenanceModeSegmentsPriority
decommissionNodes,
decommissionPriority == null
? DEFAULT_DECOMMISSIONING_PRIORITY
: decommissionPriority
);
}

Expand All @@ -548,10 +549,10 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
maxSegmentsInNodeLoadingQueue == null
? defaults.getMaxSegmentsInNodeLoadingQueue()
: maxSegmentsInNodeLoadingQueue,
maintenanceList == null ? defaults.getHistoricalNodesInMaintenance() : maintenanceList,
maintenanceModeSegmentsPriority == null
? defaults.getNodesInMaintenancePriority()
: maintenanceModeSegmentsPriority
decommissionNodes == null ? defaults.getDecommissionNodes() : decommissionNodes,
decommissionPriority == null
? defaults.getDecommissionPriority()
: decommissionPriority
);
}
}
Expand Down
Expand Up @@ -694,7 +694,7 @@ public CoordinatorHistoricalManagerRunnable(final int startingLeaderCounter)
}

// Find all historical servers, group them by subType and sort by ascending usage
Set<String> nodesInMaintenance = params.getCoordinatorDynamicConfig().getHistoricalNodesInMaintenance();
Set<String> decommissioned = params.getCoordinatorDynamicConfig().getDecommissionNodes();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested "decommissioningNodes"

final DruidCluster cluster = new DruidCluster();
for (ImmutableDruidServer server : servers) {
if (!loadManagementPeons.containsKey(server.getName())) {
Expand All @@ -709,7 +709,7 @@ public CoordinatorHistoricalManagerRunnable(final int startingLeaderCounter)
new ServerHolder(
server,
loadManagementPeons.get(server.getName()),
nodesInMaintenance.contains(server.getHost())
decommissioned.contains(server.getHost())
)
);
}
Expand Down
Expand Up @@ -32,18 +32,18 @@ public class ServerHolder implements Comparable<ServerHolder>
private static final Logger log = new Logger(ServerHolder.class);
private final ImmutableDruidServer server;
private final LoadQueuePeon peon;
private final boolean inMaintenance;
private final boolean isDecommissioned;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested "isDecommissioning" or "beingDecommissioned"


public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon)
{
this(server, peon, false);
}

public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon, boolean inMaintenance)
public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon, boolean isDecommissioned)
{
this.server = server;
this.peon = peon;
this.inMaintenance = inMaintenance;
this.isDecommissioned = isDecommissioned;
}

public ImmutableDruidServer getServer()
Expand Down Expand Up @@ -82,14 +82,14 @@ public double getPercentUsed()
}

/**
* Historical nodes can be placed in maintenance mode, which instructs Coordinator to move segments from them
* according to a specified priority. The mechanism allows to drain segments from nodes which are planned for
* Historical nodes can be 'decommissioned', which instructs Coordinator to move segments from them
* according to a specified priority. The mechanism allows draining segments from nodes which are planned for
* replacement.
* @return true if the node is in maitenance mode
* @return true if the node is being decommissioned
*/
public boolean isInMaintenance()
public boolean isDecommissioned()
{
return inMaintenance;
return isDecommissioned;
}

public long getAvailableSize()
Expand Down
Expand Up @@ -107,25 +107,25 @@ private void balanceTier(
}

/*
Take as much segments from maintenance servers as priority allows and find the best location for them on
Take as many segments from decommissioned servers as priority allows and find the best location for them on
available servers. After that, balance segments within available servers pool.
*/
Map<Boolean, List<ServerHolder>> partitions =
servers.stream().collect(Collectors.partitioningBy(ServerHolder::isInMaintenance));
final List<ServerHolder> maintenanceServers = partitions.get(true);
servers.stream().collect(Collectors.partitioningBy(ServerHolder::isDecommissioned));
final List<ServerHolder> decommssionedServers = partitions.get(true);
final List<ServerHolder> availableServers = partitions.get(false);
log.info(
"Found %d servers in maintenance, %d available servers servers",
maintenanceServers.size(),
"Found %d decomissioned servers, %d available servers servers",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"%d decommissioning servers, % active servers"

  • word "servers" is duplicated
  • perhaps "active" is a better term in the course of this method, because servers to be decomissioned are available (in distributed systems terms) too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to 'active'

decommssionedServers.size(),
availableServers.size()
);

if (maintenanceServers.isEmpty()) {
if (decommssionedServers.isEmpty()) {
if (availableServers.size() <= 1) {
log.info("[%s]: %d available servers servers found. Cannot balance.", tier, availableServers.size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"servers" duplicated

}
} else if (availableServers.isEmpty()) {
log.info("[%s]: no available servers servers found during maintenance. Cannot balance.", tier);
log.info("[%s]: no available servers servers found during decommissioning. Cannot balance.", tier);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested to change the message to something like "no active servers found, segments can't be moved off %d decommissioning servers"

  • Maybe it should be warn(), not info()?
  • Seems that this logging statement and the statement at line 125 duplicate the statement at line 117. Maybe this part of code should be restructured.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"no active servers found, segments can't be moved off %d decommissioning servers" leads me to the question: should we really give up decommission in this case? Maybe if segments are available on historical nodes from other tiers (or from the same tier, on other decommissioning servers or on full active servers -- see below), or actually always, we should continue decommission by just dropping segments from the decommissioning servers and not moving them anywhere?

Also this condition doesn't cover the situation when there are active historical nodes in the tier, but they are either full, or their load queuess have maxSegmentsInNodeLoadingQueue elements (loading segments). (Note that in the latter case, balancing step won't be skipped entirely in the if (!currentlyMovingSegments.get(tier).isEmpty()) { branch above in the code, because historicals' segment loading queues may be filled up by LoadRule, because LoadRule uses an independent mechanism to implement the "making a balancing burst and then letting all segments to be loaded before the next balacing burst" pattern, ReplicationThrottler).

Copy link
Member

@leventov leventov Feb 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I elaborated on the problems related to the independence of DruidCoordinatorBalancer and LoadRule in #7159.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"no active servers found, segments can't be moved off %d decommissioning servers" leads me to the question: should we really give up decommission in this case? Maybe if segments are available on historical nodes from other tiers (or from the same tier, on other decommissioning servers or on full active servers -- see below), or actually always, we should continue decommission by just dropping segments from the decommissioning servers and not moving them anywhere?

@egor-ryashin, what's your take on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leventov the initial idea was to keep the replication at the same level.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consolidated the 'cannot balance' checks here and changed the message here to just log.warn that there is an insufficient number of active servers to do anything and added a return

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the initial idea was to keep the replication at the same level.

This should be reflected in docs, as well as other corner cases related to decommissioning nodes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to add docs that decommissioning can become stalled if there are no active servers available to move segments to.

}
egor-ryashin marked this conversation as resolved.
Show resolved Hide resolved

int numSegments = 0;
Expand All @@ -139,18 +139,18 @@ private void balanceTier(
}

final int maxSegmentsToMove = Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), numSegments);
int priority = params.getCoordinatorDynamicConfig().getNodesInMaintenancePriority();
int maxMaintenanceSegmentsToMove = (int) Math.ceil(maxSegmentsToMove * priority / 10.0);
log.info("Processing %d segments from servers in maintenance mode", maxMaintenanceSegmentsToMove);
Pair<Integer, Integer> maintenanceResult =
balanceServers(params, maintenanceServers, availableServers, maxMaintenanceSegmentsToMove);
int maxGeneralSegmentsToMove = maxSegmentsToMove - maintenanceResult.lhs;
log.info("Processing %d segments from servers in general mode", maxGeneralSegmentsToMove);
int priority = params.getCoordinatorDynamicConfig().getDecommissionPriority();
int maxDecommissionedSegmentsToMove = (int) Math.ceil(maxSegmentsToMove * priority / 10.0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested "maxSegmentsToMoveOffDecommissioningNodes"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd vote for smth ending in "segments".

log.info("Processing %d segments from decommissioned servers", maxDecommissionedSegmentsToMove);
Pair<Integer, Integer> decommissionedResult =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested "decommissioningResult"

balanceServers(params, decommssionedServers, availableServers, maxDecommissionedSegmentsToMove);
int maxGeneralSegmentsToMove = maxSegmentsToMove - decommissionedResult.lhs;
log.info("Processing %d segments for balancing", maxGeneralSegmentsToMove);
Pair<Integer, Integer> generalResult =
balanceServers(params, availableServers, availableServers, maxGeneralSegmentsToMove);

int moved = generalResult.lhs + maintenanceResult.lhs;
int unmoved = generalResult.rhs + maintenanceResult.rhs;
int moved = generalResult.lhs + decommissionedResult.lhs;
int unmoved = generalResult.rhs + decommissionedResult.rhs;
if (unmoved == maxSegmentsToMove) {
// Cluster should be alive and constantly adjusting
log.info("No good moves found in tier [%s]", tier);
Expand Down
Expand Up @@ -46,7 +46,7 @@ public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntim
} else {
params.getDruidCluster().getAllServers().forEach(
eachHolder -> {
if (!eachHolder.isInMaintenance()
if (!eachHolder.isDecommissioned()
&& colocatedDataSources.stream()
.anyMatch(source -> eachHolder.getServer().getDataSource(source) != null)) {
loadServerHolders.add(eachHolder);
Expand Down