Skip to content

Commit

Permalink
Internal: Introduce TimedPrioritizedRunnable base class to all comman…
Browse files Browse the repository at this point in the history
…ds that go into InternalClusterService.updateTasksExecutor

 At the moment we sometime submit generic runnables, which make life slightly harder when generated pending task list which have to account for them. This commit adds an abstract TimedPrioritizedRunnable class which should always be used. This class also automatically measures time in queue, which is needed for the pending task reporting.

  Relates to elastic#8077

  Closes elastic#9354
  Closes elastic#9671
  • Loading branch information
bleskes committed Feb 12, 2015
1 parent a7e4fce commit ad15872
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
Expand Up @@ -223,7 +223,7 @@ public void add(final TimeValue timeout, final TimeoutClusterStateListener liste
}
// call the post added notification on the same event thread
try {
updateTasksExecutor.execute(new PrioritizedRunnable(Priority.HIGH) {
updateTasksExecutor.execute(new TimedPrioritizedRunnable(Priority.HIGH, "_add_listener_") {
@Override
public void run() {
NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
Expand Down Expand Up @@ -260,7 +260,7 @@ public void run() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
timeoutUpdateTask.onFailure(task.source, new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source));
timeoutUpdateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source()));
}
});
}
Expand All @@ -279,35 +279,54 @@ public void run() {

@Override
public List<PendingClusterTask> pendingTasks() {
long now = System.currentTimeMillis();
PrioritizedEsThreadPoolExecutor.Pending[] pendings = updateTasksExecutor.getPending();
List<PendingClusterTask> pendingClusterTasks = new ArrayList<>(pendings.length);
for (PrioritizedEsThreadPoolExecutor.Pending pending : pendings) {
final String source;
final long timeInQueue;
if (pending.task instanceof UpdateTask) {
UpdateTask updateTask = (UpdateTask) pending.task;
source = updateTask.source;
timeInQueue = now - updateTask.addedAt;
if (pending.task instanceof TimedPrioritizedRunnable) {
TimedPrioritizedRunnable runnable = (TimedPrioritizedRunnable) pending.task;
source = runnable.source();
timeInQueue = runnable.timeSinceCreatedInMillis();
} else {
assert false : "expected TimedPrioritizedRunnable got " + pending.task.getClass();
source = "unknown";
timeInQueue = -1;
timeInQueue = 0;
}

pendingClusterTasks.add(new PendingClusterTask(pending.insertionOrder, pending.priority, new StringText(source), timeInQueue, pending.executing));
}
return pendingClusterTasks;
}

class UpdateTask extends PrioritizedRunnable {
static abstract class TimedPrioritizedRunnable extends PrioritizedRunnable {
private final long creationTime;
protected final String source;

protected TimedPrioritizedRunnable(Priority priority, String source) {
super(priority);
this.source = source;
this.creationTime = System.currentTimeMillis();
}

public long timeSinceCreatedInMillis() {
// max with 0 to make sure we always return a non negative number
// even if time shifts.
return Math.max(0, System.currentTimeMillis() - creationTime);
}

public String source() {
return source;
}
}

class UpdateTask extends TimedPrioritizedRunnable {

public final String source;
public final ClusterStateUpdateTask updateTask;
public final long addedAt = System.currentTimeMillis();


UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
super(priority);
this.source = source;
super(priority, source);
this.updateTask = updateTask;
}

Expand Down
Expand Up @@ -43,6 +43,8 @@ public PendingClusterTask() {
}

public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue, boolean executing) {
assert timeInQueue >= 0 : "got a negative timeInQueue [" + timeInQueue + "]";
assert insertOrder >= 0 : "got a negative insertOrder [" + insertOrder + "]";
this.insertOrder = insertOrder;
this.priority = priority;
this.source = source;
Expand Down Expand Up @@ -95,11 +97,10 @@ public void writeTo(StreamOutput out) throws IOException {
Priority.writeTo(priority, out);
out.writeText(source);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
// timeInQueue is set to -1 when unknown and can be negative if time goes backwards
out.writeLong(timeInQueue);
} else {
out.writeVLong(Math.max(0, timeInQueue));
}
out.writeVLong(timeInQueue);
}
if (out.getVersion().onOrAfter(Version.V_1_3_0)) {
out.writeBoolean(executing);
}
Expand Down

0 comments on commit ad15872

Please sign in to comment.