Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-5974] Fix ByteKeyRangeTracker to handle tryClaim(ByteKey.EMPTY) instead of exposing markDone #6949

Merged
merged 2 commits into from Nov 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -27,14 +27,20 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.transforms.DoFn;

/**
* A {@link RestrictionTracker} for claiming {@link ByteKey}s in a {@link ByteKeyRange} in a
* monotonically increasing fashion. The range is a semi-open bounded interval [startKey, endKey)
* where the limits are both represented by ByteKey.EMPTY.
* where the limits are both represented by {@link ByteKey#EMPTY}.
*
* <p>Note, one can complete a range by claiming the {@link ByteKey#EMPTY} once one runs out of keys
* to process.
*/
public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKey> {
/* An empty range which contains no keys. */
@VisibleForTesting
static final ByteKeyRange NO_KEYS = ByteKeyRange.of(ByteKey.EMPTY, ByteKey.of(0x00));

private ByteKeyRange range;
@Nullable private ByteKey lastClaimedKey = null;
@Nullable private ByteKey lastAttemptedKey = null;
Expand All @@ -54,8 +60,25 @@ public synchronized ByteKeyRange currentRestriction() {

@Override
public synchronized ByteKeyRange checkpoint() {
checkState(lastClaimedKey != null, "Can't checkpoint before any key was successfully claimed");
ByteKey nextKey = next(lastClaimedKey);
// If we haven't done any work, we should return the original range we were processing
// as the checkpoint.
if (lastAttemptedKey == null) {
ByteKeyRange rval = ByteKeyRange.of(range.getStartKey(), range.getEndKey());
// We update our current range to an interval that contains no elements.
range = NO_KEYS;
return rval;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newbie question: would it be incorrect to return the original range instead of creating a new one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for some reason I goofed and forgot how to do a swap using a temp variable. I'll submit that as a follow up change since it is a minor optimization over the current implementation and the tests passed.

}

// Return an empty range if the current range is done.
if (lastAttemptedKey.isEmpty()
|| !(range.getEndKey().isEmpty() || range.getEndKey().compareTo(lastAttemptedKey) > 0)) {
return NO_KEYS;
}

// Otherwise we compute the "remainder" of the range from the last key.
assert lastAttemptedKey.equals(lastClaimedKey)
: "Expect both keys to be equal since the last key attempted was a valid key in the range.";
ByteKey nextKey = next(lastAttemptedKey);
ByteKeyRange res = ByteKeyRange.of(nextKey, range.getEndKey());
this.range = ByteKeyRange.of(range.getStartKey(), nextKey);
return res;
Expand All @@ -64,23 +87,36 @@ public synchronized ByteKeyRange checkpoint() {
/**
* Attempts to claim the given key.
*
* <p>Must be larger than the last successfully claimed key.
* <p>Must be larger than the last attempted key. Note that passing in {@link ByteKey#EMPTY}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we care about whether the key is greater than last attempted rather than last claimed?

Would you mind adding a short javadoc defining what lastAttemptedKey represents? I believe it represents the last attempted claim, regardless of whether it succeeded or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will add the javadoc in the follow-up PR since tests passed.

* claims all keys to the end of range and can only be claimed once.
*
* @return {@code true} if the key was successfully claimed, {@code false} if it is outside the
* current {@link ByteKeyRange} of this tracker (in that case this operation is a no-op).
*/
@Override
protected synchronized boolean tryClaimImpl(ByteKey key) {
// Handle claiming the end of range EMPTY key
if (key.isEmpty()) {
checkArgument(
lastAttemptedKey == null || !lastAttemptedKey.isEmpty(),
"Trying to claim key %s while last attempted key was %s",
key,
lastAttemptedKey);
lastAttemptedKey = key;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The javadoc @return comment states that returning false indicates a no-op; does that mean we shouldn't mutate lastAttemptedKey here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The range is always a semi-bounded open interval and hence "" being the end key can never be claimed.
So updating lastAttemptedKey and returning false is the correct thing to do.

return false;
}

checkArgument(
lastAttemptedKey == null || key.compareTo(lastAttemptedKey) > 0,
"Trying to claim key %s while last attempted was %s",
"Trying to claim key %s while last attempted key was %s",
key,
lastAttemptedKey);
checkArgument(
key.compareTo(range.getStartKey()) > -1,
"Trying to claim key %s before start of the range %s",
key,
range);

lastAttemptedKey = key;
// No respective checkArgument for i < range.to() - it's ok to try claiming keys beyond
if (!range.getEndKey().isEmpty() && key.compareTo(range.getEndKey()) > -1) {
Expand All @@ -90,32 +126,38 @@ protected synchronized boolean tryClaimImpl(ByteKey key) {
return true;
}

/**
* Marks that there are no more keys to be claimed in the range.
*
* <p>E.g., a {@link DoFn} reading a file and claiming the key of each record in the file might
* call this if it hits EOF - even though the last attempted claim was before the end of the
* range, there are no more keys to claim.
*/
public synchronized void markDone() {
lastAttemptedKey = range.getEndKey();
}

@Override
public synchronized void checkDone() throws IllegalStateException {
checkState(lastAttemptedKey != null, "Can't check if done before any key claim was attempted");
ByteKey nextKey = next(lastAttemptedKey);
// Handle checking the empty range which is implicitly done.
// This case can occur if the range tracker is checkpointed before any keys have been claimed
// or if the range tracker is checkpointed once the range is done.
if (NO_KEYS.equals(range)) {
return;
}

checkState(
nextKey.compareTo(range.getEndKey()) > -1,
"Last attempted key was %s in range %s, claiming work in [%s, %s) was not attempted",
lastAttemptedKey,
range,
nextKey,
range.getEndKey());
lastAttemptedKey != null,
"Key range is non-empty %s and no keys have been attempted.",
range);

// Return if the last attempted key was the empty key representing the end of range for
// all ranges.
if (lastAttemptedKey.isEmpty()) {
return;
}

// If the last attempted key was not at or beyond the end of the range then throw.
if (range.getEndKey().isEmpty() || range.getEndKey().compareTo(lastAttemptedKey) > 0) {
ByteKey nextKey = next(lastAttemptedKey);
throw new IllegalStateException(
String.format(
"Last attempted key was %s in range %s, claiming work in [%s, %s) was not attempted",
lastAttemptedKey, range, nextKey, range.getEndKey()));
}
}

@Override
public String toString() {
public synchronized String toString() {
return MoreObjects.toStringHelper(this)
.add("range", range)
.add("lastClaimedKey", lastClaimedKey)
Expand Down
Expand Up @@ -52,17 +52,33 @@ public void testTryClaim() throws Exception {
public void testCheckpointUnstarted() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
expected.expect(IllegalStateException.class);
tracker.checkpoint();

ByteKeyRange checkpoint = tracker.checkpoint();
// We expect to get the original range back and that the current restriction
// is effectively made empty.
assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), checkpoint);
assertEquals(ByteKeyRangeTracker.NO_KEYS, tracker.currentRestriction());
}

@Test
public void testCheckpointUnstartedForAllKeysRange() throws Exception {
ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);

ByteKeyRange checkpoint = tracker.checkpoint();
// We expect to get the original range back and that the current restriction
// is effectively made empty.
assertEquals(ByteKeyRange.ALL_KEYS, checkpoint);
assertEquals(ByteKeyRangeTracker.NO_KEYS, tracker.currentRestriction());
}

@Test
public void testCheckpointOnlyFailedClaim() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertFalse(tracker.tryClaim(ByteKey.of(0xd0)));
expected.expect(IllegalStateException.class);
tracker.checkpoint();
ByteKeyRange checkpoint = tracker.checkpoint();
assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
}

@Test
Expand All @@ -89,49 +105,71 @@ public void testCheckpointRegular() throws Exception {
}

@Test
public void testCheckpointClaimedLast() throws Exception {
public void testCheckpointAtLast() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
assertTrue(tracker.tryClaim(ByteKey.of(0xbf)));
assertFalse(tracker.tryClaim(ByteKey.of(0xc0)));
ByteKeyRange checkpoint = tracker.checkpoint();
assertEquals(
ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xbf, 0x00)), tracker.currentRestriction());
assertEquals(ByteKeyRange.of(ByteKey.of(0xbf, 0x00), ByteKey.of(0xc0)), checkpoint);
assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
}

@Test
public void testCheckpointAfterFailedClaim() throws Exception {
public void testCheckpointAtLastUsingAllKeysAndEmptyKey() throws Exception {
ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
assertFalse(tracker.tryClaim(ByteKey.EMPTY));
ByteKeyRange checkpoint = tracker.checkpoint();
assertEquals(ByteKeyRange.ALL_KEYS, tracker.currentRestriction());
assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
}

@Test
public void testCheckpointAfterLast() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
assertFalse(tracker.tryClaim(ByteKey.of(0xd0)));
ByteKeyRange checkpoint = tracker.checkpoint();
assertEquals(
ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xa0, 0x00)), tracker.currentRestriction());
assertEquals(ByteKeyRange.of(ByteKey.of(0xa0, 0x00), ByteKey.of(0xc0)), checkpoint);
assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
}

