Skip to content

Commit

Permalink
HBASE-16438 Create a cell type so that chunk id is embedded in it (Ram)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ramkrishna committed Apr 19, 2017
1 parent 6e962d6 commit 972e8c8
Show file tree
Hide file tree
Showing 44 changed files with 1,055 additions and 519 deletions.
24 changes: 0 additions & 24 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
Expand Up @@ -3135,28 +3135,4 @@ public byte getTypeByte() {
return Type.DeleteFamily.getCode(); return Type.DeleteFamily.getCode();
} }
} }

/**
* Clone the passed cell by copying its data into the passed buf.
*/
public static Cell copyCellTo(Cell cell, ByteBuffer buf, int offset, int len) {
int tagsLen = cell.getTagsLength();
if (cell instanceof ExtendedCell) {
((ExtendedCell) cell).write(buf, offset);
} else {
// Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
// other case also. The data fragments within Cell is copied into buf as in KeyValue
// serialization format only.
KeyValueUtil.appendTo(cell, buf, offset, true);
}
if (tagsLen == 0) {
// When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
// which directly return tagsLen as 0. So we avoid parsing many length components in
// reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
// call getTagsLength().
return new NoTagsByteBufferKeyValue(buf, offset, len, cell.getSequenceId());
} else {
return new ByteBufferKeyValue(buf, offset, len, cell.getSequenceId());
}
}
} }
Expand Up @@ -34,6 +34,7 @@
public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestamp, HeapSize, public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestamp, HeapSize,
Cloneable { Cloneable {


public static int CELL_NOT_BASED_ON_CHUNK = -1;
/** /**
* Write this cell to an OutputStream in a {@link KeyValue} format. * Write this cell to an OutputStream in a {@link KeyValue} format.
* <br> KeyValue format <br> * <br> KeyValue format <br>
Expand Down Expand Up @@ -73,4 +74,13 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam
* @return The deep cloned cell * @return The deep cloned cell
*/ */
Cell deepClone(); Cell deepClone();

/**
* Extracts the id of the backing bytebuffer of this cell if it was obtained from fixed sized
* chunks as in case of MemstoreLAB
* @return the chunk id if the cell is backed by fixed sized Chunks, else return -1
*/
default int getChunkId() {
return CELL_NOT_BASED_ON_CHUNK;
}
} }
Expand Up @@ -748,6 +748,8 @@ private void finishActiveMasterInitialization(MonitoredTask status)


this.masterActiveTime = System.currentTimeMillis(); this.masterActiveTime = System.currentTimeMillis();
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring. // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
// Initialize the chunkCreator
initializeMemStoreChunkCreator();
this.fileSystemManager = new MasterFileSystem(this); this.fileSystemManager = new MasterFileSystem(this);
this.walManager = new MasterWalManager(this); this.walManager = new MasterWalManager(this);


Expand Down
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.ByteBufferKeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;

/**
* ByteBuffer based cell which has the chunkid at the 0th offset
* @see MemStoreLAB
*/
//TODO : When moving this cell to CellChunkMap we will have the following things
// to be serialized
// chunkId (Integer) + offset (Integer) + length (Integer) + seqId (Long) = 20 bytes
@InterfaceAudience.Private
public class ByteBufferChunkCell extends ByteBufferKeyValue {
public ByteBufferChunkCell(ByteBuffer buf, int offset, int length) {
super(buf, offset, length);
}

public ByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) {
super(buf, offset, length, seqId);
}

@Override
public int getChunkId() {
// The chunkId is embedded at the 0th offset of the bytebuffer
return ByteBufferUtils.toInt(buf, 0);
}
}
Expand Up @@ -21,8 +21,10 @@
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;


import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;


/** /**
* A chunk of memory out of which allocations are sliced. * A chunk of memory out of which allocations are sliced.
Expand All @@ -46,21 +48,66 @@ public abstract class Chunk {
/** Size of chunk in bytes */ /** Size of chunk in bytes */
protected final int size; protected final int size;


// The unique id associated with the chunk.
private final int id;

// indicates if the chunk is formed by ChunkCreator#MemstorePool
private final boolean fromPool;

/**
* Create an uninitialized chunk. Note that memory is not allocated yet, so
* this is cheap.
* @param size in bytes
* @param id the chunk id
*/
public Chunk(int size, int id) {
this(size, id, false);
}

/** /**
* Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap. * Create an uninitialized chunk. Note that memory is not allocated yet, so
* * this is cheap.
* @param size in bytes * @param size in bytes
* @param id the chunk id
* @param fromPool if the chunk is formed by pool
*/ */
Chunk(int size) { public Chunk(int size, int id, boolean fromPool) {
this.size = size; this.size = size;
this.id = id;
this.fromPool = fromPool;
}

int getId() {
return this.id;
}

boolean isFromPool() {
return this.fromPool;
} }


/** /**
* Actually claim the memory for this chunk. This should only be called from the thread that * Actually claim the memory for this chunk. This should only be called from the thread that
* constructed the chunk. It is thread-safe against other threads calling alloc(), who will block * constructed the chunk. It is thread-safe against other threads calling alloc(), who will block
* until the allocation is complete. * until the allocation is complete.
*/ */
public abstract void init(); public void init() {
assert nextFreeOffset.get() == UNINITIALIZED;
try {
allocateDataBuffer();
} catch (OutOfMemoryError e) {
boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
assert failInit; // should be true.
throw e;
}
// Mark that it's ready for use
// Move 8 bytes since the first 8 bytes are having the chunkid in it
boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, Bytes.SIZEOF_LONG);
// We should always succeed the above CAS since only one thread
// calls init()!
Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
}

abstract void allocateDataBuffer();


/** /**
* Reset the offset to UNINITIALIZED before before reusing an old chunk * Reset the offset to UNINITIALIZED before before reusing an old chunk
Expand All @@ -74,7 +121,8 @@ void reset() {


/** /**
* Try to allocate <code>size</code> bytes from the chunk. * Try to allocate <code>size</code> bytes from the chunk.
* * If a chunk is tried to get allocated before init() call, the thread doing the allocation
* will be in busy-wait state as it will keep looping till the nextFreeOffset is set.
* @return the offset of the successful allocation, or -1 to indicate not-enough-space * @return the offset of the successful allocation, or -1 to indicate not-enough-space
*/ */
public int alloc(int size) { public int alloc(int size) {
Expand All @@ -96,7 +144,7 @@ public int alloc(int size) {
if (oldOffset + size > data.capacity()) { if (oldOffset + size > data.capacity()) {
return -1; // alloc doesn't fit return -1; // alloc doesn't fit
} }

// TODO : If seqID is to be written add 8 bytes here for nextFreeOFfset
// Try to atomically claim this chunk // Try to atomically claim this chunk
if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) { if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
// we got the alloc // we got the alloc
Expand Down

0 comments on commit 972e8c8

Please sign in to comment.