Skip to content

Commit

Permalink
ARROW-4147: [Java] reduce heap usage for varwidth vectors (#3298)
Browse files Browse the repository at this point in the history
* ARROW-4147: reduce heap usage for varwidth vectors

- some code reorg to avoid duplication
- changed the default initial alloc from 4096 to 3970

* ARROW-4147: [Java] Address review comments

* ARROW-4147: remove check on width to be <= 16:

* ARROW-4147: allow initial valueCount to be 0.

* ARROW-4147: Fix incorrect comment on initial alloc
  • Loading branch information
Pindikura Ravindra authored and siddharthteotia committed Jan 9, 2019
1 parent 420c949 commit bfe6865
Show file tree
Hide file tree
Showing 9 changed files with 799 additions and 648 deletions.
Expand Up @@ -22,7 +22,6 @@
import java.util.Collections;
import java.util.List;

import org.apache.arrow.memory.BaseAllocator;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
Expand All @@ -43,8 +42,7 @@ public abstract class BaseFixedWidthVector extends BaseValueVector
implements FixedWidthVector, FieldVector, VectorDefinitionSetter {
private final int typeWidth;

protected int valueAllocationSizeInBytes;
protected int validityAllocationSizeInBytes;
protected int initialValueAllocation;

protected final Field field;
private int allocationMonitor;
Expand All @@ -61,14 +59,7 @@ public BaseFixedWidthVector(final String name, final BufferAllocator allocator,
allocationMonitor = 0;
validityBuffer = allocator.getEmpty();
valueBuffer = allocator.getEmpty();
if (typeWidth > 0) {
valueAllocationSizeInBytes = INITIAL_VALUE_ALLOCATION * typeWidth;
validityAllocationSizeInBytes = getValidityBufferSizeFromCount(INITIAL_VALUE_ALLOCATION);
} else {
/* specialized handling for BitVector */
valueAllocationSizeInBytes = getValidityBufferSizeFromCount(INITIAL_VALUE_ALLOCATION);
validityAllocationSizeInBytes = valueAllocationSizeInBytes;
}
initialValueAllocation = INITIAL_VALUE_ALLOCATION;
}


Expand Down Expand Up @@ -159,12 +150,8 @@ public ArrowBuf getOffsetBuffer() {
*/
@Override
public void setInitialCapacity(int valueCount) {
final long size = (long) valueCount * typeWidth;
if (size > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
}
valueAllocationSizeInBytes = (int) size;
validityAllocationSizeInBytes = getValidityBufferSizeFromCount(valueCount);
computeAndCheckBufferSize(valueCount);
initialValueAllocation = valueCount;
}

/**
Expand Down Expand Up @@ -267,18 +254,13 @@ public void allocateNew() {
*/
@Override
public boolean allocateNewSafe() {
long curAllocationSizeValue = valueAllocationSizeInBytes;
long curAllocationSizeValidity = validityAllocationSizeInBytes;

if (align(curAllocationSizeValue) + curAllocationSizeValidity > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Requested amount of memory exceeds limit");
}
computeAndCheckBufferSize(initialValueAllocation);

/* we are doing a new allocation -- release the current buffers */
clear();

try {
allocateBytes(curAllocationSizeValue, curAllocationSizeValidity);
allocateBytes(initialValueAllocation);
} catch (Exception e) {
clear();
return false;
Expand All @@ -295,33 +277,30 @@ public boolean allocateNewSafe() {
* @throws org.apache.arrow.memory.OutOfMemoryException on error
*/
public void allocateNew(int valueCount) {
long valueBufferSize = valueCount * typeWidth;
long validityBufferSize = getValidityBufferSizeFromCount(valueCount);
if (typeWidth == 0) {
/* specialized handling for BitVector */
valueBufferSize = validityBufferSize;
}

if (align(valueBufferSize) + validityBufferSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
}
computeAndCheckBufferSize(valueCount);

/* we are doing a new allocation -- release the current buffers */
clear();

try {
allocateBytes(valueBufferSize, validityBufferSize);
allocateBytes(valueCount);
} catch (Exception e) {
clear();
throw e;
}
}

/*
* align to a 8-byte value.
* Compute the buffer size required for 'valueCount', and check if it's within bounds.
*/
private long align(long size) {
return ((size + 7) / 8) * 8;
private long computeAndCheckBufferSize(int valueCount) {
final long size = computeCombinedBufferSize(valueCount, typeWidth);
if (size > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Memory required for vector capacity " +
valueCount +
" is (" + size + "), which is more than max allowed (" + MAX_ALLOCATION_SIZE + ")");
}
return size;
}

/**
Expand All @@ -333,25 +312,11 @@ private long align(long size) {
* within the bounds of max allocation allowed and any other error
* conditions.
*/
private void allocateBytes(final long valueBufferSize, final long validityBufferSize) {
int valueBufferSlice = (int)align(valueBufferSize);
int validityBufferSlice = (int)validityBufferSize;

/* allocate combined buffer */
ArrowBuf buffer = allocator.buffer(valueBufferSlice + validityBufferSlice);

valueAllocationSizeInBytes = valueBufferSlice;
valueBuffer = buffer.slice(0, valueBufferSlice);
valueBuffer.retain();
valueBuffer.readerIndex(0);

validityAllocationSizeInBytes = validityBufferSlice;
validityBuffer = buffer.slice(valueBufferSlice, validityBufferSlice);
validityBuffer.retain();
validityBuffer.readerIndex(0);
private void allocateBytes(int valueCount) {
DataAndValidityBuffers buffers = allocFixedDataAndValidityBufs(valueCount, typeWidth);
valueBuffer = buffers.getDataBuf();
validityBuffer = buffers.getValidityBuf();
zeroVector();

buffer.release();
}

/**
Expand All @@ -363,7 +328,6 @@ private void allocateBytes(final long valueBufferSize, final long validityBuffer
private void allocateValidityBuffer(final int validityBufferSize) {
validityBuffer = allocator.buffer(validityBufferSize);
validityBuffer.readerIndex(0);
validityAllocationSizeInBytes = validityBufferSize;
}

/**
Expand Down Expand Up @@ -439,50 +403,28 @@ public ArrowBuf[] getBuffers(boolean clear) {
*/
@Override
public void reAlloc() {
int valueBaseSize = Integer.max(valueBuffer.capacity(), valueAllocationSizeInBytes);
long newValueBufferSlice = align(valueBaseSize * 2L);
long newValidityBufferSlice;
if (typeWidth > 0) {
long targetValueBufferSize = align(BaseAllocator.nextPowerOfTwo(newValueBufferSlice));
long targetValueCount = targetValueBufferSize / typeWidth;
targetValueBufferSize -= getValidityBufferSizeFromCount((int) targetValueCount);
if (newValueBufferSlice < targetValueBufferSize) {
newValueBufferSlice = targetValueBufferSize;
int targetValueCount = getValueCapacity() * 2;
if (targetValueCount == 0) {
if (initialValueAllocation > 0) {
targetValueCount = initialValueAllocation * 2;
} else {
targetValueCount = INITIAL_VALUE_ALLOCATION * 2;
}

newValidityBufferSlice = getValidityBufferSizeFromCount((int)(newValueBufferSlice / typeWidth));
} else {
newValidityBufferSlice = newValueBufferSlice;
}

long newAllocationSize = newValueBufferSlice + newValidityBufferSlice;
assert newAllocationSize >= 1;

if (newAllocationSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Unable to expand the buffer");
}
computeAndCheckBufferSize(targetValueCount);

final ArrowBuf newBuffer = allocator.buffer((int) newAllocationSize);
final ArrowBuf newValueBuffer = newBuffer.slice(0, (int)newValueBufferSlice);
DataAndValidityBuffers buffers = allocFixedDataAndValidityBufs(targetValueCount, typeWidth);
final ArrowBuf newValueBuffer = buffers.getDataBuf();
newValueBuffer.setBytes(0, valueBuffer, 0, valueBuffer.capacity());
newValueBuffer.setZero(valueBuffer.capacity(), (int)newValueBufferSlice - valueBuffer.capacity());
newValueBuffer.retain();
newValueBuffer.readerIndex(0);
newValueBuffer.setZero(valueBuffer.capacity(), newValueBuffer.capacity() - valueBuffer.capacity());
valueBuffer.release();
valueBuffer = newValueBuffer;
valueAllocationSizeInBytes = (int)newValueBufferSlice;

final ArrowBuf newValidityBuffer = newBuffer.slice((int)newValueBufferSlice,
(int)newValidityBufferSlice);
final ArrowBuf newValidityBuffer = buffers.getValidityBuf();
newValidityBuffer.setBytes(0, validityBuffer, 0, validityBuffer.capacity());
newValidityBuffer.setZero(validityBuffer.capacity(), (int)newValidityBufferSlice - validityBuffer.capacity());
newValidityBuffer.retain();
newValidityBuffer.readerIndex(0);
newValidityBuffer.setZero(validityBuffer.capacity(), newValidityBuffer.capacity() - validityBuffer.capacity());
validityBuffer.release();
validityBuffer = newValidityBuffer;
validityAllocationSizeInBytes = (int)newValidityBufferSlice;

newBuffer.release();
}

@Override
Expand Down Expand Up @@ -535,9 +477,6 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers
valueBuffer = dataBuffer.retain(allocator);

valueCount = fieldNode.getLength();

valueAllocationSizeInBytes = valueBuffer.capacity();
validityAllocationSizeInBytes = validityBuffer.capacity();
}

/**
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.Iterator;

import org.apache.arrow.memory.BaseAllocator;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.util.TransferPair;
Expand All @@ -33,7 +34,14 @@ public abstract class BaseValueVector implements ValueVector {

public static final String MAX_ALLOCATION_SIZE_PROPERTY = "arrow.vector.max_allocation_bytes";
public static final int MAX_ALLOCATION_SIZE = Integer.getInteger(MAX_ALLOCATION_SIZE_PROPERTY, Integer.MAX_VALUE);
public static final int INITIAL_VALUE_ALLOCATION = 4096;
/*
* For all fixed width vectors, the value and validity buffers are sliced from a single buffer.
* Similarly, for variable width vectors, the offsets and validity buffers are sliced from a
* single buffer. To ensure the single buffer is power-of-2 size, the initial value allocation
* should be less than power-of-2. For IntVectors, this comes to 3970*4 (15880) for the data
* buffer and 504 bytes for the validity buffer, totalling to 16384 (2^16).
*/
public static final int INITIAL_VALUE_ALLOCATION = 3970;

protected final BufferAllocator allocator;
protected final String name;
Expand Down Expand Up @@ -98,5 +106,94 @@ protected ArrowBuf releaseBuffer(ArrowBuf buffer) {
protected static int getValidityBufferSizeFromCount(final int valueCount) {
return (int) Math.ceil(valueCount / 8.0);
}

/* round up to the next multiple of 8 */
private static long roundUp8(long size) {
return ((size + 7) / 8) * 8;
}

protected long computeCombinedBufferSize(int valueCount, int typeWidth) {
Preconditions.checkArgument(valueCount >= 0, "valueCount must be >= 0");
Preconditions.checkArgument(typeWidth >= 0, "typeWidth must be >= 0");

// compute size of validity buffer.
long bufferSize = roundUp8(getValidityBufferSizeFromCount(valueCount));

// add the size of the value buffer.
if (typeWidth == 0) {
// for boolean type, value-buffer and validity-buffer are of same size.
bufferSize *= 2;
} else {
bufferSize += roundUp8(valueCount * typeWidth);
}
return BaseAllocator.nextPowerOfTwo(bufferSize);
}

class DataAndValidityBuffers {
private ArrowBuf dataBuf;
private ArrowBuf validityBuf;

DataAndValidityBuffers(ArrowBuf dataBuf, ArrowBuf validityBuf) {
this.dataBuf = dataBuf;
this.validityBuf = validityBuf;
}

public ArrowBuf getDataBuf() {
return dataBuf;
}

public ArrowBuf getValidityBuf() {
return validityBuf;
}

}

protected DataAndValidityBuffers allocFixedDataAndValidityBufs(int valueCount, int typeWidth) {
long bufferSize = computeCombinedBufferSize(valueCount, typeWidth);
assert bufferSize < MAX_ALLOCATION_SIZE;

int validityBufferSize;
int dataBufferSize;
if (typeWidth == 0) {
validityBufferSize = dataBufferSize = (int) (bufferSize / 2);
} else {
// Due to roundup to power-of-2 allocation, the bufferSize could be greater than the
// requested size. Utilize the allocated buffer fully.;
int actualCount = (int) ((bufferSize * 8.0) / (8 * typeWidth + 1));
do {
validityBufferSize = (int) roundUp8(getValidityBufferSizeFromCount(actualCount));
dataBufferSize = (int) roundUp8(actualCount * typeWidth);
if (validityBufferSize + dataBufferSize <= bufferSize) {
break;
}
--actualCount;
} while (true);
}


/* allocate combined buffer */
ArrowBuf combinedBuffer = allocator.buffer((int) bufferSize);

/* slice into requested lengths */
ArrowBuf dataBuf = null;
ArrowBuf validityBuf = null;
int bufferOffset = 0;
for (int numBuffers = 0; numBuffers < 2; ++numBuffers) {
int len = (numBuffers == 0 ? dataBufferSize : validityBufferSize);
ArrowBuf buf = combinedBuffer.slice(bufferOffset, len);
buf.retain();
buf.readerIndex(0);
buf.writerIndex(0);

bufferOffset += len;
if (numBuffers == 0) {
dataBuf = buf;
} else {
validityBuf = buf;
}
}
combinedBuffer.release();
return new DataAndValidityBuffers(dataBuf, validityBuf);
}
}

0 comments on commit bfe6865

Please sign in to comment.