Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove TransportService#registerRequestHandler leniency #20469

Merged
merged 7 commits into from
Sep 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static boolean isIngestNode(Settings settings) {
* @param version the version of the node
*/
public DiscoveryNode(final String id, TransportAddress address, Version version) {
this(id, address, Collections.emptyMap(), Collections.emptySet(), version);
this(id, address, Collections.emptyMap(), EnumSet.allOf(Role.class), version);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,6 @@ protected void doStop() {
protected void doClose() {
masterFD.close();
nodesFD.close();
publishClusterState.close();
membership.close();
pingService.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public void close() {
super.close();
stop("closing");
this.listeners.clear();
transportService.removeHandler(MASTER_PING_ACTION_NAME);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ public NodesFaultDetection stop() {
public void close() {
super.close();
stop();
transportService.removeHandler(PING_ACTION_NAME);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ public MembershipAction(Settings settings, TransportService transportService, Di
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new, ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
}

public void close() {
transportService.removeHandler(DISCOVERY_JOIN_ACTION_NAME);
transportService.removeHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME);
transportService.removeHandler(DISCOVERY_LEAVE_ACTION_NAME);
}

public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) {
transportService.sendRequest(node, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(masterNode), EmptyTransportResponseHandler.INSTANCE_SAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ protected void doStop() {

@Override
protected void doClose() {
transportService.removeHandler(ACTION_NAME);
ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS);
try {
IOUtils.close(receivedResponses.values());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,6 @@ public PublishClusterStateAction(
transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest::new, ThreadPool.Names.SAME, new CommitClusterStateRequestHandler());
}

public void close() {
transportService.removeHandler(SEND_ACTION_NAME);
transportService.removeHandler(COMMIT_ACTION_NAME);
}

public PendingClusterStatesQueue pendingStatesQueue() {
return pendingStatesQueue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ public VerifyNodeRepositoryAction(Settings settings, TransportService transportS
transportService.registerRequestHandler(ACTION_NAME, VerifyNodeRepositoryRequest::new, ThreadPool.Names.SAME, new VerifyNodeRepositoryRequestHandler());
}

public void close() {
transportService.removeHandler(ACTION_NAME);
}

public void verify(String repository, String verificationToken, final ActionListener<VerifyResponse> listener) {
final DiscoveryNodes discoNodes = clusterService.state().nodes();
final DiscoveryNode localNode = discoNodes.getLocalNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,19 +620,12 @@ public <Request extends TransportRequest> void registerRequestHandler(String act
registerRequestHandler(reg);
}

protected <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
private <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
synchronized (requestHandlerMutex) {
RequestHandlerRegistry replaced = requestHandlers.get(reg.getAction());
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
if (replaced != null) {
logger.warn("registered two transport handlers for action {}, handlers: {}, {}", reg.getAction(), reg, replaced);
if (requestHandlers.containsKey(reg.getAction())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yay

throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered");
}
}
}

public void removeHandler(String action) {
synchronized (requestHandlerMutex) {
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).remove(action).immutableMap();
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ public static <R extends ReplicationRequest> R resolveRequest(TransportRequest r
private TransportService transportService;
private CapturingTransport transport;
private Action action;
private ShardStateAction shardStateAction;

/* *
* TransportReplicationAction needs an instance of IndexShard to count operations.
* indexShards is reset to null before each test and will be initialized upon request in the tests.
Expand All @@ -150,7 +152,8 @@ public void setUp() throws Exception {
transportService = new TransportService(clusterService.getSettings(), transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool);
shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);
action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, shardStateAction, threadPool);
}

@After
Expand Down Expand Up @@ -707,7 +710,8 @@ public void testReplicasCounter() throws Exception {
final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0);
boolean throwException = randomBoolean();
final ReplicationTask task = maybeTask();
Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool) {
Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
threadPool) {
@Override
protected ReplicaResult shardOperationOnReplica(Request request) {
assertIndexShardCounter(1);
Expand Down Expand Up @@ -826,7 +830,8 @@ public void testRetryOnReplica() throws Exception {
setState(clusterService, state);
AtomicBoolean throwException = new AtomicBoolean(true);
final ReplicationTask task = maybeTask();
Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool) {
Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
threadPool) {
@Override
protected ReplicaResult shardOperationOnReplica(Request request) {
assertPhase(task, "replica");
Expand Down Expand Up @@ -940,9 +945,10 @@ class Action extends TransportReplicationAction<Request, Request, Response> {

Action(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService,
ShardStateAction shardStateAction,
ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool,
new ShardStateAction(settings, clusterService, transportService, null, null, threadPool),
shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
Request::new, Request::new, ThreadPool.Names.SAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.elasticsearch.discovery.zen;

import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -35,18 +37,18 @@
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.AssertingAckListener;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.MockNode;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -55,14 +57,11 @@
import static java.util.Collections.emptySet;
import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState;
import static org.elasticsearch.discovery.zen.elect.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.createMockNode;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

/**
*/
public class ZenDiscoveryUnitTests extends ESTestCase {

public void testShouldIgnoreNewClusterState() {
Expand Down Expand Up @@ -154,59 +153,76 @@ public void testNodesUpdatedAfterClusterStatePublished() throws Exception {
Settings settings = Settings.builder()
.put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes)).build();

Map<String, MockNode> nodes = new HashMap<>();
ZenDiscovery zenDiscovery = null;
ClusterService clusterService = null;
ArrayList<Closeable> toClose = new ArrayList<>();
try {
Set<DiscoveryNode> expectedFDNodes = null;
// create master node and its mocked up services
MockNode master = createMockNode("master", settings, null, threadPool, logger, nodes).setAsMaster();
ClusterState state = master.clusterState; // initial cluster state

final MockTransportService masterTransport = MockTransportService.local(settings, Version.CURRENT, threadPool);
masterTransport.start();
DiscoveryNode masterNode = new DiscoveryNode("master", masterTransport.boundAddress().publishAddress(), Version.CURRENT);
toClose.add(masterTransport);
masterTransport.setLocalNode(masterNode);
ClusterState state = ClusterStateCreationUtils.state(masterNode, masterNode, masterNode);
// build the zen discovery and cluster service
clusterService = createClusterService(threadPool, master.discoveryNode);
setState(clusterService, state);
zenDiscovery = buildZenDiscovery(settings, master, clusterService, threadPool);
ClusterService masterClusterService = createClusterService(threadPool, masterNode);
toClose.add(masterClusterService);
// TODO: clustername shouldn't be stored twice in cluster service, but for now, work around it
state = ClusterState.builder(masterClusterService.getClusterName()).nodes(state.nodes()).build();
setState(masterClusterService, state);
ZenDiscovery masterZen = buildZenDiscovery(settings, masterTransport, masterClusterService, threadPool);
toClose.add(masterZen);
masterTransport.acceptIncomingRequests();

final MockTransportService otherTransport = MockTransportService.local(settings, Version.CURRENT, threadPool);
otherTransport.start();
toClose.add(otherTransport);
DiscoveryNode otherNode = new DiscoveryNode("other", otherTransport.boundAddress().publishAddress(), Version.CURRENT);
otherTransport.setLocalNode(otherNode);
final ClusterState otherState = ClusterState.builder(masterClusterService.getClusterName())
.nodes(DiscoveryNodes.builder().add(otherNode).localNodeId(otherNode.getId())).build();
ClusterService otherClusterService = createClusterService(threadPool, masterNode);
toClose.add(otherClusterService);
setState(otherClusterService, otherState);
ZenDiscovery otherZen = buildZenDiscovery(settings, otherTransport, otherClusterService, threadPool);
toClose.add(otherZen);
otherTransport.acceptIncomingRequests();


masterTransport.connectToNode(otherNode);
otherTransport.connectToNode(masterNode);

// a new cluster state with a new discovery node (we will test if the cluster state
// was updated by the presence of this node in NodesFaultDetection)
MockNode newNode = createMockNode("new_node", settings, null, threadPool, logger, nodes);
ClusterState newState = ClusterState.builder(state).incrementVersion().nodes(
DiscoveryNodes.builder(state.nodes()).add(newNode.discoveryNode).masterNodeId(master.discoveryNode.getId())
ClusterState newState = ClusterState.builder(masterClusterService.state()).incrementVersion().nodes(
DiscoveryNodes.builder(state.nodes()).add(otherNode).masterNodeId(masterNode.getId())
).build();

try {
// publishing a new cluster state
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state);
AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1);
expectedFDNodes = zenDiscovery.getFaultDetectionNodes();
zenDiscovery.publish(clusterChangedEvent, listener);
expectedFDNodes = masterZen.getFaultDetectionNodes();
masterZen.publish(clusterChangedEvent, listener);
listener.await(1, TimeUnit.HOURS);
// publish was a success, update expected FD nodes based on new cluster state
expectedFDNodes = fdNodesForState(newState, master.discoveryNode);
expectedFDNodes = fdNodesForState(newState, masterNode);
} catch (Discovery.FailedToCommitClusterStateException e) {
// not successful, so expectedFDNodes above should remain what it was originally assigned
assertEquals(3, minMasterNodes); // ensure min master nodes is the higher value, otherwise we shouldn't fail
}

assertEquals(expectedFDNodes, zenDiscovery.getFaultDetectionNodes());
assertEquals(expectedFDNodes, masterZen.getFaultDetectionNodes());
} finally {
// clean close of transport service and publish action for each node
zenDiscovery.close();
clusterService.close();
for (MockNode curNode : nodes.values()) {
curNode.action.close();
curNode.service.close();
}
IOUtils.close(toClose);
terminate(threadPool);
}
}

private ZenDiscovery buildZenDiscovery(Settings settings, MockNode master, ClusterService clusterService, ThreadPool threadPool) {
private ZenDiscovery buildZenDiscovery(Settings settings, TransportService service, ClusterService clusterService, ThreadPool threadPool) {
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ZenPingService zenPingService = new ZenPingService(settings, Collections.emptySet());
ElectMasterService electMasterService = new ElectMasterService(settings);
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, master.service, clusterService,
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, clusterService,
clusterSettings, zenPingService, electMasterService);
zenDiscovery.start();
return zenDiscovery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ public void setUp() throws Exception {
public void tearDown() throws Exception {
super.tearDown();
for (MockNode curNode : nodes.values()) {
curNode.action.close();
curNode.service.close();
}
terminate(threadPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,4 @@ protected void traceRequestSent(DiscoveryNode node, long requestId, String actio
}
}
}


}
Loading