Skip to content

Commit

Permalink
HBASE-16642 Use DelayQueue instead of TimeoutBlockingQueue
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
  • Loading branch information
ikedahi authored and Matteo Bertozzi committed Oct 14, 2016
1 parent 91a7bbd commit 9a94dc9
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 419 deletions.
Expand Up @@ -29,12 +29,15 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
Expand All @@ -48,8 +51,6 @@
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -96,17 +97,58 @@ public interface ProcedureExecutorListener {
} }


/** /**
* Used by the TimeoutBlockingQueue to get the timeout interval of the procedure * Used by the DelayQueue to get the timeout interval of the procedure
*/ */
private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure> { private static class DelayedContainer implements Delayed {
static final DelayedContainer POISON = new DelayedContainer();

/** null if poison */
final Procedure proc;
final long timeoutTime;

DelayedContainer(Procedure proc) {
assert proc != null;
this.proc = proc;
this.timeoutTime = proc.getLastUpdate() + proc.getTimeout();
}

DelayedContainer() {
this.proc = null;
this.timeoutTime = Long.MIN_VALUE;
}

@Override
public long getDelay(TimeUnit unit) {
long currentTime = EnvironmentEdgeManager.currentTime();
if (currentTime >= timeoutTime) {
return 0;
}
return unit.convert(timeoutTime - currentTime, TimeUnit.MICROSECONDS);
}

/**
* @throws NullPointerException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
*/
@Override
public int compareTo(Delayed o) {
return Long.compare(timeoutTime, ((DelayedContainer)o).timeoutTime);
}

@Override @Override
public long getTimeout(Procedure proc) { public boolean equals(Object obj) {
return proc.getTimeRemaining(); if (obj == this) {
return true;
}
if (! (obj instanceof DelayedContainer)) {
return false;
}
return Objects.equals(proc, ((DelayedContainer)obj).proc);
} }


@Override @Override
public TimeUnit getTimeUnit(Procedure proc) { public int hashCode() {
return TimeUnit.MILLISECONDS; return proc != null ? proc.hashCode() : 0;
} }
} }


Expand Down Expand Up @@ -239,8 +281,8 @@ protected void periodicExecute(final TEnvironment env) {
* Timeout Queue that contains Procedures in a WAITING_TIMEOUT state * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state
* or periodic procedures. * or periodic procedures.
*/ */
private final TimeoutBlockingQueue<Procedure> waitingTimeout = private final DelayQueue<DelayedContainer> waitingTimeout =
new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever()); new DelayQueue<DelayedContainer>();


/** /**
* Scheduler/Queue that contains runnable procedures. * Scheduler/Queue that contains runnable procedures.
Expand Down Expand Up @@ -544,7 +586,7 @@ public void stop() {


LOG.info("Stopping the procedure executor"); LOG.info("Stopping the procedure executor");
scheduler.stop(); scheduler.stop();
waitingTimeout.signalAll(); waitingTimeout.add(DelayedContainer.POISON);
} }


public void join() { public void join() {
Expand Down Expand Up @@ -628,7 +670,7 @@ public List<ProcedureInfo> listProcedures() {
*/ */
public void addChore(final ProcedureInMemoryChore chore) { public void addChore(final ProcedureInMemoryChore chore) {
chore.setState(ProcedureState.RUNNABLE); chore.setState(ProcedureState.RUNNABLE);
waitingTimeout.add(chore); waitingTimeout.add(new DelayedContainer(chore));
} }


/** /**
Expand All @@ -638,7 +680,7 @@ public void addChore(final ProcedureInMemoryChore chore) {
*/ */
public boolean removeChore(final ProcedureInMemoryChore chore) { public boolean removeChore(final ProcedureInMemoryChore chore) {
chore.setState(ProcedureState.FINISHED); chore.setState(ProcedureState.FINISHED);
return waitingTimeout.remove(chore); return waitingTimeout.remove(new DelayedContainer(chore));
} }


/** /**
Expand Down Expand Up @@ -927,15 +969,16 @@ private void releaseLock(final Procedure proc, final boolean force) {


private void timeoutLoop() { private void timeoutLoop() {
while (isRunning()) { while (isRunning()) {
Procedure proc = waitingTimeout.poll(); Procedure proc;
if (proc == null) continue; try {

proc = waitingTimeout.take().proc;
if (proc.getTimeRemaining() > 100) { } catch (InterruptedException e) {
// got an early wake, maybe a stop? // Just consume the interruption.
// re-enqueue the task in case was not a stop or just a signal
waitingTimeout.add(proc);
continue; continue;
} }
if (proc == null) { // POISON to stop
break;
}


// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// TODO-MAYBE: Should we provide a notification to the store with the // TODO-MAYBE: Should we provide a notification to the store with the
Expand All @@ -955,8 +998,8 @@ private void timeoutLoop() {
} catch (Throwable e) { } catch (Throwable e) {
LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e); LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e);
} }
proc.setStartTime(EnvironmentEdgeManager.currentTime()); proc.updateTimestamp();
if (proc.isRunnable()) waitingTimeout.add(proc); if (proc.isRunnable()) waitingTimeout.add(new DelayedContainer(proc));
} }
continue; continue;
} }
Expand All @@ -970,8 +1013,6 @@ private void timeoutLoop() {
store.update(proc); store.update(proc);
scheduler.addFront(proc); scheduler.addFront(proc);
continue; continue;
} else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
waitingTimeout.add(proc);
} }
} }
} }
Expand Down Expand Up @@ -1171,15 +1212,15 @@ private void execProcedure(final RootProcedureState procStack, final Procedure p
procedure.setState(ProcedureState.WAITING); procedure.setState(ProcedureState.WAITING);
break; break;
case WAITING_TIMEOUT: case WAITING_TIMEOUT:
waitingTimeout.add(procedure); waitingTimeout.add(new DelayedContainer(procedure));
break; break;
default: default:
break; break;
} }
} }
} }
} else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
waitingTimeout.add(procedure); waitingTimeout.add(new DelayedContainer(procedure));
} else if (!isSuspended) { } else if (!isSuspended) {
// No subtask, so we are done // No subtask, so we are done
procedure.setState(ProcedureState.FINISHED); procedure.setState(ProcedureState.FINISHED);
Expand Down

0 comments on commit 9a94dc9

Please sign in to comment.