Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Subtask of HBASE-20717] CCSMap: Chunk and ChunkPool structure #2165

Open
wants to merge 2 commits into
base: ccsmap
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.ccsmap;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.hbase.classification.InterfaceAudience;

@InterfaceAudience.Private
public abstract class AbstractHeapChunk implements HeapChunk {

private final long chunkID;
private final int capacity;
private final boolean isPooled;

private final AtomicInteger alignOccupancy = new AtomicInteger(0);

protected final AtomicInteger nextFreeOffset = new AtomicInteger(0);
protected ByteBuffer chunk;

protected AbstractHeapChunk(long chunkID, int capacity, boolean isPooled) {
this.chunkID = chunkID;
this.capacity = capacity;
this.isPooled = isPooled;
}

@Override
public long getChunkID() {
return chunkID;
}

@Override
public int getPosition() {
return nextFreeOffset.get();
}

@Override
public int allocate(int len) {
int oldLen = len;
//TODO reuse the removed node's space.
//TODO add config for support unalign
//align
len = align(len);

while (true) {
int oldOffset = nextFreeOffset.get();
if (oldOffset + len > getLimit()) {
return -1; // alloc doesn't fit
}
// Try to atomically claim this chunk
if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + len)) {
// we got the alloc
alignOccupancy.addAndGet(oldLen - len);
return oldOffset;
}
}
}

private int align(int len) {
return (len % 8 != 0) ? ((len / 8 + 1) * 8) : len;
}

@Override
public int getLimit() {
return capacity;
}

@Override
public ByteBuffer getByteBuffer() {
return chunk;
}

@Override
public boolean isPooledChunk() {
return isPooled;
}

@Override
public ByteBuffer asSubByteBuffer(int offset, int len) {
ByteBuffer duplicate = chunk.duplicate();
duplicate.limit(offset + len);
duplicate.position(offset);
return duplicate.slice();
}

public abstract HeapMode getHeapMode();

@Override
public int occupancy() {
return getLimit() - getPosition();
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof AbstractHeapChunk)) {
return false;
}
AbstractHeapChunk that = (AbstractHeapChunk) obj;
return getChunkID() == that.getChunkID();
}

@Override
public int hashCode() {
return (int) (getChunkID() & CCSMapUtils.FOUR_BYTES_MARK);
}

}
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.ccsmap;

import org.apache.hadoop.hbase.classification.InterfaceAudience;

@InterfaceAudience.Private
public final class CCSMapUtils {

private CCSMapUtils() {}

static final long FOUR_BYTES_MARK = 0xFFFFFFFF;

static final String CHUNK_CAPACITY_KEY = "hbase.regionserver.memstore.ccsmap.capacity";

static final String CHUNK_SIZE_KEY = "hbase.regionserver.memstore.ccsmap.chunksize";

static final String INITIAL_CHUNK_COUNT_KEY = "hbase.regionserver.memstore.ccsmap.chunk.initial";

static final String USE_OFFHEAP = "hbase.regionserver.memstore.ccsmap.useoffheap";

}
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.ccsmap;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import java.util.concurrent.atomic.AtomicLong;

@InterfaceAudience.Private
public class ChunkAllocator {

private final HeapMode heapMode;
// ID starts from maxChunkCount, so the unpooled chunks ID range is [maxChunkCount, +)
private final AtomicLong unpooledChunkIDGenerator;
// ID starts from 0, so the pooled chunks ID range is [0, maxChunkCount)
private final AtomicLong pooledChunkIDGenerator = new AtomicLong(-1);

public ChunkAllocator(HeapMode heapMode, long maxChunkCount) {
this.heapMode = heapMode;
unpooledChunkIDGenerator = new AtomicLong(maxChunkCount);
}

/**
* Allocate a pooled chunk with specified size.
* @param size size of a chunk
* @return a chunk
*/
AbstractHeapChunk allocatePooledChunk(int size) {
return heapMode == HeapMode.ON_HEAP ?
new OnHeapChunk(pooledChunkIDGenerator.incrementAndGet(), size) :
new OffHeapChunk(pooledChunkIDGenerator.incrementAndGet(), size);
}

/**
* Allocate a unpooled chunk with specified size.
* @param size size of a chunk
* @return a chunk
*/
AbstractHeapChunk allocateUnpooledChunk(int size) {
return new OnHeapChunk(unpooledChunkIDGenerator.getAndIncrement(), size, false);
}

}