Skip to content

Commit

Permalink
[BEAM-5974] Fix ByteKeyRangeTracker to handle tryClaim(ByteKey.EMPTY)…
Browse files Browse the repository at this point in the history
… instead of exposing markDone

Add support for tryClaim(ByteKey.EMPTY) and fix doneness checking and also returning checkpoints
for restrictions that are unstarted or done.
  • Loading branch information
lukecwik committed Nov 5, 2018
1 parent de5db3f commit 7e5f2fb
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 51 deletions.
Expand Up @@ -32,9 +32,17 @@
/**
* A {@link RestrictionTracker} for claiming {@link ByteKey}s in a {@link ByteKeyRange} in a
* monotonically increasing fashion. The range is a semi-open bounded interval [startKey, endKey)
* where the limits are both represented by ByteKey.EMPTY.
* where the limits are both represented by {@link ByteKey#EMPTY}.
*
* <p>Note, one can complete a range by claiming the {@link ByteKey#EMPTY} once one runs out of keys
* to process.
*/
public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKey> {
/* An empty range which contains no keys. */
@VisibleForTesting
static final ByteKeyRange NO_KEYS = ByteKeyRange.of(ByteKey.EMPTY, ByteKey.of(0x00));


private ByteKeyRange range;
@Nullable private ByteKey lastClaimedKey = null;
@Nullable private ByteKey lastAttemptedKey = null;
Expand All @@ -53,9 +61,26 @@ public synchronized ByteKeyRange currentRestriction() {
}

@Override
public synchronized ByteKeyRange checkpoint() {
checkState(lastClaimedKey != null, "Can't checkpoint before any key was successfully claimed");
ByteKey nextKey = next(lastClaimedKey);
public ByteKeyRange checkpoint() {
// If we haven't done any work, we should return the original range we were processing
// as the checkpoint.
if (lastAttemptedKey == null) {
ByteKeyRange rval = ByteKeyRange.of(range.getStartKey(), range.getEndKey());
// We update our current range to an interval that contains no elements.
range = NO_KEYS;
return rval;
}

// Return an empty range if the current range is done.
if (lastAttemptedKey.isEmpty()
|| !(range.getEndKey().isEmpty() || range.getEndKey().compareTo(lastAttemptedKey) > 0)) {
return NO_KEYS;
}

// Otherwise we compute the "remainder" of the range from the last key.
assert lastAttemptedKey.equals(lastClaimedKey)
: "Expect both keys to be equal since the last key attempted was a valid key in the range.";
ByteKey nextKey = next(lastAttemptedKey);
ByteKeyRange res = ByteKeyRange.of(nextKey, range.getEndKey());
this.range = ByteKeyRange.of(range.getStartKey(), nextKey);
return res;
Expand All @@ -64,23 +89,35 @@ public synchronized ByteKeyRange checkpoint() {
/**
* Attempts to claim the given key.
*
* <p>Must be larger than the last successfully claimed key.
* <p>Must be larger than the last attempted key. Note that passing in
* {@link ByteKey#EMPTY} claims all keys to the end of range and can only be claimed once.
*
* @return {@code true} if the key was successfully claimed, {@code false} if it is outside the
* current {@link ByteKeyRange} of this tracker (in that case this operation is a no-op).
*/
@Override
protected synchronized boolean tryClaimImpl(ByteKey key) {
// Handle claiming the end of range EMPTY key
if (key.isEmpty()) {
checkArgument(lastAttemptedKey == null || !lastAttemptedKey.isEmpty(),
"Trying to claim key %s while last attempted key was %s",
key,
lastAttemptedKey);
lastAttemptedKey = key;
return false;
}

checkArgument(
lastAttemptedKey == null || key.compareTo(lastAttemptedKey) > 0,
"Trying to claim key %s while last attempted was %s",
"Trying to claim key %s while last attempted key was %s",
key,
lastAttemptedKey);
checkArgument(
key.compareTo(range.getStartKey()) > -1,
"Trying to claim key %s before start of the range %s",
key,
range);

lastAttemptedKey = key;
// No respective checkArgument for i < range.to() - it's ok to try claiming keys beyond
if (!range.getEndKey().isEmpty() && key.compareTo(range.getEndKey()) > -1) {
Expand All @@ -90,28 +127,36 @@ protected synchronized boolean tryClaimImpl(ByteKey key) {
return true;
}

/**
* Marks that there are no more keys to be claimed in the range.
*
* <p>E.g., a {@link DoFn} reading a file and claiming the key of each record in the file might
* call this if it hits EOF - even though the last attempted claim was before the end of the
* range, there are no more keys to claim.
*/
public synchronized void markDone() {
lastAttemptedKey = range.getEndKey();
}

@Override
public synchronized void checkDone() throws IllegalStateException {
checkState(lastAttemptedKey != null, "Can't check if done before any key claim was attempted");
ByteKey nextKey = next(lastAttemptedKey);
checkState(
nextKey.compareTo(range.getEndKey()) > -1,
"Last attempted key was %s in range %s, claiming work in [%s, %s) was not attempted",
lastAttemptedKey,
range,
nextKey,
range.getEndKey());
public void checkDone() throws IllegalStateException {
// Handle checking the empty range which is implicitly done.
// This case can occur if the range tracker is checkpointed before any keys have been claimed
// or if the range tracker is checkpointed once the range is done.
if (NO_KEYS.equals(range)) {
return;
}

checkState(lastAttemptedKey != null,
"Key range is non-empty %s and no keys have been attempted.",
range);

// Return if the last attempted key was the empty key representing the end of range for
// all ranges.
if (lastAttemptedKey.isEmpty()) {
return;
}

// If the last attempted key was not at or beyond the end of the range then throw.
if (range.getEndKey().isEmpty()
|| range.getEndKey().compareTo(lastAttemptedKey) > 0) {
ByteKey nextKey = next(lastAttemptedKey);
throw new IllegalStateException(String.format(
"Last attempted key was %s in range %s, claiming work in [%s, %s) was not attempted",
lastAttemptedKey,
range,
nextKey,
range.getEndKey()));
}
}

@Override
Expand Down
Expand Up @@ -52,17 +52,34 @@ public void testTryClaim() throws Exception {
public void testCheckpointUnstarted() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
expected.expect(IllegalStateException.class);
tracker.checkpoint();

ByteKeyRange checkpoint = tracker.checkpoint();
// We expect to get the original range back and that the current restriction
// is effectively made empty.
assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), checkpoint);
assertEquals(ByteKeyRangeTracker.NO_KEYS, tracker.currentRestriction());
}

@Test
public void testCheckpointUnstartedForAllKeysRange() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);

ByteKeyRange checkpoint = tracker.checkpoint();
// We expect to get the original range back and that the current restriction
// is effectively made empty.
assertEquals(ByteKeyRange.ALL_KEYS, checkpoint);
assertEquals(ByteKeyRangeTracker.NO_KEYS, tracker.currentRestriction());
}

@Test
public void testCheckpointOnlyFailedClaim() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertFalse(tracker.tryClaim(ByteKey.of(0xd0)));
expected.expect(IllegalStateException.class);
tracker.checkpoint();
ByteKeyRange checkpoint = tracker.checkpoint();
assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
}

@Test
Expand All @@ -89,20 +106,33 @@ public void testCheckpointRegular() throws Exception {
}

@Test
public void testCheckpointClaimedLast() throws Exception {
public void testCheckpointAtLast() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
assertTrue(tracker.tryClaim(ByteKey.of(0xbf)));
assertFalse(tracker.tryClaim(ByteKey.of(0xc0)));
ByteKeyRange checkpoint = tracker.checkpoint();
assertEquals(
ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xbf, 0x00)), tracker.currentRestriction());
assertEquals(ByteKeyRange.of(ByteKey.of(0xbf, 0x00), ByteKey.of(0xc0)), checkpoint);
ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
}

