Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15202: Fix MM2 offset translation when syncs are variably spaced #14156

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSy
// Make a copy of the array before mutating it, so that readers do not see inconsistent data
// TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics.
OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
updateSyncArray(mutableSyncs, offsetSync);
updateSyncArray(mutableSyncs, syncs, offsetSync);
if (log.isTraceEnabled()) {
log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs));
}
Expand Down Expand Up @@ -227,7 +227,7 @@ private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
}
}

private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
private void updateSyncArray(OffsetSync[] syncs, OffsetSync[] original, OffsetSync offsetSync) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: feels a little strange that we're passing in a to-be-mutated sync array. Any reason not to alter updateSyncArray to only take in the original sync array and the new sync, and construct and return the new sync array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is already the signature of the caller, so this would amount to inlining this function into updateExistingSyncs to combine the two.

I think that this function is already large enough, and doesn't also need to be concerned with copying the array or logging the result. I also think it makes batching updates simpler: The caller can manage the lifetimes of the arrays and their mutability, while the inner function can be concerned with just applying the updates.

Also this code was doing that already, here and in the clearSyncArray function.

long upstreamOffset = offsetSync.upstreamOffset();
// While reading to the end of the topic, ensure that our earliest sync is later than
// any earlier sync that could have been used for translation, to preserve monotonicity
Expand All @@ -237,29 +237,49 @@ private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
return;
}
OffsetSync replacement = offsetSync;
// The most-recently-discarded offset sync
// We track this since it may still be eligible for use in the syncs array at a later index
OffsetSync oldValue = syncs[0];
// Index into the original syncs array to the replacement we'll consider next.
int replacementIndex = 0;
// Invariant A is always violated once a new sync appears.
// Repair Invariant A: the latest sync must always be updated
syncs[0] = replacement;
for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
int previous = current - 1;

// We can potentially use oldValue instead of replacement, allowing us to keep more distinct values stored
// If oldValue is not recent, it should be expired from the store
boolean isRecent = invariantB(syncs[previous], oldValue, previous, current);
// Ensure that this value is sufficiently separated from the previous value
// We prefer to keep more recent syncs of similar precision (i.e. the value in replacement)
boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous);
// Ensure that this value is sufficiently separated from the next value
// We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next])
int next = current + 1;
boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current);
// If this condition is false, oldValue will be expired from the store and lost forever.
if (isRecent && separatedFromPrevious && separatedFromNext) {
replacement = oldValue;
}
// Try to choose a value from the old array as the replacement
// This allows us to keep more distinct values stored overall, improving translation.
boolean skipOldValue;
do {
OffsetSync oldValue = original[replacementIndex];
// If oldValue is not recent enough, then it is not valid to use at the current index.
// It may still be valid when used in a later index where values are allowed to be older.
boolean isRecent = invariantB(syncs[previous], oldValue, previous, current);
// Ensure that this value is sufficiently separated from the previous value
// We prefer to keep more recent syncs of similar precision (i.e. the value in replacement)
// If this value is too close to the previous value, it will never be valid in a later position.
boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous);
// Ensure that this value is sufficiently separated from the next value
// We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next])
int next = current + 1;
boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current);
// If the next value in the old array is a duplicate of the current one, then they are equivalent
// This value will not need to be considered again
int nextReplacement = replacementIndex + 1;
boolean duplicate = nextReplacement < SYNCS_PER_PARTITION && oldValue == original[nextReplacement];

// Promoting the oldValue to the replacement only happens if it satisfies all invariants.
boolean promoteOldValueToReplacement = isRecent && separatedFromPrevious && separatedFromNext;
if (promoteOldValueToReplacement) {
replacement = oldValue;
}
// The index should be skipped without promoting if we know that it will not be used at a later index
// based only on the observed part of the array so far.
skipOldValue = duplicate || !separatedFromPrevious;
if (promoteOldValueToReplacement || skipOldValue) {
replacementIndex++;
}
// We may need to skip past multiple indices, so keep looping until we're done skipping forward.
// The index must not get ahead of the current index, as we only promote from low index to high index.
} while (replacementIndex < current && skipOldValue);

