Skip to content

Commit

Permalink
HBASE-15721 Optimization in cloning cells into MSLAB.
Browse files Browse the repository at this point in the history
  • Loading branch information
anoopsjohn committed Oct 6, 2016
1 parent 58e843d commit 912ed17
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 86 deletions.
Expand Up @@ -543,6 +543,15 @@ public int getSerializedSize(boolean withTags) {
} }
return len; return len;
} }

@Override
public void write(byte[] buf, int offset) {
offset = KeyValueUtil.appendToByteArray(this.cell, buf, offset, false);
int tagsLen = this.tags.length;
assert tagsLen > 0;
offset = Bytes.putAsShort(buf, offset, tagsLen);
System.arraycopy(this.tags, 0, buf, offset, tagsLen);
}
} }


/** /**
Expand Down
Expand Up @@ -59,4 +59,11 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam
*/ */
// TODO remove the boolean param once HBASE-16706 is done. // TODO remove the boolean param once HBASE-16706 is done.
int getSerializedSize(boolean withTags); int getSerializedSize(boolean withTags);

/**
* Write the given Cell into the given buf's offset.
* @param buf The buffer where to write the Cell.
* @param offset The offset within buffer, to write the Cell.
*/
void write(byte[] buf, int offset);
} }
Expand Up @@ -2492,6 +2492,11 @@ public int getSerializedSize(boolean withTags) {
return this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE; return this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
} }


@Override
public void write(byte[] buf, int offset) {
System.arraycopy(this.bytes, this.offset, buf, offset, this.length);
}

/** /**
* Comparator that compares row component only of a KeyValue. * Comparator that compares row component only of a KeyValue.
*/ */
Expand Down
Expand Up @@ -136,7 +136,7 @@ public static KeyValue toNewKeyCell(final Cell cell) {
public static byte[] copyToNewByteArray(final Cell cell) { public static byte[] copyToNewByteArray(final Cell cell) {
int v1Length = length(cell); int v1Length = length(cell);
byte[] backingBytes = new byte[v1Length]; byte[] backingBytes = new byte[v1Length];
appendToByteArray(cell, backingBytes, 0); appendToByteArray(cell, backingBytes, 0, true);
return backingBytes; return backingBytes;
} }


Expand All @@ -156,15 +156,13 @@ public static int appendKeyTo(final Cell cell, final byte[] output,


/**************** copy key and value *********************/ /**************** copy key and value *********************/


public static int appendToByteArray(final Cell cell, final byte[] output, final int offset) { public static int appendToByteArray(Cell cell, byte[] output, int offset, boolean withTags) {
// TODO when cell instance of KV we can bypass all steps and just do backing single array
// copy(?)
int pos = offset; int pos = offset;
pos = Bytes.putInt(output, pos, keyLength(cell)); pos = Bytes.putInt(output, pos, keyLength(cell));
pos = Bytes.putInt(output, pos, cell.getValueLength()); pos = Bytes.putInt(output, pos, cell.getValueLength());
pos = appendKeyTo(cell, output, pos); pos = appendKeyTo(cell, output, pos);
pos = CellUtil.copyValueTo(cell, output, pos); pos = CellUtil.copyValueTo(cell, output, pos);
if ((cell.getTagsLength() > 0)) { if (withTags && (cell.getTagsLength() > 0)) {
pos = Bytes.putAsShort(output, pos, cell.getTagsLength()); pos = Bytes.putAsShort(output, pos, cell.getTagsLength());
pos = CellUtil.copyTagTo(cell, output, pos); pos = CellUtil.copyTagTo(cell, output, pos);
} }
Expand All @@ -178,7 +176,7 @@ public static int appendToByteArray(final Cell cell, final byte[] output, final
*/ */
public static ByteBuffer copyToNewByteBuffer(final Cell cell) { public static ByteBuffer copyToNewByteBuffer(final Cell cell) {
byte[] bytes = new byte[length(cell)]; byte[] bytes = new byte[length(cell)];
appendToByteArray(cell, bytes, 0); appendToByteArray(cell, bytes, 0, true);
ByteBuffer buffer = ByteBuffer.wrap(bytes); ByteBuffer buffer = ByteBuffer.wrap(bytes);
return buffer; return buffer;
} }
Expand Down Expand Up @@ -658,4 +656,31 @@ public static void oswrite(final Cell cell, final OutputStream out, final boolea
} }
} }
} }

