Skip to content

Commit

Permalink
Lazy initialize checkpoint tracker bit sets
Browse files Browse the repository at this point in the history
This local checkpoint tracker uses collections of bit sets to track
which sequence numbers are complete, eventually removing these bit sets
when the local checkpoint advances. However, these bit sets were eagerly
allocated so that if a sequence number far ahead of the checkpoint was
marked as completed, all bit sets between the "last" bit set and the bit
set needed to track the marked sequence number were allocated. If this
sequence number was too far ahead, the memory requirements could be
excessive. This commit opts for a different strategy for holding on to
these bit sets and enables them to be lazily allocated.

Relates #27179
  • Loading branch information
jasontedor committed Nov 2, 2017
1 parent 6714cdb commit 8d3ffa2
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 53 deletions.
Expand Up @@ -19,12 +19,9 @@

package org.elasticsearch.index.seqno;

import com.carrotsearch.hppc.LongObjectHashMap;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.index.IndexSettings;

import java.util.LinkedList;

/**
* This class generates sequences numbers and keeps track of the so-called "local checkpoint" which is the highest number for which all
Expand All @@ -33,21 +30,16 @@
public class LocalCheckpointTracker {

/**
* We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on
* demand and cleaning up while completed. This constant controls the size of the arrays.
* We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple sets allocating them on
* demand and cleaning up while completed. This constant controls the size of the sets.
*/
static final int BIT_ARRAYS_SIZE = 1024;
static final int BIT_SET_SIZE = 1024;

/**
* An ordered list of bit arrays representing pending sequence numbers. The list is "anchored" in {@link #firstProcessedSeqNo} which
* marks the sequence number the fist bit in the first array corresponds to.
* A collection of bit sets representing pending sequence numbers. Each sequence number is mapped to a bit set by dividing by the
* bit set size.
*/
final LinkedList<FixedBitSet> processedSeqNo = new LinkedList<>();

/**
* The sequence number that the first bit in the first array corresponds to.
*/
long firstProcessedSeqNo;
final LongObjectHashMap<FixedBitSet> processedSeqNo = new LongObjectHashMap<>();

/**
* The current local checkpoint, i.e., all sequence numbers no more than this number have been completed.
Expand Down Expand Up @@ -77,7 +69,6 @@ public LocalCheckpointTracker(final long maxSeqNo, final long localCheckpoint) {
throw new IllegalArgumentException(
"max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]");
}
firstProcessedSeqNo = localCheckpoint == SequenceNumbers.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1;
nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1;
checkpoint = localCheckpoint;
}
Expand Down Expand Up @@ -122,7 +113,6 @@ synchronized void resetCheckpoint(final long checkpoint) {
assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();
firstProcessedSeqNo = checkpoint + 1;
this.checkpoint = checkpoint;
}

Expand Down Expand Up @@ -175,24 +165,28 @@ synchronized void waitForOpsToComplete(final long seqNo) throws InterruptedExcep
@SuppressForbidden(reason = "Object#notifyAll")
private void updateCheckpoint() {
assert Thread.holdsLock(this);
assert checkpoint < firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1 :
"checkpoint should be below the end of the first bit set (o.w. current bit set is completed and shouldn't be there)";
assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() :
"checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)";
assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) :
"updateCheckpoint is called but the bit following the checkpoint is not set";
try {
// keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
FixedBitSet current = processedSeqNo.getFirst();
long bitSetKey = getBitSetKey(checkpoint);
FixedBitSet current = processedSeqNo.get(bitSetKey);
if (current == null) {
// the bit set corresponding to the checkpoint has already been removed, set ourselves up for the next bit set
assert checkpoint % BIT_SET_SIZE == BIT_SET_SIZE - 1;
current = processedSeqNo.get(++bitSetKey);
}
do {
checkpoint++;
// the checkpoint always falls in the first bit set or just before. If it falls
// on the last bit of the current bit set, we can clean it.
if (checkpoint == firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1) {
processedSeqNo.removeFirst();
firstProcessedSeqNo += BIT_ARRAYS_SIZE;
assert checkpoint - firstProcessedSeqNo < BIT_ARRAYS_SIZE;
current = processedSeqNo.peekFirst();
/*
* The checkpoint always falls in the current bit set or we have already cleaned it; if it falls on the last bit of the
* current bit set, we can clean it.
*/
if (checkpoint == lastSeqNoInBitSet(bitSetKey)) {
assert current != null;
final FixedBitSet removed = processedSeqNo.remove(bitSetKey);
assert removed == current;
current = processedSeqNo.get(++bitSetKey);
}
} while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1)));
} finally {
Expand All @@ -201,37 +195,45 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)
}
}

private long lastSeqNoInBitSet(final long bitSetKey) {
return (1 + bitSetKey) * BIT_SET_SIZE - 1;
}

