Skip to content

Commit

Permalink
Added cluster pending tasks api.
Browse files Browse the repository at this point in the history
Closes #3368
  • Loading branch information
martijnvg committed Jul 23, 2013
1 parent 6aa1bcc commit 3cc7dde
Show file tree
Hide file tree
Showing 14 changed files with 664 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/main/java/org/elasticsearch/action/ActionModule.java
Expand Up @@ -40,6 +40,8 @@
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksAction;
import org.elasticsearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistAction;
Expand Down Expand Up @@ -180,6 +182,7 @@ protected void configure() {
registerAction(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
registerAction(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
registerAction(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class);
registerAction(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class);

registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
registerAction(IndicesStatusAction.INSTANCE, TransportIndicesStatusAction.class);
Expand Down
@@ -0,0 +1,45 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.tasks;

import org.elasticsearch.action.admin.cluster.ClusterAction;
import org.elasticsearch.client.ClusterAdminClient;

/**
*/
public class PendingClusterTasksAction extends ClusterAction<PendingClusterTasksRequest, PendingClusterTasksResponse, PendingClusterTasksRequestBuilder> {

public static final PendingClusterTasksAction INSTANCE = new PendingClusterTasksAction();
public static final String NAME = "cluster/task";

private PendingClusterTasksAction() {
super(NAME);
}

@Override
public PendingClusterTasksResponse newResponse() {
return new PendingClusterTasksResponse();
}

@Override
public PendingClusterTasksRequestBuilder newRequestBuilder(ClusterAdminClient client) {
return new PendingClusterTasksRequestBuilder(client);
}
}
@@ -0,0 +1,33 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.tasks;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;

/**
*/
public class PendingClusterTasksRequest extends MasterNodeOperationRequest<PendingClusterTasksRequest> {

@Override
public ActionRequestValidationException validate() {
return null;
}
}
@@ -0,0 +1,39 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.tasks;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.internal.InternalClusterAdminClient;

/**
*/
public class PendingClusterTasksRequestBuilder extends MasterNodeOperationRequestBuilder<PendingClusterTasksRequest, PendingClusterTasksResponse, PendingClusterTasksRequestBuilder> {

public PendingClusterTasksRequestBuilder(ClusterAdminClient client) {
super((InternalClusterAdminClient) client, new PendingClusterTasksRequest());
}

@Override
protected void doExecute(ActionListener<PendingClusterTasksResponse> listener) {
((InternalClusterAdminClient) client).pendingClusterTasks(request, listener);
}
}
@@ -0,0 +1,82 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.tasks;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
*/
public class PendingClusterTasksResponse extends ActionResponse implements Iterable<PendingClusterTask> {

private List<PendingClusterTask> pendingTasks;

PendingClusterTasksResponse() {
}

PendingClusterTasksResponse(List<PendingClusterTask> pendingTasks) {
this.pendingTasks = pendingTasks;
}

public List<PendingClusterTask> pendingTasks() {
return pendingTasks;
}

/**
* The pending cluster tasks
*/
public List<PendingClusterTask> getPendingTasks() {
return pendingTasks();
}

@Override
public Iterator<PendingClusterTask> iterator() {
return pendingTasks.iterator();
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
pendingTasks = new ArrayList<PendingClusterTask>(size);
for (int i = 0; i < size; i++) {
PendingClusterTask task = new PendingClusterTask();
task.readFrom(in);
pendingTasks.add(task);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(pendingTasks.size());
for (PendingClusterTask task : pendingTasks) {
task.writeTo(out);
}
}

}
@@ -0,0 +1,69 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.tasks;

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

/**
*/
public class TransportPendingClusterTasksAction extends TransportMasterNodeOperationAction<PendingClusterTasksRequest, PendingClusterTasksResponse> {

private final ClusterService clusterService;

@Inject
public TransportPendingClusterTasksAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
super(settings, transportService, clusterService, threadPool);
this.clusterService = clusterService;
}

@Override
protected String transportAction() {
return PendingClusterTasksAction.NAME;
}

@Override
protected String executor() {
// very lightweight operation in memory, no need to fork to a thread
return ThreadPool.Names.SAME;
}

@Override
protected PendingClusterTasksRequest newRequest() {
return new PendingClusterTasksRequest();
}

@Override
protected PendingClusterTasksResponse newResponse() {
return new PendingClusterTasksResponse();
}

@Override
protected void masterOperation(PendingClusterTasksRequest request, ClusterState state, ActionListener<PendingClusterTasksResponse> listener) throws ElasticSearchException {
listener.onResponse(new PendingClusterTasksResponse(clusterService.pendingTasks()));
}
}
21 changes: 21 additions & 0 deletions src/main/java/org/elasticsearch/client/ClusterAdminClient.java
Expand Up @@ -51,6 +51,9 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;

/**
* Administrative actions/operations against indices.
Expand Down Expand Up @@ -259,4 +262,22 @@ public interface ClusterAdminClient {
*/
ClusterSearchShardsRequestBuilder prepareSearchShards(String... indices);

/**
* Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations
* that update the cluster state (for example, a create index operation)
*/
void pendingClusterTasks(PendingClusterTasksRequest request, ActionListener<PendingClusterTasksResponse> listener);

/**
* Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations
* that update the cluster state (for example, a create index operation)
*/
ActionFuture<PendingClusterTasksResponse> pendingClusterTasks(PendingClusterTasksRequest request);

/**
* Returns a list of the pending cluster tasks, that are scheduled to be executed. This includes operations
* that update the cluster state (for example, a create index operation)
*/
PendingClusterTasksRequestBuilder preparePendingClusterTasks();

}
Expand Up @@ -61,6 +61,10 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksAction;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.client.internal.InternalClusterAdminClient;

/**
Expand Down Expand Up @@ -228,5 +232,18 @@ public ClusterSearchShardsRequestBuilder prepareSearchShards(String... indices)
return new ClusterSearchShardsRequestBuilder(this).setIndices(indices);
}

@Override
public PendingClusterTasksRequestBuilder preparePendingClusterTasks() {
return new PendingClusterTasksRequestBuilder(this);
}

@Override
public ActionFuture<PendingClusterTasksResponse> pendingClusterTasks(PendingClusterTasksRequest request) {
return execute(PendingClusterTasksAction.INSTANCE, request);
}

@Override
public void pendingClusterTasks(PendingClusterTasksRequest request, ActionListener<PendingClusterTasksResponse> listener) {
execute(PendingClusterTasksAction.INSTANCE, request, listener);
}
}
8 changes: 8 additions & 0 deletions src/main/java/org/elasticsearch/cluster/ClusterService.java
Expand Up @@ -23,10 +23,13 @@
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.unit.TimeValue;

import java.util.List;

/**
* The cluster service allowing to both register for cluster state events ({@link ClusterStateListener})
* and submit state update tasks ({@link ClusterStateUpdateTask}.
Expand Down Expand Up @@ -97,4 +100,9 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
* Submits a task that will update the cluster state (the task has a default priority of {@link Priority#NORMAL}).
*/
void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask);

/**
* Returns the tasks that are pending.
*/
List<PendingClusterTask> pendingTasks();
}

0 comments on commit 3cc7dde

Please sign in to comment.