@Test
public void testCheckpointAfterFailedClaim() throws Exception {
public void testCheckpointAtLastUsingAllKeysAndEmptyKey() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
assertFalse(tracker.tryClaim(ByteKey.EMPTY));
ByteKeyRange checkpoint = tracker.checkpoint();
assertEquals(
ByteKeyRange.ALL_KEYS, tracker.currentRestriction());
assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
}

@Test
public void testCheckpointAfterLast() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
Expand All @@ -111,27 +141,41 @@ public void testCheckpointAfterFailedClaim() throws Exception {
assertFalse(tracker.tryClaim(ByteKey.of(0xd0)));
ByteKeyRange checkpoint = tracker.checkpoint();
assertEquals(
ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xa0, 0x00)), tracker.currentRestriction());
assertEquals(ByteKeyRange.of(ByteKey.of(0xa0, 0x00), ByteKey.of(0xc0)), checkpoint);
ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
}

@Test
public void testCheckpointAfterLastUsingEmptyKey() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
assertFalse(tracker.tryClaim(ByteKey.EMPTY));
ByteKeyRange checkpoint = tracker.checkpoint();
assertEquals(
ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
}

@Test
public void testNonMonotonicClaim() throws Exception {
expected.expectMessage("Trying to claim key [70] while last attempted was [90]");
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
expected.expectMessage("Trying to claim key [70] while last attempted key was [90]");
tracker.tryClaim(ByteKey.of(0x70));
}

@Test
public void testClaimBeforeStartOfRange() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
expected.expectMessage(
"Trying to claim key [05] before start of the range "
+ "ByteKeyRange{startKey=[10], endKey=[c0]}");
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
tracker.tryClaim(ByteKey.of(0x05));
}

Expand All @@ -155,6 +199,16 @@ public void testCheckDoneAfterTryClaimAtEndOfRange() {
tracker.checkDone();
}

@Test
public void testCheckDoneWhenClaimingEndOfRangeForEmptyKey() {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.EMPTY));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
assertFalse(tracker.tryClaim(ByteKey.EMPTY));
tracker.checkDone();
}

@Test
public void testCheckDoneAfterTryClaimRightBeforeEndOfRange() {
ByteKeyRangeTracker tracker =
Expand All @@ -169,24 +223,21 @@ public void testCheckDoneAfterTryClaimRightBeforeEndOfRange() {
}

@Test
public void testCheckDoneWhenNotDone() {
public void testCheckDoneForEmptyRange() {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
expected.expectMessage(
"Last attempted key was [90] in range ByteKeyRange{startKey=[10], endKey=[c0]}, "
+ "claiming work in [[9000], [c0]) was not attempted");
ByteKeyRangeTracker.of(ByteKeyRangeTracker.NO_KEYS);
tracker.checkDone();
}

@Test
public void testCheckDoneWhenExplicitlyMarkedDone() {
public void testCheckDoneWhenNotDone() {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
tracker.markDone();
expected.expectMessage(
"Last attempted key was [90] in range ByteKeyRange{startKey=[10], endKey=[c0]}, "
+ "claiming work in [[9000], [c0]) was not attempted");
tracker.checkDone();
}

Expand Down
Expand Up @@ -70,7 +70,7 @@ public void processElement(ProcessContext c, ByteKeyRangeTracker tracker) throws
}
c.output(result);
}
tracker.markDone();
tracker.tryClaim(ByteKey.EMPTY);
}
}

Expand Down

0 comments on commit 7e5f2fb

Please sign in to comment.