Skip to content

Commit

Permalink
Simplify MultiSnapshot#SeqNoset (#27547)
Browse files Browse the repository at this point in the history
Today, we maintain two sets in a SeqNoSet: ongoing sets and completed
sets. We can remove the completed sets and use only the ongoing sets by
releasing the internal bitset of a CountedBitSet when all its bits are
set. This behaves like two sets but simpler. This commit also makes
CountedBitSet as a drop-in replacement for BitSet.

Relates #27268
  • Loading branch information
dnhatn committed Dec 3, 2017
1 parent 736703a commit 89c2ed8
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 75 deletions.
106 changes: 106 additions & 0 deletions core/src/main/java/org/elasticsearch/index/translog/CountedBitSet.java
@@ -0,0 +1,106 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.translog;

import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.FixedBitSet;

/**
* A {@link CountedBitSet} wraps a {@link FixedBitSet} but automatically releases the internal bitset
* when all bits are set to reduce memory usage. This structure can work well for sequence numbers
* from translog as these numbers are likely to form contiguous ranges (eg. filling all bits).
*/
final class CountedBitSet extends BitSet {
private short onBits; // Number of bits are set.
private FixedBitSet bitset;

CountedBitSet(short numBits) {
assert numBits > 0;
this.onBits = 0;
this.bitset = new FixedBitSet(numBits);
}

@Override
public boolean get(int index) {
assert 0 <= index && index < this.length();
assert bitset == null || onBits < bitset.length() : "Bitset should be released when all bits are set";

return bitset == null ? true : bitset.get(index);
}

@Override
public void set(int index) {
assert 0 <= index && index < this.length();
assert bitset == null || onBits < bitset.length() : "Bitset should be released when all bits are set";

// Ignore set when bitset is full.
if (bitset != null) {
boolean wasOn = bitset.getAndSet(index);
if (wasOn == false) {
onBits++;
// Once all bits are set, we can simply just return YES for all indexes.
// This allows us to clear the internal bitset and use null check as the guard.
if (onBits == bitset.length()) {
bitset = null;
}
}
}
}

@Override
public void clear(int startIndex, int endIndex) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public void clear(int index) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public int cardinality() {
return onBits;
}

@Override
public int length() {
return bitset == null ? onBits : bitset.length();
}

@Override
public int prevSetBit(int index) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public int nextSetBit(int index) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public long ramBytesUsed() {
throw new UnsupportedOperationException("Not implemented yet");
}

// Exposed for testing
boolean isInternalBitsetReleased() {
return bitset == null;
}
}
Expand Up @@ -19,10 +19,8 @@

package org.elasticsearch.index.translog;

import com.carrotsearch.hppc.LongHashSet;
import com.carrotsearch.hppc.LongObjectHashMap;
import com.carrotsearch.hppc.LongSet;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.BitSet;
import org.elasticsearch.index.seqno.SequenceNumbers;

import java.io.Closeable;
Expand Down Expand Up @@ -84,75 +82,25 @@ public void close() throws IOException {
onClose.close();
}

/**
* A wrapper of {@link FixedBitSet} but allows to check if all bits are set in O(1).
*/
private static final class CountedBitSet {
private short onBits;
private final FixedBitSet bitset;

CountedBitSet(short numBits) {
assert numBits > 0;
this.onBits = 0;
this.bitset = new FixedBitSet(numBits);
}

boolean getAndSet(int index) {
assert index >= 0;
boolean wasOn = bitset.getAndSet(index);
if (wasOn == false) {
onBits++;
}
return wasOn;
}

boolean hasAllBitsOn() {
return onBits == bitset.length();
}
}

/**
* Sequence numbers from translog are likely to form contiguous ranges,
* thus collapsing a completed bitset into a single entry will reduce memory usage.
*/
static final class SeqNoSet {
static final short BIT_SET_SIZE = 1024;
private final LongSet completedSets = new LongHashSet();
private final LongObjectHashMap<CountedBitSet> ongoingSets = new LongObjectHashMap<>();
private final LongObjectHashMap<BitSet> bitSets = new LongObjectHashMap<>();

/**
* Marks this sequence number and returns <tt>true</tt> if it is seen before.
*/
boolean getAndSet(long value) {
assert value >= 0;
final long key = value / BIT_SET_SIZE;

if (completedSets.contains(key)) {
return true;
}

CountedBitSet bitset = ongoingSets.get(key);
BitSet bitset = bitSets.get(key);
if (bitset == null) {
bitset = new CountedBitSet(BIT_SET_SIZE);
ongoingSets.put(key, bitset);
}

final boolean wasOn = bitset.getAndSet(Math.toIntExact(value % BIT_SET_SIZE));
if (bitset.hasAllBitsOn()) {
ongoingSets.remove(key);
completedSets.add(key);
bitSets.put(key, bitset);
}
final int index = Math.toIntExact(value % BIT_SET_SIZE);
final boolean wasOn = bitset.get(index);
bitset.set(index);
return wasOn;
}

// For testing
long completeSetsSize() {
return completedSets.size();
}

// For testing
long ongoingSetsSize() {
return ongoingSets.size();
}
}
}
@@ -0,0 +1,97 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.translog;

import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.test.ESTestCase;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.equalTo;

