From 662a1b241f05f93aee9a5b05d7929a482a5bfcd5 Mon Sep 17 00:00:00 2001 From: Matteo Bertozzi Date: Tue, 11 Oct 2016 16:17:09 -0700 Subject: [PATCH] HBASE-16802 Procedure v2 - group procedure cleaning --- .../hbase/procedure2/ProcedureExecutor.java | 18 +++++- .../procedure2/store/NoopProcedureStore.java | 5 ++ .../procedure2/store/ProcedureStore.java | 10 ++++ .../store/wal/WALProcedureStore.java | 38 +++++++++++++ .../procedure2/ProcedureTestingUtility.java | 12 +++- .../store/wal/TestWALProcedureStore.java | 57 +++++++++++++++++-- 6 files changed, 132 insertions(+), 8 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 0572dcf63d08..2eeef9e81955 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -138,6 +138,9 @@ private static class CompletedProcedureCleaner private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl"; private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min + private static final String BATCH_SIZE_CONF_KEY = "hbase.procedure.cleaner.evict.batch.size"; + private static final int DEFAULT_BATCH_SIZE = 32; + private final Map completed; private final Map nonceKeysToProcIdsMap; private final ProcedureStore store; @@ -165,6 +168,10 @@ protected void periodicExecute(final TEnvironment env) { final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL); final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL); + final int batchSize = conf.getInt(BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE); + + final long[] batchIds = new long[batchSize]; + int batchCount = 0; final long now = EnvironmentEdgeManager.currentTime(); final Iterator> it = completed.entrySet().iterator(); @@ -179,15 +186,22 @@ protected void periodicExecute(final TEnvironment env) { if (isDebugEnabled) { LOG.debug("Evict completed procedure: " + procInfo); } - store.delete(entry.getKey()); + batchIds[batchCount++] = entry.getKey(); + if (batchCount == batchIds.length) { + store.delete(batchIds, 0, batchCount); + batchCount = 0; + } it.remove(); - NonceKey nonceKey = procInfo.getNonceKey(); + final NonceKey nonceKey = procInfo.getNonceKey(); if (nonceKey != null) { nonceKeysToProcIdsMap.remove(nonceKey); } } } + if (batchCount > 0) { + store.delete(batchIds, 0, batchCount); + } } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java index c9808a1cce0d..82ef8f0b7028 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java @@ -75,4 +75,9 @@ public void delete(long procId) { public void delete(Procedure proc, long[] subprocs) { // no-op } + + @Override + public void delete(long[] procIds, int offset, int count) { + // no-op + } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 11216d80dc33..7df52263101d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -196,4 +196,14 @@ public interface ProcedureLoader { * @param subProcIds the IDs of the sub-procedure to remove. */ void delete(Procedure parentProc, long[] subProcIds); + + /** + * The specified procIds were removed from the executor, + * due to completion, abort or failure. + * The store implementor should remove all the information about the specified procIds. + * @param procIds the IDs of the procedures to remove. + * @param offset the array offset from where to start to delete + * @param count the number of IDs to delete + */ + void delete(long[] procIds, int offset, int count); } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 1e604026de57..3a46f8ff9752 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -465,6 +465,8 @@ public void delete(final long procId) { @Override public void delete(final Procedure proc, final long[] subProcIds) { + assert proc != null : "expected a non-null procedure"; + assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds"; if (LOG.isTraceEnabled()) { LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds)); } @@ -486,6 +488,42 @@ public void delete(final Procedure proc, final long[] subProcIds) { } } + @Override + public void delete(final long[] procIds, final int offset, final int count) { + if (count == 0) return; + if (offset == 0 && count == procIds.length) { + delete(procIds); + } else if (count == 1) { + delete(procIds[offset]); + } else { + delete(Arrays.copyOfRange(procIds, offset, offset + count)); + } + } + + private void delete(final long[] procIds) { + if (LOG.isTraceEnabled()) { + LOG.trace("Delete " + Arrays.toString(procIds)); + } + + final ByteSlot slot = acquireSlot(); + try { + // Serialize the delete + for (int i = 0; i < procIds.length; ++i) { + ProcedureWALFormat.writeDelete(slot, procIds[i]); + } + + // Push the transaction data and wait until it is persisted + pushData(PushType.DELETE, slot, -1, procIds); + } catch (IOException e) { + // We are not able to serialize the procedure. + // this is a code error, and we are not able to go on. + LOG.fatal("Unable to serialize the procedures: " + Arrays.toString(procIds), e); + throw new RuntimeException(e); + } finally { + releaseSlot(slot); + } + } + private ByteSlot acquireSlot() { ByteSlot slot = slotsCache.poll(); return slot != null ? slot : new ByteSlot(); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index d767a0f23d85..0b85ff88b07f 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -97,7 +97,7 @@ public static void storeRestart(ProcedureStore procStore, ProcedureStore.Procedu procStore.load(loader); } - public static void storeRestartAndAssert(ProcedureStore procStore, long maxProcId, + public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId, long runnableCount, int completedCount, int corruptedCount) throws Exception { final LoadCounter loader = new LoadCounter(); storeRestart(procStore, loader); @@ -105,6 +105,7 @@ public static void storeRestartAndAssert(ProcedureStore procStore, long maxProcI assertEquals(runnableCount, loader.getRunnableCount()); assertEquals(completedCount, loader.getCompletedCount()); assertEquals(corruptedCount, loader.getCorruptedCount()); + return loader; } public static void setKillBeforeStoreUpdate(ProcedureExecutor procExecutor, @@ -366,6 +367,15 @@ public int getCorruptedCount() { return corrupted.size(); } + public boolean isRunnable(final long procId) { + for (Procedure proc: runnable) { + if (proc.getProcId() == procId) { + return true; + } + } + return false; + } + @Override public void setMaxProcId(long maxProcId) { this.maxProcId = maxProcId; diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 5353d62187bd..7ecffa13bb02 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -404,13 +404,13 @@ public void testCorruptedTrailersRebuild() throws Exception { procStore.insert(procs[1], null); procStore.insert(procs[2], null); procStore.insert(procs[3], null); - procStore.delete(procs[0], null); + procStore.delete(procs[0].getProcId()); procStore.rollWriterForTesting(); - procStore.delete(procs[2], null); + procStore.delete(procs[2].getProcId()); procStore.update(procs[3]); procStore.insert(procs[4], null); procStore.rollWriterForTesting(); - procStore.delete(procs[4], null); + procStore.delete(procs[4].getProcId()); procStore.insert(procs[5], null); // Stop the store @@ -737,9 +737,56 @@ public void testLoadChildren() throws Exception { restartAndAssert(3, 0, 1, 0); } - private void restartAndAssert(long maxProcId, long runnableCount, + @Test + public void testBatchDelete() throws Exception { + for (int i = 1; i < 10; ++i) { + procStore.insert(new TestProcedure(i), null); + } + + // delete nothing + long[] toDelete = new long[] { 1, 2, 3, 4 }; + procStore.delete(toDelete, 2, 0); + LoadCounter loader = restartAndAssert(9, 9, 0, 0); + for (int i = 1; i < 10; ++i) { + assertEquals(true, loader.isRunnable(i)); + } + + // delete the full "toDelete" array (2, 4, 6, 8) + toDelete = new long[] { 2, 4, 6, 8 }; + procStore.delete(toDelete, 0, toDelete.length); + loader = restartAndAssert(9, 5, 0, 0); + for (int i = 1; i < 10; ++i) { + assertEquals(i % 2 != 0, loader.isRunnable(i)); + } + + // delete a slice of "toDelete" (1, 3) + toDelete = new long[] { 5, 7, 1, 3, 9 }; + procStore.delete(toDelete, 2, 2); + loader = restartAndAssert(9, 3, 0, 0); + for (int i = 1; i < 10; ++i) { + assertEquals(i > 3 && i % 2 != 0, loader.isRunnable(i)); + } + + // delete a single item (5) + toDelete = new long[] { 5 }; + procStore.delete(toDelete, 0, 1); + loader = restartAndAssert(9, 2, 0, 0); + for (int i = 1; i < 10; ++i) { + assertEquals(i > 5 && i % 2 != 0, loader.isRunnable(i)); + } + + // delete remaining using a slice of "toDelete" (7, 9) + toDelete = new long[] { 0, 7, 9 }; + procStore.delete(toDelete, 1, 2); + loader = restartAndAssert(0, 0, 0, 0); + for (int i = 1; i < 10; ++i) { + assertEquals(false, loader.isRunnable(i)); + } + } + + private LoadCounter restartAndAssert(long maxProcId, long runnableCount, int completedCount, int corruptedCount) throws Exception { - ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId, + return ProcedureTestingUtility.storeRestartAndAssert(procStore, maxProcId, runnableCount, completedCount, corruptedCount); }