Skip to content

Commit

Permalink
HBASE-15077 Support OffheapKV write in compaction with out copying da…
Browse files Browse the repository at this point in the history
…ta on heap.
  • Loading branch information
anoopsjohn committed Jan 12, 2016
1 parent ec47a81 commit da932ee
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 55 deletions.
Expand Up @@ -254,7 +254,7 @@ public int write(OutputStream out, boolean withTags) throws IOException {
length = keyLen + this.getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
}
ByteBufferUtils.putInt(out, length);
ByteBufferUtils.writeByteBuffer(out, this.buf, this.offset, length);
ByteBufferUtils.copyBufferToStream(out, this.buf, this.offset, length);
return length + Bytes.SIZEOF_INT;
}

Expand Down
@@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;

/**
* Our own implementation of ByteArrayOutputStream where all methods are NOT synchronized and
* supports writing ByteBuffer directly to it.
*/
@InterfaceAudience.Private
public class ByteArrayOutputStream extends OutputStream implements ByteBufferSupportOutputStream {

// Borrowed from openJDK:
// http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

private byte[] buf;
private int pos = 0;

public ByteArrayOutputStream() {
this(32);
}

public ByteArrayOutputStream(int capacity) {
this.buf = new byte[capacity];
}

@Override
public void write(ByteBuffer b, int off, int len) throws IOException {
checkSizeAndGrow(len);
ByteBufferUtils.copyFromBufferToArray(this.buf, b, off, this.pos, len);
this.pos += len;
}

@Override
public void writeInt(int i) throws IOException {
checkSizeAndGrow(Bytes.SIZEOF_INT);
Bytes.putInt(this.buf, this.pos, i);
this.pos += Bytes.SIZEOF_INT;
}

@Override
public void write(int b) throws IOException {
checkSizeAndGrow(Bytes.SIZEOF_BYTE);
buf[this.pos] = (byte) b;
this.pos++;
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
checkSizeAndGrow(len);
System.arraycopy(b, off, this.buf, this.pos, len);
this.pos += len;
}

private void checkSizeAndGrow(int extra) {
long capacityNeeded = this.pos + (long) extra;
if (capacityNeeded > this.buf.length) {
// guarantee it's possible to fit
if (capacityNeeded > MAX_ARRAY_SIZE) {
throw new BufferOverflowException();
}
// double until hit the cap
long nextCapacity = Math.min(this.buf.length << 1, MAX_ARRAY_SIZE);
// but make sure there is enough if twice the existing capacity is still too small
nextCapacity = Math.max(nextCapacity, capacityNeeded);
if (nextCapacity > MAX_ARRAY_SIZE) {
throw new BufferOverflowException();
}
byte[] newBuf = new byte[(int) nextCapacity];
System.arraycopy(buf, 0, newBuf, 0, buf.length);
buf = newBuf;
}
}

/**
* Resets the <code>pos</code> field of this byte array output stream to zero. The output stream
* can be used again.
*/
public void reset() {
this.pos = 0;
}

/**
* Copies the content of this Stream into a new byte array.
* @return the contents of this output stream, as new byte array.
*/
public byte toByteArray()[] {
return Arrays.copyOf(buf, pos);
}

/**
* @return the underlying array where the data gets accumulated
*/
public byte[] getBuffer() {
return this.buf;
}

/**
* @return The current size of the buffer.
*/
public int size() {
return this.pos;
}
}
Expand Up @@ -37,7 +37,8 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ByteBufferOutputStream extends OutputStream {
public class ByteBufferOutputStream extends OutputStream
implements ByteBufferSupportOutputStream {

// Borrowed from openJDK:
// http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
Expand Down
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;

/**
* Our extension of DataOutputStream which implements ByteBufferSupportOutputStream
*/
@InterfaceAudience.Private
public class ByteBufferSupportDataOutputStream extends DataOutputStream
implements ByteBufferSupportOutputStream {

public ByteBufferSupportDataOutputStream(OutputStream out) {
super(out);
}

@Override
public void write(ByteBuffer b, int off, int len) throws IOException {
ByteBufferUtils.copyBufferToStream(out, b, off, len);
written += len;
}
}
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.classification.InterfaceAudience;

/**
* Interface adds support for writing {@link ByteBuffer} into OutputStream.
*/
@InterfaceAudience.Private
public interface ByteBufferSupportOutputStream {

/**
* Writes <code>len</code> bytes from the specified ByteBuffer starting at offset <code>off</code>
* to this output stream.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
* @exception IOException
* if an I/O error occurs. In particular, an <code>IOException</code> is thrown if
* the output stream is closed.
*/
void write(ByteBuffer b, int off, int len) throws IOException;

/**
* Writes an <code>int</code> to the underlying output stream as four
* bytes, high byte first.
* @param i the <code>int</code> to write
* @throws IOException if an I/O error occurs.
*/
void writeInt(int i) throws IOException;
}
Expand Up @@ -673,14 +673,14 @@ public int write(OutputStream out, boolean withTags) throws IOException {
// Write key
out.write(keyBuffer.array());
// Write value
ByteBufferUtils.writeByteBuffer(out, this.valueBuffer, this.valueOffset, this.valueLength);
ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
if (withTags) {
// 2 bytes tags length followed by tags bytes
// tags length is serialized with 2 bytes only(short way) even if the type is int.
// As this is non -ve numbers, we save the sign bit. See HBASE-11437
out.write((byte) (0xff & (this.tagsLength >> 8)));
out.write((byte) (0xff & this.tagsLength));
ByteBufferUtils.writeByteBuffer(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
}
return lenToWrite + Bytes.SIZEOF_INT;
}
Expand Down
Expand Up @@ -27,7 +27,7 @@

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils;
Expand Down Expand Up @@ -141,8 +141,8 @@ public static void putInt(OutputStream out, final int value)
// We have writeInt in ByteBufferOutputStream so that it can directly write
// int to underlying
// ByteBuffer in one step.
if (out instanceof ByteBufferOutputStream) {
((ByteBufferOutputStream) out).writeInt(value);
if (out instanceof ByteBufferSupportOutputStream) {
((ByteBufferSupportOutputStream) out).writeInt(value);
} else {
StreamUtils.writeInt(out, value);
}
Expand Down Expand Up @@ -179,9 +179,10 @@ public static void moveBufferToStream(OutputStream out, ByteBuffer in,
*/
public static void copyBufferToStream(OutputStream out, ByteBuffer in,
int offset, int length) throws IOException {
if (in.hasArray()) {
out.write(in.array(), in.arrayOffset() + offset,
length);
if (out instanceof ByteBufferSupportOutputStream) {
((ByteBufferSupportOutputStream) out).write(in, offset, length);
} else if (in.hasArray()) {
out.write(in.array(), in.arrayOffset() + offset, length);
} else {
for (int i = 0; i < length; ++i) {
out.write(toByte(in, offset + i));
Expand Down Expand Up @@ -904,19 +905,6 @@ public static void copyFromBufferToArray(byte[] out, ByteBuffer in, int sourceOf
}
}

public static void writeByteBuffer(OutputStream out, ByteBuffer b, int offset, int length)
throws IOException {
// We have write which takes ByteBuffer in ByteBufferOutputStream so that it
// can directly write
// bytes from the src ByteBuffer to the destination ByteBuffer. This avoid
// need for temp array
// creation and copy
if (out instanceof ByteBufferOutputStream) {
((ByteBufferOutputStream) out).write(b, offset, length);
} else {
ByteBufferUtils.copyBufferToStream(out, b, offset, length);
}
}
// For testing purpose
public static String toStringBinary(final ByteBuffer b, int off, int len) {
StringBuilder result = new StringBuilder();
Expand Down
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
Expand All @@ -35,7 +34,9 @@
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.io.ByteBuffInputStream;
import org.apache.hadoop.hbase.io.ByteBufferSupportDataOutputStream;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
Expand Down Expand Up @@ -915,7 +916,7 @@ public DataOutputStream startWriting(BlockType newBlockType)
state = State.WRITING;

// We will compress it later in finishBlock()
userDataStream = new DataOutputStream(baosInMemory);
userDataStream = new ByteBufferSupportDataOutputStream(baosInMemory);
if (newBlockType == BlockType.DATA) {
this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
}
Expand Down Expand Up @@ -969,11 +970,8 @@ void ensureBlockReady() throws IOException {
*/
private void finishBlock() throws IOException {
if (blockType == BlockType.DATA) {
BufferGrabbingByteArrayOutputStream baosInMemoryCopy =
new BufferGrabbingByteArrayOutputStream();
baosInMemory.writeTo(baosInMemoryCopy);
this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
baosInMemoryCopy.buf, blockType);
baosInMemory.getBuffer(), blockType);
blockType = dataBlockEncodingCtx.getBlockType();
}
userDataStream.flush();
Expand Down Expand Up @@ -1011,19 +1009,6 @@ private void finishBlock() throws IOException {
onDiskChecksum, 0, fileContext.getChecksumType(), fileContext.getBytesPerChecksum());
}

public static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
private byte[] buf;

@Override
public void write(byte[] b, int off, int len) {
this.buf = b;
}

public byte[] getBuffer() {
return this.buf;
}
}

/**
* Put the header into the given byte array at the given offset.
* @param onDiskSize size of the block on disk header + data + checksum
Expand Down

0 comments on commit da932ee

Please sign in to comment.