Skip to content

Commit

Permalink
HBASE-26384 Segment already flushed to hfile may still be remained in…
Browse files Browse the repository at this point in the history
… CompactingMemStore (#3777)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
comnetwork committed Nov 1, 2021
1 parent ff11f11 commit 558ab92
Show file tree
Hide file tree
Showing 3 changed files with 568 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -567,12 +567,12 @@ private void pushPipelineToSnapshot() {
boolean done = false;
while (!done) {
iterationsCnt++;
VersionedSegmentsList segments = pipeline.getVersionedList();
VersionedSegmentsList segments = getImmutableSegments();
pushToSnapshot(segments.getStoreSegments());
// swap can return false in case the pipeline was updated by ongoing compaction
// and the version increase, the chance of it happenning is very low
// In Swap: don't close segments (they are in snapshot now) and don't update the region size
done = pipeline.swap(segments, null, false, false);
done = swapPipelineWithNull(segments);
if (iterationsCnt>2) {
// practically it is impossible that this loop iterates more than two times
// (because the compaction is stopped and none restarts it while in snapshot request),
Expand All @@ -585,6 +585,10 @@ private void pushPipelineToSnapshot() {
}
}

protected boolean swapPipelineWithNull(VersionedSegmentsList segments) {
return pipeline.swap(segments, null, false, false);
}

private void pushToSnapshot(List<ImmutableSegment> segments) {
if(segments.isEmpty()) return;
if(segments.size() == 1 && !segments.get(0).isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
Expand Down Expand Up @@ -64,7 +65,16 @@ public class CompactionPipeline {
private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
// The list is volatile to avoid reading a new allocated reference before the c'tor is executed
private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
// Version is volatile to ensure it is atomically read when not using a lock
/**
* <pre>
* Version is volatile to ensure it is atomically read when not using a lock.
* To indicate whether the suffix of pipleline changes:
* 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
* added at Head, {@link #version} not change.
* 2.for {@link CompactionPipeline#swap},{@link #version} increase.
* 3.for {@link CompactionPipeline#replaceAtIndex},{@link #version} increase.
* </pre>
*/
private volatile long version = 0;

public CompactionPipeline(RegionServicesForStores region) {
Expand Down Expand Up @@ -290,10 +300,15 @@ public MemStoreSize getPipelineSize() {
return memStoreSizing.getMemStoreSize();
}

/**
* Must be called under the {@link CompactionPipeline#pipeline} Lock.
*/
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
boolean closeSegmentsInSuffix) {
pipeline.removeAll(suffix);
if(segment != null) pipeline.addLast(segment);
matchAndRemoveSuffixFromPipeline(suffix);
if (segment != null) {
pipeline.addLast(segment);
}
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
Expand All @@ -307,6 +322,41 @@ private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment
}
}

/**
* Checking that the {@link Segment}s in suffix input parameter is same as the {@link Segment}s in
* {@link CompactionPipeline#pipeline} one by one from the last element to the first element of
* suffix. If matched, remove suffix from {@link CompactionPipeline#pipeline}. <br/>
* Must be called under the {@link CompactionPipeline#pipeline} Lock.
*/
private void matchAndRemoveSuffixFromPipeline(List<? extends Segment> suffix) {
if (suffix.isEmpty()) {
return;
}
if (pipeline.size() < suffix.size()) {
throw new IllegalStateException(
"CODE-BUG:pipleine size:[" + pipeline.size() + "],suffix size:[" + suffix.size()
+ "],pipeline size must greater than or equals suffix size");
}

ListIterator<? extends Segment> suffixIterator = suffix.listIterator(suffix.size());
ListIterator<? extends Segment> pipelineIterator = pipeline.listIterator(pipeline.size());
int count = 0;
while (suffixIterator.hasPrevious()) {
Segment suffixSegment = suffixIterator.previous();
Segment pipelineSegment = pipelineIterator.previous();
if (suffixSegment != pipelineSegment) {
throw new IllegalStateException("CODE-BUG:suffix last:[" + count + "]" + suffixSegment
+ " is not pipleline segment:[" + pipelineSegment + "]");
}
count++;
}

for (int index = 1; index <= count; index++) {
pipeline.pollLast();
}

}

// replacing one segment in the pipeline with a new one exactly at the same index
// need to be called only within synchronized block
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
Expand Down
Loading

0 comments on commit 558ab92

Please sign in to comment.