Skip to content

Commit

Permalink
re-introduce: Inline reroute with process of node join/master election (
Browse files Browse the repository at this point in the history
  • Loading branch information
bleskes committed Jul 4, 2016
1 parent 1924b46 commit 86d2e88
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 79 deletions.
Expand Up @@ -72,10 +72,6 @@ protected void doStop() {
protected void doClose() {
}

public AllocationService getAllocationService() {
return this.allocationService;
}

/**
* Initiates a reroute.
*/
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/elasticsearch/discovery/Discovery.java
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -44,7 +44,7 @@ public interface Discovery extends LifecycleComponent {
* Another hack to solve dep injection problem..., note, this will be called before
* any start is called.
*/
void setRoutingService(RoutingService routingService);
void setAllocationService(AllocationService allocationService);

/**
* Publish all the changes to the cluster from the master (can be called just by the master). The publish
Expand Down
Expand Up @@ -28,7 +28,7 @@
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -61,7 +61,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implements Discov
private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0];

private final ClusterService clusterService;
private RoutingService routingService;
private AllocationService allocationService;
private final ClusterName clusterName;

private final DiscoverySettings discoverySettings;
Expand All @@ -81,8 +81,8 @@ public LocalDiscovery(Settings settings, ClusterService clusterService, ClusterS
}

@Override
public void setRoutingService(RoutingService routingService) {
this.routingService = routingService;
public void setAllocationService(AllocationService allocationService) {
this.allocationService = allocationService;
}

@Override
Expand Down Expand Up @@ -154,21 +154,19 @@ public ClusterState execute(ClusterState currentState) {
nodesBuilder.put(discovery.localNode());
}
nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId());
return ClusterState.builder(currentState).nodes(nodesBuilder).build();
currentState = ClusterState.builder(currentState).nodes(nodesBuilder).build();
RoutingAllocation.Result result = master.allocationService.reroute(currentState, "node_add");
if (result.changed()) {
currentState = ClusterState.builder(currentState).routingResult(result).build();
}
return currentState;
}

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
master.routingService.reroute("post_node_add");
}
});
}
} // else, no master node, the next node that will start will fill things in...
Expand Down Expand Up @@ -224,7 +222,7 @@ public ClusterState execute(ClusterState currentState) {
}
// reroute here, so we eagerly remove dead nodes from the routing
ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build();
RoutingAllocation.Result routingResult = master.routingService.getAllocationService().reroute(
RoutingAllocation.Result routingResult = master.allocationService.reroute(
ClusterState.builder(updatedState).build(), "elected as master");
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}
Expand Down
Expand Up @@ -29,7 +29,7 @@
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -58,7 +58,7 @@
public class NodeJoinController extends AbstractComponent {

private final ClusterService clusterService;
private final RoutingService routingService;
private final AllocationService allocationService;
private final ElectMasterService electMaster;
private final DiscoverySettings discoverySettings;
private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor();
Expand All @@ -68,10 +68,11 @@ public class NodeJoinController extends AbstractComponent {
private ElectionContext electionContext = null;


public NodeJoinController(ClusterService clusterService, RoutingService routingService, ElectMasterService electMaster, DiscoverySettings discoverySettings, Settings settings) {
public NodeJoinController(ClusterService clusterService, AllocationService allocationService, ElectMasterService electMaster,
DiscoverySettings discoverySettings, Settings settings) {
super(settings);
this.clusterService = clusterService;
this.routingService = routingService;
this.allocationService = allocationService;
this.electMaster = electMaster;
this.discoverySettings = discoverySettings;
}
Expand Down Expand Up @@ -406,21 +407,7 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks())
.removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
newState.blocks(clusterBlocks);
newState.nodes(nodesBuilder);
nodesChanged = true;

// reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table)
// Note: also do it now to avoid assigning shards to these nodes. We will have another reroute after the cluster
// state is published.
// TODO: this publishing of a cluster state with no nodes assigned to joining nodes shouldn't be needed anymore. remove.

final ClusterState tmpState = newState.build();
RoutingAllocation.Result result = routingService.getAllocationService().reroute(tmpState, "nodes joined");
newState = ClusterState.builder(tmpState);
if (result.changed()) {
newState.routingResult(result);
}
nodesBuilder = DiscoveryNodes.builder(tmpState.nodes());
}

if (nodesBuilder.isLocalNodeElectedMaster() == false) {
Expand All @@ -439,7 +426,8 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov
for (DiscoveryNode existingNode : currentNodes) {
if (node.getAddress().equals(existingNode.getAddress())) {
nodesBuilder.remove(existingNode.getId());
logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode);
logger.warn("received join request from node [{}], but found existing node {} with same address, " +
"removing existing node", node, existingNode);
}
}
}
Expand All @@ -448,6 +436,12 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov

if (nodesChanged) {
newState.nodes(nodesBuilder);
final ClusterState tmpState = newState.build();
RoutingAllocation.Result result = allocationService.reroute(tmpState, "node_join");
newState = ClusterState.builder(tmpState);
if (result.changed()) {
newState.routingResult(result);
}
}

// we must return a new cluster state instance to force publishing. This is important
Expand All @@ -463,13 +457,6 @@ public boolean runOnlyOnMaster() {

@Override
public void clusterStatePublished(ClusterChangedEvent event) {
if (event.nodesDelta().hasChanges()) {
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
routingService.reroute("post_node_add");
}

NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
}
}
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -113,7 +113,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover

private final TransportService transportService;
private final ClusterService clusterService;
private RoutingService routingService;
private AllocationService allocationService;
private final ClusterName clusterName;
private final DiscoverySettings discoverySettings;
private final ZenPingService pingService;
Expand Down Expand Up @@ -146,7 +146,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
/** counts the time this node has joined the cluster or have elected it self as master */
private final AtomicLong clusterJoinsCounter = new AtomicLong();

// must initialized in doStart(), when we have the routingService set
// must initialized in doStart(), when we have the allocationService set
private volatile NodeJoinController nodeJoinController;

@Inject
Expand Down Expand Up @@ -206,16 +206,16 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool,
}

@Override
public void setRoutingService(RoutingService routingService) {
this.routingService = routingService;
public void setAllocationService(AllocationService allocationService) {
this.allocationService = allocationService;
}

@Override
protected void doStart() {
nodesFD.setLocalNode(clusterService.localNode());
joinThreadControl.start();
pingService.start();
this.nodeJoinController = new NodeJoinController(clusterService, routingService, electMaster, discoverySettings, settings);
this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, discoverySettings, settings);
}

@Override
Expand Down Expand Up @@ -516,8 +516,7 @@ public ClusterState execute(ClusterState currentState) {
return rejoin(currentState, "not enough master nodes");
}
// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute(
ClusterState.builder(currentState).build(),
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(currentState).build(),
"[" + node + "] left");
return ClusterState.builder(currentState).routingResult(routingResult).build();
}
Expand Down Expand Up @@ -561,7 +560,7 @@ public ClusterState execute(ClusterState currentState) {
return rejoin(currentState, "not enough master nodes");
}
// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute(
RoutingAllocation.Result routingResult = allocationService.reroute(
ClusterState.builder(currentState).build(),
"[" + node + "] failed");
return ClusterState.builder(currentState).routingResult(routingResult).build();
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/elasticsearch/node/Node.java
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.component.Lifecycle;
Expand Down Expand Up @@ -350,7 +351,7 @@ public Node start() {
ESLogger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
logger.info("starting ...");
// hack around dependency injection problem (for now...)
injector.getInstance(Discovery.class).setRoutingService(injector.getInstance(RoutingService.class));
injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class));
for (Class<? extends LifecycleComponent> plugin : pluginsService.nodeServices()) {
injector.getInstance(plugin).start();
}
Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
Expand Down Expand Up @@ -99,10 +98,9 @@ public void setUp() throws Exception {
// make sure we have a master
setState(clusterService, ClusterState.builder(clusterService.state()).nodes(
DiscoveryNodes.builder(initialNodes).masterNodeId(localNode.getId())));
nodeJoinController = new NodeJoinController(clusterService, new NoopRoutingService(Settings.EMPTY),
new ElectMasterService(Settings.EMPTY),
new DiscoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
Settings.EMPTY);
nodeJoinController = new NodeJoinController(clusterService, new NoopAllocationService(Settings.EMPTY),
new ElectMasterService(Settings.EMPTY), new DiscoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), Settings.EMPTY);
}

