Skip to content

Commit

Permalink
HBASE-16802 Procedure v2 - group procedure cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
Matteo Bertozzi committed Oct 11, 2016
1 parent eb52e26 commit 662a1b2
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 8 deletions.
Expand Up @@ -138,6 +138,9 @@ private static class CompletedProcedureCleaner<TEnvironment>
private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min

private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size";
private static final int DEFAULT_BATCH_SIZE = 32;

private final Map<Long, ProcedureInfo> completed;
private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
private final ProcedureStore store;
Expand Down Expand Up @@ -165,6 +168,10 @@ protected void periodicExecute(final TEnvironment env) {

final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);

final long[] batchIds = new long[batchSize];
int batchCount = 0;

final long now = EnvironmentEdgeManager.currentTime();
final Iterator<Map.Entry<Long, ProcedureInfo>> it = completed.entrySet().iterator();
Expand All @@ -179,15 +186,22 @@ protected void periodicExecute(final TEnvironment env) {
if (isDebugEnabled) {
LOG.debug("Evict completed procedure: " + procInfo);
}
store.delete(entry.getKey());
batchIds[batchCount++] = entry.getKey();
if (batchCount == batchIds.length) {
store.delete(batchIds, 0, batchCount);
batchCount = 0;
}
it.remove();

NonceKey nonceKey = procInfo.getNonceKey();
final NonceKey nonceKey = procInfo.getNonceKey();
if (nonceKey != null) {
nonceKeysToProcIdsMap.remove(nonceKey);
}
}
}
if (batchCount > 0) {
store.delete(batchIds, 0, batchCount);
}
}
}

Expand Down
Expand Up @@ -75,4 +75,9 @@ public void delete(long procId) {
public void delete(Procedure proc, long[] subprocs) {
// no-op
}

@Override
public void delete(long[] procIds, int offset, int count) {
// no-op
}
}
Expand Up @@ -196,4 +196,14 @@ public interface ProcedureLoader {
* @param subProcIds the IDs of the sub-procedure to remove.
*/
void delete(Procedure parentProc, long[] subProcIds);

/**
* The specified procIds were removed from the executor,
* due to completion, abort or failure.
* The store implementor should remove all the information about the specified procIds.
* @param procIds the IDs of the procedures to remove.
* @param offset the array offset from where to start to delete
* @param count the number of IDs to delete
*/
void delete(long[] procIds, int offset, int count);
}
Expand Up @@ -465,6 +465,8 @@ public void delete(final long procId) {

@Override
public void delete(final Procedure proc, final long[] subProcIds) {
assert proc != null : "expected a non-null procedure";
assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds";
if (LOG.isTraceEnabled()) {
LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds));
}
Expand All @@ -486,6 +488,42 @@ public void delete(final Procedure proc, final long[] subProcIds) {
}
}

@Override
public void delete(final long[] procIds, final int offset, final int count) {
if (count == 0) return;
if (offset == 0 && count == procIds.length) {
delete(procIds);
} else if (count == 1) {
delete(procIds[offset]);
} else {
delete(Arrays.copyOfRange(procIds, offset, offset + count));
}
}

private void delete(final long[] procIds) {
if (LOG.isTraceEnabled()) {
LOG.trace("Delete " + Arrays.toString(procIds));
}

final ByteSlot slot = acquireSlot();
try {
// Serialize the delete
for (int i = 0; i < procIds.length; ++i) {
ProcedureWALFormat.writeDelete(slot, procIds[i]);
}

// Push the transaction data and wait until it is persisted
pushData(PushType.DELETE, slot, -1, procIds);
} catch (IOException e) {
// We are not able to serialize the procedure.
// this is a code error, and we are not able to go on.
LOG.fatal("Unable to serialize the procedures: " + Arrays.toString(procIds), e);
throw new RuntimeException(e);
} finally {
releaseSlot(slot);
}
}

