Skip to content

Commit

Permalink
[Discovery] accumulated improvements to ZenDiscovery
Browse files Browse the repository at this point in the history
Merging the accumulated work from the feautre/improve_zen branch. Here are the highlights of the changes:

__Testing infra__
- Networking:
    - all symmetric partitioning
    - dropping packets
    - hard disconnects
    - Jepsen Tests
- Single node service disruptions:
    - Long GC / Halt
    - Slow cluster state updates
- Discovery settings
    - Easy to setup unicast with partial host list

__Zen Discovery__
- Pinging after master loss (no local elects)
- Fixes the split brain issue: #2488
- Batching join requests
- More resilient joining process (wait on a publish from master)

Closes #7493
  • Loading branch information
bleskes committed Sep 1, 2014
2 parents 889db1c + 34f4ca7 commit 598854d
Show file tree
Hide file tree
Showing 63 changed files with 3,891 additions and 803 deletions.
468 changes: 239 additions & 229 deletions pom.xml

Large diffs are not rendered by default.

Expand Up @@ -137,6 +137,12 @@ protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
return new ClusterUpdateSettingsResponse(updateSettingsAcked && acknowledged, transientUpdates.build(), persistentUpdates.build());
}

@Override
public void onNoLongerMaster(String source) {
logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master");
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build()));
}

@Override
public void onFailure(String source, Throwable t) {
//if the reroute fails we only log
Expand Down
Expand Up @@ -173,12 +173,12 @@ protected GroupShardsIterator shards(ClusterState state, RecoveryRequest request

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, RecoveryRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, RecoveryRequest request, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, concreteIndices);
}

static class ShardRecoveryRequest extends BroadcastShardOperationRequest {
Expand Down
42 changes: 23 additions & 19 deletions src/main/java/org/elasticsearch/action/bench/BenchmarkService.java
Expand Up @@ -66,11 +66,11 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
/**
* Constructs a service component for running benchmarks
*
* @param settings Settings
* @param clusterService Cluster service
* @param threadPool Thread pool
* @param client Client
* @param transportService Transport service
* @param settings Settings
* @param clusterService Cluster service
* @param threadPool Thread pool
* @param client Client
* @param transportService Transport service
*/
@Inject
public BenchmarkService(Settings settings, ClusterService clusterService, ThreadPool threadPool,
Expand All @@ -86,19 +86,22 @@ public BenchmarkService(Settings settings, ClusterService clusterService, Thread
}

@Override
protected void doStart() throws ElasticsearchException { }
protected void doStart() throws ElasticsearchException {
}

@Override
protected void doStop() throws ElasticsearchException { }
protected void doStop() throws ElasticsearchException {
}

@Override
protected void doClose() throws ElasticsearchException { }
protected void doClose() throws ElasticsearchException {
}

/**
* Lists actively running benchmarks on the cluster
*
* @param request Status request
* @param listener Response listener
* @param request Status request
* @param listener Response listener
*/
public void listBenchmarks(final BenchmarkStatusRequest request, final ActionListener<BenchmarkStatusResponse> listener) {

Expand Down Expand Up @@ -171,8 +174,8 @@ public void onFailure(Throwable t) {
/**
* Executes benchmarks on the cluster
*
* @param request Benchmark request
* @param listener Response listener
* @param request Benchmark request
* @param listener Response listener
*/
public void startBenchmark(final BenchmarkRequest request, final ActionListener<BenchmarkResponse> listener) {

Expand Down Expand Up @@ -228,7 +231,7 @@ public void onFailure(Throwable t) {
listener.onFailure(t);
}
}, (benchmarkResponse.state() != BenchmarkResponse.State.ABORTED) &&
(benchmarkResponse.state() != BenchmarkResponse.State.FAILED)));
(benchmarkResponse.state() != BenchmarkResponse.State.FAILED)));
}