// The replacement variable always contains a value which satisfies the invariants for this index.
// This replacement may or may not be used, since the invariants could already be satisfied,
Expand All @@ -273,8 +293,7 @@ private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
break;
} else {
// Invariant B violated for syncs[current]: sync is now too old and must be updated
// Repair Invariant B: swap in replacement, and save the old value for the next iteration
oldValue = syncs[current];
// Repair Invariant B: swap in replacement
syncs[current] = replacement;

assert invariantB(syncs[previous], syncs[current], previous, current);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.junit.jupiter.api.Test;

import java.util.OptionalLong;
import java.util.PrimitiveIterator;
import java.util.Random;
import java.util.stream.LongStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -153,31 +156,85 @@ public void testPastOffsetTranslation() {
}

@Test
public void testKeepMostDistinctSyncs() {
public void testConsistentlySpacedSyncs() {
// Under normal operation, the incoming syncs will be regularly spaced and the store should keep a set of syncs
// which provide the best translation accuracy (expires as few syncs as possible)
// Each new sync should be added to the cache and expire at most one other sync from the cache
long iterations = 10000;
long iterations = 100;
long maxStep = Long.MAX_VALUE / iterations;
// Test a variety of steps (corresponding to the offset.lag.max configuration)
for (long step = 1; step < maxStep; step = (step * 2) + 1) {
for (long firstOffset = 0; firstOffset < 30; firstOffset++) {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
int lastCount = 1;
store.start();
for (long offset = firstOffset; offset <= iterations; offset += step) {
store.sync(tp, offset, offset);
// Invariant A: the latest sync is present
assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
// Invariant D: the earliest sync is present
assertEquals(firstOffset, store.syncFor(tp, 63).upstreamOffset());
int count = countDistinctStoredSyncs(store, tp);
int diff = count - lastCount;
assertTrue(diff >= 0,
"Store expired too many syncs: " + diff + " after receiving offset " + offset);
lastCount = count;
}
}
long finalStep = step;
// Generate a stream of consistently spaced syncs
// Each new sync should be added to the cache and expire at most one other sync from the cache
assertSyncSpacingHasBoundedExpirations(firstOffset, LongStream.generate(() -> finalStep).limit(iterations), 1);
}
}
}

@Test
public void testRandomlySpacedSyncs() {
Random random = new Random(0L); // arbitrary but deterministic seed
int iterationBits = 10;
long iterations = 1 << iterationBits;
for (int n = 1; n < Long.SIZE - iterationBits; n++) {
// A stream with at most n bits of difference between the largest and smallest steps
// will expire n + 2 syncs at once in the worst case, because the sync store is laid out exponentially.
long maximumDifference = 1L << n;
int maximumExpirations = n + 2;
assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, 0L, maximumDifference), maximumExpirations);
// This holds true even if there is a larger minimum step size, such as caused by offsetLagMax
long offsetLagMax = 1L << 16;
assertSyncSpacingHasBoundedExpirations(0, random.longs(iterations, offsetLagMax, offsetLagMax + maximumDifference), maximumExpirations);
}
}

@Test
public void testDroppedSyncsSpacing() {
Random random = new Random(0L); // arbitrary but deterministic seed
long iterations = 10000;
long offsetLagMax = 100;
// Half of the gaps will be offsetLagMax, and half will be double that, as if one intervening sync was dropped.
LongStream stream = random.doubles()
.mapToLong(d -> (d < 0.5 ? 2 : 1) * offsetLagMax)
.limit(iterations);
// This will cause up to 2 syncs to be discarded, because a sequence of two adjacent syncs followed by a
// dropped sync will set up the following situation
// before [d....d,c,b,a....]
// after [e......e,d,a....]
// and syncs b and c are discarded to make room for e and the demoted sync d.
assertSyncSpacingHasBoundedExpirations(0, stream, 2);
}

/**
* Simulate an OffsetSyncStore receiving a sequence of offset syncs as defined by their start offset and gaps.
* After processing each simulated sync, assert that the store has not expired more unique syncs than the bound.
* @param firstOffset First offset to give to the sync store after starting
* @param steps A finite stream of gaps between syncs with some known distribution
* @param maximumExpirations The maximum number of distinct syncs allowed to be expired after a single update.
*/
private void assertSyncSpacingHasBoundedExpirations(long firstOffset, LongStream steps, int maximumExpirations) {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
store.start();
store.sync(tp, firstOffset, firstOffset);
PrimitiveIterator.OfLong iterator = steps.iterator();
long offset = firstOffset;
int lastCount = 1;
while (iterator.hasNext()) {
offset += iterator.nextLong();
assertTrue(offset >= 0, "Test is invalid, offset overflowed");
store.sync(tp, offset, offset);
// Invariant A: the latest sync is present
assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
// Invariant D: the earliest sync is present
assertEquals(firstOffset, store.syncFor(tp, 63).upstreamOffset());
int count = countDistinctStoredSyncs(store, tp);
// We are adding one sync, so if the count didn't change, then exactly one sync expired.
int expiredSyncs = lastCount - count + 1;
assertTrue(expiredSyncs <= maximumExpirations,
"Store expired too many syncs: " + expiredSyncs + " > " + maximumExpirations
+ " after receiving offset " + offset);
lastCount = count;
}
}
}
Expand Down