From c03c4d281d01b1cd1ca7c2c5bd3f6a32522022ba Mon Sep 17 00:00:00 2001 From: "Wang, Gang(Gary)" Date: Mon, 21 Mar 2016 16:14:15 -0700 Subject: [PATCH] added Mnemonic infra. as an alternative backed allocation mechanism, note that move allocator services to the service-dist folder as the properties indicated in pom.xml. --- .gitignore | 1 + java/.gitignore | 2 + java/memory/pom.xml | 4 + .../MnemonicUnpooledByteBufAllocator.java | 72 + .../buffer/MnemonicUnpooledDirectByteBuf.java | 620 +++++++ .../MnemonicUnpooledUnsafeDirectByteBuf.java | 542 ++++++ .../netty/buffer/PooledByteBufAllocatorL.java | 18 +- .../TestMnemonicBackedBaseAllocator.java | 688 +++++++ java/pom.xml | 75 +- java/vector/pom.xml | 4 + .../vector/TestMnemonicBackedValueVector.java | 1616 +++++++++++++++++ 11 files changed, 3637 insertions(+), 5 deletions(-) create mode 100644 java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledByteBufAllocator.java create mode 100644 java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledDirectByteBuf.java create mode 100644 java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledUnsafeDirectByteBuf.java create mode 100644 java/memory/src/test/java/org/apache/arrow/memory/TestMnemonicBackedBaseAllocator.java create mode 100644 java/vector/src/test/java/org/apache/arrow/vector/TestMnemonicBackedValueVector.java diff --git a/.gitignore b/.gitignore index e6dfe19bb9807..935f5fb488efe 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ cpp/.idea/ python/.eggs/ .vscode .idea/ + diff --git a/java/.gitignore b/java/.gitignore index 03f5bf76e60d2..e596e627597fa 100644 --- a/java/.gitignore +++ b/java/.gitignore @@ -20,4 +20,6 @@ CMakeFiles Makefile cmake_install.cmake install_manifest.txt +*.dat ?/ + diff --git a/java/memory/pom.xml b/java/memory/pom.xml index 7efc8e6aa470c..c3a0347ba0589 100644 --- a/java/memory/pom.xml +++ b/java/memory/pom.xml @@ -40,6 +40,10 @@ org.slf4j slf4j-api + + org.apache.mnemonic + mnemonic-core + diff --git a/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledByteBufAllocator.java b/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledByteBufAllocator.java new file mode 100644 index 0000000000000..8d8295499a5cb --- /dev/null +++ b/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledByteBufAllocator.java @@ -0,0 +1,72 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +import io.netty.util.internal.PlatformDependent; + +import org.apache.mnemonic.CommonAllocator; + +/** + * Simplistic {@link ByteBufAllocator} implementation that does not pool anything. + */ +public final class MnemonicUnpooledByteBufAllocator> extends AbstractByteBufAllocator { + + private A mcalloc; + + /** + * Default instance + */ + // public static final UnpooledByteBufAllocator DEFAULT = + // new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred()); + + /** + * Create a new instance + * + * @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than + * a heap buffer + */ + public MnemonicUnpooledByteBufAllocator(boolean preferDirect, A mcallocator) { + super(preferDirect); + this.mcalloc = mcallocator; + } + + public A getAllocator() { + return this.mcalloc; + } + + @Override + protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { + return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); + } + + @Override + protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { + assert null != this.mcalloc; + ByteBuf buf; + if (PlatformDependent.hasUnsafe()) { + buf = new MnemonicUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); + } else { + buf = new MnemonicUnpooledDirectByteBuf(this, initialCapacity, maxCapacity); + } + + return toLeakAwareBuffer(buf); + } + + @Override + public boolean isDirectBufferPooled() { + return false; + } +} diff --git a/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledDirectByteBuf.java b/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledDirectByteBuf.java new file mode 100644 index 0000000000000..33569117a7e7d --- /dev/null +++ b/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledDirectByteBuf.java @@ -0,0 +1,620 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +import io.netty.util.internal.PlatformDependent; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import org.apache.arrow.memory.OutOfMemoryException; + +import org.apache.mnemonic.CommonAllocator; +import org.apache.mnemonic.MemBufferHolder; + +/** + * A NIO {@link ByteBuffer} based buffer. It is recommended to use {@link Unpooled#directBuffer(int)} + * and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the + * constructor explicitly. + */ +public class MnemonicUnpooledDirectByteBuf> extends AbstractReferenceCountedByteBuf { + + private final MnemonicUnpooledByteBufAllocator alloc; + + private MemBufferHolder bufholder; + private ByteBuffer buffer; + private ByteBuffer tmpNioBuf; + private int capacity; + private boolean doNotFree; + + /** + * Creates a new direct buffer. + * + * @param initialCapacity the initial capacity of the underlying direct buffer + * @param maxCapacity the maximum capacity of the underlying direct buffer + */ + protected MnemonicUnpooledDirectByteBuf(MnemonicUnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { + super(maxCapacity); + if (alloc == null) { + throw new NullPointerException("alloc"); + } + if (initialCapacity < 0) { + throw new IllegalArgumentException("initialCapacity: " + initialCapacity); + } + if (maxCapacity < 0) { + throw new IllegalArgumentException("maxCapacity: " + maxCapacity); + } + if (initialCapacity > maxCapacity) { + throw new IllegalArgumentException(String.format( + "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); + } + + this.alloc = alloc; + setByteBuffer(allocateDirect(initialCapacity)); + } + + /** + * Creates a new direct buffer by wrapping the specified initial buffer. + * + * @param maxCapacity the maximum capacity of the underlying direct buffer + */ + protected MnemonicUnpooledDirectByteBuf(MnemonicUnpooledByteBufAllocator alloc, MemBufferHolder initialBufHolder, int maxCapacity) { + super(maxCapacity); + if (alloc == null) { + throw new NullPointerException("alloc"); + } + if (initialBufHolder == null || initialBufHolder.get() == null) { + throw new NullPointerException("initialBufHolder"); + } + if (!initialBufHolder.get().isDirect()) { + throw new IllegalArgumentException("initialBufHolder is not a direct buffer."); + } + if (initialBufHolder.get().isReadOnly()) { + throw new IllegalArgumentException("initialBufHolder is a read-only buffer."); + } + + int initialCapacity = initialBufHolder.get().remaining(); + if (initialCapacity > maxCapacity) { + throw new IllegalArgumentException(String.format( + "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); + } + + this.alloc = alloc; + doNotFree = true; + setByteBuffer(initialBufHolder); + writerIndex(initialCapacity); + } + + /** + * Allocate a new direct {@link ByteBuffer} with the given initialCapacity. + */ + protected MemBufferHolder allocateDirect(int initialCapacity) { + MemBufferHolder mbholder = this.alloc.getAllocator().createBuffer(initialCapacity); + if (null == mbholder) { + throw new OutOfMemoryException("No more memory resource for this MnemonicUnpooledDirectByteBuf instance"); + } + return mbholder; + } + + /** + * Free a direct {@link ByteBuffer} + */ + protected void freeDirect(MemBufferHolder bufholder) { + if (this.bufholder != null && this.bufholder.get() != null) { + this.bufholder.destroy(); + } + this.buffer = null; + this.bufholder = null; + } + + private void setByteBuffer(MemBufferHolder bufholder) { + MemBufferHolder oldBufholder = this.bufholder; + if (oldBufholder != null) { + if (doNotFree) { + doNotFree = false; + } else { + freeDirect(oldBufholder); + } + } + + this.bufholder = bufholder; + this.buffer = this.bufholder.get(); + tmpNioBuf = null; + capacity = this.buffer.remaining(); + } + + @Override + public boolean isDirect() { + return true; + } + + @Override + public int capacity() { + return capacity; + } + + @Override + public ByteBuf capacity(int newCapacity) { + ensureAccessible(); + if (newCapacity < 0 || newCapacity > maxCapacity()) { + throw new IllegalArgumentException("newCapacity: " + newCapacity); + } + + int readerIndex = readerIndex(); + int writerIndex = writerIndex(); + + int oldCapacity = capacity; + if (newCapacity > oldCapacity) { + ByteBuffer oldBuffer = buffer; + MemBufferHolder newBufholder = allocateDirect(newCapacity); + ByteBuffer newBuffer = newBufholder.get(); + oldBuffer.position(0).limit(oldBuffer.capacity()); + newBuffer.position(0).limit(oldBuffer.capacity()); + newBuffer.put(oldBuffer); + newBuffer.clear(); + setByteBuffer(newBufholder); + } else if (newCapacity < oldCapacity) { + ByteBuffer oldBuffer = buffer; + MemBufferHolder newBufholder = allocateDirect(newCapacity); + ByteBuffer newBuffer = newBufholder.get(); + if (readerIndex < newCapacity) { + if (writerIndex > newCapacity) { + writerIndex(writerIndex = newCapacity); + } + oldBuffer.position(readerIndex).limit(writerIndex); + newBuffer.position(readerIndex).limit(writerIndex); + newBuffer.put(oldBuffer); + newBuffer.clear(); + } else { + setIndex(newCapacity, newCapacity); + } + setByteBuffer(newBufholder); + } + return this; + } + + @Override + public ByteBufAllocator alloc() { + return alloc; + } + + @Override + public ByteOrder order() { + return ByteOrder.BIG_ENDIAN; + } + + @Override + public boolean hasArray() { + return false; + } + + @Override + public byte[] array() { + throw new UnsupportedOperationException("direct buffer"); + } + + @Override + public int arrayOffset() { + throw new UnsupportedOperationException("direct buffer"); + } + + @Override + public boolean hasMemoryAddress() { + return false; + } + + @Override + public long memoryAddress() { + throw new UnsupportedOperationException(); + } + + @Override + public byte getByte(int index) { + ensureAccessible(); + return _getByte(index); + } + + @Override + protected byte _getByte(int index) { + return buffer.get(index); + } + + @Override + public short getShort(int index) { + ensureAccessible(); + return _getShort(index); + } + + @Override + protected short _getShort(int index) { + return buffer.getShort(index); + } + + @Override + public int getUnsignedMedium(int index) { + ensureAccessible(); + return _getUnsignedMedium(index); + } + + @Override + protected int _getUnsignedMedium(int index) { + return (getByte(index) & 0xff) << 16 | (getByte(index + 1) & 0xff) << 8 | getByte(index + 2) & 0xff; + } + + @Override + public int getInt(int index) { + ensureAccessible(); + return _getInt(index); + } + + @Override + protected int _getInt(int index) { + return buffer.getInt(index); + } + + @Override + public long getLong(int index) { + ensureAccessible(); + return _getLong(index); + } + + @Override + protected long _getLong(int index) { + return buffer.getLong(index); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + checkDstIndex(index, length, dstIndex, dst.capacity()); + if (dst.hasArray()) { + getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length); + } else if (dst.nioBufferCount() > 0) { + for (ByteBuffer bb: dst.nioBuffers(dstIndex, length)) { + int bbLen = bb.remaining(); + getBytes(index, bb); + index += bbLen; + } + } else { + dst.setBytes(dstIndex, this, index, length); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + getBytes(index, dst, dstIndex, length, false); + return this; + } + + private void getBytes(int index, byte[] dst, int dstIndex, int length, boolean internal) { + checkDstIndex(index, length, dstIndex, dst.length); + + if (dstIndex < 0 || dstIndex > dst.length - length) { + throw new IndexOutOfBoundsException(String.format( + "dstIndex: %d, length: %d (expected: range(0, %d))", dstIndex, length, dst.length)); + } + + ByteBuffer tmpBuf; + if (internal) { + tmpBuf = internalNioBuffer(); + } else { + tmpBuf = buffer.duplicate(); + } + tmpBuf.clear().position(index).limit(index + length); + tmpBuf.get(dst, dstIndex, length); + } + + @Override + public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { + checkReadableBytes(length); + getBytes(readerIndex, dst, dstIndex, length, true); + readerIndex += length; + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + getBytes(index, dst, false); + return this; + } + + private void getBytes(int index, ByteBuffer dst, boolean internal) { + checkIndex(index); + if (dst == null) { + throw new NullPointerException("dst"); + } + + int bytesToCopy = Math.min(capacity() - index, dst.remaining()); + ByteBuffer tmpBuf; + if (internal) { + tmpBuf = internalNioBuffer(); + } else { + tmpBuf = buffer.duplicate(); + } + tmpBuf.clear().position(index).limit(index + bytesToCopy); + dst.put(tmpBuf); + } + + @Override + public ByteBuf readBytes(ByteBuffer dst) { + int length = dst.remaining(); + checkReadableBytes(length); + getBytes(readerIndex, dst, true); + readerIndex += length; + return this; + } + + @Override + public ByteBuf setByte(int index, int value) { + ensureAccessible(); + _setByte(index, value); + return this; + } + + @Override + protected void _setByte(int index, int value) { + buffer.put(index, (byte) value); + } + + @Override + public ByteBuf setShort(int index, int value) { + ensureAccessible(); + _setShort(index, value); + return this; + } + + @Override + protected void _setShort(int index, int value) { + buffer.putShort(index, (short) value); + } + + @Override + public ByteBuf setMedium(int index, int value) { + ensureAccessible(); + _setMedium(index, value); + return this; + } + + @Override + protected void _setMedium(int index, int value) { + setByte(index, (byte) (value >>> 16)); + setByte(index + 1, (byte) (value >>> 8)); + setByte(index + 2, (byte) value); + } + + @Override + public ByteBuf setInt(int index, int value) { + ensureAccessible(); + _setInt(index, value); + return this; + } + + @Override + protected void _setInt(int index, int value) { + buffer.putInt(index, value); + } + + @Override + public ByteBuf setLong(int index, long value) { + ensureAccessible(); + _setLong(index, value); + return this; + } + + @Override + protected void _setLong(int index, long value) { + buffer.putLong(index, value); + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + checkSrcIndex(index, length, srcIndex, src.capacity()); + if (src.nioBufferCount() > 0) { + for (ByteBuffer bb: src.nioBuffers(srcIndex, length)) { + int bbLen = bb.remaining(); + setBytes(index, bb); + index += bbLen; + } + } else { + src.getBytes(srcIndex, this, index, length); + } + return this; + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + checkSrcIndex(index, length, srcIndex, src.length); + ByteBuffer tmpBuf = internalNioBuffer(); + tmpBuf.clear().position(index).limit(index + length); + tmpBuf.put(src, srcIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + ensureAccessible(); + ByteBuffer tmpBuf = internalNioBuffer(); + if (src == tmpBuf) { + src = src.duplicate(); + } + + tmpBuf.clear().position(index).limit(index + src.remaining()); + tmpBuf.put(src); + return this; + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + getBytes(index, out, length, false); + return this; + } + + private void getBytes(int index, OutputStream out, int length, boolean internal) throws IOException { + ensureAccessible(); + if (length == 0) { + return; + } + + if (buffer.hasArray()) { + out.write(buffer.array(), index + buffer.arrayOffset(), length); + } else { + byte[] tmp = new byte[length]; + ByteBuffer tmpBuf; + if (internal) { + tmpBuf = internalNioBuffer(); + } else { + tmpBuf = buffer.duplicate(); + } + tmpBuf.clear().position(index); + tmpBuf.get(tmp); + out.write(tmp); + } + } + + @Override + public ByteBuf readBytes(OutputStream out, int length) throws IOException { + checkReadableBytes(length); + getBytes(readerIndex, out, length, true); + readerIndex += length; + return this; + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + return getBytes(index, out, length, false); + } + + private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException { + ensureAccessible(); + if (length == 0) { + return 0; + } + + ByteBuffer tmpBuf; + if (internal) { + tmpBuf = internalNioBuffer(); + } else { + tmpBuf = buffer.duplicate(); + } + tmpBuf.clear().position(index).limit(index + length); + return out.write(tmpBuf); + } + + @Override + public int readBytes(GatheringByteChannel out, int length) throws IOException { + checkReadableBytes(length); + int readBytes = getBytes(readerIndex, out, length, true); + readerIndex += readBytes; + return readBytes; + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + ensureAccessible(); + if (buffer.hasArray()) { + return in.read(buffer.array(), buffer.arrayOffset() + index, length); + } else { + byte[] tmp = new byte[length]; + int readBytes = in.read(tmp); + if (readBytes <= 0) { + return readBytes; + } + ByteBuffer tmpBuf = internalNioBuffer(); + tmpBuf.clear().position(index); + tmpBuf.put(tmp, 0, readBytes); + return readBytes; + } + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + ensureAccessible(); + ByteBuffer tmpBuf = internalNioBuffer(); + tmpBuf.clear().position(index).limit(index + length); + try { + return in.read(tmpNioBuf); + } catch (ClosedChannelException ignored) { + return -1; + } + } + + @Override + public int nioBufferCount() { + return 1; + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return new ByteBuffer[] { nioBuffer(index, length) }; + } + + @Override + public ByteBuf copy(int index, int length) { + ensureAccessible(); + ByteBuffer src; + try { + src = (ByteBuffer) buffer.duplicate().clear().position(index).limit(index + length); + } catch (IllegalArgumentException ignored) { + throw new IndexOutOfBoundsException("Too many bytes to read - Need " + (index + length)); + } + + return alloc().directBuffer(length, maxCapacity()).writeBytes(src); + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + checkIndex(index, length); + return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length); + } + + private ByteBuffer internalNioBuffer() { + ByteBuffer tmpNioBuf = this.tmpNioBuf; + if (tmpNioBuf == null) { + this.tmpNioBuf = tmpNioBuf = buffer.duplicate(); + } + return tmpNioBuf; + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + checkIndex(index, length); + return ((ByteBuffer) buffer.duplicate().position(index).limit(index + length)).slice(); + } + + @Override + protected void deallocate() { + ByteBuffer buffer = this.buffer; + if (buffer == null) { + return; + } + + this.buffer = null; + + if (!doNotFree) { + freeDirect(this.bufholder); + } + } + + @Override + public ByteBuf unwrap() { + return null; + } +} diff --git a/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledUnsafeDirectByteBuf.java b/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledUnsafeDirectByteBuf.java new file mode 100644 index 0000000000000..aed9ff0c6aae1 --- /dev/null +++ b/java/memory/src/main/java/io/netty/buffer/MnemonicUnpooledUnsafeDirectByteBuf.java @@ -0,0 +1,542 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +import io.netty.util.internal.PlatformDependent; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import org.apache.arrow.memory.OutOfMemoryException; + +import org.apache.mnemonic.CommonAllocator; +import org.apache.mnemonic.MemBufferHolder; + +/** + * A NIO {@link ByteBuffer} based buffer. It is recommended to use {@link Unpooled#directBuffer(int)} + * and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the + * constructor explicitly. + */ +public class MnemonicUnpooledUnsafeDirectByteBuf> extends AbstractReferenceCountedByteBuf { + + private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN; + + private final MnemonicUnpooledByteBufAllocator alloc; + + private long memoryAddress; + private MemBufferHolder bufholder; + private ByteBuffer buffer; + private ByteBuffer tmpNioBuf; + private int capacity; + private boolean doNotFree; + + /** + * Creates a new direct buffer. + * + * @param initialCapacity the initial capacity of the underlying direct buffer + * @param maxCapacity the maximum capacity of the underlying direct buffer + */ + protected MnemonicUnpooledUnsafeDirectByteBuf(MnemonicUnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { + super(maxCapacity); + if (alloc == null) { + throw new NullPointerException("alloc"); + } + if (initialCapacity < 0) { + throw new IllegalArgumentException("initialCapacity: " + initialCapacity); + } + if (maxCapacity < 0) { + throw new IllegalArgumentException("maxCapacity: " + maxCapacity); + } + if (initialCapacity > maxCapacity) { + throw new IllegalArgumentException(String.format( + "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); + } + + this.alloc = alloc; + + setByteBuffer(allocateDirect(initialCapacity)); + } + + /** + * Creates a new direct buffer by wrapping the specified initial buffer. + * + * @param maxCapacity the maximum capacity of the underlying direct buffer + */ + protected MnemonicUnpooledUnsafeDirectByteBuf(MnemonicUnpooledByteBufAllocator alloc, MemBufferHolder initialBufHolder, int maxCapacity) { + super(maxCapacity); + if (alloc == null) { + throw new NullPointerException("alloc"); + } + if (initialBufHolder == null || initialBufHolder.get() == null) { + throw new NullPointerException("initialBufHolder"); + } + if (!initialBufHolder.get().isDirect()) { + throw new IllegalArgumentException("initialBufHolder is not a direct buffer."); + } + if (initialBufHolder.get().isReadOnly()) { + throw new IllegalArgumentException("initialBufHolder is a read-only buffer."); + } + + int initialCapacity = initialBufHolder.get().remaining(); + if (initialCapacity > maxCapacity) { + throw new IllegalArgumentException(String.format( + "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); + } + + this.alloc = alloc; + doNotFree = true; + setByteBuffer(initialBufHolder); + writerIndex(initialCapacity); + } + + /** + * Allocate a new direct {@link ByteBuffer} with the given initialCapacity. + */ + protected MemBufferHolder allocateDirect(int initialCapacity) { + MemBufferHolder mbholder = this.alloc.getAllocator().createBuffer(initialCapacity); + if (null == mbholder) { + throw new OutOfMemoryException("No more memory resource for this MnemonicUnpooledUnsafeDirectByteBuf instance"); + } + return mbholder; + } + + /** + * Free a direct {@link ByteBuffer} + */ + protected void freeDirect(MemBufferHolder bufholder) { + if (this.bufholder != null && this.bufholder.get() != null) { + this.bufholder.destroy(); + } + this.buffer = null; + this.bufholder = null; + } + + private void setByteBuffer(MemBufferHolder bufholder) { + MemBufferHolder oldBufholder = this.bufholder; + if (oldBufholder != null) { + if (doNotFree) { + doNotFree = false; + } else { + freeDirect(oldBufholder); + } + } + + this.bufholder = bufholder; + this.buffer = this.bufholder.get(); + memoryAddress = PlatformDependent.directBufferAddress(this.buffer); + tmpNioBuf = null; + capacity = (int)this.bufholder.getSize(); + } + + @Override + public boolean isDirect() { + return true; + } + + @Override + public int capacity() { + return capacity; + } + + @Override + public ByteBuf capacity(int newCapacity) { + ensureAccessible(); + if (newCapacity < 0 || newCapacity > maxCapacity()) { + throw new IllegalArgumentException("newCapacity: " + newCapacity); + } + + int readerIndex = readerIndex(); + int writerIndex = writerIndex(); + + int oldCapacity = capacity; + if (newCapacity > oldCapacity) { + ByteBuffer oldBuffer = buffer; + MemBufferHolder newBufholder = allocateDirect(newCapacity); + ByteBuffer newBuffer = newBufholder.get(); + oldBuffer.position(0).limit(oldBuffer.capacity()); + newBuffer.position(0).limit(oldBuffer.capacity()); + newBuffer.put(oldBuffer); + newBuffer.clear(); + setByteBuffer(newBufholder); + } else if (newCapacity < oldCapacity) { + ByteBuffer oldBuffer = buffer; + MemBufferHolder newBufholder = allocateDirect(newCapacity); + ByteBuffer newBuffer = newBufholder.get(); + if (readerIndex < newCapacity) { + if (writerIndex > newCapacity) { + writerIndex(writerIndex = newCapacity); + } + oldBuffer.position(readerIndex).limit(writerIndex); + newBuffer.position(readerIndex).limit(writerIndex); + newBuffer.put(oldBuffer); + newBuffer.clear(); + } else { + setIndex(newCapacity, newCapacity); + } + setByteBuffer(newBufholder); + } + return this; + } + + @Override + public ByteBufAllocator alloc() { + return alloc; + } + + @Override + public ByteOrder order() { + return buffer.order(); + } + + @Override + public boolean hasArray() { + return false; + } + + @Override + public byte[] array() { + throw new UnsupportedOperationException("direct buffer"); + } + + @Override + public int arrayOffset() { + throw new UnsupportedOperationException("direct buffer"); + } + + @Override + public boolean hasMemoryAddress() { + return true; + } + + @Override + public long memoryAddress() { + ensureAccessible(); + return memoryAddress; + } + + @Override + protected byte _getByte(int index) { + return PlatformDependent.getByte(addr(index)); + } + + @Override + protected short _getShort(int index) { + short v = PlatformDependent.getShort(addr(index)); + return NATIVE_ORDER? v : Short.reverseBytes(v); + } + + @Override + protected int _getUnsignedMedium(int index) { + long addr = addr(index); + return (PlatformDependent.getByte(addr) & 0xff) << 16 | + (PlatformDependent.getByte(addr + 1) & 0xff) << 8 | + PlatformDependent.getByte(addr + 2) & 0xff; + } + + @Override + protected int _getInt(int index) { + int v = PlatformDependent.getInt(addr(index)); + return NATIVE_ORDER? v : Integer.reverseBytes(v); + } + + @Override + protected long _getLong(int index) { + long v = PlatformDependent.getLong(addr(index)); + return NATIVE_ORDER? v : Long.reverseBytes(v); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + checkIndex(index, length); + if (dst == null) { + throw new NullPointerException("dst"); + } + if (dstIndex < 0 || dstIndex > dst.capacity() - length) { + throw new IndexOutOfBoundsException("dstIndex: " + dstIndex); + } + + if (dst.hasMemoryAddress()) { + PlatformDependent.copyMemory(addr(index), dst.memoryAddress() + dstIndex, length); + } else if (dst.hasArray()) { + PlatformDependent.copyMemory(addr(index), dst.array(), dst.arrayOffset() + dstIndex, length); + } else { + dst.setBytes(dstIndex, this, index, length); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + checkIndex(index, length); + if (dst == null) { + throw new NullPointerException("dst"); + } + if (dstIndex < 0 || dstIndex > dst.length - length) { + throw new IndexOutOfBoundsException(String.format( + "dstIndex: %d, length: %d (expected: range(0, %d))", dstIndex, length, dst.length)); + } + + if (length != 0) { + PlatformDependent.copyMemory(addr(index), dst, dstIndex, length); + } + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + getBytes(index, dst, false); + return this; + } + + private void getBytes(int index, ByteBuffer dst, boolean internal) { + checkIndex(index); + if (dst == null) { + throw new NullPointerException("dst"); + } + + int bytesToCopy = Math.min(capacity() - index, dst.remaining()); + ByteBuffer tmpBuf; + if (internal) { + tmpBuf = internalNioBuffer(); + } else { + tmpBuf = buffer.duplicate(); + } + tmpBuf.clear().position(index).limit(index + bytesToCopy); + dst.put(tmpBuf); + } + + @Override + public ByteBuf readBytes(ByteBuffer dst) { + int length = dst.remaining(); + checkReadableBytes(length); + getBytes(readerIndex, dst, true); + readerIndex += length; + return this; + } + + @Override + protected void _setByte(int index, int value) { + PlatformDependent.putByte(addr(index), (byte) value); + } + + @Override + protected void _setShort(int index, int value) { + PlatformDependent.putShort(addr(index), NATIVE_ORDER ? (short) value : Short.reverseBytes((short) value)); + } + + @Override + protected void _setMedium(int index, int value) { + long addr = addr(index); + PlatformDependent.putByte(addr, (byte) (value >>> 16)); + PlatformDependent.putByte(addr + 1, (byte) (value >>> 8)); + PlatformDependent.putByte(addr + 2, (byte) value); + } + + @Override + protected void _setInt(int index, int value) { + PlatformDependent.putInt(addr(index), NATIVE_ORDER ? value : Integer.reverseBytes(value)); + } + + @Override + protected void _setLong(int index, long value) { + PlatformDependent.putLong(addr(index), NATIVE_ORDER ? value : Long.reverseBytes(value)); + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + checkIndex(index, length); + if (src == null) { + throw new NullPointerException("src"); + } + if (srcIndex < 0 || srcIndex > src.capacity() - length) { + throw new IndexOutOfBoundsException("srcIndex: " + srcIndex); + } + + if (length != 0) { + if (src.hasMemoryAddress()) { + PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, addr(index), length); + } else if (src.hasArray()) { + PlatformDependent.copyMemory(src.array(), src.arrayOffset() + srcIndex, addr(index), length); + } else { + src.getBytes(srcIndex, this, index, length); + } + } + return this; + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + checkIndex(index, length); + if (length != 0) { + PlatformDependent.copyMemory(src, srcIndex, addr(index), length); + } + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + ensureAccessible(); + ByteBuffer tmpBuf = internalNioBuffer(); + if (src == tmpBuf) { + src = src.duplicate(); + } + + tmpBuf.clear().position(index).limit(index + src.remaining()); + tmpBuf.put(src); + return this; + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + ensureAccessible(); + if (length != 0) { + byte[] tmp = new byte[length]; + PlatformDependent.copyMemory(addr(index), tmp, 0, length); + out.write(tmp); + } + return this; + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + return getBytes(index, out, length, false); + } + + private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException { + ensureAccessible(); + if (length == 0) { + return 0; + } + + ByteBuffer tmpBuf; + if (internal) { + tmpBuf = internalNioBuffer(); + } else { + tmpBuf = buffer.duplicate(); + } + tmpBuf.clear().position(index).limit(index + length); + return out.write(tmpBuf); + } + + @Override + public int readBytes(GatheringByteChannel out, int length) throws IOException { + checkReadableBytes(length); + int readBytes = getBytes(readerIndex, out, length, true); + readerIndex += readBytes; + return readBytes; + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + checkIndex(index, length); + byte[] tmp = new byte[length]; + int readBytes = in.read(tmp); + if (readBytes > 0) { + PlatformDependent.copyMemory(tmp, 0, addr(index), readBytes); + } + return readBytes; + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + ensureAccessible(); + ByteBuffer tmpBuf = internalNioBuffer(); + tmpBuf.clear().position(index).limit(index + length); + try { + return in.read(tmpBuf); + } catch (ClosedChannelException ignored) { + return -1; + } + } + + @Override + public int nioBufferCount() { + return 1; + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return new ByteBuffer[] { nioBuffer(index, length) }; + } + + @Override + public ByteBuf copy(int index, int length) { + checkIndex(index, length); + ByteBuf copy = alloc().directBuffer(length, maxCapacity()); + if (length != 0) { + if (copy.hasMemoryAddress()) { + PlatformDependent.copyMemory(addr(index), copy.memoryAddress(), length); + copy.setIndex(0, length); + } else { + copy.writeBytes(this, index, length); + } + } + return copy; + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + checkIndex(index, length); + return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length); + } + + private ByteBuffer internalNioBuffer() { + ByteBuffer tmpNioBuf = this.tmpNioBuf; + if (tmpNioBuf == null) { + this.tmpNioBuf = tmpNioBuf = buffer.duplicate(); + } + return tmpNioBuf; + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + checkIndex(index, length); + return ((ByteBuffer) buffer.duplicate().position(index).limit(index + length)).slice(); + } + + @Override + protected void deallocate() { + ByteBuffer buffer = this.buffer; + if (buffer == null) { + return; + } + + this.buffer = null; + + if (!doNotFree) { + freeDirect(this.bufholder); + } + } + + @Override + public ByteBuf unwrap() { + return null; + } + + long addr(int index) { + return memoryAddress + index; + } + + @Override + protected SwappedByteBuf newSwappedByteBuf() { + return new UnsafeDirectSwappedByteBuf(this); + } +} diff --git a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java index b6de2e3aa2acb..acec32fbc7ce0 100644 --- a/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java +++ b/java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java @@ -39,6 +39,9 @@ public class PooledByteBufAllocatorL { private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60; public final UnsafeDirectLittleEndian empty; + + private static MnemonicUnpooledByteBufAllocator mubballoc = null; + private final AtomicLong hugeBufferSize = new AtomicLong(0); private final AtomicLong hugeBufferCount = new AtomicLong(0); private final AtomicLong normalBufferSize = new AtomicLong(0); @@ -50,6 +53,17 @@ public PooledByteBufAllocatorL() { empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER)); } + public static void setUpMnemonicUnpooledByteBufAllocator(MnemonicUnpooledByteBufAllocator mubballocator) { + if (null == mubballocator) { + throw new RuntimeException("MnemonicUnpooledByteBufAllocator is null for setup"); + } + mubballoc = mubballocator; + } + + public static void clearMnemonicUnpooledByteBufAllocator() { + mubballoc = null; + } + public UnsafeDirectLittleEndian allocate(int size) { try { return allocator.directBuffer(size, Integer.MAX_VALUE); @@ -156,9 +170,9 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa if (directArena != null) { - if (initialCapacity > directArena.chunkSize) { + if (initialCapacity > directArena.chunkSize || null != mubballoc) { // This is beyond chunk size so we'll allocate separately. - ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity); + ByteBuf buf = null != mubballoc ? mubballoc.directBuffer(initialCapacity, maxCapacity) : UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity); hugeBufferSize.addAndGet(buf.capacity()); hugeBufferCount.incrementAndGet(); diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestMnemonicBackedBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestMnemonicBackedBaseAllocator.java new file mode 100644 index 0000000000000..be08842ac41be --- /dev/null +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestMnemonicBackedBaseAllocator.java @@ -0,0 +1,688 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.junit.Ignore; +import org.junit.Test; +import org.junit.BeforeClass; +import org.junit.AfterClass; + +import org.apache.mnemonic.VolatileMemAllocator; +import org.apache.mnemonic.Utils; + +import io.netty.buffer.ArrowBuf; +import io.netty.buffer.ArrowBuf.TransferResult; +import io.netty.buffer.PooledByteBufAllocatorL; +import io.netty.buffer.MnemonicUnpooledByteBufAllocator; + +public class TestMnemonicBackedBaseAllocator { + // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBaseAllocator.class); + + private final static int MAX_ALLOCATION = 8 * 1024; + + private final static long MNEMONIC_CAPACITY = 1024 * 1024 * 40; + private static VolatileMemAllocator bdmalloc; + + @BeforeClass + public static void setupUpBeforeClass() throws Exception { + bdmalloc = new VolatileMemAllocator( + Utils.getVolatileMemoryAllocatorService("pmalloc"), + MNEMONIC_CAPACITY, "./base_allocator_test.dat"); + MnemonicUnpooledByteBufAllocator mubba = new MnemonicUnpooledByteBufAllocator(true, bdmalloc); + PooledByteBufAllocatorL.setUpMnemonicUnpooledByteBufAllocator(mubba); + } + + @AfterClass + public static void tearDownAfterClass() { + PooledByteBufAllocatorL.clearMnemonicUnpooledByteBufAllocator(); + if (null != bdmalloc) { + bdmalloc.close(); + } + } + +/* + // ---------------------------------------- DEBUG ----------------------------------- + + @After + public void checkBuffers() { + final int bufferCount = UnsafeDirectLittleEndian.getBufferCount(); + if (bufferCount != 0) { + UnsafeDirectLittleEndian.logBuffers(logger); + UnsafeDirectLittleEndian.releaseBuffers(); + } + + assertEquals(0, bufferCount); + } + +// @AfterClass +// public static void dumpBuffers() { +// UnsafeDirectLittleEndian.logBuffers(logger); +// } + + // ---------------------------------------- DEBUG ------------------------------------ +*/ + + + @Test + public void test_privateMax() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + final ArrowBuf arrowBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2); + assertNotNull("allocation failed", arrowBuf1); + + try (final BufferAllocator childAllocator = + rootAllocator.newChildAllocator("noLimits", 0, MAX_ALLOCATION)) { + final ArrowBuf arrowBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2); + assertNotNull("allocation failed", arrowBuf2); + arrowBuf2.release(); + } + + arrowBuf1.release(); + } + } + + @Test(expected = IllegalStateException.class) + public void testRootAllocator_closeWithOutstanding() throws Exception { + try { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + final ArrowBuf arrowBuf = rootAllocator.buffer(512); + assertNotNull("allocation failed", arrowBuf); + } + } finally { + /* + * We expect there to be one unreleased underlying buffer because we're closing + * without releasing it. + */ +/* + // ------------------------------- DEBUG --------------------------------- + final int bufferCount = UnsafeDirectLittleEndian.getBufferCount(); + UnsafeDirectLittleEndian.releaseBuffers(); + assertEquals(1, bufferCount); + // ------------------------------- DEBUG --------------------------------- +*/ + } + } + + @Test + public void testRootAllocator_getEmpty() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + final ArrowBuf arrowBuf = rootAllocator.buffer(0); + assertNotNull("allocation failed", arrowBuf); + assertEquals("capacity was non-zero", 0, arrowBuf.capacity()); + arrowBuf.release(); + } + } + + @Ignore // TODO(DRILL-2740) + @Test(expected = IllegalStateException.class) + public void testAllocator_unreleasedEmpty() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + @SuppressWarnings("unused") + final ArrowBuf arrowBuf = rootAllocator.buffer(0); + } + } + + @Test + public void testAllocator_transferOwnership() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + final BufferAllocator childAllocator1 = + rootAllocator.newChildAllocator("changeOwnership1", 0, MAX_ALLOCATION); + final BufferAllocator childAllocator2 = + rootAllocator.newChildAllocator("changeOwnership2", 0, MAX_ALLOCATION); + + final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4); + rootAllocator.verify(); + TransferResult transferOwnership = arrowBuf1.transferOwnership(childAllocator2); + assertEquiv(arrowBuf1, transferOwnership.buffer); + final boolean allocationFit = transferOwnership.allocationFit; + rootAllocator.verify(); + assertTrue(allocationFit); + + arrowBuf1.release(); + childAllocator1.close(); + rootAllocator.verify(); + + transferOwnership.buffer.release(); + childAllocator2.close(); + } + } + + @Test + public void testAllocator_shareOwnership() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("shareOwnership1", 0, MAX_ALLOCATION); + final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("shareOwnership2", 0, MAX_ALLOCATION); + final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4); + rootAllocator.verify(); + + // share ownership of buffer. + final ArrowBuf arrowBuf2 = arrowBuf1.retain(childAllocator2); + rootAllocator.verify(); + assertNotNull(arrowBuf2); + assertNotEquals(arrowBuf2, arrowBuf1); + assertEquiv(arrowBuf1, arrowBuf2); + + // release original buffer (thus transferring ownership to allocator 2. (should leave allocator 1 in empty state) + arrowBuf1.release(); + rootAllocator.verify(); + childAllocator1.close(); + rootAllocator.verify(); + + final BufferAllocator childAllocator3 = rootAllocator.newChildAllocator("shareOwnership3", 0, MAX_ALLOCATION); + final ArrowBuf arrowBuf3 = arrowBuf1.retain(childAllocator3); + assertNotNull(arrowBuf3); + assertNotEquals(arrowBuf3, arrowBuf1); + assertNotEquals(arrowBuf3, arrowBuf2); + assertEquiv(arrowBuf1, arrowBuf3); + rootAllocator.verify(); + + arrowBuf2.release(); + rootAllocator.verify(); + childAllocator2.close(); + rootAllocator.verify(); + + arrowBuf3.release(); + rootAllocator.verify(); + childAllocator3.close(); + } + } + + @Test + public void testRootAllocator_createChildAndUse() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createChildAndUse", 0, + MAX_ALLOCATION)) { + final ArrowBuf arrowBuf = childAllocator.buffer(512); + assertNotNull("allocation failed", arrowBuf); + arrowBuf.release(); + } + } + } + + @Test(expected = IllegalStateException.class) + public void testRootAllocator_createChildDontClose() throws Exception { + try { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createChildDontClose", 0, + MAX_ALLOCATION); + final ArrowBuf arrowBuf = childAllocator.buffer(512); + assertNotNull("allocation failed", arrowBuf); + } + } finally { + /* + * We expect one underlying buffer because we closed a child allocator without + * releasing the buffer allocated from it. + */ +/* + // ------------------------------- DEBUG --------------------------------- + final int bufferCount = UnsafeDirectLittleEndian.getBufferCount(); + UnsafeDirectLittleEndian.releaseBuffers(); + assertEquals(1, bufferCount); + // ------------------------------- DEBUG --------------------------------- +*/ + } + } + + private static void allocateAndFree(final BufferAllocator allocator) { + final ArrowBuf arrowBuf = allocator.buffer(512); + assertNotNull("allocation failed", arrowBuf); + arrowBuf.release(); + + final ArrowBuf arrowBuf2 = allocator.buffer(MAX_ALLOCATION); + assertNotNull("allocation failed", arrowBuf2); + arrowBuf2.release(); + + final int nBufs = 8; + final ArrowBuf[] arrowBufs = new ArrowBuf[nBufs]; + for (int i = 0; i < arrowBufs.length; ++i) { + ArrowBuf arrowBufi = allocator.buffer(MAX_ALLOCATION / nBufs); + assertNotNull("allocation failed", arrowBufi); + arrowBufs[i] = arrowBufi; + } + for (ArrowBuf arrowBufi : arrowBufs) { + arrowBufi.release(); + } + } + + @Test + public void testAllocator_manyAllocations() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator = + rootAllocator.newChildAllocator("manyAllocations", 0, MAX_ALLOCATION)) { + allocateAndFree(childAllocator); + } + } + } + + @Test + public void testAllocator_overAllocate() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator = + rootAllocator.newChildAllocator("overAllocate", 0, MAX_ALLOCATION)) { + allocateAndFree(childAllocator); + + try { + childAllocator.buffer(MAX_ALLOCATION + 1); + fail("allocated memory beyond max allowed"); + } catch (OutOfMemoryException e) { + // expected + } + } + } + } + + @Test + public void testAllocator_overAllocateParent() throws Exception { + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator = + rootAllocator.newChildAllocator("overAllocateParent", 0, MAX_ALLOCATION)) { + final ArrowBuf arrowBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2); + assertNotNull("allocation failed", arrowBuf1); + final ArrowBuf arrowBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2); + assertNotNull("allocation failed", arrowBuf2); + + try { + childAllocator.buffer(MAX_ALLOCATION / 4); + fail("allocated memory beyond max allowed"); + } catch (OutOfMemoryException e) { + // expected + } + + arrowBuf1.release(); + arrowBuf2.release(); + } + } + } + + private static void testAllocator_sliceUpBufferAndRelease( + final RootAllocator rootAllocator, final BufferAllocator bufferAllocator) { + final ArrowBuf arrowBuf1 = bufferAllocator.buffer(MAX_ALLOCATION / 2); + rootAllocator.verify(); + + final ArrowBuf arrowBuf2 = arrowBuf1.slice(16, arrowBuf1.capacity() - 32); + rootAllocator.verify(); + final ArrowBuf arrowBuf3 = arrowBuf2.slice(16, arrowBuf2.capacity() - 32); + rootAllocator.verify(); + @SuppressWarnings("unused") + final ArrowBuf arrowBuf4 = arrowBuf3.slice(16, arrowBuf3.capacity() - 32); + rootAllocator.verify(); + + arrowBuf3.release(); // since they share refcounts, one is enough to release them all + rootAllocator.verify(); + } + + @Test + public void testAllocator_createSlices() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator); + + try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createSlices", 0, MAX_ALLOCATION)) { + testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator); + } + rootAllocator.verify(); + + testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator); + + try (final BufferAllocator childAllocator = rootAllocator.newChildAllocator("createSlices", 0, MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator2 = + childAllocator.newChildAllocator("createSlices", 0, MAX_ALLOCATION)) { + final ArrowBuf arrowBuf1 = childAllocator2.buffer(MAX_ALLOCATION / 8); + @SuppressWarnings("unused") + final ArrowBuf arrowBuf2 = arrowBuf1.slice(MAX_ALLOCATION / 16, MAX_ALLOCATION / 16); + testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator); + arrowBuf1.release(); + rootAllocator.verify(); + } + rootAllocator.verify(); + + testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator); + } + rootAllocator.verify(); + } + } + + @Test + public void testAllocator_sliceRanges() throws Exception { +// final AllocatorOwner allocatorOwner = new NamedOwner("sliceRanges"); + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + // Populate a buffer with byte values corresponding to their indices. + final ArrowBuf arrowBuf = rootAllocator.buffer(256); + assertEquals(256, arrowBuf.capacity()); + assertEquals(0, arrowBuf.readerIndex()); + assertEquals(0, arrowBuf.readableBytes()); + assertEquals(0, arrowBuf.writerIndex()); + assertEquals(256, arrowBuf.writableBytes()); + + final ArrowBuf slice3 = (ArrowBuf) arrowBuf.slice(); + assertEquals(0, slice3.readerIndex()); + assertEquals(0, slice3.readableBytes()); + assertEquals(0, slice3.writerIndex()); +// assertEquals(256, slice3.capacity()); +// assertEquals(256, slice3.writableBytes()); + + for (int i = 0; i < 256; ++i) { + arrowBuf.writeByte(i); + } + assertEquals(0, arrowBuf.readerIndex()); + assertEquals(256, arrowBuf.readableBytes()); + assertEquals(256, arrowBuf.writerIndex()); + assertEquals(0, arrowBuf.writableBytes()); + + final ArrowBuf slice1 = (ArrowBuf) arrowBuf.slice(); + assertEquals(0, slice1.readerIndex()); + assertEquals(256, slice1.readableBytes()); + for (int i = 0; i < 10; ++i) { + assertEquals(i, slice1.readByte()); + } + assertEquals(256 - 10, slice1.readableBytes()); + for (int i = 0; i < 256; ++i) { + assertEquals((byte) i, slice1.getByte(i)); + } + + final ArrowBuf slice2 = arrowBuf.slice(25, 25); + assertEquals(0, slice2.readerIndex()); + assertEquals(25, slice2.readableBytes()); + for (int i = 25; i < 50; ++i) { + assertEquals(i, slice2.readByte()); + } + +/* + for(int i = 256; i > 0; --i) { + slice3.writeByte(i - 1); + } + for(int i = 0; i < 256; ++i) { + assertEquals(255 - i, slice1.getByte(i)); + } +*/ + + arrowBuf.release(); // all the derived buffers share this fate + } + } + + @Test + public void testAllocator_slicesOfSlices() throws Exception { +// final AllocatorOwner allocatorOwner = new NamedOwner("slicesOfSlices"); + try (final RootAllocator rootAllocator = + new RootAllocator(MAX_ALLOCATION)) { + // Populate a buffer with byte values corresponding to their indices. + final ArrowBuf arrowBuf = rootAllocator.buffer(256); + for (int i = 0; i < 256; ++i) { + arrowBuf.writeByte(i); + } + + // Slice it up. + final ArrowBuf slice0 = arrowBuf.slice(0, arrowBuf.capacity()); + for (int i = 0; i < 256; ++i) { + assertEquals((byte) i, arrowBuf.getByte(i)); + } + + final ArrowBuf slice10 = slice0.slice(10, arrowBuf.capacity() - 10); + for (int i = 10; i < 256; ++i) { + assertEquals((byte) i, slice10.getByte(i - 10)); + } + + final ArrowBuf slice20 = slice10.slice(10, arrowBuf.capacity() - 20); + for (int i = 20; i < 256; ++i) { + assertEquals((byte) i, slice20.getByte(i - 20)); + } + + final ArrowBuf slice30 = slice20.slice(10, arrowBuf.capacity() - 30); + for (int i = 30; i < 256; ++i) { + assertEquals((byte) i, slice30.getByte(i - 30)); + } + + arrowBuf.release(); + } + } + + @Test + public void testAllocator_transferSliced() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferSliced1", 0, MAX_ALLOCATION); + final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferSliced2", 0, MAX_ALLOCATION); + + final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8); + final ArrowBuf arrowBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8); + + final ArrowBuf arrowBuf1s = arrowBuf1.slice(0, arrowBuf1.capacity() / 2); + final ArrowBuf arrowBuf2s = arrowBuf2.slice(0, arrowBuf2.capacity() / 2); + + rootAllocator.verify(); + + TransferResult result1 = arrowBuf2s.transferOwnership(childAllocator1); + assertEquiv(arrowBuf2s, result1.buffer); + rootAllocator.verify(); + TransferResult result2 = arrowBuf1s.transferOwnership(childAllocator2); + assertEquiv(arrowBuf1s, result2.buffer); + rootAllocator.verify(); + + result1.buffer.release(); + result2.buffer.release(); + + arrowBuf1s.release(); // releases arrowBuf1 + arrowBuf2s.release(); // releases arrowBuf2 + + childAllocator1.close(); + childAllocator2.close(); + } + } + + @Test + public void testAllocator_shareSliced() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION); + final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION); + + final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8); + final ArrowBuf arrowBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8); + + final ArrowBuf arrowBuf1s = arrowBuf1.slice(0, arrowBuf1.capacity() / 2); + final ArrowBuf arrowBuf2s = arrowBuf2.slice(0, arrowBuf2.capacity() / 2); + + rootAllocator.verify(); + + final ArrowBuf arrowBuf2s1 = arrowBuf2s.retain(childAllocator1); + assertEquiv(arrowBuf2s, arrowBuf2s1); + final ArrowBuf arrowBuf1s2 = arrowBuf1s.retain(childAllocator2); + assertEquiv(arrowBuf1s, arrowBuf1s2); + rootAllocator.verify(); + + arrowBuf1s.release(); // releases arrowBuf1 + arrowBuf2s.release(); // releases arrowBuf2 + rootAllocator.verify(); + + arrowBuf2s1.release(); // releases the shared arrowBuf2 slice + arrowBuf1s2.release(); // releases the shared arrowBuf1 slice + + childAllocator1.close(); + childAllocator2.close(); + } + } + + @Test + public void testAllocator_transferShared() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("transferShared1", 0, MAX_ALLOCATION); + final BufferAllocator childAllocator2 = rootAllocator.newChildAllocator("transferShared2", 0, MAX_ALLOCATION); + final BufferAllocator childAllocator3 = rootAllocator.newChildAllocator("transferShared3", 0, MAX_ALLOCATION); + + final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8); + + boolean allocationFit; + + ArrowBuf arrowBuf2 = arrowBuf1.retain(childAllocator2); + rootAllocator.verify(); + assertNotNull(arrowBuf2); + assertNotEquals(arrowBuf2, arrowBuf1); + assertEquiv(arrowBuf1, arrowBuf2); + + TransferResult result = arrowBuf1.transferOwnership(childAllocator3); + allocationFit = result.allocationFit; + final ArrowBuf arrowBuf3 = result.buffer; + assertTrue(allocationFit); + assertEquiv(arrowBuf1, arrowBuf3); + rootAllocator.verify(); + + // Since childAllocator3 now has childAllocator1's buffer, 1, can close + arrowBuf1.release(); + childAllocator1.close(); + rootAllocator.verify(); + + arrowBuf2.release(); + childAllocator2.close(); + rootAllocator.verify(); + + final BufferAllocator childAllocator4 = rootAllocator.newChildAllocator("transferShared4", 0, MAX_ALLOCATION); + TransferResult result2 = arrowBuf3.transferOwnership(childAllocator4); + allocationFit = result.allocationFit; + final ArrowBuf arrowBuf4 = result2.buffer; + assertTrue(allocationFit); + assertEquiv(arrowBuf3, arrowBuf4); + rootAllocator.verify(); + + arrowBuf3.release(); + childAllocator3.close(); + rootAllocator.verify(); + + arrowBuf4.release(); + childAllocator4.close(); + rootAllocator.verify(); + } + } + + @Test + public void testAllocator_unclaimedReservation() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator childAllocator1 = + rootAllocator.newChildAllocator("unclaimedReservation", 0, MAX_ALLOCATION)) { + try (final AllocationReservation reservation = childAllocator1.newReservation()) { + assertTrue(reservation.add(64)); + } + rootAllocator.verify(); + } + } + } + + @Test + public void testAllocator_claimedReservation() throws Exception { + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + + try (final BufferAllocator childAllocator1 = rootAllocator.newChildAllocator("claimedReservation", 0, + MAX_ALLOCATION)) { + + try (final AllocationReservation reservation = childAllocator1.newReservation()) { + assertTrue(reservation.add(32)); + assertTrue(reservation.add(32)); + + final ArrowBuf arrowBuf = reservation.allocateBuffer(); + assertEquals(64, arrowBuf.capacity()); + rootAllocator.verify(); + + arrowBuf.release(); + rootAllocator.verify(); + } + rootAllocator.verify(); + } + } + } + + @Test + public void multiple() throws Exception { + final String owner = "test"; + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + + final int op = 100000; + + BufferAllocator frag1 = allocator.newChildAllocator(owner, 1500000, Long.MAX_VALUE); + BufferAllocator frag2 = allocator.newChildAllocator(owner, 500000, Long.MAX_VALUE); + + allocator.verify(); + + BufferAllocator allocator11 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE); + ArrowBuf b11 = allocator11.buffer(1000000); + + allocator.verify(); + + BufferAllocator allocator12 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE); + ArrowBuf b12 = allocator12.buffer(500000); + + allocator.verify(); + + BufferAllocator allocator21 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE); + + allocator.verify(); + + BufferAllocator allocator22 = frag2.newChildAllocator(owner, op, Long.MAX_VALUE); + ArrowBuf b22 = allocator22.buffer(2000000); + + allocator.verify(); + + BufferAllocator frag3 = allocator.newChildAllocator(owner, 1000000, Long.MAX_VALUE); + + allocator.verify(); + + BufferAllocator allocator31 = frag3.newChildAllocator(owner, op, Long.MAX_VALUE); + ArrowBuf b31a = allocator31.buffer(200000); + + allocator.verify(); + + // Previously running operator completes + b22.release(); + + allocator.verify(); + + allocator22.close(); + + b31a.release(); + allocator31.close(); + + b12.release(); + allocator12.close(); + + allocator21.close(); + + b11.release(); + allocator11.close(); + + frag1.close(); + frag2.close(); + frag3.close(); + + } + } + + public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) { + assertEquals(origBuf.readerIndex(), newBuf.readerIndex()); + assertEquals(origBuf.writerIndex(), newBuf.writerIndex()); + } +} diff --git a/java/pom.xml b/java/pom.xml index 0a0f2e0ce8f65..4d664deb17933 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -35,9 +35,11 @@ 4.0.49.Final 2.7.9 2.7.1 + 0.9.0-incubating-SNAPSHOT 1.2.0-3f79e055 2 false + ${session.executionRootDirectory}/target/service-dist @@ -80,7 +82,13 @@ - + + + kr.motd.maven + os-maven-plugin + 1.4.0.Final + + org.apache.rat @@ -272,7 +280,6 @@ - org.apache.maven.plugins maven-checkstyle-plugin @@ -333,6 +340,63 @@ + + org.apache.maven.plugins + maven-dependency-plugin + 2.10 + + + copy + package + + copy + + + + + org.apache.mnemonic + mnemonic-nvml-vmem-service + ${dep.mnemonic.version} + ${os.detected.classifier} + jar + + + org.apache.mnemonic + mnemonic-sys-vmem-service + ${dep.mnemonic.version} + ${os.detected.classifier} + jar + + + org.apache.mnemonic + mnemonic-nvml-pmem-service + ${dep.mnemonic.version} + ${os.detected.classifier} + jar + + + org.apache.mnemonic + mnemonic-pmalloc-service + ${dep.mnemonic.version} + ${os.detected.classifier} + jar + + + org.apache.mnemonic + mnemonic-utilities-service + ${dep.mnemonic.version} + ${os.detected.classifier} + jar + + + ${service.dist.dir} + false + false + true + + + + @@ -374,6 +438,7 @@ -Darrow.vector.max_allocation_bytes=1048576 + -Djava.ext.dirs=${service.dist.dir} @@ -524,6 +589,11 @@ slf4j-api ${dep.slf4j.version} + + org.apache.mnemonic + mnemonic-core + ${dep.mnemonic.version} + @@ -588,7 +658,6 @@ 0.9.44 test - diff --git a/java/vector/pom.xml b/java/vector/pom.xml index 46e06aa1e3f97..1cd58853a55cf 100644 --- a/java/vector/pom.xml +++ b/java/vector/pom.xml @@ -82,6 +82,10 @@ org.slf4j slf4j-api + + org.apache.mnemonic + mnemonic-core + diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestMnemonicBackedValueVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestMnemonicBackedValueVector.java new file mode 100644 index 0000000000000..bedacb19fc6f9 --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestMnemonicBackedValueVector.java @@ -0,0 +1,1616 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector; +import org.apache.arrow.vector.holders.VarCharHolder; +import org.apache.arrow.vector.util.OversizedAllocationException; + +import static org.apache.arrow.vector.TestUtils.newNullableVarBinaryVector; +import static org.apache.arrow.vector.TestUtils.newNullableVarCharVector; +import static org.apache.arrow.vector.TestUtils.newVector; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; + +import org.apache.arrow.vector.schema.ArrowRecordBatch; +import org.apache.arrow.vector.schema.TypeLayout; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.util.TransferPair; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.BeforeClass; +import org.junit.AfterClass; + +import org.apache.mnemonic.VolatileMemAllocator; +import org.apache.mnemonic.Utils; +import io.netty.buffer.PooledByteBufAllocatorL; +import io.netty.buffer.MnemonicUnpooledByteBufAllocator; +import io.netty.buffer.ArrowBuf; + + +public class TestMnemonicBackedValueVector{ + + private final static String EMPTY_SCHEMA_PATH = ""; + + private BufferAllocator allocator; + + private final static long MNEMONIC_CAPACITY = 1024 * 1024 * 1024 * 1; + private static VolatileMemAllocator bdmalloc; + + @BeforeClass + public static void setupUpBeforeClass() throws Exception { + bdmalloc = new VolatileMemAllocator( + Utils.getVolatileMemoryAllocatorService("pmalloc"), + MNEMONIC_CAPACITY, "./value_vector_test.dat"); + MnemonicUnpooledByteBufAllocator mubba = new MnemonicUnpooledByteBufAllocator(true, bdmalloc); + PooledByteBufAllocatorL.setUpMnemonicUnpooledByteBufAllocator(mubba); + } + + @AfterClass + public static void tearDownAfterClass() { + PooledByteBufAllocatorL.clearMnemonicUnpooledByteBufAllocator(); + if (null != bdmalloc) { + bdmalloc.close(); + } + } + + @Before + public void init() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + private final static Charset utf8Charset = Charset.forName("UTF-8"); + private final static byte[] STR1 = "AAAAA1".getBytes(utf8Charset); + private final static byte[] STR2 = "BBBBBBBBB2".getBytes(utf8Charset); + private final static byte[] STR3 = "CCCC3".getBytes(utf8Charset); + private final static byte[] STR4 = "DDDDDDDD4".getBytes(utf8Charset); + private final static byte[] STR5 = "EEE5".getBytes(utf8Charset); + private final static byte[] STR6 = "FFFFF6".getBytes(utf8Charset); + private final static int MAX_VALUE_COUNT = + Integer.getInteger("arrow.vector.max_allocation_bytes", Integer.MAX_VALUE)/4; + private final static int MAX_VALUE_COUNT_8BYTE = MAX_VALUE_COUNT/2; + + @After + public void terminate() throws Exception { + allocator.close(); + } + + /* + * Tests for Fixed-Width vectors + * + * Covered types as of now + * + * -- UInt4Vector + * -- IntVector + * -- Float4Vector + * -- Float8Vector + * + * -- NullableUInt4Vector + * -- NullableIntVector + * -- NullableFloat4Vector + * + * TODO: + * + * -- SmallIntVector + * -- BigIntVector + * -- TinyIntVector + */ + + @Test /* UInt4Vector */ + public void testFixedType1() { + + // Create a new value vector for 1024 integers. + try (final UInt4Vector vector = new UInt4Vector(EMPTY_SCHEMA_PATH, allocator)) { + + boolean error = false; + int initialCapacity = 0; + final UInt4Vector.Mutator mutator = vector.getMutator(); + final UInt4Vector.Accessor accessor = vector.getAccessor(); + + vector.allocateNew(1024); + initialCapacity = vector.getValueCapacity(); + assertEquals(1024, initialCapacity); + + // Put and set a few values + mutator.setSafe(0, 100); + mutator.setSafe(1, 101); + mutator.setSafe(100, 102); + mutator.setSafe(1022, 103); + mutator.setSafe(1023, 104); + + assertEquals(100, accessor.get(0)); + assertEquals(101, accessor.get(1)); + assertEquals(102, accessor.get(100)); + assertEquals(103, accessor.get(1022)); + assertEquals(104, accessor.get(1023)); + + try { + mutator.set(1024, 10000); + } + catch (IndexOutOfBoundsException ie) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + try { + accessor.get(1024); + } + catch (IndexOutOfBoundsException ie) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + /* this should trigger a realloc() */ + mutator.setSafe(1024, 10000); + + /* underlying buffer should now be able to store double the number of values */ + assertEquals(initialCapacity * 2, vector.getValueCapacity()); + + /* check vector data after realloc */ + assertEquals(100, accessor.get(0)); + assertEquals(101, accessor.get(1)); + assertEquals(102, accessor.get(100)); + assertEquals(103, accessor.get(1022)); + assertEquals(104, accessor.get(1023)); + assertEquals(10000, accessor.get(1024)); + + /* reset the vector */ + vector.reset(); + + /* capacity shouldn't change after reset */ + assertEquals(initialCapacity * 2, vector.getValueCapacity()); + + /* vector data should have been zeroed out */ + for(int i = 0; i < (initialCapacity * 2); i++) { + assertEquals("non-zero data not expected at index: " + i, 0, accessor.get(i)); + } + } + } + + @Test /* IntVector */ + public void testFixedType2() { + try (final IntVector intVector = new IntVector(EMPTY_SCHEMA_PATH, allocator)) { + final IntVector.Mutator mutator = intVector.getMutator(); + final IntVector.Accessor accessor = intVector.getAccessor(); + boolean error = false; + int initialCapacity = 16; + + /* we should not throw exception for these values of capacity */ + intVector.setInitialCapacity(MAX_VALUE_COUNT - 1); + intVector.setInitialCapacity(MAX_VALUE_COUNT); + + try { + intVector.setInitialCapacity(MAX_VALUE_COUNT + 1); + } + catch (OversizedAllocationException oe) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + intVector.setInitialCapacity(initialCapacity); + /* no memory allocation has happened yet so capacity of underlying buffer should be 0 */ + assertEquals(0, intVector.getValueCapacity()); + + /* allocate 64 bytes (16 * 4) */ + intVector.allocateNew(); + /* underlying buffer should be able to store 16 values */ + assertEquals(initialCapacity, intVector.getValueCapacity()); + + /* populate the vector */ + int j = 1; + for(int i = 0; i < 16; i += 2) { + mutator.set(i, j); + j++; + } + + try { + mutator.set(16, 9); + } + catch (IndexOutOfBoundsException ie) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + /* check vector contents */ + j = 1; + for(int i = 0; i < 16; i += 2) { + assertEquals("unexpected value at index: " + i, j, accessor.get(i)); + j++; + } + + try { + accessor.get(16); + } + catch (IndexOutOfBoundsException ie) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + /* this should trigger a realloc() */ + mutator.setSafe(16, 9); + + /* underlying buffer should now be able to store double the number of values */ + assertEquals(initialCapacity * 2, intVector.getValueCapacity()); + + /* vector data should still be intact after realloc */ + j = 1; + for(int i = 0; i <= 16; i += 2) { + assertEquals("unexpected value at index: " + i, j, accessor.get(i)); + j++; + } + + /* reset the vector */ + intVector.reset(); + + /* capacity shouldn't change after reset */ + assertEquals(initialCapacity * 2, intVector.getValueCapacity()); + + /* vector data should have been zeroed out */ + for(int i = 0; i < (initialCapacity * 2); i++) { + assertEquals("non-zero data not expected at index: " + i, 0, accessor.get(i)); + } + } + } + + @Test /* Float4Vector */ + public void testFixedType3() { + try (final Float4Vector floatVector = new Float4Vector(EMPTY_SCHEMA_PATH, allocator)) { + final Float4Vector.Mutator mutator = floatVector.getMutator(); + final Float4Vector.Accessor accessor = floatVector.getAccessor(); + boolean error = false; + int initialCapacity = 16; + + /* we should not throw exception for these values of capacity */ + floatVector.setInitialCapacity(MAX_VALUE_COUNT - 1); + floatVector.setInitialCapacity(MAX_VALUE_COUNT); + + try { + floatVector.setInitialCapacity(MAX_VALUE_COUNT + 1); + } + catch (OversizedAllocationException oe) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + floatVector.setInitialCapacity(initialCapacity); + /* no memory allocation has happened yet so capacity of underlying buffer should be 0 */ + assertEquals(0, floatVector.getValueCapacity()); + + /* allocate 64 bytes (16 * 4) */ + floatVector.allocateNew(); + /* underlying buffer should be able to store 16 values */ + assertEquals(initialCapacity, floatVector.getValueCapacity()); + + floatVector.zeroVector(); + + /* populate the vector */ + mutator.set(0, 1.5f); + mutator.set(2, 2.5f); + mutator.set(4, 3.3f); + mutator.set(6, 4.8f); + mutator.set(8, 5.6f); + mutator.set(10, 6.6f); + mutator.set(12, 7.8f); + mutator.set(14, 8.5f); + + try { + mutator.set(16, 9.5f); + } + catch (IndexOutOfBoundsException ie) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + /* check vector contents */ + assertEquals(1.5f, accessor.get(0), 0); + assertEquals(2.5f, accessor.get(2), 0); + assertEquals(3.3f, accessor.get(4), 0); + assertEquals(4.8f, accessor.get(6), 0); + assertEquals(5.6f, accessor.get(8), 0); + assertEquals(6.6f, accessor.get(10), 0); + assertEquals(7.8f, accessor.get(12), 0); + assertEquals(8.5f, accessor.get(14), 0); + + try { + accessor.get(16); + } + catch (IndexOutOfBoundsException ie) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + /* this should trigger a realloc() */ + mutator.setSafe(16, 9.5f); + + /* underlying buffer should now be able to store double the number of values */ + assertEquals(initialCapacity * 2, floatVector.getValueCapacity()); + + /* vector data should still be intact after realloc */ + assertEquals(1.5f, accessor.get(0), 0); + assertEquals(2.5f, accessor.get(2), 0); + assertEquals(3.3f, accessor.get(4), 0); + assertEquals(4.8f, accessor.get(6), 0); + assertEquals(5.6f, accessor.get(8), 0); + assertEquals(6.6f, accessor.get(10), 0); + assertEquals(7.8f, accessor.get(12), 0); + assertEquals(8.5f, accessor.get(14), 0); + assertEquals(9.5f, accessor.get(16), 0); + + /* reset the vector */ + floatVector.reset(); + + /* capacity shouldn't change after reset */ + assertEquals(initialCapacity * 2, floatVector.getValueCapacity()); + + /* vector data should be zeroed out */ + for(int i = 0; i < (initialCapacity * 2); i++) { + assertEquals("non-zero data not expected at index: " + i, 0, accessor.get(i), 0); + } + } + } + + @Test /* Float8Vector */ + public void testFixedType4() { + try (final Float8Vector floatVector = new Float8Vector(EMPTY_SCHEMA_PATH, allocator)) { + final Float8Vector.Mutator mutator = floatVector.getMutator(); + final Float8Vector.Accessor accessor = floatVector.getAccessor(); + boolean error = false; + int initialCapacity = 16; + + /* we should not throw exception for these values of capacity */ + floatVector.setInitialCapacity(MAX_VALUE_COUNT_8BYTE - 1); + floatVector.setInitialCapacity(MAX_VALUE_COUNT_8BYTE); + + try { + floatVector.setInitialCapacity(MAX_VALUE_COUNT_8BYTE + 1); + } + catch (OversizedAllocationException oe) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + floatVector.setInitialCapacity(initialCapacity); + /* no memory allocation has happened yet so capacity of underlying buffer should be 0 */ + assertEquals(0, floatVector.getValueCapacity()); + + /* allocate 128 bytes (16 * 8) */ + floatVector.allocateNew(); + /* underlying buffer should be able to store 16 values */ + assertEquals(initialCapacity, floatVector.getValueCapacity()); + + /* populate the vector */ + mutator.set(0, 1.55); + mutator.set(2, 2.53); + mutator.set(4, 3.36); + mutator.set(6, 4.82); + mutator.set(8, 5.67); + mutator.set(10, 6.67); + mutator.set(12, 7.87); + mutator.set(14, 8.56); + + try { + mutator.set(16, 9.53); + } + catch (IndexOutOfBoundsException ie) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + /* check vector contents */ + assertEquals(1.55, accessor.get(0), 0); + assertEquals(2.53, accessor.get(2), 0); + assertEquals(3.36, accessor.get(4), 0); + assertEquals(4.82, accessor.get(6), 0); + assertEquals(5.67, accessor.get(8), 0); + assertEquals(6.67, accessor.get(10), 0); + assertEquals(7.87, accessor.get(12), 0); + assertEquals(8.56, accessor.get(14), 0); + + try { + accessor.get(16); + } + catch (IndexOutOfBoundsException ie) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + /* this should trigger a realloc() */ + mutator.setSafe(16, 9.53); + + /* underlying buffer should now be able to store double the number of values */ + assertEquals(initialCapacity * 2, floatVector.getValueCapacity()); + + /* vector data should still be intact after realloc */ + assertEquals(1.55, accessor.get(0), 0); + assertEquals(2.53, accessor.get(2), 0); + assertEquals(3.36, accessor.get(4), 0); + assertEquals(4.82, accessor.get(6), 0); + assertEquals(5.67, accessor.get(8), 0); + assertEquals(6.67, accessor.get(10), 0); + assertEquals(7.87, accessor.get(12), 0); + assertEquals(8.56, accessor.get(14), 0); + assertEquals(9.53, accessor.get(16), 0); + + /* reset the vector */ + floatVector.reset(); + + /* capacity shouldn't change after reset */ + assertEquals(initialCapacity * 2, floatVector.getValueCapacity()); + + /* vector data should be zeroed out */ + for(int i = 0; i < (initialCapacity * 2); i++) { + assertEquals("non-zero data not expected at index: " + i, 0, accessor.get(i), 0); + } + } + } + + @Test /* NullableUInt4Vector */ + public void testNullableFixedType1() { + + // Create a new value vector for 1024 integers. + try (final NullableUInt4Vector vector = newVector(NullableUInt4Vector.class, EMPTY_SCHEMA_PATH, new ArrowType.Int(32, false), allocator);) { + final NullableUInt4Vector.Mutator mutator = vector.getMutator(); + final NullableUInt4Vector.Accessor accessor = vector.getAccessor(); + boolean error = false; + int initialCapacity = 1024; + + vector.setInitialCapacity(initialCapacity); + /* no memory allocation has happened yet */ + assertEquals(0, vector.getValueCapacity()); + + vector.allocateNew(); + assertEquals(initialCapacity, vector.getValueCapacity()); + + // Put and set a few values + mutator.set(0, 100); + mutator.set(1, 101); + mutator.set(100, 102); + mutator.set(1022, 103); + mutator.set(1023, 104); + + /* check vector contents */ + assertEquals(100, accessor.get(0)); + assertEquals(101, accessor.get(1)); + assertEquals(102, accessor.get(100)); + assertEquals(103, accessor.get(1022)); + assertEquals(104, accessor.get(1023)); + + int val = 0; + + /* check unset bits/null values */ + for (int i = 2, j = 101; i <= 99 || j <= 1021; i++, j++) { + if (i <= 99) { + assertTrue(accessor.isNull(i)); + } + if(j <= 1021) { + assertTrue(accessor.isNull(j)); + } + } + + try { + mutator.set(1024, 10000); + } + catch (IndexOutOfBoundsException ie) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + try { + accessor.get(1024); + } + catch (IndexOutOfBoundsException ie) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + /* should trigger a realloc of the underlying bitvector and valuevector */ + mutator.setSafe(1024, 10000); + + /* check new capacity */ + assertEquals(initialCapacity * 2, vector.getValueCapacity()); + + /* vector contents should still be intact after realloc */ + assertEquals(100, accessor.get(0)); + assertEquals(101, accessor.get(1)); + assertEquals(102, accessor.get(100)); + assertEquals(103, accessor.get(1022)); + assertEquals(104, accessor.get(1023)); + assertEquals(10000, accessor.get(1024)); + + val = 0; + + /* check unset bits/null values */ + for (int i = 2, j = 101; i < 99 || j < 1021; i++, j++) { + if (i <= 99) { + assertTrue(accessor.isNull(i)); + } + if(j <= 1021) { + assertTrue(accessor.isNull(j)); + } + } + + /* reset the vector */ + vector.reset(); + + /* capacity shouldn't change after reset */ + assertEquals(initialCapacity * 2, vector.getValueCapacity()); + + /* vector data should be zeroed out */ + for(int i = 0; i < (initialCapacity * 2); i++) { + assertTrue("non-null data not expected at index: " + i, accessor.isNull(i)); + } + } + } + + @Test /* NullableFloat4Vector */ + public void testNullableFixedType2() { + // Create a new value vector for 1024 integers + try (final NullableFloat4Vector vector = newVector(NullableFloat4Vector.class, EMPTY_SCHEMA_PATH, MinorType.FLOAT4, allocator);) { + final NullableFloat4Vector.Mutator mutator = vector.getMutator(); + final NullableFloat4Vector.Accessor accessor = vector.getAccessor(); + boolean error = false; + int initialCapacity = 16; + + vector.setInitialCapacity(initialCapacity); + /* no memory allocation has happened yet */ + assertEquals(0, vector.getValueCapacity()); + + vector.allocateNew(); + assertEquals(initialCapacity, vector.getValueCapacity()); + + /* populate the vector */ + mutator.set(0, 100.5f); + mutator.set(2, 201.5f); + mutator.set(4, 300.3f); + mutator.set(6, 423.8f); + mutator.set(8, 555.6f); + mutator.set(10, 66.6f); + mutator.set(12, 78.8f); + mutator.set(14, 89.5f); + + try { + mutator.set(16, 90.5f); + } + catch (IndexOutOfBoundsException ie) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + /* check vector contents */ + assertEquals(100.5f, accessor.get(0), 0); + assertTrue(accessor.isNull(1)); + assertEquals(201.5f, accessor.get(2), 0); + assertTrue(accessor.isNull(3)); + assertEquals(300.3f, accessor.get(4), 0); + assertTrue(accessor.isNull(5)); + assertEquals(423.8f, accessor.get(6), 0); + assertTrue(accessor.isNull(7)); + assertEquals(555.6f, accessor.get(8), 0); + assertTrue(accessor.isNull(9)); + assertEquals(66.6f, accessor.get(10), 0); + assertTrue(accessor.isNull(11)); + assertEquals(78.8f, accessor.get(12), 0); + assertTrue(accessor.isNull(13)); + assertEquals(89.5f, accessor.get(14), 0); + assertTrue(accessor.isNull(15)); + + try { + accessor.get(16); + } + catch (IndexOutOfBoundsException ie) { + error = true; + } + finally { + assertTrue(error); + error = false; + } + + /* this should trigger a realloc() */ + mutator.setSafe(16, 90.5f); + + /* underlying buffer should now be able to store double the number of values */ + assertEquals(initialCapacity * 2, vector.getValueCapacity()); + + /* vector data should still be intact after realloc */ + assertEquals(100.5f, accessor.get(0), 0); + assertTrue(accessor.isNull(1)); + assertEquals(201.5f, accessor.get(2), 0); + assertTrue(accessor.isNull(3)); + assertEquals(300.3f, accessor.get(4), 0); + assertTrue(accessor.isNull(5)); + assertEquals(423.8f, accessor.get(6), 0); + assertTrue(accessor.isNull(7)); + assertEquals(555.6f, accessor.get(8), 0); + assertTrue(accessor.isNull(9)); + assertEquals(66.6f, accessor.get(10), 0); + assertTrue(accessor.isNull(11)); + assertEquals(78.8f, accessor.get(12), 0); + assertTrue(accessor.isNull(13)); + assertEquals(89.5f, accessor.get(14), 0); + assertTrue(accessor.isNull(15)); + assertEquals(90.5f, accessor.get(16), 0); + + /* reset the vector */ + vector.reset(); + + /* capacity shouldn't change after reset */ + assertEquals(initialCapacity * 2, vector.getValueCapacity()); + + /* vector data should be zeroed out */ + for(int i = 0; i < (initialCapacity * 2); i++) { + assertTrue("non-null data not expected at index: " + i, accessor.isNull(i)); + } + } + } + + @Test /* NullableIntVector */ + public void testNullableFixedType3() { + // Create a new value vector for 1024 integers + try (final NullableIntVector vector = newVector(NullableIntVector.class, EMPTY_SCHEMA_PATH, MinorType.INT, allocator)) { + final NullableIntVector.Mutator mutator = vector.getMutator(); + final NullableIntVector.Accessor accessor = vector.getAccessor(); + boolean error = false; + int initialCapacity = 1024; + + /* no memory allocation has happened yet so capacity of underlying buffer should be 0 */ + assertEquals(0, vector.getValueCapacity()); + /* allocate space for 4KB data (1024 * 4) */ + vector.allocateNew(initialCapacity); + /* underlying buffer should be able to store 16 values */ + assertEquals(initialCapacity, vector.getValueCapacity()); + + mutator.set(0, 1); + mutator.set(1, 2); + mutator.set(100, 3); + mutator.set(1022, 4); + mutator.set(1023, 5); + + /* check vector contents */ + int j = 1; + for(int i = 0; i <= 1023; i++) { + if((i >= 2 && i <= 99) || (i >= 101 && i <= 1021)) { + assertTrue("non-null data not expected at index: " + i, accessor.isNull(i)); + } + else { + assertFalse("null data not expected at index: " + i, accessor.isNull(i)); + assertEquals("unexpected value at index: " + i, j, accessor.get(i)); + j++; + } + } + + mutator.setValueCount(1024); + Field field = vector.getField(); + TypeLayout typeLayout = field.getTypeLayout(); + + List buffers = vector.getFieldBuffers(); + + assertEquals(2, typeLayout.getVectors().size()); + assertEquals(2, buffers.size()); + + ArrowBuf validityVectorBuf = buffers.get(0); + + /* bitvector tracks 1024 integers --> 1024 bits --> 128 bytes */ + assertEquals(128, validityVectorBuf.readableBytes()); + assertEquals(3, validityVectorBuf.getByte(0)); // 1st and second bit defined + for (int i = 1; i < 12; i++) { + assertEquals(0, validityVectorBuf.getByte(i)); // nothing defined until 100 + } + assertEquals(16, validityVectorBuf.getByte(12)); // 100th bit is defined (12 * 8 + 4) + for (int i = 13; i < 127; i++) { + assertEquals(0, validityVectorBuf.getByte(i)); // nothing defined between 100th and 1022nd + } + assertEquals(-64, validityVectorBuf.getByte(127)); // 1022nd and 1023rd bit defined + + /* this should trigger a realloc() */ + mutator.setSafe(1024, 6); + + /* underlying buffer should now be able to store double the number of values */ + assertEquals(initialCapacity * 2, vector.getValueCapacity()); + + /* vector data should still be intact after realloc */ + j = 1; + for(int i = 0; i < (initialCapacity * 2); i++) { + if((i > 1024) || (i >= 2 && i <= 99) || (i >= 101 && i <= 1021)) { + assertTrue("non-null data not expected at index: " + i, accessor.isNull(i)); + } + else { + assertFalse("null data not expected at index: " + i, accessor.isNull(i)); + assertEquals("unexpected value at index: " + i, j, accessor.get(i)); + j++; + } + } + + /* reset the vector */ + vector.reset(); + + /* capacity shouldn't change after reset */ + assertEquals(initialCapacity * 2, vector.getValueCapacity()); + + /* vector data should have been zeroed out */ + for(int i = 0; i < (initialCapacity * 2); i++) { + assertTrue("non-null data not expected at index: " + i, accessor.isNull(i)); + } + + vector.allocateNew(4096); + // vector has been erased + for(int i = 0; i < 4096; i++) { + assertTrue("non-null data not expected at index: " + i, accessor.isNull(i)); + } + } + } + + /* + * Tests for Variable Width Vectors + * + * Covered types as of now + * + * -- NullableVarCharVector + * -- NullableVarBinaryVector + * + * TODO: + * + * -- VarCharVector + * -- VarBinaryVector + */ + + @Test /* NullableVarCharVector */ + public void testNullableVarType1() { + + // Create a new value vector for 1024 integers. + try (final NullableVarCharVector vector = newNullableVarCharVector(EMPTY_SCHEMA_PATH, allocator)) { + final NullableVarCharVector.Mutator m = vector.getMutator(); + vector.allocateNew(1024 * 10, 1024); + + m.set(0, STR1); + m.set(1, STR2); + m.set(2, STR3); + m.setSafe(3, STR3, 1, STR3.length - 1); + m.setSafe(4, STR3, 2, STR3.length - 2); + ByteBuffer STR3ByteBuffer = ByteBuffer.wrap(STR3); + m.setSafe(5, STR3ByteBuffer, 1, STR3.length - 1); + m.setSafe(6, STR3ByteBuffer, 2, STR3.length - 2); + + // Check the sample strings. + final NullableVarCharVector.Accessor accessor = vector.getAccessor(); + assertArrayEquals(STR1, accessor.get(0)); + assertArrayEquals(STR2, accessor.get(1)); + assertArrayEquals(STR3, accessor.get(2)); + assertArrayEquals(Arrays.copyOfRange(STR3, 1, STR3.length), accessor.get(3)); + assertArrayEquals(Arrays.copyOfRange(STR3, 2, STR3.length), accessor.get(4)); + assertArrayEquals(Arrays.copyOfRange(STR3, 1, STR3.length), accessor.get(5)); + assertArrayEquals(Arrays.copyOfRange(STR3, 2, STR3.length), accessor.get(6)); + + // Ensure null value throws. + boolean b = false; + try { + vector.getAccessor().get(7); + } catch (IllegalStateException e) { + b = true; + } finally { + assertTrue(b); + } + } + } + + @Test /* NullableVarBinaryVector */ + public void testNullableVarType2() { + + // Create a new value vector for 1024 integers. + try (final NullableVarBinaryVector vector = newNullableVarBinaryVector(EMPTY_SCHEMA_PATH, allocator)) { + final NullableVarBinaryVector.Mutator m = vector.getMutator(); + vector.allocateNew(1024 * 10, 1024); + + m.set(0, STR1); + m.set(1, STR2); + m.set(2, STR3); + m.setSafe(3, STR3, 1, STR3.length - 1); + m.setSafe(4, STR3, 2, STR3.length - 2); + ByteBuffer STR3ByteBuffer = ByteBuffer.wrap(STR3); + m.setSafe(5, STR3ByteBuffer, 1, STR3.length - 1); + m.setSafe(6, STR3ByteBuffer, 2, STR3.length - 2); + + // Check the sample strings. + final NullableVarBinaryVector.Accessor accessor = vector.getAccessor(); + assertArrayEquals(STR1, accessor.get(0)); + assertArrayEquals(STR2, accessor.get(1)); + assertArrayEquals(STR3, accessor.get(2)); + assertArrayEquals(Arrays.copyOfRange(STR3, 1, STR3.length), accessor.get(3)); + assertArrayEquals(Arrays.copyOfRange(STR3, 2, STR3.length), accessor.get(4)); + assertArrayEquals(Arrays.copyOfRange(STR3, 1, STR3.length), accessor.get(5)); + assertArrayEquals(Arrays.copyOfRange(STR3, 2, STR3.length), accessor.get(6)); + + // Ensure null value throws. + boolean b = false; + try { + vector.getAccessor().get(7); + } catch (IllegalStateException e) { + b = true; + } finally { + assertTrue(b); + } + } + } + + + /* + * generic tests + * + * -- lastSet() and setValueCount() + * -- fillEmpties() + * -- VectorLoader and VectorUnloader + * -- some realloc tests + * + * TODO: + * + * The realloc() related tests below should be moved up and we need to + * add realloc related tests (edge cases) for more vector types. + */ + + @Test /* Float8Vector */ + public void testReallocAfterVectorTransfer1() { + try (final Float8Vector vector = new Float8Vector(EMPTY_SCHEMA_PATH, allocator)) { + final Float8Vector.Mutator mutator = vector.getMutator(); + final Float8Vector.Accessor accessor = vector.getAccessor(); + final int initialDefaultCapacity = 4096; + boolean error = false; + + /* use the default capacity; 4096*8 => 32KB */ + vector.allocateNew(); + + assertEquals(initialDefaultCapacity, vector.getValueCapacity()); + + double baseValue = 100.375; + + for (int i = 0; i < initialDefaultCapacity; i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + /* the above setSafe calls should not have triggered a realloc as + * we are within the capacity. check the vector contents + */ + assertEquals(initialDefaultCapacity, vector.getValueCapacity()); + + for (int i = 0; i < initialDefaultCapacity; i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* this should trigger a realloc */ + mutator.setSafe(initialDefaultCapacity, baseValue + (double)initialDefaultCapacity); + assertEquals(initialDefaultCapacity * 2, vector.getValueCapacity()); + + for (int i = initialDefaultCapacity + 1; i < (initialDefaultCapacity * 2); i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + for (int i = 0; i < (initialDefaultCapacity * 2); i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* this should trigger a realloc */ + mutator.setSafe(initialDefaultCapacity * 2, baseValue + (double)(initialDefaultCapacity * 2)); + assertEquals(initialDefaultCapacity * 4, vector.getValueCapacity()); + + for (int i = (initialDefaultCapacity * 2) + 1; i < (initialDefaultCapacity * 4); i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + for (int i = 0; i < (initialDefaultCapacity * 4); i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* at this point we are working with a 128KB buffer data for this + * vector. now let's transfer this vector + */ + + TransferPair transferPair = vector.getTransferPair(allocator); + transferPair.transfer(); + + Float8Vector toVector = (Float8Vector)transferPair.getTo(); + + /* now let's realloc the toVector */ + toVector.reAlloc(); + assertEquals(initialDefaultCapacity * 8, toVector.getValueCapacity()); + + final Float8Vector.Accessor toAccessor = toVector.getAccessor(); + + for (int i = 0; i < (initialDefaultCapacity * 8); i++) { + double value = toAccessor.get(i); + if (i < (initialDefaultCapacity * 4)) { + assertEquals(baseValue + (double)i, value, 0); + } + else { + assertEquals(0, value, 0); + } + } + + toVector.close(); + } + } + + @Test /* NullableFloat8Vector */ + public void testReallocAfterVectorTransfer2() { + try (final NullableFloat8Vector vector = new NullableFloat8Vector(EMPTY_SCHEMA_PATH, allocator)) { + final NullableFloat8Vector.Mutator mutator = vector.getMutator(); + final NullableFloat8Vector.Accessor accessor = vector.getAccessor(); + final int initialDefaultCapacity = 4096; + boolean error = false; + + vector.allocateNew(initialDefaultCapacity); + + assertEquals(initialDefaultCapacity, vector.getValueCapacity()); + + double baseValue = 100.375; + + for (int i = 0; i < initialDefaultCapacity; i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + /* the above setSafe calls should not have triggered a realloc as + * we are within the capacity. check the vector contents + */ + assertEquals(initialDefaultCapacity, vector.getValueCapacity()); + + for (int i = 0; i < initialDefaultCapacity; i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* this should trigger a realloc */ + mutator.setSafe(initialDefaultCapacity, baseValue + (double)initialDefaultCapacity); + assertEquals(initialDefaultCapacity * 2, vector.getValueCapacity()); + + for (int i = initialDefaultCapacity + 1; i < (initialDefaultCapacity * 2); i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + for (int i = 0; i < (initialDefaultCapacity * 2); i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* this should trigger a realloc */ + mutator.setSafe(initialDefaultCapacity * 2, baseValue + (double)(initialDefaultCapacity * 2)); + assertEquals(initialDefaultCapacity * 4, vector.getValueCapacity()); + + for (int i = (initialDefaultCapacity * 2) + 1; i < (initialDefaultCapacity * 4); i++) { + mutator.setSafe(i, baseValue + (double)i); + } + + for (int i = 0; i < (initialDefaultCapacity * 4); i++) { + double value = accessor.get(i); + assertEquals(baseValue + (double)i, value, 0); + } + + /* at this point we are working with a 128KB buffer data for this + * vector. now let's transfer this vector + */ + + TransferPair transferPair = vector.getTransferPair(allocator); + transferPair.transfer(); + + NullableFloat8Vector toVector = (NullableFloat8Vector)transferPair.getTo(); + final NullableFloat8Vector.Accessor toAccessor = toVector.getAccessor(); + + /* check toVector contents before realloc */ + for (int i = 0; i < (initialDefaultCapacity * 4); i++) { + assertFalse("unexpected null value at index: " + i, toAccessor.isNull(i)); + double value = toAccessor.get(i); + assertEquals("unexpected value at index: " + i, baseValue + (double)i, value, 0); + } + + /* now let's realloc the toVector and check contents again */ + toVector.reAlloc(); + assertEquals(initialDefaultCapacity * 8, toVector.getValueCapacity()); + + for (int i = 0; i < (initialDefaultCapacity * 8); i++) { + if (i < (initialDefaultCapacity * 4)) { + assertFalse("unexpected null value at index: " + i, toAccessor.isNull(i)); + double value = toAccessor.get(i); + assertEquals("unexpected value at index: " + i, baseValue + (double)i, value, 0); + } + else { + assertTrue("unexpected non-null value at index: " + i, toAccessor.isNull(i)); + } + } + + toVector.close(); + } + } + + @Test /* NullableVarCharVector */ + public void testReallocAfterVectorTransfer3() { + try (final NullableVarCharVector vector = new NullableVarCharVector(EMPTY_SCHEMA_PATH, allocator)) { + final NullableVarCharVector.Mutator mutator = vector.getMutator(); + final NullableVarCharVector.Accessor accessor = vector.getAccessor(); + + /* 4096 values with 10 byte per record */ + vector.allocateNew(4096 * 10, 4096); + int valueCapacity = vector.getValueCapacity(); + + /* populate the vector */ + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + mutator.set(i, STR1); + } + else { + mutator.set(i, STR2); + } + } + + /* Check the vector output */ + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + assertArrayEquals(STR1, accessor.get(i)); + } + else { + assertArrayEquals(STR2, accessor.get(i)); + } + } + + /* trigger first realloc */ + mutator.setSafe(valueCapacity, STR2, 0, STR2.length); + + /* populate the remaining vector */ + for (int i = valueCapacity; i < vector.getValueCapacity(); i++) { + if ((i & 1) == 1) { + mutator.set(i, STR1); + } + else { + mutator.set(i, STR2); + } + } + + /* Check the vector output */ + valueCapacity = vector.getValueCapacity(); + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + assertArrayEquals(STR1, accessor.get(i)); + } + else { + assertArrayEquals(STR2, accessor.get(i)); + } + } + + /* trigger second realloc */ + mutator.setSafe(valueCapacity + 10, STR2, 0, STR2.length); + + /* populate the remaining vector */ + for (int i = valueCapacity; i < vector.getValueCapacity(); i++) { + if ((i & 1) == 1) { + mutator.set(i, STR1); + } + else { + mutator.set(i, STR2); + } + } + + /* Check the vector output */ + valueCapacity = vector.getValueCapacity(); + for (int i = 0; i < valueCapacity; i++) { + if ((i & 1) == 1) { + assertArrayEquals(STR1, accessor.get(i)); + } + else { + assertArrayEquals(STR2, accessor.get(i)); + } + } + + /* we are potentially working with 4x the size of vector buffer + * that we initially started with. Now let's transfer the vector. + */ + + TransferPair transferPair = vector.getTransferPair(allocator); + transferPair.transfer(); + NullableVarCharVector toVector = (NullableVarCharVector)transferPair.getTo(); + NullableVarCharVector.Mutator toMutator = toVector.getMutator(); + NullableVarCharVector.Accessor toAccessor = toVector.getAccessor(); + + valueCapacity = toVector.getValueCapacity(); + + /* trigger a realloc of this toVector */ + toMutator.setSafe(valueCapacity + 10, STR2, 0, STR2.length); + + toVector.close(); + } + } + + @Test + public void testReAllocNullableFixedWidthVector() { + // Create a new value vector for 1024 integers + try (final NullableFloat4Vector vector = newVector(NullableFloat4Vector.class, EMPTY_SCHEMA_PATH, MinorType.FLOAT4, allocator)) { + final NullableFloat4Vector.Mutator m = vector.getMutator(); + vector.allocateNew(1024); + + assertEquals(1024, vector.getValueCapacity()); + + // Put values in indexes that fall within the initial allocation + m.setSafe(0, 100.1f); + m.setSafe(100, 102.3f); + m.setSafe(1023, 104.5f); + + // Now try to put values in space that falls beyond the initial allocation + m.setSafe(2000, 105.5f); + + // Check valueCapacity is more than initial allocation + assertEquals(1024 * 2, vector.getValueCapacity()); + + final NullableFloat4Vector.Accessor accessor = vector.getAccessor(); + assertEquals(100.1f, accessor.get(0), 0); + assertEquals(102.3f, accessor.get(100), 0); + assertEquals(104.5f, accessor.get(1023), 0); + assertEquals(105.5f, accessor.get(2000), 0); + + // Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors + // as we don't call setSafe for null values, but we do call setValueCount when all values are inserted into the + // vector + m.setValueCount(vector.getValueCapacity() + 200); + } + } + + @Test + public void testReAllocNullableVariableWidthVector() { + // Create a new value vector for 1024 integers + try (final NullableVarCharVector vector = newVector(NullableVarCharVector.class, EMPTY_SCHEMA_PATH, MinorType.VARCHAR, allocator)) { + final NullableVarCharVector.Mutator m = vector.getMutator(); + vector.allocateNew(); + + int initialCapacity = vector.getValueCapacity(); + + // Put values in indexes that fall within the initial allocation + m.setSafe(0, STR1, 0, STR1.length); + m.setSafe(initialCapacity - 1, STR2, 0, STR2.length); + + // Now try to put values in space that falls beyond the initial allocation + m.setSafe(initialCapacity + 200, STR3, 0, STR3.length); + + // Check valueCapacity is more than initial allocation + assertEquals((initialCapacity + 1) * 2 - 1, vector.getValueCapacity()); + + final NullableVarCharVector.Accessor accessor = vector.getAccessor(); + assertArrayEquals(STR1, accessor.get(0)); + assertArrayEquals(STR2, accessor.get(initialCapacity - 1)); + assertArrayEquals(STR3, accessor.get(initialCapacity + 200)); + + // Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors + // as we don't call setSafe for null values, but we do call setValueCount when the current batch is processed. + m.setValueCount(vector.getValueCapacity() + 200); + } + } + + @Test + public void testFillEmptiesNotOverfill() { + try (final NullableVarCharVector vector = newVector(NullableVarCharVector.class, EMPTY_SCHEMA_PATH, MinorType.VARCHAR, allocator)) { + vector.allocateNew(); + + vector.getMutator().setSafe(4094, "hello".getBytes(), 0, 5); + vector.getMutator().setValueCount(4095); + + assertEquals(4096 * 4, vector.getFieldBuffers().get(1).capacity()); + } + } + + @Test + public void testCopyFromWithNulls() { + try (final NullableVarCharVector vector = newVector(NullableVarCharVector.class, EMPTY_SCHEMA_PATH, MinorType.VARCHAR, allocator); + final NullableVarCharVector vector2 = newVector(NullableVarCharVector.class, EMPTY_SCHEMA_PATH, MinorType.VARCHAR, allocator)) { + vector.allocateNew(); + + for (int i = 0; i < 4095; i++) { + if (i % 3 == 0) { + continue; + } + byte[] b = Integer.toString(i).getBytes(); + vector.getMutator().setSafe(i, b, 0, b.length); + } + + vector.getMutator().setValueCount(4095); + + vector2.allocateNew(); + + for (int i = 0; i < 4095; i++) { + vector2.copyFromSafe(i, i, vector); + } + + vector2.getMutator().setValueCount(4095); + + for (int i = 0; i < 4095; i++) { + if (i % 3 == 0) { + assertNull(vector2.getAccessor().getObject(i)); + } else { + assertEquals(Integer.toString(i), vector2.getAccessor().getObject(i).toString()); + } + } + } + } + + @Test + public void testSetLastSetUsage() { + try (final NullableVarCharVector vector = new NullableVarCharVector("myvector", allocator)) { + + final NullableVarCharVector.Mutator mutator = vector.getMutator(); + + vector.allocateNew(1024 * 10, 1024); + + setBytes(0, STR1, vector); + setBytes(1, STR2, vector); + setBytes(2, STR3, vector); + setBytes(3, STR4, vector); + setBytes(4, STR5, vector); + setBytes(5, STR6, vector); + + /* Check current lastSet */ + assertEquals(Integer.toString(-1), Integer.toString(mutator.getLastSet())); + + /* Check the vector output */ + final NullableVarCharVector.Accessor accessor = vector.getAccessor(); + assertArrayEquals(STR1, accessor.get(0)); + assertArrayEquals(STR2, accessor.get(1)); + assertArrayEquals(STR3, accessor.get(2)); + assertArrayEquals(STR4, accessor.get(3)); + assertArrayEquals(STR5, accessor.get(4)); + assertArrayEquals(STR6, accessor.get(5)); + + /* + * If we don't do setLastSe(5) before setValueCount(), then the latter will corrupt + * the value vector by filling in all positions [0,valuecount-1] will empty byte arrays. + * Run the test by commenting out next line and we should see incorrect vector output. + */ + mutator.setLastSet(5); + mutator.setValueCount(20); + + /* Check the vector output again */ + assertArrayEquals(STR1, accessor.get(0)); + assertArrayEquals(STR2, accessor.get(1)); + assertArrayEquals(STR3, accessor.get(2)); + assertArrayEquals(STR4, accessor.get(3)); + assertArrayEquals(STR5, accessor.get(4)); + assertArrayEquals(STR6, accessor.get(5)); + } + } + + @Test + public void testVectorLoadUnload() { + + try (final NullableVarCharVector vector1 = new NullableVarCharVector("myvector", allocator)) { + + final NullableVarCharVector.Mutator mutator1 = vector1.getMutator(); + + vector1.allocateNew(1024 * 10, 1024); + + mutator1.set(0, STR1); + mutator1.set(1, STR2); + mutator1.set(2, STR3); + mutator1.set(3, STR4); + mutator1.set(4, STR5); + mutator1.set(5, STR6); + assertEquals(Integer.toString(5), Integer.toString(mutator1.getLastSet())); + mutator1.setValueCount(15); + assertEquals(Integer.toString(14), Integer.toString(mutator1.getLastSet())); + + /* Check the vector output */ + final NullableVarCharVector.Accessor accessor1 = vector1.getAccessor(); + assertArrayEquals(STR1, accessor1.get(0)); + assertArrayEquals(STR2, accessor1.get(1)); + assertArrayEquals(STR3, accessor1.get(2)); + assertArrayEquals(STR4, accessor1.get(3)); + assertArrayEquals(STR5, accessor1.get(4)); + assertArrayEquals(STR6, accessor1.get(5)); + + Field field = vector1.getField(); + String fieldName = field.getName(); + + List fields = new ArrayList(); + List fieldVectors = new ArrayList(); + + fields.add(field); + fieldVectors.add(vector1); + + Schema schema = new Schema(fields); + + VectorSchemaRoot schemaRoot1 = new VectorSchemaRoot(schema, fieldVectors, accessor1.getValueCount()); + VectorUnloader vectorUnloader = new VectorUnloader(schemaRoot1); + + try ( + ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); + BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("new vector", 0, Long.MAX_VALUE); + VectorSchemaRoot schemaRoot2 = VectorSchemaRoot.create(schema, finalVectorsAllocator); + ) { + + VectorLoader vectorLoader = new VectorLoader(schemaRoot2); + vectorLoader.load(recordBatch); + + NullableVarCharVector vector2 = (NullableVarCharVector) schemaRoot2.getVector(fieldName); + NullableVarCharVector.Mutator mutator2 = vector2.getMutator(); + + /* + * lastSet would have internally been set by VectorLoader.load() when it invokes + * loadFieldBuffers. + */ + assertEquals(Integer.toString(14), Integer.toString(mutator2.getLastSet())); + mutator2.setValueCount(25); + assertEquals(Integer.toString(24), Integer.toString(mutator2.getLastSet())); + + /* Check the vector output */ + final NullableVarCharVector.Accessor accessor2 = vector2.getAccessor(); + assertArrayEquals(STR1, accessor2.get(0)); + assertArrayEquals(STR2, accessor2.get(1)); + assertArrayEquals(STR3, accessor2.get(2)); + assertArrayEquals(STR4, accessor2.get(3)); + assertArrayEquals(STR5, accessor2.get(4)); + assertArrayEquals(STR6, accessor2.get(5)); + } + } + } + + @Test + public void testFillEmptiesUsage() { + try (final NullableVarCharVector vector = new NullableVarCharVector("myvector", allocator)) { + + final NullableVarCharVector.Mutator mutator = vector.getMutator(); + + vector.allocateNew(1024 * 10, 1024); + + setBytes(0, STR1, vector); + setBytes(1, STR2, vector); + setBytes(2, STR3, vector); + setBytes(3, STR4, vector); + setBytes(4, STR5, vector); + setBytes(5, STR6, vector); + + /* Check current lastSet */ + assertEquals(Integer.toString(-1), Integer.toString(mutator.getLastSet())); + + /* Check the vector output */ + final NullableVarCharVector.Accessor accessor = vector.getAccessor(); + assertArrayEquals(STR1, accessor.get(0)); + assertArrayEquals(STR2, accessor.get(1)); + assertArrayEquals(STR3, accessor.get(2)); + assertArrayEquals(STR4, accessor.get(3)); + assertArrayEquals(STR5, accessor.get(4)); + assertArrayEquals(STR6, accessor.get(5)); + + mutator.setLastSet(5); + /* fill empty byte arrays from index [6, 9] */ + mutator.fillEmpties(10); + + /* Check current lastSet */ + assertEquals(Integer.toString(9), Integer.toString(mutator.getLastSet())); + + /* Check the vector output */ + assertArrayEquals(STR1, accessor.get(0)); + assertArrayEquals(STR2, accessor.get(1)); + assertArrayEquals(STR3, accessor.get(2)); + assertArrayEquals(STR4, accessor.get(3)); + assertArrayEquals(STR5, accessor.get(4)); + assertArrayEquals(STR6, accessor.get(5)); + assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(6))); + assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(7))); + assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(8))); + assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(9))); + + setBytes(10, STR1, vector); + setBytes(11, STR2, vector); + + mutator.setLastSet(11); + /* fill empty byte arrays from index [12, 14] */ + mutator.setValueCount(15); + + /* Check current lastSet */ + assertEquals(Integer.toString(14), Integer.toString(mutator.getLastSet())); + + /* Check the vector output */ + assertArrayEquals(STR1, accessor.get(0)); + assertArrayEquals(STR2, accessor.get(1)); + assertArrayEquals(STR3, accessor.get(2)); + assertArrayEquals(STR4, accessor.get(3)); + assertArrayEquals(STR5, accessor.get(4)); + assertArrayEquals(STR6, accessor.get(5)); + assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(6))); + assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(7))); + assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(8))); + assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(9))); + assertArrayEquals(STR1, accessor.get(10)); + assertArrayEquals(STR2, accessor.get(11)); + assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(12))); + assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(13))); + assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(14))); + + /* Check offsets */ + final UInt4Vector.Accessor offsetAccessor = vector.values.offsetVector.getAccessor(); + assertEquals(Integer.toString(0), Integer.toString(offsetAccessor.get(0))); + assertEquals(Integer.toString(6), Integer.toString(offsetAccessor.get(1))); + assertEquals(Integer.toString(16), Integer.toString(offsetAccessor.get(2))); + assertEquals(Integer.toString(21), Integer.toString(offsetAccessor.get(3))); + assertEquals(Integer.toString(30), Integer.toString(offsetAccessor.get(4))); + assertEquals(Integer.toString(34), Integer.toString(offsetAccessor.get(5))); + + assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(6))); + assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(7))); + assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(8))); + assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(9))); + assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(10))); + + assertEquals(Integer.toString(46), Integer.toString(offsetAccessor.get(11))); + assertEquals(Integer.toString(56), Integer.toString(offsetAccessor.get(12))); + + assertEquals(Integer.toString(56), Integer.toString(offsetAccessor.get(13))); + assertEquals(Integer.toString(56), Integer.toString(offsetAccessor.get(14))); + assertEquals(Integer.toString(56), Integer.toString(offsetAccessor.get(15))); + } + } + + @Test /* NullableVarCharVector */ + public void testGetBufferAddress1() { + + try (final NullableVarCharVector vector = new NullableVarCharVector("myvector", allocator)) { + + final NullableVarCharVector.Mutator mutator = vector.getMutator(); + final NullableVarCharVector.Accessor accessor = vector.getAccessor(); + + vector.allocateNew(1024 * 10, 1024); + + /* populate the vector */ + mutator.set(0, STR1); + mutator.set(1, STR2); + mutator.set(2, STR3); + mutator.set(3, STR4); + mutator.set(4, STR5); + mutator.set(5, STR6); + + mutator.setValueCount(15); + + /* check the vector output */ + assertArrayEquals(STR1, accessor.get(0)); + assertArrayEquals(STR2, accessor.get(1)); + assertArrayEquals(STR3, accessor.get(2)); + assertArrayEquals(STR4, accessor.get(3)); + assertArrayEquals(STR5, accessor.get(4)); + assertArrayEquals(STR6, accessor.get(5)); + + List buffers = vector.getFieldBuffers(); + long bitAddress = vector.getValidityBufferAddress(); + long offsetAddress = vector.getOffsetBufferAddress(); + long dataAddress = vector.getDataBufferAddress(); + + assertEquals(3, buffers.size()); + assertEquals(bitAddress, buffers.get(0).memoryAddress()); + assertEquals(offsetAddress, buffers.get(1).memoryAddress()); + assertEquals(dataAddress, buffers.get(2).memoryAddress()); + } + } + + @Test /* NullableIntVector */ + public void testGetBufferAddress2() { + + try (final NullableIntVector vector = new NullableIntVector("myvector", allocator)) { + + final NullableIntVector.Mutator mutator = vector.getMutator(); + final NullableIntVector.Accessor accessor = vector.getAccessor(); + boolean error = false; + + vector.allocateNew(16); + + /* populate the vector */ + for(int i = 0; i < 16; i += 2) { + mutator.set(i, i+10); + } + + /* check the vector output */ + for(int i = 0; i < 16; i += 2) { + assertEquals(i+10, accessor.get(i)); + } + + List buffers = vector.getFieldBuffers(); + long bitAddress = vector.getValidityBufferAddress(); + long dataAddress = vector.getDataBufferAddress(); + + try { + long offsetAddress = vector.getOffsetBufferAddress(); + } + catch (UnsupportedOperationException ue) { + error = true; + } + finally { + assertTrue(error); + } + + assertEquals(2, buffers.size()); + assertEquals(bitAddress, buffers.get(0).memoryAddress()); + assertEquals(dataAddress, buffers.get(1).memoryAddress()); + } + } + + @Test + public void testMultipleClose() { + BufferAllocator vectorAllocator = allocator.newChildAllocator("vector_allocator", 0, Long.MAX_VALUE); + NullableIntVector vector = newVector(NullableIntVector.class, EMPTY_SCHEMA_PATH, MinorType.INT, vectorAllocator); + vector.close(); + vectorAllocator.close(); + vector.close(); + vectorAllocator.close(); + } + + public static void setBytes(int index, byte[] bytes, NullableVarCharVector vector) { + final int currentOffset = vector.values.offsetVector.getAccessor().get(index); + + vector.bits.getMutator().setToOne(index); + vector.values.offsetVector.getMutator().set(index + 1, currentOffset + bytes.length); + vector.values.data.setBytes(currentOffset, bytes, 0, bytes.length); + } +}