Skip to content

Commit

Permalink
HBASE-19074 Miscellaneous Observer cleanups
Browse files Browse the repository at this point in the history
Breaks MemStoreSize into MemStoreSize (read-only) and MemStoreSizing
(read/write). MemStoreSize we allow to Coprocesors. MemStoreSizing we
use internally doing MemStore accounting.
  • Loading branch information
saintstack committed Oct 24, 2017
1 parent 9716f62 commit cb506fd
Show file tree
Hide file tree
Showing 23 changed files with 309 additions and 223 deletions.
Expand Up @@ -880,15 +880,21 @@ default void postReplayWALs(ObserverContext<? extends RegionCoprocessorEnvironme
* Called before a {@link WALEdit} * Called before a {@link WALEdit}
* replayed for this region. * replayed for this region.
* @param ctx the environment provided by the region server * @param ctx the environment provided by the region server
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* with something that doesn't expose IntefaceAudience.Private classes.
*/ */
@Deprecated
default void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, default void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {} RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {}


/** /**
* Called after a {@link WALEdit} * Called after a {@link WALEdit}
* replayed for this region. * replayed for this region.
* @param ctx the environment provided by the region server * @param ctx the environment provided by the region server
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* with something that doesn't expose IntefaceAudience.Private classes.
*/ */
@Deprecated
default void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, default void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {} RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {}


Expand Down
Expand Up @@ -72,8 +72,11 @@ public interface WALObserver {
* is writen to WAL. * is writen to WAL.
* *
* @return true if default behavior should be bypassed, false otherwise * @return true if default behavior should be bypassed, false otherwise
* @deprecated Since hbase-2.0.0. To be replaced with an alternative that does not expose
* InterfaceAudience classes such as WALKey and WALEdit. Will be removed in hbase-3.0.0.
*/ */
// TODO: return value is not used // TODO: return value is not used
@Deprecated
default boolean preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, default boolean preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
return false; return false;
Expand All @@ -82,7 +85,10 @@ default boolean preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment>
/** /**
* Called after a {@link WALEdit} * Called after a {@link WALEdit}
* is writen to WAL. * is writen to WAL.
* @deprecated Since hbase-2.0.0. To be replaced with an alternative that does not expose
* InterfaceAudience classes such as WALKey and WALEdit. Will be removed in hbase-3.0.0.
*/ */
@Deprecated
default void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx, default void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {} RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {}


Expand Down
@@ -1,4 +1,4 @@
/** /*
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -96,14 +96,14 @@ protected void resetActive() {
public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent); public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);


@Override @Override
public void add(Iterable<Cell> cells, MemStoreSize memstoreSize) { public void add(Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
for (Cell cell : cells) { for (Cell cell : cells) {
add(cell, memstoreSize); add(cell, memstoreSizing);
} }
} }


@Override @Override
public void add(Cell cell, MemStoreSize memstoreSize) { public void add(Cell cell, MemStoreSizing memstoreSizing) {
Cell toAdd = maybeCloneWithAllocator(cell); Cell toAdd = maybeCloneWithAllocator(cell);
boolean mslabUsed = (toAdd != cell); boolean mslabUsed = (toAdd != cell);
// This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By
Expand All @@ -118,7 +118,7 @@ public void add(Cell cell, MemStoreSize memstoreSize) {
if (!mslabUsed) { if (!mslabUsed) {
toAdd = deepCopyIfNeeded(toAdd); toAdd = deepCopyIfNeeded(toAdd);
} }
internalAdd(toAdd, mslabUsed, memstoreSize); internalAdd(toAdd, mslabUsed, memstoreSizing);
} }


private static Cell deepCopyIfNeeded(Cell cell) { private static Cell deepCopyIfNeeded(Cell cell) {
Expand All @@ -129,9 +129,9 @@ private static Cell deepCopyIfNeeded(Cell cell) {
} }


@Override @Override
public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSize memstoreSize) { public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing memstoreSizing) {
for (Cell cell : cells) { for (Cell cell : cells) {
upsert(cell, readpoint, memstoreSize); upsert(cell, readpoint, memstoreSizing);
} }
} }


Expand Down Expand Up @@ -167,7 +167,11 @@ public void clearSnapshot(long id) throws UnexpectedStateException {


@Override @Override
public MemStoreSize getSnapshotSize() { public MemStoreSize getSnapshotSize() {
return new MemStoreSize(this.snapshot.keySize(), this.snapshot.heapSize()); return getSnapshotSizing();
}

MemStoreSizing getSnapshotSizing() {
return new MemStoreSizing(this.snapshot.keySize(), this.snapshot.heapSize());
} }


@Override @Override
Expand Down Expand Up @@ -210,7 +214,7 @@ protected void dump(Log log) {
* @param readpoint readpoint below which we can safely remove duplicate KVs * @param readpoint readpoint below which we can safely remove duplicate KVs
* @param memstoreSize * @param memstoreSize
*/ */
private void upsert(Cell cell, long readpoint, MemStoreSize memstoreSize) { private void upsert(Cell cell, long readpoint, MemStoreSizing memstoreSizing) {
// Add the Cell to the MemStore // Add the Cell to the MemStore
// Use the internalAdd method here since we (a) already have a lock // Use the internalAdd method here since we (a) already have a lock
// and (b) cannot safely use the MSLAB here without potentially // and (b) cannot safely use the MSLAB here without potentially
Expand All @@ -221,7 +225,7 @@ private void upsert(Cell cell, long readpoint, MemStoreSize memstoreSize) {
// must do below deep copy. Or else we will keep referring to the bigger chunk of memory and // must do below deep copy. Or else we will keep referring to the bigger chunk of memory and
// prevent it from getting GCed. // prevent it from getting GCed.
cell = deepCopyIfNeeded(cell); cell = deepCopyIfNeeded(cell);
this.active.upsert(cell, readpoint, memstoreSize); this.active.upsert(cell, readpoint, memstoreSizing);
setOldestEditTimeToNow(); setOldestEditTimeToNow();
checkActiveSize(); checkActiveSize();
} }
Expand Down Expand Up @@ -277,8 +281,8 @@ private Cell maybeCloneWithAllocator(Cell cell) {
* @param mslabUsed whether using MSLAB * @param mslabUsed whether using MSLAB
* @param memstoreSize * @param memstoreSize
*/ */
private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSize memstoreSize) { private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemStoreSizing memstoreSizing) {
active.add(toAdd, mslabUsed, memstoreSize); active.add(toAdd, mslabUsed, memstoreSizing);
setOldestEditTimeToNow(); setOldestEditTimeToNow();
checkActiveSize(); checkActiveSize();
} }
Expand Down
Expand Up @@ -20,7 +20,6 @@


import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;


Expand Down Expand Up @@ -55,7 +54,7 @@ protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsI
* of CSLMImmutableSegment * of CSLMImmutableSegment
* The given iterator returns the Cells that "survived" the compaction. * The given iterator returns the Cells that "survived" the compaction.
*/ */
protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSize memstoreSize) { protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSizing memstoreSizing) {
super(segment); // initiailize the upper class super(segment); // initiailize the upper class
incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM); incSize(0, DEEP_OVERHEAD_CAM - CSLMImmutableSegment.DEEP_OVERHEAD_CSLM);
int numOfCells = segment.getCellsCount(); int numOfCells = segment.getCellsCount();
Expand All @@ -65,7 +64,7 @@ protected CellArrayImmutableSegment(CSLMImmutableSegment segment, MemStoreSize m
// add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes) // add sizes of CellArrayMap entry (reinitializeCellSet doesn't take the care for the sizes)
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
incSize(0, newSegmentSizeDelta); incSize(0, newSegmentSizeDelta);
memstoreSize.incMemStoreSize(0, newSegmentSizeDelta); memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta);
} }