/**
* Write the given cell in KeyValue serialization format into the given buf and return a new
* KeyValue object around that.
*/
public static KeyValue copyCellTo(Cell cell, byte[] buf, int offset) {
int tagsLen = cell.getTagsLength();
int len = length(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength(),
cell.getValueLength(), tagsLen, true);
if (cell instanceof ExtendedCell) {
((ExtendedCell) cell).write(buf, offset);
} else {
appendToByteArray(cell, buf, offset, true);
}
KeyValue newKv;
if (tagsLen == 0) {
// When tagsLen is 0, make a NoTagsKeyValue version of Cell. This is an optimized class which
// directly return tagsLen as 0. So we avoid parsing many length components in reading the
// tagLength stored in the backing buffer. The Memstore addition of every Cell call
// getTagsLength().
newKv = new NoTagsKeyValue(buf, offset, len);
} else {
newKv = new KeyValue(buf, offset, len);
}
newKv.setSequenceId(cell.getSequenceId());
return newKv;
}
} }
Expand Up @@ -253,6 +253,11 @@ public int getSerializedSize(boolean withTags) {
return this.keyLen + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; return this.keyLen + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
} }


@Override
public void write(byte[] buf, int offset) {
ByteBufferUtils.copyFromBufferToArray(buf, this.buf, this.offset, offset, this.length);
}

@Override @Override
public String toString() { public String toString() {
return CellUtil.toString(this, true); return CellUtil.toString(this, true);
Expand Down
Expand Up @@ -451,6 +451,12 @@ public int getSerializedSize(boolean withTags) {
withTags); withTags);
} }


@Override
public void write(byte[] buf, int offset) {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
}

@Override @Override
public void setTimestamp(long ts) throws IOException { public void setTimestamp(long ts) throws IOException {
// This is not used in actual flow. Throwing UnsupportedOperationException // This is not used in actual flow. Throwing UnsupportedOperationException
Expand Down Expand Up @@ -695,6 +701,12 @@ public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
// This is not used in actual flow. Throwing UnsupportedOperationException // This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

@Override
public void write(byte[] buf, int offset) {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
}
} }


protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState> protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>
Expand Down
Expand Up @@ -26,11 +26,11 @@


import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.MemStoreChunkPool.PooledChunk; import org.apache.hadoop.hbase.regionserver.MemStoreChunkPool.PooledChunk;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.SimpleMutableByteRange;


import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -106,37 +106,31 @@ public HeapMemStoreLAB(Configuration conf) {
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
} }


/**
* Allocate a slice of the given length.
*
* If the size is larger than the maximum size specified for this
* allocator, returns null.
*/
@Override @Override
public ByteRange allocateBytes(int size) { public Cell copyCellInto(Cell cell) {
int size = KeyValueUtil.length(cell);
Preconditions.checkArgument(size >= 0, "negative size"); Preconditions.checkArgument(size >= 0, "negative size");

// Callers should satisfy large allocations directly from JVM since they // Callers should satisfy large allocations directly from JVM since they
// don't cause fragmentation as badly. // don't cause fragmentation as badly.
if (size > maxAlloc) { if (size > maxAlloc) {
return null; return null;
} }

Chunk c = null;
int allocOffset = 0;
while (true) { while (true) {
Chunk c = getOrMakeChunk(); c = getOrMakeChunk();

// Try to allocate from this chunk // Try to allocate from this chunk
int allocOffset = c.alloc(size); allocOffset = c.alloc(size);
if (allocOffset != -1) { if (allocOffset != -1) {
// We succeeded - this is the common case - small alloc // We succeeded - this is the common case - small alloc
// from a big buffer // from a big buffer
return new SimpleMutableByteRange(c.getData(), allocOffset, size); break;
} }

// not enough space! // not enough space!
// try to retire this chunk // try to retire this chunk
tryRetireChunk(c); tryRetireChunk(c);
} }
return KeyValueUtil.copyCellTo(cell, c.getData(), allocOffset);
} }


