From 7e5f2fb8c918ad61158c54f0ece773b05c40140f Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Mon, 5 Nov 2018 11:27:15 -0800 Subject: [PATCH] [BEAM-5974] Fix ByteKeyRangeTracker to handle tryClaim(ByteKey.EMPTY) instead of exposing markDone Add support for tryClaim(ByteKey.EMPTY) and fix doneness checking and also returning checkpoints for restrictions that are unstarted or done. --- .../splittabledofn/ByteKeyRangeTracker.java | 99 ++++++++++++++----- .../ByteKeyRangeTrackerTest.java | 97 +++++++++++++----- .../sdk/io/hbase/HBaseReadSplittableDoFn.java | 2 +- 3 files changed, 147 insertions(+), 51 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java index 0a553f7d93113..fde7cf00c2e8f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java @@ -32,9 +32,17 @@ /** * A {@link RestrictionTracker} for claiming {@link ByteKey}s in a {@link ByteKeyRange} in a * monotonically increasing fashion. The range is a semi-open bounded interval [startKey, endKey) - * where the limits are both represented by ByteKey.EMPTY. + * where the limits are both represented by {@link ByteKey#EMPTY}. + * + *

Note, one can complete a range by claiming the {@link ByteKey#EMPTY} once one runs out of keys + * to process. */ public class ByteKeyRangeTracker extends RestrictionTracker { + /* An empty range which contains no keys. */ + @VisibleForTesting + static final ByteKeyRange NO_KEYS = ByteKeyRange.of(ByteKey.EMPTY, ByteKey.of(0x00)); + + private ByteKeyRange range; @Nullable private ByteKey lastClaimedKey = null; @Nullable private ByteKey lastAttemptedKey = null; @@ -53,9 +61,26 @@ public synchronized ByteKeyRange currentRestriction() { } @Override - public synchronized ByteKeyRange checkpoint() { - checkState(lastClaimedKey != null, "Can't checkpoint before any key was successfully claimed"); - ByteKey nextKey = next(lastClaimedKey); + public ByteKeyRange checkpoint() { + // If we haven't done any work, we should return the original range we were processing + // as the checkpoint. + if (lastAttemptedKey == null) { + ByteKeyRange rval = ByteKeyRange.of(range.getStartKey(), range.getEndKey()); + // We update our current range to an interval that contains no elements. + range = NO_KEYS; + return rval; + } + + // Return an empty range if the current range is done. + if (lastAttemptedKey.isEmpty() + || !(range.getEndKey().isEmpty() || range.getEndKey().compareTo(lastAttemptedKey) > 0)) { + return NO_KEYS; + } + + // Otherwise we compute the "remainder" of the range from the last key. + assert lastAttemptedKey.equals(lastClaimedKey) + : "Expect both keys to be equal since the last key attempted was a valid key in the range."; + ByteKey nextKey = next(lastAttemptedKey); ByteKeyRange res = ByteKeyRange.of(nextKey, range.getEndKey()); this.range = ByteKeyRange.of(range.getStartKey(), nextKey); return res; @@ -64,16 +89,27 @@ public synchronized ByteKeyRange checkpoint() { /** * Attempts to claim the given key. * - *

Must be larger than the last successfully claimed key. + *

Must be larger than the last attempted key. Note that passing in + * {@link ByteKey#EMPTY} claims all keys to the end of range and can only be claimed once. * * @return {@code true} if the key was successfully claimed, {@code false} if it is outside the * current {@link ByteKeyRange} of this tracker (in that case this operation is a no-op). */ @Override protected synchronized boolean tryClaimImpl(ByteKey key) { + // Handle claiming the end of range EMPTY key + if (key.isEmpty()) { + checkArgument(lastAttemptedKey == null || !lastAttemptedKey.isEmpty(), + "Trying to claim key %s while last attempted key was %s", + key, + lastAttemptedKey); + lastAttemptedKey = key; + return false; + } + checkArgument( lastAttemptedKey == null || key.compareTo(lastAttemptedKey) > 0, - "Trying to claim key %s while last attempted was %s", + "Trying to claim key %s while last attempted key was %s", key, lastAttemptedKey); checkArgument( @@ -81,6 +117,7 @@ protected synchronized boolean tryClaimImpl(ByteKey key) { "Trying to claim key %s before start of the range %s", key, range); + lastAttemptedKey = key; // No respective checkArgument for i < range.to() - it's ok to try claiming keys beyond if (!range.getEndKey().isEmpty() && key.compareTo(range.getEndKey()) > -1) { @@ -90,28 +127,36 @@ protected synchronized boolean tryClaimImpl(ByteKey key) { return true; } - /** - * Marks that there are no more keys to be claimed in the range. - * - *

E.g., a {@link DoFn} reading a file and claiming the key of each record in the file might - * call this if it hits EOF - even though the last attempted claim was before the end of the - * range, there are no more keys to claim. - */ - public synchronized void markDone() { - lastAttemptedKey = range.getEndKey(); - } - @Override - public synchronized void checkDone() throws IllegalStateException { - checkState(lastAttemptedKey != null, "Can't check if done before any key claim was attempted"); - ByteKey nextKey = next(lastAttemptedKey); - checkState( - nextKey.compareTo(range.getEndKey()) > -1, - "Last attempted key was %s in range %s, claiming work in [%s, %s) was not attempted", - lastAttemptedKey, - range, - nextKey, - range.getEndKey()); + public void checkDone() throws IllegalStateException { + // Handle checking the empty range which is implicitly done. + // This case can occur if the range tracker is checkpointed before any keys have been claimed + // or if the range tracker is checkpointed once the range is done. + if (NO_KEYS.equals(range)) { + return; + } + + checkState(lastAttemptedKey != null, + "Key range is non-empty %s and no keys have been attempted.", + range); + + // Return if the last attempted key was the empty key representing the end of range for + // all ranges. + if (lastAttemptedKey.isEmpty()) { + return; + } + + // If the last attempted key was not at or beyond the end of the range then throw. + if (range.getEndKey().isEmpty() + || range.getEndKey().compareTo(lastAttemptedKey) > 0) { + ByteKey nextKey = next(lastAttemptedKey); + throw new IllegalStateException(String.format( + "Last attempted key was %s in range %s, claiming work in [%s, %s) was not attempted", + lastAttemptedKey, + range, + nextKey, + range.getEndKey())); + } } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java index a285ec82ef62d..1d81070344e37 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java @@ -52,8 +52,24 @@ public void testTryClaim() throws Exception { public void testCheckpointUnstarted() throws Exception { ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0))); - expected.expect(IllegalStateException.class); - tracker.checkpoint(); + + ByteKeyRange checkpoint = tracker.checkpoint(); + // We expect to get the original range back and that the current restriction + // is effectively made empty. + assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), checkpoint); + assertEquals(ByteKeyRangeTracker.NO_KEYS, tracker.currentRestriction()); + } + + @Test + public void testCheckpointUnstartedForAllKeysRange() throws Exception { + ByteKeyRangeTracker tracker = + ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS); + + ByteKeyRange checkpoint = tracker.checkpoint(); + // We expect to get the original range back and that the current restriction + // is effectively made empty. + assertEquals(ByteKeyRange.ALL_KEYS, checkpoint); + assertEquals(ByteKeyRangeTracker.NO_KEYS, tracker.currentRestriction()); } @Test @@ -61,8 +77,9 @@ public void testCheckpointOnlyFailedClaim() throws Exception { ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0))); assertFalse(tracker.tryClaim(ByteKey.of(0xd0))); - expected.expect(IllegalStateException.class); - tracker.checkpoint(); + ByteKeyRange checkpoint = tracker.checkpoint(); + assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction()); + assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint); } @Test @@ -89,20 +106,33 @@ public void testCheckpointRegular() throws Exception { } @Test - public void testCheckpointClaimedLast() throws Exception { + public void testCheckpointAtLast() throws Exception { ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0))); assertTrue(tracker.tryClaim(ByteKey.of(0x50))); assertTrue(tracker.tryClaim(ByteKey.of(0x90))); - assertTrue(tracker.tryClaim(ByteKey.of(0xbf))); + assertFalse(tracker.tryClaim(ByteKey.of(0xc0))); ByteKeyRange checkpoint = tracker.checkpoint(); assertEquals( - ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xbf, 0x00)), tracker.currentRestriction()); - assertEquals(ByteKeyRange.of(ByteKey.of(0xbf, 0x00), ByteKey.of(0xc0)), checkpoint); + ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction()); + assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint); } @Test - public void testCheckpointAfterFailedClaim() throws Exception { + public void testCheckpointAtLastUsingAllKeysAndEmptyKey() throws Exception { + ByteKeyRangeTracker tracker = + ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS); + assertTrue(tracker.tryClaim(ByteKey.of(0x50))); + assertTrue(tracker.tryClaim(ByteKey.of(0x90))); + assertFalse(tracker.tryClaim(ByteKey.EMPTY)); + ByteKeyRange checkpoint = tracker.checkpoint(); + assertEquals( + ByteKeyRange.ALL_KEYS, tracker.currentRestriction()); + assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint); + } + + @Test + public void testCheckpointAfterLast() throws Exception { ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0))); assertTrue(tracker.tryClaim(ByteKey.of(0x50))); @@ -111,27 +141,41 @@ public void testCheckpointAfterFailedClaim() throws Exception { assertFalse(tracker.tryClaim(ByteKey.of(0xd0))); ByteKeyRange checkpoint = tracker.checkpoint(); assertEquals( - ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xa0, 0x00)), tracker.currentRestriction()); - assertEquals(ByteKeyRange.of(ByteKey.of(0xa0, 0x00), ByteKey.of(0xc0)), checkpoint); + ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction()); + assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint); + } + + @Test + public void testCheckpointAfterLastUsingEmptyKey() throws Exception { + ByteKeyRangeTracker tracker = + ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0))); + assertTrue(tracker.tryClaim(ByteKey.of(0x50))); + assertTrue(tracker.tryClaim(ByteKey.of(0x90))); + assertTrue(tracker.tryClaim(ByteKey.of(0xa0))); + assertFalse(tracker.tryClaim(ByteKey.EMPTY)); + ByteKeyRange checkpoint = tracker.checkpoint(); + assertEquals( + ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction()); + assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint); } @Test public void testNonMonotonicClaim() throws Exception { - expected.expectMessage("Trying to claim key [70] while last attempted was [90]"); ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0))); assertTrue(tracker.tryClaim(ByteKey.of(0x50))); assertTrue(tracker.tryClaim(ByteKey.of(0x90))); + expected.expectMessage("Trying to claim key [70] while last attempted key was [90]"); tracker.tryClaim(ByteKey.of(0x70)); } @Test public void testClaimBeforeStartOfRange() throws Exception { + ByteKeyRangeTracker tracker = + ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0))); expected.expectMessage( "Trying to claim key [05] before start of the range " + "ByteKeyRange{startKey=[10], endKey=[c0]}"); - ByteKeyRangeTracker tracker = - ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0))); tracker.tryClaim(ByteKey.of(0x05)); } @@ -155,6 +199,16 @@ public void testCheckDoneAfterTryClaimAtEndOfRange() { tracker.checkDone(); } + @Test + public void testCheckDoneWhenClaimingEndOfRangeForEmptyKey() { + ByteKeyRangeTracker tracker = + ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.EMPTY)); + assertTrue(tracker.tryClaim(ByteKey.of(0x50))); + assertTrue(tracker.tryClaim(ByteKey.of(0x90))); + assertFalse(tracker.tryClaim(ByteKey.EMPTY)); + tracker.checkDone(); + } + @Test public void testCheckDoneAfterTryClaimRightBeforeEndOfRange() { ByteKeyRangeTracker tracker = @@ -169,24 +223,21 @@ public void testCheckDoneAfterTryClaimRightBeforeEndOfRange() { } @Test - public void testCheckDoneWhenNotDone() { + public void testCheckDoneForEmptyRange() { ByteKeyRangeTracker tracker = - ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0))); - assertTrue(tracker.tryClaim(ByteKey.of(0x50))); - assertTrue(tracker.tryClaim(ByteKey.of(0x90))); - expected.expectMessage( - "Last attempted key was [90] in range ByteKeyRange{startKey=[10], endKey=[c0]}, " - + "claiming work in [[9000], [c0]) was not attempted"); + ByteKeyRangeTracker.of(ByteKeyRangeTracker.NO_KEYS); tracker.checkDone(); } @Test - public void testCheckDoneWhenExplicitlyMarkedDone() { + public void testCheckDoneWhenNotDone() { ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0))); assertTrue(tracker.tryClaim(ByteKey.of(0x50))); assertTrue(tracker.tryClaim(ByteKey.of(0x90))); - tracker.markDone(); + expected.expectMessage( + "Last attempted key was [90] in range ByteKeyRange{startKey=[10], endKey=[c0]}, " + + "claiming work in [[9000], [c0]) was not attempted"); tracker.checkDone(); } diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java index 5df8c037ea038..a3c17e6ca1b35 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java @@ -70,7 +70,7 @@ public void processElement(ProcessContext c, ByteKeyRangeTracker tracker) throws } c.output(result); } - tracker.markDone(); + tracker.tryClaim(ByteKey.EMPTY); } }