Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy initialize checkpoint tracker bit sets #27179

Merged
merged 8 commits into from Nov 2, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,13 +19,12 @@

package org.elasticsearch.index.seqno;

import com.carrotsearch.hppc.LongObjectHashMap;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.index.IndexSettings;

import java.util.LinkedList;

/**
* This class generates sequences numbers and keeps track of the so-called "local checkpoint" which is the highest number for which all
* previous sequence numbers have been processed (inclusive).
Expand All @@ -40,21 +39,16 @@ public class LocalCheckpointTracker {
Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024, 4, Setting.Property.IndexScope);

/**
* An ordered list of bit arrays representing pending sequence numbers. The list is "anchored" in {@link #firstProcessedSeqNo} which
* marks the sequence number the fist bit in the first array corresponds to.
* A collection of bit arrays representing pending sequence numbers. Each sequence number is mapped to a bit array by dividing by the
* bit set size.
*/
final LinkedList<FixedBitSet> processedSeqNo = new LinkedList<>();
final LongObjectHashMap<FixedBitSet> processedSeqNo = new LongObjectHashMap<>();

/**
* The size of each bit set representing processed sequence numbers.
*/
private final int bitArraysSize;

/**
* The sequence number that the first bit in the first array corresponds to.
*/
long firstProcessedSeqNo;

/**
* The current local checkpoint, i.e., all sequence numbers no more than this number have been completed.
*/
Expand Down Expand Up @@ -85,7 +79,6 @@ public LocalCheckpointTracker(final IndexSettings indexSettings, final long maxS
"max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]");
}
bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings());
firstProcessedSeqNo = localCheckpoint == SequenceNumbers.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1;
nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1;
checkpoint = localCheckpoint;
}
Expand Down Expand Up @@ -113,9 +106,9 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) {
// this is possible during recovery where we might replay an operation that was also replicated
return;
}
final FixedBitSet bitSet = getBitSetForSeqNo(seqNo);
final int offset = seqNoToBitSetOffset(seqNo);
bitSet.set(offset);
final FixedBitSet bitArray = getBitArrayForSeqNo(seqNo);
final int offset = seqNoToBitArrayOffset(seqNo);
bitArray.set(offset);
if (seqNo == checkpoint + 1) {
updateCheckpoint();
}
Expand All @@ -130,7 +123,6 @@ synchronized void resetCheckpoint(final long checkpoint) {
assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();
firstProcessedSeqNo = checkpoint + 1;
this.checkpoint = checkpoint;
}