/** /**
Expand Down
Expand Up @@ -17,34 +17,38 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;


import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteRange;


/** /**
* A memstore-local allocation buffer. * A memstore-local allocation buffer.
* <p> * <p>
* The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) chunks from * The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) chunks from
* and then doles it out to threads that request slices into the array. * and then doles it out to threads that request slices into the array. These chunks can get pooled
* as well. See {@link MemStoreChunkPool}.
* <p> * <p>
* The purpose of this is to combat heap fragmentation in the regionserver. By ensuring that all * The purpose of this is to combat heap fragmentation in the regionserver. By ensuring that all
* KeyValues in a given memstore refer only to large chunks of contiguous memory, we ensure that * Cells in a given memstore refer only to large chunks of contiguous memory, we ensure that
* large blocks get freed up when the memstore is flushed. * large blocks get freed up when the memstore is flushed.
* <p> * <p>
* Without the MSLAB, the byte array allocated during insertion end up interleaved throughout the * Without the MSLAB, the byte array allocated during insertion end up interleaved throughout the
* heap, and the old generation gets progressively more fragmented until a stop-the-world compacting * heap, and the old generation gets progressively more fragmented until a stop-the-world compacting
* collection occurs. * collection occurs.
* <p> * <p>
* This manages the large sized chunks. When Cells are to be added to Memstore, MemStoreLAB's
* {@link #copyCellInto(Cell)} gets called. This allocates enough size in the chunk to hold this
* cell's data and copies into this area and then recreate a Cell over this copied data.
* <p>
* @see MemStoreChunkPool
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface MemStoreLAB { public interface MemStoreLAB {


/** /**
* Allocate a slice of the given length. If the size is larger than the maximum size specified for * Allocates slice in this LAB and copy the passed Cell into this area. Returns new Cell instance
* this allocator, returns null. * over the copied the data. When this MemStoreLAB can not copy this Cell, it returns null.
* @param size
* @return {@link ByteRange}
*/ */
ByteRange allocateBytes(int size); Cell copyCellInto(Cell cell);