/**
* Return the bit array for the provided sequence number, possibly allocating a new array if needed.
* Return the bit set for the provided sequence number, possibly allocating a new set if needed.
*
* @param seqNo the sequence number to obtain the bit array for
* @return the bit array corresponding to the provided sequence number
* @param seqNo the sequence number to obtain the bit set for
* @return the bit set corresponding to the provided sequence number
*/
private long getBitSetKey(final long seqNo) {
assert Thread.holdsLock(this);
return seqNo / BIT_SET_SIZE;
}

private FixedBitSet getBitSetForSeqNo(final long seqNo) {
assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo;
final long bitSetOffset = (seqNo - firstProcessedSeqNo) / BIT_ARRAYS_SIZE;
if (bitSetOffset > Integer.MAX_VALUE) {
throw new IndexOutOfBoundsException(
"sequence number too high; got [" + seqNo + "], firstProcessedSeqNo [" + firstProcessedSeqNo + "]");
}
while (bitSetOffset >= processedSeqNo.size()) {
processedSeqNo.add(new FixedBitSet(BIT_ARRAYS_SIZE));
final long bitSetKey = getBitSetKey(seqNo);
final int index = processedSeqNo.indexOf(bitSetKey);
final FixedBitSet bitSet;
if (processedSeqNo.indexExists(index)) {
bitSet = processedSeqNo.indexGet(index);
} else {
bitSet = new FixedBitSet(BIT_SET_SIZE);
processedSeqNo.indexInsert(index, bitSetKey, bitSet);
}
return processedSeqNo.get((int) bitSetOffset);
return bitSet;
}

/**
* Obtain the position in the bit array corresponding to the provided sequence number. The bit array corresponding to the sequence
* number can be obtained via {@link #getBitSetForSeqNo(long)}.
* Obtain the position in the bit set corresponding to the provided sequence number. The bit set corresponding to the sequence number
* can be obtained via {@link #getBitSetForSeqNo(long)}.
*
* @param seqNo the sequence number to obtain the position for
* @return the position in the bit array corresponding to the provided sequence number
* @return the position in the bit set corresponding to the provided sequence number
*/
private int seqNoToBitSetOffset(final long seqNo) {
assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo;
return ((int) (seqNo - firstProcessedSeqNo)) % BIT_ARRAYS_SIZE;
return Math.toIntExact(seqNo % BIT_SET_SIZE);
}

}
Expand Up @@ -19,10 +19,14 @@

package org.elasticsearch.index.seqno;

import com.carrotsearch.hppc.LongObjectHashMap;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Before;

import java.util.ArrayList;
Expand All @@ -36,8 +40,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.index.seqno.LocalCheckpointTracker.BIT_ARRAYS_SIZE;
import static org.hamcrest.Matchers.empty;
import static org.elasticsearch.index.seqno.LocalCheckpointTracker.BIT_SET_SIZE;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf;

Expand Down Expand Up @@ -83,10 +86,19 @@ public void testSimpleReplica() {
assertThat(tracker.getCheckpoint(), equalTo(2L));
}

public void testLazyInitialization() {
/*
* Previously this would allocate the entire chain of bit sets to the one for the sequence number being marked; for very large
* sequence numbers this could lead to excessive memory usage resulting in out of memory errors.
*/
tracker.markSeqNoAsCompleted(randomNonNegativeLong());
assertThat(tracker.processedSeqNo.size(), equalTo(1));
}

public void testSimpleOverFlow() {
List<Integer> seqNoList = new ArrayList<>();
final boolean aligned = randomBoolean();
final int maxOps = BIT_ARRAYS_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_ARRAYS_SIZE - 1));
final int maxOps = BIT_SET_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_SET_SIZE - 1));

for (int i = 0; i < maxOps; i++) {
seqNoList.add(i);
Expand All @@ -97,7 +109,9 @@ public void testSimpleOverFlow() {
}
assertThat(tracker.checkpoint, equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE));
if (aligned == false) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
}
}

public void testConcurrentPrimary() throws InterruptedException {
Expand Down Expand Up @@ -138,7 +152,9 @@ protected void doRun() throws Exception {
tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE));
if (tracker.processedSeqNo.size() == 1) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
}
}

public void testConcurrentReplica() throws InterruptedException {
Expand Down Expand Up @@ -186,7 +202,10 @@ protected void doRun() throws Exception {
assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L));
tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE));
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
if (tracker.processedSeqNo.size() == 1) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
}
}

public void testWaitForOpsToComplete() throws BrokenBarrierException, InterruptedException {
Expand Down Expand Up @@ -241,7 +260,17 @@ public void testResetCheckpoint() {
tracker.resetCheckpoint(localCheckpoint);
assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo));
assertThat(tracker.processedSeqNo, empty());
assertThat(tracker.processedSeqNo, new BaseMatcher<LongObjectHashMap<FixedBitSet>>() {
@Override
public boolean matches(Object item) {
return (item instanceof LongObjectHashMap && ((LongObjectHashMap) item).isEmpty());
}

@Override
public void describeTo(Description description) {
description.appendText("empty");
}
});
assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
}
}

0 comments on commit 8d3ffa2

Please sign in to comment.