Expand Down Expand Up @@ -183,26 +175,30 @@ synchronized void waitForOpsToComplete(final long seqNo) throws InterruptedExcep
@SuppressForbidden(reason = "Object#notifyAll")
private void updateCheckpoint() {
assert Thread.holdsLock(this);
assert checkpoint < firstProcessedSeqNo + bitArraysSize - 1 :
"checkpoint should be below the end of the first bit set (o.w. current bit set is completed and shouldn't be there)";
assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() :
"checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)";
assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) :
assert getBitArrayForSeqNo(checkpoint + 1).get(seqNoToBitArrayOffset(checkpoint + 1)) :
"updateCheckpoint is called but the bit following the checkpoint is not set";
try {
// keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
FixedBitSet current = processedSeqNo.getFirst();
long bitArrayKey = getBitArrayKey(checkpoint);
FixedBitSet current = processedSeqNo.get(bitArrayKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is a FixedBitSet I wonder why renamed everything to use bitArray? I don't mind. Just curious.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I did not like the inconsistency that in some places it's referred to as a "bit array", and in some it's referred to as a "bit set". I wanted to make them all "bit set" but the problem is the setting index.seq_no.checkpoint.bit_arrays_size (currently never released to the world) refers to "bit array". If, and only if, you would be okay with changing the name of this setting to index.seq_no.checkpoint.bit_sets_size (in 6.0) then I would be okay with normalizing everything to "bit set".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with renaming the setting. I think I added it just be able to control things from tests and as an extra escape hatch if the default size proved wrong. We might want to just remove it. We can solve the test part differently and I'm less concerned now about the default size (I'm also thinking that 1024 is maybe too small)..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #27191.

if (current == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when can this happen? it seems we only clean bit sets here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we clean the set (e.g., the checkpoint is equal to size - 1) but the checkpoint can not advance to size. In this case, when size is marked as completed, checkpoint will still be equal to size - 1 but we have already cleaned the set.

// the bit set corresponding to the checkpoint has already been removed, set ourselves up for the next bit set
assert checkpoint % bitArraysSize == bitArraysSize - 1;
current = processedSeqNo.get(++bitArrayKey);
}
do {
checkpoint++;
// the checkpoint always falls in the first bit set or just before. If it falls
// on the last bit of the current bit set, we can clean it.
if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) {
processedSeqNo.removeFirst();
firstProcessedSeqNo += bitArraysSize;
assert checkpoint - firstProcessedSeqNo < bitArraysSize;
current = processedSeqNo.peekFirst();
/*
* The checkpoint always falls in the current bit set or we have already cleaned it; if it falls on the last bit of the
* current bit set, we can clean it.
*/
if (checkpoint == (1 + bitArrayKey) * bitArraysSize - 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we maybe cache (1 + bitArrayKey) * bitArraysSize - 1 to something like lastSeqNoInArray ? I think it will be easier to read.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 91fbab3.

assert current != null;
final FixedBitSet removed = processedSeqNo.remove(bitArrayKey);
assert removed == current;
current = processedSeqNo.get(++bitArrayKey);
}
} while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1)));
} while (current != null && current.get(seqNoToBitArrayOffset(checkpoint + 1)));
} finally {
// notifies waiters in waitForOpsToComplete
this.notifyAll();
Expand All @@ -215,31 +211,30 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)
* @param seqNo the sequence number to obtain the bit array for
* @return the bit array corresponding to the provided sequence number
*/
private FixedBitSet getBitSetForSeqNo(final long seqNo) {
private long getBitArrayKey(final long seqNo) {
assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo;
final long bitSetOffset = (seqNo - firstProcessedSeqNo) / bitArraysSize;
if (bitSetOffset > Integer.MAX_VALUE) {
throw new IndexOutOfBoundsException(
"sequence number too high; got [" + seqNo + "], firstProcessedSeqNo [" + firstProcessedSeqNo + "]");
}
while (bitSetOffset >= processedSeqNo.size()) {
processedSeqNo.add(new FixedBitSet(bitArraysSize));
return seqNo / bitArraysSize;
}

private FixedBitSet getBitArrayForSeqNo(final long seqNo) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use indexOf(...) and then indexInsert(...) and indexGet(...) respectively to avoid determining what the slot is for a key several times?

final int slot = processedSeqNo.indexOf(bitArrayKey);
if (processedSeqNo.indexExists(slot) == false) {
   processedSeqNo.indexInsert(slot, bitArrayKey, new FixedBitSet(bitArraysSize));
}
return processedSeqNo.indexGet(slot);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @martijnvg, I pushed cabda87.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And 68c4eab. 😐

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the indexGet at the end is not correct because the returned index is negative if the slot is not actually used. Yet, ~slot also does not work because the table could have been resized after the insert. I pushed 97bb3ca.

Copy link
Member

@martijnvg martijnvg Oct 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I didn't realize that. Maybe we should do this to make optimal use of the slot?

final int slot = processedSeqNo.indexOf(bitArrayKey);
if (processedSeqNo.indexExists(slot) == false) {
   FixedBitSet bitSet = new FixedBitSet(bitArraysSize));
   processedSeqNo.indexInsert(slot, bitArrayKey, bitSet);
   return bitSet;
} else {
   return processedSeqNo.indexGet(slot);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you see a chance to avoid == false, you seize it. 😉

I pushed f4f0dae.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

assert Thread.holdsLock(this);
final long bitArrayKey = getBitArrayKey(seqNo);
if (processedSeqNo.containsKey(bitArrayKey) == false) {
processedSeqNo.put(bitArrayKey, new FixedBitSet(bitArraysSize));
}
return processedSeqNo.get((int) bitSetOffset);
return processedSeqNo.get(bitArrayKey);
}

/**
* Obtain the position in the bit array corresponding to the provided sequence number. The bit array corresponding to the sequence
* number can be obtained via {@link #getBitSetForSeqNo(long)}.
* number can be obtained via {@link #getBitArrayForSeqNo(long)}.
*
* @param seqNo the sequence number to obtain the position for
* @return the position in the bit array corresponding to the provided sequence number
*/
private int seqNoToBitSetOffset(final long seqNo) {
private int seqNoToBitArrayOffset(final long seqNo) {
assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo;
return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize;
return Math.toIntExact(seqNo % bitArraysSize);
}

}
Expand Up @@ -19,12 +19,16 @@

package org.elasticsearch.index.seqno;

import com.carrotsearch.hppc.LongObjectHashMap;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Before;

import java.util.ArrayList;
Expand All @@ -38,7 +42,6 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf;

Expand Down Expand Up @@ -95,6 +98,14 @@ public void testSimpleReplica() {
assertThat(tracker.getCheckpoint(), equalTo(2L));
}

public void testLazyInitialization() {
/*
* Previously this would allocate the entire chain of bit sets to the one for the sequence number being marked; for very large
* sequence numbers this could lead to excessive memory usage resulting in out of memory errors.
*/
tracker.markSeqNoAsCompleted(randomNonNegativeLong());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol. can we assert we allocated just one array?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed b2cbfc5.

}

public void testSimpleOverFlow() {
List<Integer> seqNoList = new ArrayList<>();
final boolean aligned = randomBoolean();
Expand All @@ -109,7 +120,9 @@ public void testSimpleOverFlow() {
}
assertThat(tracker.checkpoint, equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE));
if (aligned == false) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / SMALL_CHUNK_SIZE));
}
}