@Test
public void testCheckpointAfterLastUsingEmptyKey() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
assertFalse(tracker.tryClaim(ByteKey.EMPTY));
ByteKeyRange checkpoint = tracker.checkpoint();
assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
}

@Test
public void testNonMonotonicClaim() throws Exception {
expected.expectMessage("Trying to claim key [70] while last attempted was [90]");
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
expected.expectMessage("Trying to claim key [70] while last attempted key was [90]");
tracker.tryClaim(ByteKey.of(0x70));
}

@Test
public void testClaimBeforeStartOfRange() throws Exception {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
expected.expectMessage(
"Trying to claim key [05] before start of the range "
+ "ByteKeyRange{startKey=[10], endKey=[c0]}");
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
tracker.tryClaim(ByteKey.of(0x05));
}

Expand All @@ -155,6 +193,16 @@ public void testCheckDoneAfterTryClaimAtEndOfRange() {
tracker.checkDone();
}

@Test
public void testCheckDoneWhenClaimingEndOfRangeForEmptyKey() {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.EMPTY));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
assertFalse(tracker.tryClaim(ByteKey.EMPTY));
tracker.checkDone();
}

@Test
public void testCheckDoneAfterTryClaimRightBeforeEndOfRange() {
ByteKeyRangeTracker tracker =
Expand All @@ -169,24 +217,20 @@ public void testCheckDoneAfterTryClaimRightBeforeEndOfRange() {
}

@Test
public void testCheckDoneWhenNotDone() {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
expected.expectMessage(
"Last attempted key was [90] in range ByteKeyRange{startKey=[10], endKey=[c0]}, "
+ "claiming work in [[9000], [c0]) was not attempted");
public void testCheckDoneForEmptyRange() {
ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRangeTracker.NO_KEYS);
tracker.checkDone();
}

@Test
public void testCheckDoneWhenExplicitlyMarkedDone() {
public void testCheckDoneWhenNotDone() {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
tracker.markDone();
expected.expectMessage(
"Last attempted key was [90] in range ByteKeyRange{startKey=[10], endKey=[c0]}, "
+ "claiming work in [[9000], [c0]) was not attempted");
tracker.checkDone();
}

Expand Down
Expand Up @@ -70,7 +70,7 @@ public void processElement(ProcessContext c, ByteKeyRangeTracker tracker) throws
}
c.output(result);
}
tracker.markDone();
tracker.tryClaim(ByteKey.EMPTY);
}
}

Expand Down