@Override @Override
Expand Down
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.hbase.ByteBufferKeyValue; import org.apache.hadoop.hbase.ByteBufferKeyValue;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -61,7 +60,8 @@ protected CellChunkImmutableSegment(CellComparator comparator, MemStoreSegmentsI
* of CSLMImmutableSegment * of CSLMImmutableSegment
* The given iterator returns the Cells that "survived" the compaction. * The given iterator returns the Cells that "survived" the compaction.
*/ */
protected CellChunkImmutableSegment(CSLMImmutableSegment segment, MemStoreSize memstoreSize) { protected CellChunkImmutableSegment(CSLMImmutableSegment segment,
MemStoreSizing memstoreSizing) {
super(segment); // initiailize the upper class super(segment); // initiailize the upper class
incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM); incSize(0,-CSLMImmutableSegment.DEEP_OVERHEAD_CSLM + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
int numOfCells = segment.getCellsCount(); int numOfCells = segment.getCellsCount();
Expand All @@ -73,7 +73,7 @@ protected CellChunkImmutableSegment(CSLMImmutableSegment segment, MemStoreSize m
long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY); long newSegmentSizeDelta = numOfCells*(indexEntrySize()-ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);


incSize(0, newSegmentSizeDelta); incSize(0, newSegmentSizeDelta);
memstoreSize.incMemStoreSize(0, newSegmentSizeDelta); memstoreSizing.incMemStoreSize(0, newSegmentSizeDelta);
} }


@Override @Override
Expand Down
Expand Up @@ -146,12 +146,12 @@ private void initInmemoryFlushSize(Configuration conf) {
*/ */
@Override @Override
public MemStoreSize size() { public MemStoreSize size() {
MemStoreSize memstoreSize = new MemStoreSize(); MemStoreSizing memstoreSizing = new MemStoreSizing();
memstoreSize.incMemStoreSize(this.active.keySize(), this.active.heapSize()); memstoreSizing.incMemStoreSize(this.active.keySize(), this.active.heapSize());
for (Segment item : pipeline.getSegments()) { for (Segment item : pipeline.getSegments()) {
memstoreSize.incMemStoreSize(item.keySize(), item.heapSize()); memstoreSizing.incMemStoreSize(item.keySize(), item.heapSize());
} }
return memstoreSize; return memstoreSizing;
} }


/** /**
Expand Down Expand Up @@ -215,17 +215,17 @@ public MemStoreSnapshot snapshot() {
*/ */
@Override @Override
public MemStoreSize getFlushableSize() { public MemStoreSize getFlushableSize() {
MemStoreSize snapshotSize = getSnapshotSize(); MemStoreSizing snapshotSizing = getSnapshotSizing();
if (snapshotSize.getDataSize() == 0) { if (snapshotSizing.getDataSize() == 0) {
// if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed // if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
if (compositeSnapshot) { if (compositeSnapshot) {
snapshotSize = pipeline.getPipelineSize(); snapshotSizing = pipeline.getPipelineSizing();
snapshotSize.incMemStoreSize(this.active.keySize(), this.active.heapSize()); snapshotSizing.incMemStoreSize(this.active.keySize(), this.active.heapSize());
} else { } else {
snapshotSize = pipeline.getTailSize(); snapshotSizing = pipeline.getTailSizing();
} }
} }
return snapshotSize.getDataSize() > 0 ? snapshotSize return snapshotSizing.getDataSize() > 0 ? snapshotSizing
: new MemStoreSize(this.active.keySize(), this.active.heapSize()); : new MemStoreSize(this.active.keySize(), this.active.heapSize());
} }


Expand Down
@@ -1,4 +1,4 @@
/** /*
* *
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -149,7 +149,7 @@ public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segmen
long newHeapSize = 0; long newHeapSize = 0;
if(segment != null) newHeapSize = segment.heapSize(); if(segment != null) newHeapSize = segment.heapSize();
long heapSizeDelta = suffixHeapSize - newHeapSize; long heapSizeDelta = suffixHeapSize - newHeapSize;
region.addMemStoreSize(new MemStoreSize(-dataSizeDelta, -heapSizeDelta)); region.addMemStoreSize(new MemStoreSizing(-dataSizeDelta, -heapSizeDelta));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Suffix data size: " + suffixDataSize + " new segment data size: " LOG.debug("Suffix data size: " + suffixDataSize + " new segment data size: "
+ newDataSize + ". Suffix heap size: " + suffixHeapSize + newDataSize + ". Suffix heap size: " + suffixHeapSize
Expand Down Expand Up @@ -199,14 +199,14 @@ public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.Index
int i = 0; int i = 0;
for (ImmutableSegment s : pipeline) { for (ImmutableSegment s : pipeline) {
if ( s.canBeFlattened() ) { if ( s.canBeFlattened() ) {
MemStoreSize newMemstoreSize = new MemStoreSize(); // the size to be updated MemStoreSizing newMemstoreAccounting = new MemStoreSizing(); // the size to be updated
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening( ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
(CSLMImmutableSegment)s,idxType,newMemstoreSize); (CSLMImmutableSegment)s,idxType,newMemstoreAccounting);
replaceAtIndex(i,newS); replaceAtIndex(i,newS);
if(region != null) { if(region != null) {
// update the global memstore size counter // update the global memstore size counter
// upon flattening there is no change in the data size // upon flattening there is no change in the data size
region.addMemStoreSize(new MemStoreSize(0, newMemstoreSize.getHeapSize())); region.addMemStoreSize(new MemStoreSize(0, newMemstoreAccounting.getHeapSize()));
} }
LOG.debug("Compaction pipeline segment " + s + " was flattened"); LOG.debug("Compaction pipeline segment " + s + " was flattened");
return true; return true;
Expand Down Expand Up @@ -241,22 +241,22 @@ public long getMinSequenceId() {
return minSequenceId; return minSequenceId;
} }


public MemStoreSize getTailSize() { public MemStoreSizing getTailSizing() {
LinkedList<? extends Segment> localCopy = readOnlyCopy; LinkedList<? extends Segment> localCopy = readOnlyCopy;
if (localCopy.isEmpty()) return new MemStoreSize(true); if (localCopy.isEmpty()) return new MemStoreSizing();
return new MemStoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapSize()); return new MemStoreSizing(localCopy.peekLast().keySize(), localCopy.peekLast().heapSize());
} }


public MemStoreSize getPipelineSize() { public MemStoreSizing getPipelineSizing() {
long keySize = 0; long keySize = 0;
long heapSize = 0; long heapSize = 0;
LinkedList<? extends Segment> localCopy = readOnlyCopy; LinkedList<? extends Segment> localCopy = readOnlyCopy;
if (localCopy.isEmpty()) return new MemStoreSize(true); if (localCopy.isEmpty()) return new MemStoreSizing();
for (Segment segment : localCopy) { for (Segment segment : localCopy) {
keySize += segment.keySize(); keySize += segment.keySize();
heapSize += segment.heapSize(); heapSize += segment.heapSize();
} }
return new MemStoreSize(keySize, heapSize); return new MemStoreSizing(keySize, heapSize);
} }


private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment, private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
Expand Down
Expand Up @@ -257,13 +257,13 @@ protected CellSet getCellSet() {
} }


@Override @Override
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSize memstoreSize) { protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner"); throw new IllegalStateException("Not supported by CompositeImmutableScanner");
} }


@Override @Override
protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed, protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
MemStoreSize memstoreSize) { MemStoreSizing memstoreSizing) {
throw new IllegalStateException("Not supported by CompositeImmutableScanner"); throw new IllegalStateException("Not supported by CompositeImmutableScanner");
} }


Expand Down
Expand Up @@ -195,26 +195,26 @@ public static void main(String [] args) {
byte [] fam = Bytes.toBytes("col"); byte [] fam = Bytes.toBytes("col");
byte [] qf = Bytes.toBytes("umn"); byte [] qf = Bytes.toBytes("umn");
byte [] empty = new byte[0]; byte [] empty = new byte[0];
MemStoreSize memstoreSize = new MemStoreSize(); MemStoreSizing memstoreSizing = new MemStoreSizing();
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
// Give each its own ts // Give each its own ts
memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSize); memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing);
} }
LOG.info("memstore1 estimated size=" LOG.info("memstore1 estimated size="
+ (memstoreSize.getDataSize() + memstoreSize.getHeapSize())); + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSize); memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSizing);
} }
LOG.info("memstore1 estimated size (2nd loading of same data)=" LOG.info("memstore1 estimated size (2nd loading of same data)="
+ (memstoreSize.getDataSize() + memstoreSize.getHeapSize())); + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
// Make a variably sized memstore. // Make a variably sized memstore.
DefaultMemStore memstore2 = new DefaultMemStore(); DefaultMemStore memstore2 = new DefaultMemStore();
memstoreSize = new MemStoreSize(); memstoreSizing = new MemStoreSizing();
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memstoreSize); memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memstoreSizing);
} }
LOG.info("memstore2 estimated size=" LOG.info("memstore2 estimated size="
+ (memstoreSize.getDataSize() + memstoreSize.getHeapSize())); + (memstoreSizing.getDataSize() + memstoreSizing.getHeapSize()));
final int seconds = 30; final int seconds = 30;
LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
LOG.info("Exiting."); LOG.info("Exiting.");
Expand Down

0 comments on commit cb506fd

Please sign in to comment.