public void testConcurrentPrimary() throws InterruptedException {
Expand Down Expand Up @@ -150,7 +163,9 @@ protected void doRun() throws Exception {
tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE));
if (tracker.processedSeqNo.size() == 1) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / SMALL_CHUNK_SIZE));
}
}

public void testConcurrentReplica() throws InterruptedException {
Expand Down Expand Up @@ -198,7 +213,10 @@ protected void doRun() throws Exception {
assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L));
tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE));
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
if (tracker.processedSeqNo.size() == 1) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / SMALL_CHUNK_SIZE));
}
}

public void testWaitForOpsToComplete() throws BrokenBarrierException, InterruptedException {
Expand Down Expand Up @@ -253,7 +271,17 @@ public void testResetCheckpoint() {
tracker.resetCheckpoint(localCheckpoint);
assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo));
assertThat(tracker.processedSeqNo, empty());
assertThat(tracker.processedSeqNo, new BaseMatcher<LongObjectHashMap<FixedBitSet>>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I despise assertTrue. Note that this gives a nicer error message if the assertion fails:

java.lang.AssertionError: 
Expected: empty
     but: was <[512=>org.apache.lucene.util.FixedBitSet@98761236]>
Expected :empty
     
Actual   :<[512=>org.apache.lucene.util.FixedBitSet@98761236]>

as opposed to

java.lang.AssertionError
	at __randomizedtesting.SeedInfo.seed([F9E3DE31F5D979AE:C4C54170CE328934]:0)
	at org.junit.Assert.fail(Assert.java:86)
	at org.junit.Assert.assertTrue(Assert.java:41)
	at org.junit.Assert.assertTrue(Assert.java:52)
	at org.elasticsearch.index.seqno.LocalCheckpointTrackerTests.testResetCheckpoint(LocalCheckpointTrackerTests.java:276)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)

@Override
public boolean matches(Object item) {
return (item instanceof LongObjectHashMap && ((LongObjectHashMap) item).isEmpty());
}

@Override
public void describeTo(Description description) {
description.appendText("empty");
}
});
assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
}
}