diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/AbstractHeapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/AbstractHeapChunk.java new file mode 100644 index 000000000000..be3643e50860 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/AbstractHeapChunk.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ccsmap; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public abstract class AbstractHeapChunk implements HeapChunk { + + private final long chunkID; + private final int capacity; + private final boolean isPooled; + + private final AtomicInteger alignOccupancy = new AtomicInteger(0); + + protected final AtomicInteger nextFreeOffset = new AtomicInteger(0); + protected ByteBuffer chunk; + + protected AbstractHeapChunk(long chunkID, int capacity, boolean isPooled) { + this.chunkID = chunkID; + this.capacity = capacity; + this.isPooled = isPooled; + } + + @Override + public long getChunkID() { + return chunkID; + } + + @Override + public int getPosition() { + return nextFreeOffset.get(); + } + + @Override + public int allocate(int len) { + int oldLen = len; + //TODO reuse the removed node's space. + //TODO add config for support unalign + //align + len = align(len); + + while (true) { + int oldOffset = nextFreeOffset.get(); + if (oldOffset + len > getLimit()) { + return -1; // alloc doesn't fit + } + // Try to atomically claim this chunk + if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + len)) { + // we got the alloc + alignOccupancy.addAndGet(oldLen - len); + return oldOffset; + } + } + } + + private int align(int len) { + return (len % 8 != 0) ? ((len / 8 + 1) * 8) : len; + } + + @Override + public int getLimit() { + return capacity; + } + + @Override + public ByteBuffer getByteBuffer() { + return chunk; + } + + @Override + public boolean isPooledChunk() { + return isPooled; + } + + @Override + public ByteBuffer asSubByteBuffer(int offset, int len) { + ByteBuffer duplicate = chunk.duplicate(); + duplicate.limit(offset + len); + duplicate.position(offset); + return duplicate.slice(); + } + + public abstract HeapMode getHeapMode(); + + @Override + public int occupancy() { + return getLimit() - getPosition(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof AbstractHeapChunk)) { + return false; + } + AbstractHeapChunk that = (AbstractHeapChunk) obj; + return getChunkID() == that.getChunkID(); + } + + @Override + public int hashCode() { + return (int) (getChunkID() & CCSMapUtils.FOUR_BYTES_MARK); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/CCSMapUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/CCSMapUtils.java new file mode 100644 index 000000000000..34a402f77847 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/CCSMapUtils.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ccsmap; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public final class CCSMapUtils { + + private CCSMapUtils() {} + + static final long FOUR_BYTES_MARK = 0xFFFFFFFF; + + static final String CHUNK_CAPACITY_KEY = "hbase.regionserver.memstore.ccsmap.capacity"; + + static final String CHUNK_SIZE_KEY = "hbase.regionserver.memstore.ccsmap.chunksize"; + + static final String INITIAL_CHUNK_COUNT_KEY = "hbase.regionserver.memstore.ccsmap.chunk.initial"; + + static final String USE_OFFHEAP = "hbase.regionserver.memstore.ccsmap.useoffheap"; + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/ChunkAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/ChunkAllocator.java new file mode 100644 index 000000000000..c994a10a29ae --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/ChunkAllocator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ccsmap; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import java.util.concurrent.atomic.AtomicLong; + +@InterfaceAudience.Private +public class ChunkAllocator { + + private final HeapMode heapMode; + // ID starts from maxChunkCount, so the unpooled chunks ID range is [maxChunkCount, +) + private final AtomicLong unpooledChunkIDGenerator; + // ID starts from 0, so the pooled chunks ID range is [0, maxChunkCount) + private final AtomicLong pooledChunkIDGenerator = new AtomicLong(-1); + + public ChunkAllocator(HeapMode heapMode, long maxChunkCount) { + this.heapMode = heapMode; + unpooledChunkIDGenerator = new AtomicLong(maxChunkCount); + } + + /** + * Allocate a pooled chunk with specified size. + * @param size size of a chunk + * @return a chunk + */ + AbstractHeapChunk allocatePooledChunk(int size) { + return heapMode == HeapMode.ON_HEAP ? + new OnHeapChunk(pooledChunkIDGenerator.incrementAndGet(), size) : + new OffHeapChunk(pooledChunkIDGenerator.incrementAndGet(), size); + } + + /** + * Allocate a unpooled chunk with specified size. + * @param size size of a chunk + * @return a chunk + */ + AbstractHeapChunk allocateUnpooledChunk(int size) { + return new OnHeapChunk(unpooledChunkIDGenerator.getAndIncrement(), size, false); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/ChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/ChunkPool.java new file mode 100644 index 000000000000..9caa172fb921 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/ChunkPool.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ccsmap; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * It mainly focus on managing reusable chunks, to make GC friendy. + * Also, if situation needs, this class also keep tracking non-resuable chunks. + */ +@InterfaceAudience.Private +public class ChunkPool { + + private static final Log LOG = LogFactory.getLog(ChunkPool.class); + private static final Object initLock = new Object(); + private static volatile ChunkPool globalInstance; + + private final HeapMode heapMode; + private final int maxChunkCount; + private final int chunkSize; + private final ChunkAllocator chunkAllocator; + private final AbstractHeapChunk[] chunkArray; + private final Queue chunkQueue; + // Map for tracking unpooled chunk's + private final Map unpooledChunks = new ConcurrentHashMap<>(); + + private final AtomicLong currentChunkCounter = new AtomicLong(0); + private final AtomicLong unpooledChunkUsed = new AtomicLong(0); + + public ChunkPool(ChunkPoolParameters parameters) { + heapMode = parameters.heapMode; + chunkSize = parameters.chunkSize; + + long capacity = parameters.capacity; + maxChunkCount = (int) (capacity / this.chunkSize); + int initialCount = parameters.initialCount; + if (initialCount > maxChunkCount) { + initialCount = maxChunkCount; + parameters.initialCount = initialCount; + } + chunkAllocator = new ChunkAllocator(heapMode, maxChunkCount); + + chunkArray = new AbstractHeapChunk[maxChunkCount]; + chunkQueue = new ConcurrentLinkedQueue<>(); + for (int i = 0; i < initialCount; i++) { + if (!this.chunkQueue.offer(allocatePooledChunk())) { + // Should not happen, since the queue has no size limit. + throw new IllegalStateException("Initial count=" + initialCount + + ", but failed to allocate the " + i + "th chunk."); + } + } + } + + /** + * Try to allocate a chunk with specified size. + * @param size size of a chunk + * @return a chunk + */ + public AbstractHeapChunk allocate(int size) { + AbstractHeapChunk chunk; + if (size > chunkSize) { + LOG.warn("Allocating a big chunk which size is " + size + + " larger than specified size: " + chunkSize); + return allocateUnpooledChunk(size); + } + chunk = chunkQueue.poll(); + if (chunk == null) { + synchronized (chunkAllocator) { + if (currentChunkCounter.get() >= maxChunkCount) { + LOG.debug("No more available reusable chunk in this pool, " + + "will use unpooled chunk on heap before pooled chunks reclaimed."); + return allocateUnpooledChunk(chunkSize); + } + chunk = allocatePooledChunk(); + } + } + return chunk; + } + + private AbstractHeapChunk allocateUnpooledChunk(int len) { + // An unpooled chunk must be on heap. + AbstractHeapChunk chunk = chunkAllocator.allocateUnpooledChunk(len); + unpooledChunkUsed.addAndGet(len); + unpooledChunks.put(chunk.getChunkID(), chunk); + return chunk; + } + + /** + * Reclaim a chunk if it is reusable, remove it otherwise. + * @param chunk a chunk not in use + */ + public void reclaimChunk(AbstractHeapChunk chunk) { + //not support putback duplicate. + if (chunk.getHeapMode() == this.heapMode && chunk.isPooledChunk()) { + chunk.getByteBuffer().clear(); + if (!chunkQueue.offer(chunk)) { + // Should not happen, since the queue has no size limit. + throw new IllegalStateException("Can't reclaim chunk"); + } + } else { + unpooledChunks.remove(chunk.getChunkID()); + unpooledChunkUsed.addAndGet(-chunk.getLimit()); + } + } + + private AbstractHeapChunk allocatePooledChunk() { + AbstractHeapChunk chunk = chunkAllocator.allocatePooledChunk(chunkSize); + chunkArray[(int) chunk.getChunkID()] = chunk; + currentChunkCounter.incrementAndGet(); + return chunk; + } + + public AbstractHeapChunk getChunkByID(long chunkID) { + return chunkID < maxChunkCount ? chunkArray[(int) chunkID] : unpooledChunks.get(chunkID); + } + + @VisibleForTesting + long getCurrentChunkCounter() { + return currentChunkCounter.get(); + } + + @VisibleForTesting + long getUnpooledChunkUsed() { + return unpooledChunkUsed.get(); + } + + @VisibleForTesting + Queue getChunkQueue() { + return chunkQueue; + } + + @VisibleForTesting + AbstractHeapChunk[] getChunkArray() { + return chunkArray; + } + + @VisibleForTesting + Map getUnpooledChunksMap() { + return unpooledChunks; + } + + @VisibleForTesting + int getMaxChunkCount() { + return maxChunkCount; + } + + public static ChunkPool initialize(Configuration conf) { + if (globalInstance != null) { + return globalInstance; + } + synchronized (initLock) { + if (globalInstance == null) { + ChunkPoolParameters parameters = new ChunkPoolParameters(conf); + globalInstance = new ChunkPool(parameters); + LOG.info("CCSMapMemstore's chunkPool initialized with " + parameters); + } + } + return globalInstance; + } + + static class ChunkPoolParameters { + long capacity; + int chunkSize; + int initialCount; + HeapMode heapMode; + + ChunkPoolParameters(Configuration conf) { + capacity = conf.getLong(CCSMapUtils.CHUNK_CAPACITY_KEY, Long.MIN_VALUE); + chunkSize = conf.getInt(CCSMapUtils.CHUNK_SIZE_KEY, Integer.MIN_VALUE); + initialCount = conf.getInt(CCSMapUtils.INITIAL_CHUNK_COUNT_KEY, Integer.MIN_VALUE); + heapMode = conf.getBoolean(CCSMapUtils.USE_OFFHEAP, false) ? + HeapMode.OFF_HEAP : HeapMode.ON_HEAP; + } + + @Override + public String toString() { + return "ChunkPoolParameters{" + "capacity=" + capacity + ", chunkSize=" + chunkSize + + ", initialCount=" + initialCount + ", heapMode=" + heapMode + '}'; + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/HeapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/HeapChunk.java new file mode 100644 index 000000000000..2527e84e7a20 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/HeapChunk.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ccsmap; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A chunk of heap memory out of which allocations are sliced. + */ +@InterfaceAudience.Private +public interface HeapChunk { + + /** + * @return ID of a Chunk + */ + long getChunkID(); + + /** + * @return offset of this Chunk + */ + int getPosition(); + + /** + * Try to allocate len bytes from the chunk. Note, alignment will happen. + * @return the start offset of the successful allocation, or -1 to indicate not-enough-space + */ + int allocate(int len); + + /** + * @return the total len of this Chunk. + */ + int getLimit(); + + /** + * @return if this Chunk is a pooled + */ + boolean isPooledChunk(); + + /** + * @return This chunk's backing ByteBuffer. + */ + ByteBuffer getByteBuffer(); + + /** + * Creates a new byte buffer that shares this buffer's content. + * @param offset start offset + * @param len share length + * @return a ByteBuffer + */ + ByteBuffer asSubByteBuffer(int offset, int len); + + /** + * @return number of free space in bytes, after alignment + */ + int occupancy(); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/HeapMode.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/HeapMode.java new file mode 100644 index 000000000000..61d8a302373d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/HeapMode.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ccsmap; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public enum HeapMode { + ON_HEAP, OFF_HEAP +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/OffHeapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/OffHeapChunk.java new file mode 100644 index 000000000000..a99a0bd3a436 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/OffHeapChunk.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ccsmap; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import java.nio.ByteBuffer; + +@InterfaceAudience.Private +public class OffHeapChunk extends AbstractHeapChunk { + + public OffHeapChunk(long chunkID, int capacity) { + this(chunkID, capacity, true); + } + + public OffHeapChunk(long chunkID, int capacity, boolean isPooled) { + super(chunkID, capacity, isPooled); + chunk = ByteBuffer.allocateDirect(capacity); + } + + @Override + public HeapMode getHeapMode() { + return HeapMode.OFF_HEAP; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/OnHeapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/OnHeapChunk.java new file mode 100644 index 000000000000..9d42c0761236 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ccsmap/OnHeapChunk.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ccsmap; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import java.nio.ByteBuffer; + +@InterfaceAudience.Private +public class OnHeapChunk extends AbstractHeapChunk { + + public OnHeapChunk(long chunkID, int capacity) { + this(chunkID, capacity, true); + } + + public OnHeapChunk(long chunkID, int capacity, boolean isPooled) { + super(chunkID, capacity, isPooled); + chunk = ByteBuffer.allocate(capacity); + } + + @Override + public HeapMode getHeapMode() { + return HeapMode.ON_HEAP; + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ccsmap/TestChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ccsmap/TestChunkPool.java new file mode 100644 index 000000000000..feac0b54164e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ccsmap/TestChunkPool.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ccsmap; + +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ccsmap.ChunkPool.ChunkPoolParameters; +import org.apache.hadoop.hbase.testclassification.SmallTests; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestChunkPool { + + private Configuration conf = HBaseConfiguration.create(); + + @Test + public void testNormal() { + conf.setLong(CCSMapUtils.CHUNK_CAPACITY_KEY, 8 * 1024 * 1024); + conf.setInt(CCSMapUtils.CHUNK_SIZE_KEY, 4 * 1024); + conf.setInt(CCSMapUtils.INITIAL_CHUNK_COUNT_KEY, Integer.MAX_VALUE); + conf.setBoolean(CCSMapUtils.USE_OFFHEAP, true); + ChunkPoolParameters parameters = new ChunkPoolParameters(conf); + ChunkPool chunkPool = new ChunkPool(parameters); + + int numberOfChunk = chunkPool.getChunkQueue().size(); + Assert.assertEquals(2 * 1024, numberOfChunk); + Assert.assertEquals(numberOfChunk, chunkPool.getChunkArray().length); + Assert.assertEquals(2 * 1024, chunkPool.getCurrentChunkCounter()); + Assert.assertEquals(2 * 1024, chunkPool.getMaxChunkCount()); + + AbstractHeapChunk chunk = chunkPool.allocate(4 * 1024 - 1); + Assert.assertTrue(chunk.isPooledChunk()); + Assert.assertEquals(HeapMode.OFF_HEAP, chunk.getHeapMode()); + Assert.assertEquals(4 * 1024, chunk.getLimit()); + Assert.assertEquals(0, chunk.getChunkID()); + Assert.assertEquals(0, chunk.getPosition()); + Assert.assertEquals(2 * 1024 - 1, chunkPool.getChunkQueue().size()); + + for (int i = 0; i < chunkPool.getChunkArray().length; i++) { + Assert.assertEquals(i, chunkPool.getChunkArray()[i].getChunkID()); + } + + Assert.assertEquals(2 * 1024, chunkPool.getCurrentChunkCounter()); + Assert.assertEquals(0, chunkPool.getUnpooledChunkUsed()); + + int unpooledSize = 4 * 1024 + 1; + AbstractHeapChunk unpooledChunk = chunkPool.allocate(unpooledSize); + Assert.assertTrue(unpooledChunk instanceof OnHeapChunk); + Assert.assertFalse(unpooledChunk.isPooledChunk()); + Assert.assertEquals(HeapMode.ON_HEAP, unpooledChunk.getHeapMode()); + Assert.assertEquals(4 * 1024 + 1, unpooledChunk.getLimit()); + long maxChunkCount = chunkPool.getMaxChunkCount(); + Assert.assertEquals(maxChunkCount, unpooledChunk.getChunkID()); + // Nothing changed in chunks pool + Assert.assertEquals(2 * 1024 - 1, chunkPool.getChunkQueue().size()); + Assert.assertEquals(2 * 1024, chunkPool.getChunkArray().length); + Assert.assertEquals(2 * 1024, chunkPool.getCurrentChunkCounter()); + Assert.assertEquals(unpooledSize, chunkPool.getUnpooledChunkUsed()); + Map unpooledChunks = chunkPool.getUnpooledChunksMap(); + Assert.assertEquals(1, unpooledChunks.size()); + Assert.assertEquals(unpooledChunk, unpooledChunks.get(unpooledChunk.getChunkID())); + + System.out.println("chunk position=" + chunk.getPosition()); + System.out.println("chunk limit=" + chunk.getLimit()); + System.out.println("chunk BB position=" + chunk.getByteBuffer().position()); + System.out.println("chunk BB limit=" + chunk.getByteBuffer().limit()); + System.out.println("chunk BB capacity=" + chunk.getByteBuffer().capacity()); + + chunkPool.reclaimChunk(chunk); + Assert.assertEquals(2 * 1024, chunkPool.getChunkQueue().size()); + Assert.assertEquals(2 * 1024, chunkPool.getChunkArray().length); + Assert.assertEquals(2 * 1024, chunkPool.getCurrentChunkCounter()); + Assert.assertEquals(0, chunk.getPosition()); + Assert.assertEquals(0, chunk.getByteBuffer().position()); + Assert.assertEquals(4 * 1024, chunk.getByteBuffer().limit()); + Assert.assertEquals(4 * 1024, chunk.getLimit()); + Assert.assertEquals(0, chunk.getChunkID()); + System.out.println("chunk position=" + chunk.getPosition()); + System.out.println("chunk limit=" + chunk.getLimit()); + System.out.println("chunk BB position=" + chunk.getByteBuffer().position()); + System.out.println("chunk BB limit=" + chunk.getByteBuffer().limit()); + System.out.println("chunk BB capacity=" + chunk.getByteBuffer().capacity()); + + chunkPool.reclaimChunk(unpooledChunk); + Assert.assertEquals(2 * 1024, chunkPool.getChunkQueue().size()); + Assert.assertEquals(2 * 1024, chunkPool.getChunkArray().length); + Assert.assertEquals(2 * 1024, chunkPool.getCurrentChunkCounter()); + Assert.assertEquals(0, unpooledChunks.size()); + Assert.assertEquals(0, chunkPool.getUnpooledChunkUsed()); + } + + @Test + public void testExaustedNormalChunk() { + conf.setLong(CCSMapUtils.CHUNK_CAPACITY_KEY, 8 * 1024); + conf.setInt(CCSMapUtils.CHUNK_SIZE_KEY, 4 * 1024); + conf.setInt(CCSMapUtils.INITIAL_CHUNK_COUNT_KEY, Integer.MAX_VALUE); + conf.setBoolean(CCSMapUtils.USE_OFFHEAP, true); + ChunkPoolParameters parameters = new ChunkPoolParameters(conf); + ChunkPool chunkPool = new ChunkPool(parameters); + Assert.assertEquals(2, chunkPool.getChunkQueue().size()); + Assert.assertEquals(chunkPool.getChunkArray().length, chunkPool.getChunkQueue().size()); + Assert.assertEquals(2, chunkPool.getCurrentChunkCounter()); + Assert.assertEquals(2, chunkPool.getMaxChunkCount()); + Assert.assertEquals(0, chunkPool.getUnpooledChunksMap().size()); + + AbstractHeapChunk chunk1 = chunkPool.allocate(4 * 1024 - 1); + Assert.assertTrue(chunk1.isPooledChunk()); + Assert.assertEquals(HeapMode.OFF_HEAP, chunk1.getHeapMode()); + Assert.assertEquals(4 * 1024, chunk1.getLimit()); + Assert.assertEquals(0, chunk1.getChunkID()); + Assert.assertEquals(0, chunk1.getPosition()); + Assert.assertEquals(1, chunkPool.getChunkQueue().size()); + Assert.assertEquals(2, chunkPool.getChunkArray().length); + + AbstractHeapChunk chunk2 = chunkPool.allocate(4 * 1024 - 2); + Assert.assertTrue(chunk2.isPooledChunk()); + Assert.assertEquals(HeapMode.OFF_HEAP, chunk2.getHeapMode()); + Assert.assertEquals(4 * 1024, chunk2.getLimit()); + Assert.assertEquals(1, chunk2.getChunkID()); + Assert.assertEquals(0, chunk2.getPosition()); + Assert.assertEquals(0, chunkPool.getChunkQueue().size()); + Assert.assertEquals(2, chunkPool.getChunkArray().length); + + // Exhausted + AbstractHeapChunk chunk3 = chunkPool.allocate(4 * 1024 - 3); + Assert.assertEquals(HeapMode.ON_HEAP, chunk3.getHeapMode()); + Assert.assertFalse(chunk3.isPooledChunk()); + Assert.assertEquals(4 * 1024, chunk3.getLimit()); + Assert.assertEquals(2, chunk3.getChunkID()); + Assert.assertEquals(1, chunkPool.getUnpooledChunksMap().size()); + Assert.assertEquals(0, chunkPool.getChunkQueue().size()); + Assert.assertEquals(2, chunkPool.getChunkArray().length); + + AbstractHeapChunk chunk4 = chunkPool.allocate(4 * 1024 - 4); + Assert.assertEquals(HeapMode.ON_HEAP, chunk3.getHeapMode()); + Assert.assertFalse(chunk4.isPooledChunk()); + Assert.assertEquals(4 * 1024, chunk4.getLimit()); + Assert.assertEquals(3, chunk4.getChunkID()); + Assert.assertEquals(2, chunkPool.getUnpooledChunksMap().size()); + Assert.assertEquals(0, chunkPool.getChunkQueue().size()); + Assert.assertEquals(2, chunkPool.getChunkArray().length); + + chunkPool.reclaimChunk(chunk4); + Assert.assertEquals(1, chunkPool.getUnpooledChunksMap().size()); + Assert.assertEquals(0, chunkPool.getChunkQueue().size()); + Assert.assertEquals(2, chunkPool.getChunkArray().length); + + chunk4 = chunkPool.allocate(4 * 1024 - 4); + Assert.assertEquals(HeapMode.ON_HEAP, chunk3.getHeapMode()); + Assert.assertFalse(chunk4.isPooledChunk()); + Assert.assertEquals(4 * 1024, chunk4.getLimit()); + Assert.assertEquals(4, chunk4.getChunkID()); + Assert.assertEquals(2, chunkPool.getUnpooledChunksMap().size()); + Assert.assertEquals(0, chunkPool.getChunkQueue().size()); + Assert.assertEquals(2, chunkPool.getChunkArray().length); + + // A chunk larger than specified + AbstractHeapChunk chunk5 = chunkPool.allocate(4 * 1024 + 1); + Assert.assertEquals(HeapMode.ON_HEAP, chunk5.getHeapMode()); + Assert.assertFalse(chunk5.isPooledChunk()); + Assert.assertEquals(4 * 1024 + 1, chunk5.getLimit()); + Assert.assertEquals(5, chunk5.getChunkID()); + Assert.assertEquals(3, chunkPool.getUnpooledChunksMap().size()); + Assert.assertEquals(0, chunkPool.getChunkQueue().size()); + Assert.assertEquals(2, chunkPool.getChunkArray().length); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ccsmap/TestHeapChunk.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ccsmap/TestHeapChunk.java new file mode 100644 index 000000000000..0dd4a5bd5d9e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ccsmap/TestHeapChunk.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ccsmap; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestHeapChunk { + + @Test + public void testOnHeapNormal() { + int len = 4 * 1024 * 1024; // 4MB + long chunkID = 1234; + OnHeapChunk chunk1 = new OnHeapChunk(chunkID, len); + Assert.assertEquals(1234, chunk1.getChunkID()); + Assert.assertEquals(0, chunk1.getPosition()); + Assert.assertEquals(len, chunk1.getLimit()); + Assert.assertTrue(chunk1.isPooledChunk()); + Assert.assertEquals(len, chunk1.occupancy()); + Assert.assertEquals(HeapMode.ON_HEAP, chunk1.getHeapMode()); + Assert.assertEquals(chunkID, chunk1.hashCode()); + Assert.assertNotNull(chunk1.getByteBuffer()); + + int bytes1 = 1023; + int startPosition1 = chunk1.allocate(bytes1); + Assert.assertEquals(0, startPosition1); + // Since alignment happened, it should start from 1024 + Assert.assertEquals(1024, chunk1.getPosition()); + Assert.assertEquals(len - 1024, chunk1.occupancy()); + + int bytes2 = 1025; + int startPosistion2 = chunk1.allocate(bytes2); + Assert.assertEquals(1024, startPosistion2); + // Since alignment happened, it should start from 1024 + (1024 + 8) + Assert.assertEquals(1024 + 1032, chunk1.getPosition()); + Assert.assertEquals(len - 1024 - 1032, chunk1.occupancy()); + + ByteBuffer bb = chunk1.getByteBuffer(); + Assert.assertEquals(len, bb.limit()); + Assert.assertEquals(len, bb.capacity()); + Assert.assertEquals(0, bb.position()); + + int len2 = 4096; + ByteBuffer bb2 = chunk1.asSubByteBuffer(100, len2); + Assert.assertEquals(len2, bb2.limit()); + Assert.assertEquals(len2, bb2.capacity()); + Assert.assertEquals(0, bb2.position()); + + OnHeapChunk chunk2 = new OnHeapChunk(1234, len); + //As long as chunkID is same, Chunk is the same + Assert.assertEquals(chunk1, chunk2); + + OnHeapChunk chunk3 = new OnHeapChunk(1235, len, false); + Assert.assertFalse(chunk3.isPooledChunk()); + } + + @Test + public void testOffHeapNormal() { + int len = 4 * 1024 * 1024; // 4MB + long chunkID = 1234; + OffHeapChunk chunk1 = new OffHeapChunk(chunkID, len); + Assert.assertEquals(1234, chunk1.getChunkID()); + Assert.assertEquals(0, chunk1.getPosition()); + Assert.assertEquals(len, chunk1.getLimit()); + Assert.assertTrue(chunk1.isPooledChunk()); + Assert.assertEquals(len, chunk1.occupancy()); + Assert.assertEquals(HeapMode.OFF_HEAP, chunk1.getHeapMode()); + Assert.assertEquals(chunkID, chunk1.hashCode()); + Assert.assertNotNull(chunk1.getByteBuffer()); + + int bytes1 = 1023; + int startPosition1 = chunk1.allocate(bytes1); + Assert.assertEquals(0, startPosition1); + // Since alignment happened, it should start from 1024 + Assert.assertEquals(1024, chunk1.getPosition()); + Assert.assertEquals(len - 1024, chunk1.occupancy()); + + int bytes2 = 1025; + int startPosistion2 = chunk1.allocate(bytes2); + Assert.assertEquals(1024, startPosistion2); + // Since alignment happened, it should start from 1024 + (1024 + 8) + Assert.assertEquals(1024 + 1032, chunk1.getPosition()); + Assert.assertEquals(len - 1024 - 1032, chunk1.occupancy()); + + ByteBuffer bb = chunk1.getByteBuffer(); + Assert.assertEquals(len, bb.limit()); + Assert.assertEquals(len, bb.capacity()); + Assert.assertEquals(0, bb.position()); + + int len2 = 4096; + ByteBuffer bb2 = chunk1.asSubByteBuffer(100, len2); + Assert.assertEquals(len2, bb2.limit()); + Assert.assertEquals(len2, bb2.capacity()); + Assert.assertEquals(0, bb2.position()); + + OffHeapChunk chunk2 = new OffHeapChunk(1234, len); + //As long as chunkID is same, Chunk is the same + Assert.assertEquals(chunk1, chunk2); + + OffHeapChunk chunk3 = new OffHeapChunk(1235, len, false); + Assert.assertFalse(chunk3.isPooledChunk()); + } + + @Test + public void testConcurrentWriteOnHeap() throws Exception { + int len = 4 * 1024 * 1024; + OnHeapChunk chunk = new OnHeapChunk(1234, len); + + int concurrent = 50; + final ByteBuffer[] bbArray = new ByteBuffer[concurrent]; + + for (int i = 0; i < concurrent; i++) { + bbArray[i] = chunk.asSubByteBuffer(i * 2049, 1023); + } + + final AtomicBoolean hasError = new AtomicBoolean(false); + Thread[] ths = new Thread[concurrent]; + + for (int i = 0; i < concurrent; i++) { + final int thid = i; + ths[i] = new Thread(new Runnable() { + @Override + public void run() { + ByteBuffer bb = ByteBuffer.allocate(13); + bb.put((byte) thid); + bb.putInt(thid); + bb.putLong(thid); + bb.flip(); + try { + Assert.assertEquals(0, bbArray[thid].position()); + Thread.sleep(100); + bbArray[thid].put((byte) thid); + Assert.assertEquals(1, bbArray[thid].position()); + Thread.sleep(100); + bbArray[thid].putInt(thid); + Assert.assertEquals(1 + 4, bbArray[thid].position()); + Thread.sleep(100); + bbArray[thid].putLong(thid); + Assert.assertEquals(1 + 4 + 8, bbArray[thid].position()); + Thread.sleep(100); + bbArray[thid].put(bb); + Assert.assertEquals(1 + 4 + 8 + 13, bbArray[thid].position()); + } catch (Throwable e) { + e.printStackTrace(); + hasError.set(true); + } + } + }); + } + + for (int j = 0; j < concurrent; j++) { + ths[j].start(); + } + + for (int j = 0; j < concurrent; j++) { + ths[j].join(); + } + + Assert.assertFalse(hasError.get()); + + for (int j = 0; j < concurrent; j++) { + bbArray[j].rewind(); + Assert.assertEquals(0, bbArray[j].position()); + Assert.assertEquals(j, bbArray[j].get()); + Assert.assertEquals(1, bbArray[j].position()); + Assert.assertEquals(j, bbArray[j].getInt()); + Assert.assertEquals(1 + 4, bbArray[j].position()); + Assert.assertEquals(j, bbArray[j].getLong()); + Assert.assertEquals(1 + 4 + 8, bbArray[j].position()); + Assert.assertEquals(j, bbArray[j].get()); + Assert.assertEquals(1 + 4 + 8 + 1, bbArray[j].position()); + Assert.assertEquals(j, bbArray[j].getInt()); + Assert.assertEquals(1 + 4 + 8 + 1 + 4, bbArray[j].position()); + Assert.assertEquals(j, bbArray[j].getLong()); + Assert.assertEquals(1 + 4 + 8 + 1 + 4 + 8, bbArray[j].position()); + } + + ByteBuffer bb = chunk.getByteBuffer(); + bb.rewind(); + for (int j = 0; j < concurrent; j++) { + bb.position(j * 2049); + Assert.assertEquals(j, bb.get()); + Assert.assertEquals(j, bb.getInt()); + Assert.assertEquals(j, bb.getLong()); + Assert.assertEquals(j, bb.get()); + Assert.assertEquals(j, bb.getInt()); + Assert.assertEquals(j, bb.getLong()); + } + } + + @Test + public void testConcurrentWriteOffHeap() throws Exception { + int len = 4 * 1024 * 1024; // 4MB + OffHeapChunk chunk = new OffHeapChunk(1234, len); + + int concurrent = 50; + final ByteBuffer[] bbArray = new ByteBuffer[concurrent]; + + for (int i = 0; i < concurrent; i++) { + bbArray[i] = chunk.asSubByteBuffer(i * 2049, 1023); + } + + final AtomicBoolean hasError = new AtomicBoolean(false); + Thread[] ths = new Thread[concurrent]; + + for (int i = 0; i < concurrent; i++) { + final int thid = i; + ths[i] = new Thread(new Runnable() { + @Override + public void run() { + ByteBuffer bb = ByteBuffer.allocate(13); + bb.put((byte) thid); + bb.putInt(thid); + bb.putLong(thid); + bb.flip(); + try { + Assert.assertEquals(0, bbArray[thid].position()); + Thread.sleep(1000); + bbArray[thid].put((byte) thid); + Assert.assertEquals(1, bbArray[thid].position()); + Thread.sleep(1000); + bbArray[thid].putInt(thid); + Assert.assertEquals(1 + 4, bbArray[thid].position()); + Thread.sleep(1000); + bbArray[thid].putLong(thid); + Assert.assertEquals(1 + 4 + 8, bbArray[thid].position()); + Thread.sleep(1000); + bbArray[thid].put(bb); + Assert.assertEquals(1 + 4 + 8 + 13, bbArray[thid].position()); + } catch (Throwable e) { + e.printStackTrace(); + hasError.set(true); + } + } + }); + } + + for (int j = 0; j < concurrent; j++) { + ths[j].start(); + } + + for (int j = 0; j < concurrent; j++) { + ths[j].join(); + } + + Assert.assertFalse(hasError.get()); + + for (int j = 0; j < concurrent; j++) { + bbArray[j].rewind(); + Assert.assertEquals(0, bbArray[j].position()); + Assert.assertEquals(j, bbArray[j].get()); + Assert.assertEquals(1, bbArray[j].position()); + Assert.assertEquals(j, bbArray[j].getInt()); + Assert.assertEquals(1 + 4, bbArray[j].position()); + Assert.assertEquals(j, bbArray[j].getLong()); + Assert.assertEquals(1 + 4 + 8, bbArray[j].position()); + Assert.assertEquals(j, bbArray[j].get()); + Assert.assertEquals(1 + 4 + 8 + 1, bbArray[j].position()); + Assert.assertEquals(j, bbArray[j].getInt()); + Assert.assertEquals(1 + 4 + 8 + 1 + 4, bbArray[j].position()); + Assert.assertEquals(j, bbArray[j].getLong()); + Assert.assertEquals(1 + 4 + 8 + 1 + 4 + 8, bbArray[j].position()); + } + + ByteBuffer bb = chunk.getByteBuffer(); + bb.rewind(); + for (int j = 0; j < concurrent; j++) { + bb.position(j * 2049); + Assert.assertEquals(j, bb.get()); + Assert.assertEquals(j, bb.getInt()); + Assert.assertEquals(j, bb.getLong()); + Assert.assertEquals(j, bb.get()); + Assert.assertEquals(j, bb.getInt()); + Assert.assertEquals(j, bb.getLong()); + } + } + +}