Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove sync allocate implementation #89683

Draft
wants to merge 1 commit into
base: feature/desired-balance-allocator
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IntroSorter;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
Expand Down Expand Up @@ -117,7 +118,6 @@ private void setThreshold(float threshold) {
this.threshold = threshold;
}

@Override
public void allocate(RoutingAllocation allocation) {
assert allocation.ignoreDisable() == false;

Expand All @@ -131,6 +131,12 @@ public void allocate(RoutingAllocation allocation) {
balancer.balance();
}

@Override
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
allocate(allocation);
listener.onResponse(null);
}

@Override
public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) {
Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -217,7 +218,7 @@ public DesiredBalance compute(
}

logger.trace("running delegate allocator");
delegateAllocator.allocate(routingAllocation);
delegateAllocator.allocate(routingAllocation, ActionListener.noop());
assert routingNodes.unassigned().size() == 0; // any unassigned shards should now be ignored

hasChanges = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,6 @@ public String toString() {
this.queue = new PendingListenersQueue(threadPool);
}

@Override
public void allocate(RoutingAllocation allocation) {
throw new UnsupportedOperationException();
}

@Override
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
assert MasterService.isMasterUpdateThread() || Thread.currentThread().getName().startsWith("TEST-")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,24 @@ public interface ShardsAllocator {
* - relocate shards to find a good shard balance in the cluster
*
* @param allocation current node allocation
* @param listener listener to be executed once async allocation is completed
*/
void allocate(RoutingAllocation allocation);
void allocate(RoutingAllocation allocation, ActionListener<Void> listener);

/**
* Allocates shards to nodes in the cluster. An implementation of this method should:
* - assign unassigned shards
* - relocate shards that cannot stay on a node anymore
* - relocate shards to find a good shard balance in the cluster
* Returns the decision for where a shard should reside in the cluster. If the shard is unassigned,
* then the {@link AllocateUnassignedDecision} will be non-null. If the shard is not in the unassigned
* state, then the {@link MoveDecision} will be non-null.
*
* @param allocation current node allocation
* @param listener listener to be executed once async allocation is completed
* This method is primarily used by the cluster allocation explain API to provide detailed explanations
* for the allocation of a single shard. Implementations of the {@link #allocate(RoutingAllocation, ActionListener<Void>)} method
* may use the results of this method implementation to decide on allocating shards in the routing table
* to the cluster.
*
* If an implementation of this interface does not support explaining decisions for a single shard through
* the cluster explain API, then this method should throw a {@code UnsupportedOperationException}.
*/
default void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
allocate(allocation);
listener.onResponse(null);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only thing removed. Diff is caused by moving default execute method to the bottom.

ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation);

/**
* Execute allocation commands
Expand All @@ -70,19 +72,4 @@ default RoutingExplanations execute(RoutingAllocation allocation, AllocationComm
allocation.setDebugMode(originalDebugMode);
}
}

/**
* Returns the decision for where a shard should reside in the cluster. If the shard is unassigned,
* then the {@link AllocateUnassignedDecision} will be non-null. If the shard is not in the unassigned
* state, then the {@link MoveDecision} will be non-null.
*
* This method is primarily used by the cluster allocation explain API to provide detailed explanations
* for the allocation of a single shard. Implementations of the {@link #allocate(RoutingAllocation)} method
* may use the results of this method implementation to decide on allocating shards in the routing table
* to the cluster.
*
* If an implementation of this interface does not support explaining decisions for a single shard through
* the cluster explain API, then this method should throw a {@code UnsupportedOperationException}.
*/
ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.admin.cluster.allocation;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
Expand Down Expand Up @@ -65,8 +66,9 @@ public void testInitializingOrRelocatingShardExplanation() throws Exception {
true,
new AllocationService(null, new TestGatewayAllocator(), new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
// no-op
listener.onResponse(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.cluster;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
Expand Down Expand Up @@ -99,8 +100,9 @@ protected FakeAllocationDecider() {}

static class FakeShardsAllocator implements ShardsAllocator {
@Override
public void allocate(RoutingAllocation allocation) {
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
// noop
listener.onResponse(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ public void testAssignsPrimariesInPriorityOrderThenReplicas() {
),
new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
// all primaries are handled by existing shards allocators in these tests; even the invalid allocator prevents shards
// from falling through to here
assertThat(allocation.routingNodes().unassigned().getNumPrimaries(), equalTo(0));
listener.onResponse(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public void testNoRebalanceOnPrimaryOverload() {
--------[test][3], node[3], [P], s[STARTED]
---- unassigned
*/
public void allocate(RoutingAllocation allocation) {
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
ShardRouting[] drain = unassigned.drain();
ArrayUtil.timSort(drain, (a, b) -> { return a.primary() ? -1 : 1; }); // we have to allocate primaries first
Expand Down Expand Up @@ -392,6 +392,7 @@ public void allocate(RoutingAllocation allocation) {
}

}
listener.onResponse(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
Expand Down Expand Up @@ -247,7 +248,7 @@ public void testSimulatesAchievingDesiredBalanceBeforeDelegating() {
var allocateCalled = new AtomicBoolean();
var desiredBalanceComputer = new DesiredBalanceComputer(new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
assertTrue(allocateCalled.compareAndSet(false, true));
// whatever the allocation in the current cluster state, the desired balance service should start by moving all the
// known shards to their desired locations before delegating to the inner allocator
Expand All @@ -261,6 +262,7 @@ public void allocate(RoutingAllocation allocation) {
assertTrue(shardRouting.toString(), shardRouting.started());
}
}
listener.onResponse(null);
}

@Override
Expand Down Expand Up @@ -595,7 +597,7 @@ private static DiscoveryNode createDiscoveryNode(String id, Set<DiscoveryNodeRol
private static DesiredBalanceComputer createDesiredBalanceComputer() {
return new DesiredBalanceComputer(new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
while (unassignedIterator.hasNext()) {
final var shardRouting = unassignedIterator.next();
Expand All @@ -607,6 +609,7 @@ public void allocate(RoutingAllocation allocation) {
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
}
}
listener.onResponse(null);
}

private static boolean isCorrespondingPrimaryStarted(ShardRouting shardRouting, RoutingAllocation allocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -971,8 +971,9 @@ private static AllocationService createTestAllocationService(
) {
final var allocationService = new AllocationService(new AllocationDeciders(List.of(allocationDeciders)), new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
allocationConsumer.accept(allocation);
listener.onResponse(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,14 @@ private static void testAllocate(GatewayAllocatorBehaviour gatewayAllocatorBehav
final var threadPool = deterministicTaskQueue.getThreadPool();
final var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
final var dataNodeId = allocation.nodes().getDataNodes().values().iterator().next().getId();
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
while (unassignedIterator.hasNext()) {
unassignedIterator.next();
unassignedIterator.initialize(dataNodeId, null, 0L, allocation.changes());
}
listener.onResponse(null);
}

@Override
Expand Down Expand Up @@ -247,13 +248,14 @@ public void testCallListenersOnlyAfterProducingFreshInput() {
var clusterService = ClusterServiceUtils.createClusterService(createInitialClusterState(), threadPool);
var allocator = new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
final var dataNodeId = allocation.nodes().getDataNodes().values().iterator().next().getId();
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
while (unassignedIterator.hasNext()) {
var indexName = unassignedIterator.next().getIndexName();
unassignedIterator.initialize(dataNodeId, null, 0L, allocation.changes());
}
listener.onResponse(null);

try {
assertTrue("Should have submitted the second input in time", secondInputSubmitted.await(10, TimeUnit.SECONDS));
Expand Down Expand Up @@ -368,7 +370,7 @@ public void testFailListenersOnNoLongerMasterException() throws Exception {
var rerouteServiceSupplier = new SetOnce<RerouteService>();
var allocator = new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
final var dataNodeId = allocation.nodes().getDataNodes().values().iterator().next().getId();
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
var madeProgress = false;
Expand All @@ -381,6 +383,7 @@ public void allocate(RoutingAllocation allocation) {
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
}
}
listener.onResponse(null);
}

@Override
Expand Down Expand Up @@ -436,7 +439,7 @@ public void testConcurrency() throws Exception {
var clusterService = ClusterServiceUtils.createClusterService(createInitialClusterState(), threadPool);
var allocator = new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
final var dataNodeId = allocation.nodes().getDataNodes().values().iterator().next().getId();
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
var madeProgress = false;
Expand All @@ -449,6 +452,7 @@ public void allocate(RoutingAllocation allocation) {
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
}
}
listener.onResponse(null);
}

@Override
Expand Down