Skip to content

Commit

Permalink
HBASE-17407: Correct update of maxFlushedSeqId in HRegion
Browse files Browse the repository at this point in the history
Signed-off-by: zhangduo <zhangduo@apache.org>
  • Loading branch information
eshcarh authored and Apache9 committed Jan 23, 2017
1 parent 3abd13d commit f254e27
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 23 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
Expand Down Expand Up @@ -124,13 +125,20 @@ public MemstoreSize size() {
}

/**
* This method is called when it is clear that the flush to disk is completed.
* The store may do any post-flush actions at this point.
* One example is to update the WAL with sequence number that is known only at the store level.
* This method is called before the flush is executed.
* @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush
* is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
*/
@Override
public void finalizeFlush() {
updateLowestUnflushedSequenceIdInWAL(false);
public long preFlushSeqIDEstimation() {
if(compositeSnapshot) {
return HConstants.NO_SEQNUM;
}
Segment segment = getLastSegment();
if(segment == null) {
return HConstants.NO_SEQNUM;
}
return segment.getMinSequenceId();
}

@Override
Expand Down Expand Up @@ -364,6 +372,12 @@ void flushInMemory() throws IOException {
}
}

private Segment getLastSegment() {
Segment localActive = getActive();
Segment tail = pipeline.getTail();
return tail == null ? localActive : tail;
}

private byte[] getFamilyNameInBytes() {
return store.getFamily().getName();
}
Expand Down
Expand Up @@ -267,6 +267,14 @@ private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment
if(segment != null) pipeline.addLast(segment);
}

public Segment getTail() {
List<? extends Segment> localCopy = getSegments();
if(localCopy.isEmpty()) {
return null;
}
return localCopy.get(localCopy.size()-1);
}

private boolean addFirst(ImmutableSegment segment) {
pipeline.addFirst(segment);
return true;
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -169,7 +170,8 @@ protected void checkActiveSize() {
}

@Override
public void finalizeFlush() {
public long preFlushSeqIDEstimation() {
return HConstants.NO_SEQNUM;
}

@Override public boolean isSloppy() {
Expand Down
Expand Up @@ -2412,9 +2412,10 @@ protected PrepareFlushResult internalPrepareFlushCache(final WAL wal, final long
status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName());
MemstoreSize totalSizeOfFlushableStores = new MemstoreSize();

Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>();
for (Store store: storesToFlush) {
flushedFamilyNames.add(store.getFamily().getName());
flushedFamilyNamesToSeq.put(store.getFamily().getName(),
((HStore) store).preFlushSeqIDEstimation());
}

TreeMap<byte[], StoreFlushContext> storeFlushCtxs
Expand All @@ -2434,7 +2435,7 @@ protected PrepareFlushResult internalPrepareFlushCache(final WAL wal, final long
try {
if (wal != null) {
Long earliestUnflushedSequenceIdForTheRegion =
wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
wal.startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq);
if (earliestUnflushedSequenceIdForTheRegion == null) {
// This should never happen. This is how startCacheFlush signals flush cannot proceed.
String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
Expand Down Expand Up @@ -2677,9 +2678,6 @@ protected FlushResult internalFlushCacheAndCommit(
}

// If we get to here, the HStores have been written.
for(Store storeToFlush :storesToFlush) {
((HStore) storeToFlush).finalizeFlush();
}
if (wal != null) {
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
}
Expand Down
Expand Up @@ -2509,8 +2509,8 @@ private void removeCompactedfiles(Collection<StoreFile> compactedfiles)
}
}

public void finalizeFlush() {
memstore.finalizeFlush();
public Long preFlushSeqIDEstimation() {
return memstore.preFlushSeqIDEstimation();
}

@Override
Expand Down
Expand Up @@ -119,12 +119,12 @@ public interface MemStore {
MemstoreSize size();

/**
* This method is called when it is clear that the flush to disk is completed.
* The store may do any post-flush actions at this point.
* One example is to update the wal with sequence number that is known only at the store level.
* This method is called before the flush is executed.
* @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush
* is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
*/
void finalizeFlush();
long preFlushSeqIDEstimation();

/* Return true if the memstore may need some extra memory space*/
/* Return true if the memstore may use some extra memory space*/
boolean isSloppy();
}
Expand Up @@ -428,6 +428,15 @@ public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
}

@Override
public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
if (!closeBarrier.beginOp()) {
LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing.");
return null;
}
return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
}

@Override
public void completeCacheFlush(byte[] encodedRegionName) {
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
Expand Down
Expand Up @@ -264,6 +264,14 @@ private <T extends Map<?, Long>> Map<byte[], Long> flattenToLowestSequenceId(Map
* oldest/lowest outstanding edit.
*/
Long startCacheFlush(final byte[] encodedRegionName, final Set<byte[]> families) {
Map<byte[],Long> familytoSeq = new HashMap<>();
for (byte[] familyName : families){
familytoSeq.put(familyName,HConstants.NO_SEQNUM);
}
return startCacheFlush(encodedRegionName,familytoSeq);
}

Long startCacheFlush(final byte[] encodedRegionName, final Map<byte[], Long> familyToSeq) {
Map<ImmutableByteArray, Long> oldSequenceIds = null;
Long lowestUnflushedInRegion = HConstants.NO_SEQNUM;
synchronized (tieLock) {
Expand All @@ -273,9 +281,14 @@ Long startCacheFlush(final byte[] encodedRegionName, final Set<byte[]> families)
// circumstance because another concurrent thread now may add sequenceids for this family
// (see above in getOrCreateLowestSequenceId). Make sure you are ok with this. Usually it
// is fine because updates are blocked when this method is called. Make sure!!!
for (byte[] familyName : families) {
ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap(familyName);
Long seqId = m.remove(familyNameWrapper);
for (Map.Entry<byte[], Long> entry : familyToSeq.entrySet()) {
ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap((byte[]) entry.getKey());
Long seqId = null;
if(entry.getValue() == HConstants.NO_SEQNUM) {
seqId = m.remove(familyNameWrapper);
} else {
seqId = m.replace(familyNameWrapper, entry.getValue());
}
if (seqId != null) {
if (oldSequenceIds == null) {
oldSequenceIds = new HashMap<>();
Expand Down Expand Up @@ -344,7 +357,7 @@ void abortCacheFlush(final byte[] encodedRegionName) {
if (flushing != null) {
for (Map.Entry<ImmutableByteArray, Long> e : flushing.entrySet()) {
Long currentId = tmpMap.get(e.getKey());
if (currentId != null && currentId.longValue() <= e.getValue().longValue()) {
if (currentId != null && currentId.longValue() < e.getValue().longValue()) {
String errorStr = Bytes.toString(encodedRegionName) + " family "
+ e.getKey().toStringUtf8() + " acquired edits out of order current memstore seq="
+ currentId + ", previous oldest unflushed id=" + e.getValue();
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -195,6 +196,11 @@ public void sync(long txid) {
sync();
}

public Long startCacheFlush(final byte[] encodedRegionName, Map<byte[], Long>
flushedFamilyNamesToSeq) {
return startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq.keySet());
}

@Override
public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
if (closed.get()) return null;
Expand Down
Expand Up @@ -161,6 +161,8 @@ void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
*/
Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families);

Long startCacheFlush(final byte[] encodedRegionName, Map<byte[], Long> familyToSeq);

/**
* Complete the cache flush.
* @param encodedRegionName Encoded region name.
Expand Down

0 comments on commit f254e27

Please sign in to comment.