Skip to content

Commit

Permalink
Shutdown API: allow to shutdown node(s) or the whole, closes #64.
Browse files Browse the repository at this point in the history
  • Loading branch information
kimchy committed Mar 16, 2010
1 parent 1dd5997 commit d8ef200
Show file tree
Hide file tree
Showing 23 changed files with 548 additions and 12 deletions.
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.cluster.ClusterState;

/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class Actions {

Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.inject.AbstractModule;
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfo;
import org.elasticsearch.action.admin.cluster.node.shutdown.TransportNodesShutdown;
import org.elasticsearch.action.admin.cluster.ping.broadcast.TransportBroadcastPingAction;
import org.elasticsearch.action.admin.cluster.ping.replication.TransportIndexReplicationPingAction;
import org.elasticsearch.action.admin.cluster.ping.replication.TransportReplicationPingAction;
Expand Down Expand Up @@ -52,13 +53,14 @@
import org.elasticsearch.action.terms.TransportTermsAction;

/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class TransportActionModule extends AbstractModule {

@Override protected void configure() {

bind(TransportNodesInfo.class).asEagerSingleton();
bind(TransportNodesShutdown.class).asEagerSingleton();
bind(TransportClusterStateAction.class).asEagerSingleton();
bind(TransportClusterHealthAction.class).asEagerSingleton();

Expand Down
Expand Up @@ -68,6 +68,7 @@ public static class Cluster {

public static class Node {
public static final String INFO = "/cluster/nodes/info";
public static final String SHUTDOWN = "/cluster/nodes/shutdown";
}

public static class Ping {
Expand Down
Expand Up @@ -22,13 +22,19 @@
import org.elasticsearch.action.support.nodes.NodesOperationRequest;

/**
* @author kimchy (Shay Banon)
* A request to get node (cluster) level information.
*
* @author kimchy (shay.banon)
*/
public class NodesInfoRequest extends NodesOperationRequest {

protected NodesInfoRequest() {
}

/**
* Get information from nodes based on the nodes ids specified. If none are passed, information
* for all nodes will be returned.
*/
public NodesInfoRequest(String... nodesIds) {
super(nodesIds);
}
Expand Down
@@ -0,0 +1,72 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.node.shutdown;

import org.elasticsearch.action.support.nodes.NodesOperationRequest;
import org.elasticsearch.util.TimeValue;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import static org.elasticsearch.util.TimeValue.*;

/**
* A request to shutdown one ore more nodes (or the whole cluster).
*
* @author kimchy (shay.banon)
*/
public class NodesShutdownRequest extends NodesOperationRequest {

TimeValue delay = TimeValue.timeValueSeconds(1);

protected NodesShutdownRequest() {
}

/**
* Shuts down nodes based on the nodes ids specified. If none are passed, <b>all</b>
* nodes will be shutdown.
*/
public NodesShutdownRequest(String... nodesIds) {
super(nodesIds);
}

/**
* The delay for the shutdown to occur. Defaults to <tt>1s</tt>.
*/
public NodesShutdownRequest delay(TimeValue delay) {
this.delay = delay;
return this;
}

public TimeValue delay() {
return this.delay;
}

@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
super.readFrom(in);
delay = readTimeValue(in);
}

@Override public void writeTo(DataOutput out) throws IOException {
super.writeTo(out);
delay.writeTo(out);
}
}
@@ -0,0 +1,74 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.node.shutdown;

import org.elasticsearch.action.support.nodes.NodeOperationResponse;
import org.elasticsearch.action.support.nodes.NodesOperationResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.Node;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* @author kimchy (Shay Banon)
*/
public class NodesShutdownResponse extends NodesOperationResponse<NodesShutdownResponse.NodeShutdownResponse> {

NodesShutdownResponse() {
}

public NodesShutdownResponse(ClusterName clusterName, NodeShutdownResponse[] nodes) {
super(clusterName, nodes);
}

@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
super.readFrom(in);
nodes = new NodeShutdownResponse[in.readInt()];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = NodeShutdownResponse.readNodeShutdownResponse(in);
}
}

@Override public void writeTo(DataOutput out) throws IOException {
super.writeTo(out);
out.writeInt(nodes.length);
for (NodeShutdownResponse node : nodes) {
node.writeTo(out);
}
}

public static class NodeShutdownResponse extends NodeOperationResponse {

NodeShutdownResponse() {
}

public NodeShutdownResponse(Node node) {
super(node);
}

public static NodeShutdownResponse readNodeShutdownResponse(DataInput in) throws ClassNotFoundException, IOException {
NodeShutdownResponse res = new NodeShutdownResponse();
res.readFrom(in);
return res;
}
}
}
@@ -0,0 +1,137 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.node.shutdown;

import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.nodes.NodeOperationRequest;
import org.elasticsearch.action.support.nodes.TransportNodesOperationAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.server.Server;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.settings.Settings;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;

import static com.google.common.collect.Lists.*;
import static org.elasticsearch.util.TimeValue.*;

/**
* @author kimchy (shay.banon)
*/
public class TransportNodesShutdown extends TransportNodesOperationAction<NodesShutdownRequest, NodesShutdownResponse, TransportNodesShutdown.NodeShutdownRequest, NodesShutdownResponse.NodeShutdownResponse> {

private final Server server;

private final boolean disabled;

@Inject public TransportNodesShutdown(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
Server server) {
super(settings, clusterName, threadPool, clusterService, transportService);
this.server = server;
disabled = componentSettings.getAsBoolean("disabled", false);
}

@Override protected String transportAction() {
return TransportActions.Admin.Cluster.Node.SHUTDOWN;
}

@Override protected String transportNodeAction() {
return "/cluster/nodes/shutdown/node";
}

@Override protected NodesShutdownResponse newResponse(NodesShutdownRequest nodesShutdownRequest, AtomicReferenceArray responses) {
final List<NodesShutdownResponse.NodeShutdownResponse> nodeShutdownResponses = newArrayList();
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodesShutdownResponse.NodeShutdownResponse) {
nodeShutdownResponses.add((NodesShutdownResponse.NodeShutdownResponse) resp);
}
}
return new NodesShutdownResponse(clusterName, nodeShutdownResponses.toArray(new NodesShutdownResponse.NodeShutdownResponse[nodeShutdownResponses.size()]));
}

@Override protected NodesShutdownRequest newRequest() {
return new NodesShutdownRequest();
}

@Override protected NodeShutdownRequest newNodeRequest() {
return new NodeShutdownRequest();
}

@Override protected NodeShutdownRequest newNodeRequest(String nodeId, NodesShutdownRequest request) {
return new NodeShutdownRequest(nodeId, request.delay);
}

@Override protected NodesShutdownResponse.NodeShutdownResponse newNodeResponse() {
return new NodesShutdownResponse.NodeShutdownResponse();
}

@Override protected NodesShutdownResponse.NodeShutdownResponse nodeOperation(NodeShutdownRequest request) throws ElasticSearchException {
if (disabled) {
throw new ElasticSearchIllegalStateException("Shutdown is disabled");
}
logger.info("Shutting down in [{}]", request.delay);
threadPool.schedule(new Runnable() {
@Override public void run() {
server.close();
}
}, request.delay.millis(), TimeUnit.MILLISECONDS);
return new NodesShutdownResponse.NodeShutdownResponse(clusterService.state().nodes().localNode());
}

@Override protected boolean accumulateExceptions() {
return false;
}

protected static class NodeShutdownRequest extends NodeOperationRequest {

TimeValue delay;

private NodeShutdownRequest() {
}

private NodeShutdownRequest(String nodeId, TimeValue delay) {
super(nodeId);
this.delay = delay;
}

@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
super.readFrom(in);
delay = readTimeValue(in);
}

@Override public void writeTo(DataOutput out) throws IOException {
super.writeTo(out);
delay.writeTo(out);
}
}
}
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownRequest;
import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownResponse;
import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest;
import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingResponse;
import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingRequest;
Expand Down Expand Up @@ -96,6 +98,24 @@ public interface ClusterAdminClient {
*/
void nodesInfo(NodesInfoRequest request, ActionListener<NodesInfoResponse> listener);

/**
* Shutdown nodes in the cluster.
*
* @param request The nodes shutdown request
* @return The result future
* @see org.elasticsearch.client.Requests#nodesShutdown(String...)
*/
ActionFuture<NodesShutdownResponse> nodesShutdown(NodesShutdownRequest request);

/**
* Shutdown nodes in the cluster.
*
* @param request The nodes shutdown request
* @param listener A listener to be notified with a result
* @see org.elasticsearch.client.Requests#nodesShutdown(String...)
*/
void nodesShutdown(NodesShutdownRequest request, ActionListener<NodesShutdownResponse> listener);

ActionFuture<SinglePingResponse> ping(SinglePingRequest request);

void ping(SinglePingRequest request, ActionListener<SinglePingResponse> listener);
Expand Down

0 comments on commit d8ef200

Please sign in to comment.