private ByteSlot acquireSlot() {
ByteSlot slot = slotsCache.poll();
return slot != null ? slot : new ByteSlot();
Expand Down
Expand Up @@ -97,14 +97,15 @@ public static void storeRestart(ProcedureStore procStore, ProcedureStore.Procedu
procStore.load(loader);
}

public static void storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
long runnableCount, int completedCount, int corruptedCount) throws Exception {
final LoadCounter loader = new LoadCounter();
storeRestart(procStore, loader);
assertEquals(maxProcId, loader.getMaxProcId());
assertEquals(runnableCount, loader.getRunnableCount());
assertEquals(completedCount, loader.getCompletedCount());
assertEquals(corruptedCount, loader.getCorruptedCount());
return loader;
}

public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
Expand Down Expand Up @@ -366,6 +367,15 @@ public int getCorruptedCount() {
return corrupted.size();
}

public boolean isRunnable(final long procId) {
for (Procedure proc: runnable) {
if (proc.getProcId() == procId) {
return true;
}
}
return false;
}

@Override
public void setMaxProcId(long maxProcId) {
this.maxProcId = maxProcId;
Expand Down
Expand Up @@ -404,13 +404,13 @@ public void testCorruptedTrailersRebuild() throws Exception {
procStore.insert(procs[1], null);
procStore.insert(procs[2], null);
procStore.insert(procs[3], null);
procStore.delete(procs[0], null);
procStore.delete(procs[0].getProcId());
procStore.rollWriterForTesting();
procStore.delete(procs[2], null);
procStore.delete(procs[2].getProcId());
procStore.update(procs[3]);
procStore.insert(procs[4], null);
procStore.rollWriterForTesting();
procStore.delete(procs[4], null);
procStore.delete(procs[4].getProcId());
procStore.insert(procs[5], null);

// Stop the store
Expand Down Expand Up @@ -737,9 +737,56 @@ public void testLoadChildren() throws Exception {
restartAndAssert(3, 0, 1, 0);
}

private void restartAndAssert(long maxProcId, long runnableCount,
@Test
public void testBatchDelete() throws Exception {
for (int i = 1; i < 10; ++i) {
procStore.insert(new TestProcedure(i), null);
}

// delete nothing
long[] toDelete = new long[] { 1, 2, 3, 4 };
procStore.delete(toDelete, 2, 0);
LoadCounter loader = restartAndAssert(9, 9, 0, 0);
for (int i = 1; i < 10; ++i) {
assertEquals(true, loader.isRunnable(i));
}

// delete the full "toDelete" array (2, 4, 6, 8)
toDelete = new long[] { 2, 4, 6, 8 };
procStore.delete(toDelete, 0, toDelete.length);
loader = restartAndAssert(9, 5, 0, 0);
for (int i = 1; i < 10; ++i) {
assertEquals(i % 2 != 0, loader.isRunnable(i));
}

// delete a slice of "toDelete" (1, 3)
toDelete = new long[] { 5, 7, 1, 3, 9 };
procStore.delete(toDelete, 2, 2);
loader = restartAndAssert(9, 3, 0, 0);
for (int i = 1; i < 10; ++i) {
assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i));
}

// delete a single item (5)
toDelete = new long[] { 5 };
procStore.delete(toDelete, 0, 1);
loader = restartAndAssert(9, 2, 0, 0);
for (int i = 1; i < 10; ++i) {
assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i));
}

// delete remaining using a slice of "toDelete" (7, 9)
toDelete = new long[] { 0, 7, 9 };
procStore.delete(toDelete, 1, 2);
loader = restartAndAssert(0, 0, 0, 0);
for (int i = 1; i < 10; ++i) {
assertEquals(false, loader.isRunnable(i));
}
}

private LoadCounter restartAndAssert(long maxProcId, long runnableCount,
int completedCount, int corruptedCount) throws Exception {
ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId,
return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId,
runnableCount, completedCount, corruptedCount);
}

Expand Down

0 comments on commit 662a1b2

Please sign in to comment.