public class CountedBitSetTests extends ESTestCase {

public void testCompareToFixedBitset() {
int numBits = (short) randomIntBetween(8, 4096);
final FixedBitSet fixedBitSet = new FixedBitSet(numBits);
final CountedBitSet countedBitSet = new CountedBitSet((short) numBits);

for (int i = 0; i < numBits; i++) {
if (randomBoolean()) {
fixedBitSet.set(i);
countedBitSet.set(i);
}
assertThat(countedBitSet.cardinality(), equalTo(fixedBitSet.cardinality()));
assertThat(countedBitSet.length(), equalTo(fixedBitSet.length()));
}

for (int i = 0; i < numBits; i++) {
assertThat(countedBitSet.get(i), equalTo(fixedBitSet.get(i)));
}
}

public void testReleaseInternalBitSet() {
int numBits = (short) randomIntBetween(8, 4096);
final CountedBitSet countedBitSet = new CountedBitSet((short) numBits);
final List<Integer> values = IntStream.range(0, numBits).boxed().collect(Collectors.toList());

for (int i = 1; i < numBits; i++) {
final int value = values.get(i);
assertThat(countedBitSet.get(value), equalTo(false));
assertThat(countedBitSet.isInternalBitsetReleased(), equalTo(false));

countedBitSet.set(value);

assertThat(countedBitSet.get(value), equalTo(true));
assertThat(countedBitSet.isInternalBitsetReleased(), equalTo(false));
assertThat(countedBitSet.length(), equalTo(numBits));
assertThat(countedBitSet.cardinality(), equalTo(i));
}

// The missing piece to fill all bits.
{
final int value = values.get(0);
assertThat(countedBitSet.get(value), equalTo(false));
assertThat(countedBitSet.isInternalBitsetReleased(), equalTo(false));

countedBitSet.set(value);

assertThat(countedBitSet.get(value), equalTo(true));
assertThat(countedBitSet.isInternalBitsetReleased(), equalTo(true));
assertThat(countedBitSet.length(), equalTo(numBits));
assertThat(countedBitSet.cardinality(), equalTo(numBits));
}

// Tests with released internal bitset.
final int iterations = iterations(1000, 10000);
for (int i = 0; i < iterations; i++) {
final int value = randomInt(numBits - 1);
assertThat(countedBitSet.get(value), equalTo(true));
assertThat(countedBitSet.isInternalBitsetReleased(), equalTo(true));
assertThat(countedBitSet.length(), equalTo(numBits));
assertThat(countedBitSet.cardinality(), equalTo(numBits));
if (frequently()) {
assertThat(countedBitSet.get(value), equalTo(true));
}
}
}
}
Expand Up @@ -30,7 +30,6 @@
import java.util.stream.LongStream;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class MultiSnapshotTests extends ESTestCase {

Expand All @@ -40,14 +39,8 @@ public void testTrackSeqNoSimpleRange() throws Exception {
Randomness.shuffle(values);
for (int i = 0; i < 1023; i++) {
assertThat(bitSet.getAndSet(values.get(i)), equalTo(false));
assertThat(bitSet.ongoingSetsSize(), equalTo(1L));
assertThat(bitSet.completeSetsSize(), equalTo(0L));
}

assertThat(bitSet.getAndSet(values.get(1023)), equalTo(false));
assertThat(bitSet.ongoingSetsSize(), equalTo(0L));
assertThat(bitSet.completeSetsSize(), equalTo(1L));

assertThat(bitSet.getAndSet(between(0, 1023)), equalTo(true));
assertThat(bitSet.getAndSet(between(1024, Integer.MAX_VALUE)), equalTo(false));
}
Expand All @@ -59,7 +52,6 @@ public void testTrackSeqNoDenseRanges() throws Exception {
long seq = between(0, 5000);
boolean existed = normalSet.add(seq) == false;
assertThat("SeqNoSet != Set" + seq, bitSet.getAndSet(seq), equalTo(existed));
assertThat(bitSet.ongoingSetsSize() + bitSet.completeSetsSize(), lessThanOrEqualTo(5L));
});
}

Expand All @@ -78,12 +70,8 @@ public void testTrackSeqNoMimicTranslogRanges() throws Exception {
final LongSet normalSet = new LongHashSet();
long currentSeq = between(10_000_000, 1_000_000_000);
final int iterations = scaledRandomIntBetween(100, 2000);
assertThat(bitSet.completeSetsSize(), equalTo(0L));
assertThat(bitSet.ongoingSetsSize(), equalTo(0L));
long totalDocs = 0;
for (long i = 0; i < iterations; i++) {
int batchSize = between(1, 1500);
totalDocs += batchSize;
currentSeq -= batchSize;
List<Long> batch = LongStream.range(currentSeq, currentSeq + batchSize)
.boxed()
Expand All @@ -92,11 +80,7 @@ public void testTrackSeqNoMimicTranslogRanges() throws Exception {
batch.forEach(seq -> {
boolean existed = normalSet.add(seq) == false;
assertThat("SeqNoSet != Set", bitSet.getAndSet(seq), equalTo(existed));
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(4L));
});
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L));
}
assertThat(bitSet.completeSetsSize(), lessThanOrEqualTo(totalDocs / 1024));
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L));
}
}

0 comments on commit 89c2ed8

Please sign in to comment.