private final boolean isBenchmarkNode(DiscoveryNode node) {
Expand Down Expand Up @@ -403,6 +406,7 @@ protected CountDownAsyncHandler(int size) {
}

public abstract T newInstance();

protected abstract void sendResponse();

@Override
Expand Down Expand Up @@ -593,7 +597,7 @@ public ClusterState execute(ClusterState currentState) {

if (bmd != null) {
for (BenchmarkMetaData.Entry entry : bmd.entries()) {
if (request.benchmarkName().equals(entry.benchmarkId())){
if (request.benchmarkName().equals(entry.benchmarkId())) {
if (entry.state() != BenchmarkMetaData.State.SUCCESS && entry.state() != BenchmarkMetaData.State.FAILED) {
throw new ElasticsearchException("A benchmark with ID [" + request.benchmarkName() + "] is already running in state [" + entry.state() + "]");
}
Expand Down Expand Up @@ -648,7 +652,7 @@ public FinishBenchmarkTask(String reason, String benchmarkId, BenchmarkStateList
@Override
protected BenchmarkMetaData.Entry process(BenchmarkMetaData.Entry entry) {
BenchmarkMetaData.State state = entry.state();
assert state == BenchmarkMetaData.State.STARTED || state == BenchmarkMetaData.State.ABORTED : "Expected state: STARTED or ABORTED but was: " + entry.state();
assert state == BenchmarkMetaData.State.STARTED || state == BenchmarkMetaData.State.ABORTED : "Expected state: STARTED or ABORTED but was: " + entry.state();
if (success) {
return new BenchmarkMetaData.Entry(entry, BenchmarkMetaData.State.SUCCESS);
} else {
Expand All @@ -661,7 +665,7 @@ public final class AbortBenchmarkTask extends UpdateBenchmarkStateTask {
private final String[] patterns;

public AbortBenchmarkTask(String[] patterns, BenchmarkStateListener listener) {
super("abort_benchmark", null , listener);
super("abort_benchmark", null, listener);
this.patterns = patterns;
}

Expand All @@ -675,7 +679,7 @@ protected BenchmarkMetaData.Entry process(BenchmarkMetaData.Entry entry) {
}
}

public abstract class UpdateBenchmarkStateTask implements ProcessedClusterStateUpdateTask {
public abstract class UpdateBenchmarkStateTask extends ProcessedClusterStateUpdateTask {

private final String reason;
protected final String benchmarkId;
Expand All @@ -702,7 +706,7 @@ public ClusterState execute(ClusterState currentState) {
ImmutableList.Builder<BenchmarkMetaData.Entry> builder = new ImmutableList.Builder<BenchmarkMetaData.Entry>();
for (BenchmarkMetaData.Entry e : bmd.entries()) {
if (benchmarkId == null || match(e)) {
e = process(e) ;
e = process(e);
instances.add(e);
}
// Don't keep finished benchmarks around in cluster state
Expand Down Expand Up @@ -741,7 +745,7 @@ public String reason() {
}
}

public abstract class BenchmarkStateChangeAction<R extends MasterNodeOperationRequest> implements TimeoutClusterStateUpdateTask {
public abstract class BenchmarkStateChangeAction<R extends MasterNodeOperationRequest> extends TimeoutClusterStateUpdateTask {
protected final R request;

public BenchmarkStateChangeAction(R request) {
Expand Down
Expand Up @@ -28,7 +28,7 @@
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
* all the nodes have acknowledged a cluster state update request
*/
public abstract class AckedClusterStateUpdateTask<Response> implements TimeoutClusterStateUpdateTask {
public abstract class AckedClusterStateUpdateTask<Response> extends TimeoutClusterStateUpdateTask {

private final ActionListener<Response> listener;
private final AckedRequest request;
Expand All @@ -40,6 +40,7 @@ protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<Respo

/**
* Called to determine which nodes the acknowledgement is expected from
*
* @param discoveryNode a node
* @return true if the node is expected to send ack back, false otherwise
*/
Expand All @@ -50,6 +51,7 @@ public boolean mustAck(DiscoveryNode discoveryNode) {
/**
* Called once all the nodes have acknowledged the cluster state update request. Must be
* very lightweight execution, since it gets executed on the cluster service thread.
*
* @param t optional error that might have been thrown
*/
public void onAllNodesAcked(@Nullable Throwable t) {
Expand Down
Expand Up @@ -110,4 +110,5 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
* Returns the tasks that are pending.
*/
List<PendingClusterTask> pendingTasks();

}
2 changes: 2 additions & 0 deletions src/main/java/org/elasticsearch/cluster/ClusterState.java
Expand Up @@ -115,6 +115,8 @@ public static <T extends Custom> Custom.Factory<T> lookupFactorySafe(String type
}


public static final long UNKNOWN_VERSION = -1;

private final long version;

private final RoutingTable routingTable;
Expand Down
@@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster;

/**
* This is a marker interface to indicate that the task should be executed
* even if the current node is not a master.
*/
public abstract class ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask {

@Override
public boolean runOnlyOnMaster() {
return false;
}
}
Expand Up @@ -19,19 +19,37 @@

package org.elasticsearch.cluster;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

/**
* A task that can update the cluster state.
*/
public interface ClusterStateUpdateTask {
abstract public class ClusterStateUpdateTask {

/**
* Update the cluster state based on the current state. Return the *same instance* if no state
* should be changed.
*/
ClusterState execute(ClusterState currentState) throws Exception;
abstract public ClusterState execute(ClusterState currentState) throws Exception;

/**
* A callback called when execute fails.
*/
void onFailure(String source, Throwable t);
abstract public void onFailure(String source, @Nullable Throwable t);


/**
* indicates whether this task should only run if current node is master
*/
public boolean runOnlyOnMaster() {
return true;
}

/**
* called when the task was rejected because the local node is no longer master
*/
public void onNoLongerMaster(String source) {
onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]"));
}
}
@@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;

/**
* A combination between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and
* {@link org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask} to allow easy creation of anonymous classes
*/
abstract public class ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask {

@Override
public boolean runOnlyOnMaster() {
return false;
}
}
Expand Up @@ -23,11 +23,11 @@
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
* the cluster state update has been processed.
*/
public interface ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {
public abstract class ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {

/**
* Called when the result of the {@link #execute(ClusterState)} have been processed
* properly by all listeners.
*/
void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
public abstract void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
}
Expand Up @@ -25,11 +25,11 @@
* An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate
* a timeout.
*/
public interface TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {
abstract public class TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {

/**
* If the cluster state update task wasn't processed by the provided timeout, call
* {@link #onFailure(String, Throwable)}
*/
TimeValue timeout();
abstract public TimeValue timeout();
}
13 changes: 13 additions & 0 deletions src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java
Expand Up @@ -108,6 +108,19 @@ public boolean hasGlobalBlock(ClusterBlock block) {
return global.contains(block);
}

public boolean hasGlobalBlock(int blockId) {
for (ClusterBlock clusterBlock : global) {
if (clusterBlock.id() == blockId) {
return true;
}
}
return false;
}

public boolean hasGlobalBlock(ClusterBlockLevel level) {
return global(level).size() > 0;
}

/**
* Is there a global block with the provided status?
*/
Expand Down
Expand Up @@ -149,10 +149,15 @@ public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).routingResult(routingResult).build();
}

@Override
public void onNoLongerMaster(String source) {
// no biggie
}

@Override
public void onFailure(String source, Throwable t) {
ClusterState state = clusterService.state();
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
ClusterState state = clusterService.state();
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
}
});
routingTableDirty = false;
Expand Down

0 comments on commit 598854d

Please sign in to comment.