Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve pending api to include current executing class #6744

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -89,10 +89,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startArray(Fields.TASKS);
for (PendingClusterTask pendingClusterTask : this) {
builder.startObject();
builder.field(Fields.INSERT_ORDER, pendingClusterTask.insertOrder());
builder.field(Fields.PRIORITY, pendingClusterTask.priority());
builder.field(Fields.SOURCE, pendingClusterTask.source());
builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.timeInQueueInMillis());
builder.field(Fields.INSERT_ORDER, pendingClusterTask.getInsertOrder());
builder.field(Fields.PRIORITY, pendingClusterTask.getPriority());
builder.field(Fields.SOURCE, pendingClusterTask.getSource());
builder.field(Fields.EXECUTING, pendingClusterTask.isExecuting());
builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.getTimeInQueueInMillis());
builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue());
builder.endObject();
}
Expand All @@ -103,6 +104,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
static final class Fields {

static final XContentBuilderString TASKS = new XContentBuilderString("tasks");
static final XContentBuilderString EXECUTING = new XContentBuilderString("executing");
static final XContentBuilderString INSERT_ORDER = new XContentBuilderString("insert_order");
static final XContentBuilderString PRIORITY = new XContentBuilderString("priority");
static final XContentBuilderString SOURCE = new XContentBuilderString("source");
Expand Down
Expand Up @@ -279,7 +279,7 @@ public List<PendingClusterTask> pendingTasks() {
timeInQueue = -1;
}

pendingClusterTasks.add(new PendingClusterTask(pending.insertionOrder, pending.priority, new StringText(source), timeInQueue));
pendingClusterTasks.add(new PendingClusterTask(pending.insertionOrder, pending.priority, new StringText(source), timeInQueue, pending.executing));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we maybe fix this ctor to just take Pending as the first arg?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are a 2 fields (the source and time in queue) that are computed on top of pending, so it would be confusing I think to pass those on top of pending

}
return pendingClusterTasks;
}
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.service;

import org.elasticsearch.Version;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -36,59 +37,52 @@ public class PendingClusterTask implements Streamable {
private Priority priority;
private Text source;
private long timeInQueue;
private boolean executing;

public PendingClusterTask() {
}

public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue) {
public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue, boolean executing) {
this.insertOrder = insertOrder;
this.priority = priority;
this.source = source;
this.timeInQueue = timeInQueue;
}

public long insertOrder() {
return insertOrder;
this.executing = executing;
}

public long getInsertOrder() {
return insertOrder();
}

public Priority priority() {
return priority;
return insertOrder;
}

public Priority getPriority() {
return priority();
}

public Text source() {
return source;
return priority;
}

public Text getSource() {
return source();
}

public long timeInQueueInMillis() {
return timeInQueue;
return source;
}

public long getTimeInQueueInMillis() {
return timeInQueueInMillis();
return timeInQueue;
}

public TimeValue getTimeInQueue() {
return new TimeValue(getTimeInQueueInMillis());
}

public boolean isExecuting() {
return executing;
}

@Override
public void readFrom(StreamInput in) throws IOException {
insertOrder = in.readVLong();
priority = Priority.readFrom(in);
source = in.readText();
timeInQueue = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_1_3_0)) {
executing = in.readBoolean();
}
}

@Override
Expand All @@ -97,5 +91,8 @@ public void writeTo(StreamOutput out) throws IOException {
Priority.writeTo(priority, out);
out.writeText(source);
out.writeVLong(timeInQueue);
if (out.getVersion().onOrAfter(Version.V_1_3_0)) {
out.writeBoolean(executing);
}
}
}
Expand Up @@ -18,9 +18,12 @@
*/
package org.elasticsearch.common.util.concurrent;

import com.google.common.collect.Lists;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;

import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -34,25 +37,39 @@
public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {

private AtomicLong insertionOrder = new AtomicLong();
private Queue<Runnable> current = ConcurrentCollections.newQueue();

PrioritizedEsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(), threadFactory);
}