/** /**
* Close instance since it won't be used any more, try to put the chunks back to pool * Close instance since it won't be used any more, try to put the chunks back to pool
Expand Down
Expand Up @@ -31,7 +31,6 @@
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;


Expand Down Expand Up @@ -147,18 +146,8 @@ public Cell maybeCloneWithAllocator(Cell cell) {
return cell; return cell;
} }


int len = getCellLength(cell); Cell cellFromMslab = this.memStoreLAB.copyCellInto(cell);
ByteRange alloc = this.memStoreLAB.allocateBytes(len); return (cellFromMslab != null) ? cellFromMslab : cell;
if (alloc == null) {
// The allocation was too large, allocator decided
// not to do anything with it.
return cell;
}
assert alloc.getBytes() != null;
KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset());
KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
newKv.setSequenceId(cell.getSequenceId());
return newKv;
} }


/** /**
Expand Down
Expand Up @@ -20,10 +20,10 @@


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
Expand Down Expand Up @@ -73,18 +73,22 @@ public void testReusingChunks() {
MemStoreLAB mslab = new HeapMemStoreLAB(conf); MemStoreLAB mslab = new HeapMemStoreLAB(conf);
int expectedOff = 0; int expectedOff = 0;
byte[] lastBuffer = null; byte[] lastBuffer = null;
final byte[] rk = Bytes.toBytes("r1");
final byte[] cf = Bytes.toBytes("f");
final byte[] q = Bytes.toBytes("q");
// Randomly allocate some bytes // Randomly allocate some bytes
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
int size = rand.nextInt(1000); int valSize = rand.nextInt(1000);
ByteRange alloc = mslab.allocateBytes(size); KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);

int size = KeyValueUtil.length(kv);
if (alloc.getBytes() != lastBuffer) { KeyValue newKv = (KeyValue) mslab.copyCellInto(kv);
if (newKv.getBuffer() != lastBuffer) {
expectedOff = 0; expectedOff = 0;
lastBuffer = alloc.getBytes(); lastBuffer = newKv.getBuffer();
} }
assertEquals(expectedOff, alloc.getOffset()); assertEquals(expectedOff, newKv.getOffset());
assertTrue("Allocation overruns buffer", alloc.getOffset() assertTrue("Allocation overruns buffer",
+ size <= alloc.getBytes().length); newKv.getOffset() + size <= newKv.getBuffer().length);
expectedOff += size; expectedOff += size;
} }
// chunks will be put back to pool after close // chunks will be put back to pool after close
Expand All @@ -94,7 +98,8 @@ public void testReusingChunks() {
// reconstruct mslab // reconstruct mslab
mslab = new HeapMemStoreLAB(conf); mslab = new HeapMemStoreLAB(conf);
// chunk should be got from the pool, so we can reuse it. // chunk should be got from the pool, so we can reuse it.
mslab.allocateBytes(1000); KeyValue kv = new KeyValue(rk, cf, q, new byte[10]);
mslab.copyCellInto(kv);
assertEquals(chunkCount - 1, chunkPool.getPoolSize()); assertEquals(chunkCount - 1, chunkPool.getPoolSize());
} }


Expand Down Expand Up @@ -202,21 +207,24 @@ public void testPutbackChunksMultiThreaded() throws Exception {
MemStoreChunkPool oldPool = MemStoreChunkPool.GLOBAL_INSTANCE; MemStoreChunkPool oldPool = MemStoreChunkPool.GLOBAL_INSTANCE;
final int maxCount = 10; final int maxCount = 10;
final int initialCount = 5; final int initialCount = 5;
final int chunkSize = 10; final int chunkSize = 30;
final int valSize = 7;
MemStoreChunkPool pool = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount, 1); MemStoreChunkPool pool = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount, 1);
assertEquals(initialCount, pool.getPoolSize()); assertEquals(initialCount, pool.getPoolSize());
assertEquals(maxCount, pool.getMaxCount()); assertEquals(maxCount, pool.getMaxCount());
MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created. MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created.
// Used it for the testing. Later in finally we put // Used it for the testing. Later in finally we put
// back the original // back the original
final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
new byte[valSize]);
try { try {
Runnable r = new Runnable() { Runnable r = new Runnable() {
@Override @Override
public void run() { public void run() {
MemStoreLAB memStoreLAB = new HeapMemStoreLAB(conf); MemStoreLAB memStoreLAB = new HeapMemStoreLAB(conf);
for (int i = 0; i < maxCount; i++) { for (int i = 0; i < maxCount; i++) {
memStoreLAB.allocateBytes(chunkSize);// Try allocate size = chunkSize. Means every memStoreLAB.copyCellInto(kv);// Try allocate size = chunkSize. Means every
// allocate call will result in a new chunk // allocate call will result in a new chunk
} }
// Close MemStoreLAB so that all chunks will be tried to be put back to pool // Close MemStoreLAB so that all chunks will be tried to be put back to pool
memStoreLAB.close(); memStoreLAB.close();
Expand Down

0 comments on commit 912ed17

Please sign in to comment.