@After
Expand Down Expand Up @@ -298,7 +296,8 @@ public void onFailure(Throwable t) {
}

logger.debug("--> asserting master election didn't finish yet");
assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]", electionFuture.isDone(), equalTo(false));
assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]",
electionFuture.isDone(), equalTo(false));

final int finalJoins = requiredJoins - initialJoins + randomInt(5);
nodesToJoin.clear();
Expand Down Expand Up @@ -374,7 +373,8 @@ public void testMasterElectionTimeout() throws InterruptedException {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueMillis(1), new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), equalTo(true));
assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(),
equalTo(true));
latch.countDown();
}

Expand Down Expand Up @@ -492,7 +492,8 @@ protected void doRun() throws Exception {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), equalTo(true));
assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(),
equalTo(true));
latch.countDown();
}

Expand All @@ -516,31 +517,21 @@ public void onFailure(Throwable t) {
}


static class NoopRoutingService extends RoutingService {

public NoopRoutingService(Settings settings) {
super(settings, null, new NoopAllocationService(settings));
}

@Override
protected void performReroute(String reason) {

}
}

static class NoopAllocationService extends AllocationService {

public NoopAllocationService(Settings settings) {
super(settings, null, null, null, null);
}

@Override
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards, boolean withReroute) {
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards,
boolean withReroute) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}

@Override
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) {
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState,
List<FailedRerouteAllocation.FailedShard> failedShards) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}

Expand Down

0 comments on commit 86d2e88

Please sign in to comment.