public Pending[] getPending() {
Object[] objects = getQueue().toArray();
Pending[] infos = new Pending[objects.length];
for (int i = 0; i < objects.length; i++) {
Object obj = objects[i];
if (obj instanceof TieBreakingPrioritizedRunnable) {
TieBreakingPrioritizedRunnable t = (TieBreakingPrioritizedRunnable) obj;
infos[i] = new Pending(t.runnable, t.priority(), t.insertionOrder);
} else if (obj instanceof PrioritizedFutureTask) {
PrioritizedFutureTask t = (PrioritizedFutureTask) obj;
infos[i] = new Pending(t.task, t.priority, t.insertionOrder);
List<Pending> pending = Lists.newArrayList();
addPending(Lists.newArrayList(current), pending, true);
addPending(Lists.newArrayList(getQueue()), pending, false);
return pending.toArray(new Pending[pending.size()]);
}

private void addPending(List<Runnable> runnables, List<Pending> pending, boolean executing) {
for (Runnable runnable : runnables) {
if (runnable instanceof TieBreakingPrioritizedRunnable) {
TieBreakingPrioritizedRunnable t = (TieBreakingPrioritizedRunnable) runnable;
pending.add(new Pending(t.runnable, t.priority(), t.insertionOrder, executing));
} else if (runnable instanceof PrioritizedFutureTask) {
PrioritizedFutureTask t = (PrioritizedFutureTask) runnable;
pending.add(new Pending(t.task, t.priority, t.insertionOrder, executing));
}
}
return infos;
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
current.add(r);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
current.remove(r);
}

public void execute(Runnable command, final ScheduledExecutorService timer, final TimeValue timeout, final Runnable timeoutCallback) {
Expand Down Expand Up @@ -106,11 +123,13 @@ public static class Pending {
public final Object task;
public final Priority priority;
public final long insertionOrder;
public final boolean executing;

public Pending(Object task, Priority priority, long insertionOrder) {
public Pending(Object task, Priority priority, long insertionOrder, boolean executing) {
this.task = task;
this.priority = priority;
this.insertionOrder = insertionOrder;
this.executing = executing;
}
}

Expand Down
28 changes: 16 additions & 12 deletions src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java
Expand Up @@ -450,19 +450,23 @@ public void onFailure(String source, Throwable t) {
}

// The tasks can be re-ordered, so we need to check out-of-order
Set<String> controlSources = new HashSet<>(Arrays.asList("2", "3", "4", "5", "6", "7", "8", "9", "10"));
Set<String> controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
List<PendingClusterTask> pendingClusterTasks = clusterService.pendingTasks();
assertThat(pendingClusterTasks.size(), equalTo(9));
assertThat(pendingClusterTasks.size(), equalTo(10));
assertThat(pendingClusterTasks.get(0).getSource().string(), equalTo("1"));
assertThat(pendingClusterTasks.get(0).isExecuting(), equalTo(true));
for (PendingClusterTask task : pendingClusterTasks) {
assertTrue(controlSources.remove(task.source().string()));
assertTrue(controlSources.remove(task.getSource().string()));
}
assertTrue(controlSources.isEmpty());

controlSources = new HashSet<>(Arrays.asList("2", "3", "4", "5", "6", "7", "8", "9", "10"));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
PendingClusterTasksResponse response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
assertThat(response.pendingTasks().size(), equalTo(9));
assertThat(response.pendingTasks().size(), equalTo(10));
assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1"));
assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true));
for (PendingClusterTask task : response) {
assertTrue(controlSources.remove(task.source().string()));
assertTrue(controlSources.remove(task.getSource().string()));
}
assertTrue(controlSources.isEmpty());
block1.countDown();
Expand Down Expand Up @@ -511,18 +515,18 @@ public void onFailure(String source, Throwable t) {
Thread.sleep(100);

pendingClusterTasks = clusterService.pendingTasks();
assertThat(pendingClusterTasks.size(), equalTo(4));
controlSources = new HashSet<>(Arrays.asList("2", "3", "4", "5"));
assertThat(pendingClusterTasks.size(), equalTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : pendingClusterTasks) {
assertTrue(controlSources.remove(task.source().string()));
assertTrue(controlSources.remove(task.getSource().string()));
}
assertTrue(controlSources.isEmpty());

response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
assertThat(response.pendingTasks().size(), equalTo(4));
controlSources = new HashSet<>(Arrays.asList("2", "3", "4", "5"));
assertThat(response.pendingTasks().size(), equalTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : response) {
assertTrue(controlSources.remove(task.source().string()));
assertTrue(controlSources.remove(task.getSource().string()));
assertThat(task.getTimeInQueueInMillis(), greaterThan(0l));
}
assertTrue(controlSources.isEmpty());
Expand Down
Expand Up @@ -178,11 +178,13 @@ public void testSubmitPrioritizedExecutorWithMixed() throws Exception {
public void testTimeout() throws Exception {
ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
final CountDownLatch invoked = new CountDownLatch(1);
final CountDownLatch block = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override
public void run() {
try {
invoked.countDown();
block.await();
} catch (InterruptedException e) {
fail();
Expand All @@ -194,6 +196,11 @@ public String toString() {
return "the blocking";
}
});
invoked.await();
PrioritizedEsThreadPoolExecutor.Pending[] pending = executor.getPending();
assertThat(pending.length, equalTo(1));
assertThat(pending[0].task.toString(), equalTo("the blocking"));
assertThat(pending[0].executing, equalTo(true));

final AtomicBoolean executeCalled = new AtomicBoolean();
final CountDownLatch timedOut = new CountDownLatch(1);
Expand All @@ -215,9 +222,12 @@ public void run() {
}
);

PrioritizedEsThreadPoolExecutor.Pending[] pending = executor.getPending();
assertThat(pending.length, equalTo(1));
assertThat(pending[0].task.toString(), equalTo("the waiting"));
pending = executor.getPending();
assertThat(pending.length, equalTo(2));
assertThat(pending[0].task.toString(), equalTo("the blocking"));
assertThat(pending[0].executing, equalTo(true));
assertThat(pending[1].task.toString(), equalTo("the waiting"));
assertThat(pending[1].executing, equalTo(false));

assertThat(timedOut.await(2, TimeUnit.SECONDS), equalTo(true));
block.countDown();
Expand Down