Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-1807: [Java] consolidate bufs to reduce heap #3121

Merged
merged 1 commit into from Dec 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -22,11 +22,8 @@
import java.util.IdentityHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.arrow.memory.BaseAllocator.Verbosity;
import org.apache.arrow.memory.util.AutoCloseableLock;
import org.apache.arrow.memory.util.HistoricalLog;
import org.apache.arrow.util.Preconditions;

Expand Down Expand Up @@ -73,9 +70,6 @@ public class AllocationManager {
// ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap
// see JIRA for details
private final LowCostIdentityHashMap<BaseAllocator, BufferLedger> map = new LowCostIdentityHashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock());
private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock());
private final long amCreationTime = System.nanoTime();

private volatile BufferLedger owningLedger;
Expand Down Expand Up @@ -115,30 +109,16 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
"A buffer can only be associated between two allocators that share the same root.");
}

try (AutoCloseableLock read = readLock.open()) {

final BufferLedger ledger = map.get(allocator);
synchronized (this) {
BufferLedger ledger = map.get(allocator);
if (ledger != null) {
if (retain) {
ledger.inc();
}
return ledger;
}

}
try (AutoCloseableLock write = writeLock.open()) {
// we have to recheck existing ledger since a second reader => writer could be competing
// with us.

final BufferLedger existingLedger = map.get(allocator);
if (existingLedger != null) {
if (retain) {
existingLedger.inc();
}
return existingLedger;
}

final BufferLedger ledger = new BufferLedger(allocator);
ledger = new BufferLedger(allocator);
if (retain) {
ledger.inc();
}
Expand All @@ -153,7 +133,7 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
* The way that a particular BufferLedger communicates back to the AllocationManager that it
* now longer needs to hold
* a reference to particular piece of memory.
* Can only be called when you already hold the writeLock.
* Can only be called when you already hold the lock.
*/
private void release(final BufferLedger ledger) {
final BaseAllocator allocator = ledger.getAllocator();
Expand Down Expand Up @@ -250,7 +230,7 @@ public boolean transferBalance(final BufferLedger target) {
// since two balance transfers out from the allocator manager could cause incorrect
// accounting, we need to ensure
// that this won't happen by synchronizing on the allocator manager instance.
try (AutoCloseableLock write = writeLock.open()) {
synchronized (this) {
if (owningLedger != this) {
return true;
}
Expand Down Expand Up @@ -330,7 +310,7 @@ public int decrement(int decrement) {
allocator.assertOpen();

final int outcome;
try (AutoCloseableLock write = writeLock.open()) {
synchronized (this) {
outcome = bufRefCnt.addAndGet(-decrement);
if (outcome == 0) {
lDestructionTime = System.nanoTime();
Expand Down Expand Up @@ -431,7 +411,7 @@ public int getSize() {
* @return Amount of accounted(owned) memory associated with this ledger.
*/
public int getAccountedSize() {
try (AutoCloseableLock read = readLock.open()) {
synchronized (this) {
if (owningLedger == this) {
return size;
} else {
Expand Down
Expand Up @@ -270,7 +270,7 @@ public boolean allocateNewSafe() {
long curAllocationSizeValue = valueAllocationSizeInBytes;
long curAllocationSizeValidity = validityAllocationSizeInBytes;

if (curAllocationSizeValue > MAX_ALLOCATION_SIZE) {
if (align(curAllocationSizeValue) + curAllocationSizeValidity > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Requested amount of memory exceeds limit");
}

Expand Down Expand Up @@ -302,7 +302,7 @@ public void allocateNew(int valueCount) {
valueBufferSize = validityBufferSize;
}

if (valueBufferSize > MAX_ALLOCATION_SIZE) {
if (align(valueBufferSize) + validityBufferSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
}

Expand All @@ -317,6 +317,13 @@ public void allocateNew(int valueCount) {
}
}

/*
* align to a 8-byte value.
*/
private long align(long size) {
return ((size + 7) / 8) * 8;
}

/**
* Actual memory allocation is done by this function. All the calculations
* and knowledge about what size to allocate is upto the callers of this
Expand All @@ -327,14 +334,24 @@ public void allocateNew(int valueCount) {
* conditions.
*/
private void allocateBytes(final long valueBufferSize, final long validityBufferSize) {
/* allocate data buffer */
int curSize = (int) valueBufferSize;
valueBuffer = allocator.buffer(curSize);
int valueBufferSlice = (int)align(valueBufferSize);
int validityBufferSlice = (int)validityBufferSize;

/* allocate combined buffer */
ArrowBuf buffer = allocator.buffer(valueBufferSlice + validityBufferSlice);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a bit why does this reduce heap usage? We still end up with two ArrowBuf objects, but the fact that they are slices of a single ArrowBuf allows them to share some heap data structure?

Copy link
Contributor Author

@pravindra pravindra Dec 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

The current heap cost (shallow counts) for buffers in a IntVector is:

io.netty.buffer.ArrowBuf -> 2 * 109 = 218
io.netty.buffer.PooledUnsafeDirectBuf -> 2 * 116 = 232
io.netty.buffer.UnsafeDirectLittleEndian -> 2 * 48 = 96
io.netty.util.Recycler$DefaultHandle -> 2 * 41 = 82
arrow.memory.AllocationManager -> 2 * 100 = 200
arrow.memory.AllocationManager$BufferLedger -> 2 * 80 = 160
java.util.concurrent.locks.ReentrantReadWriteLock* -> 2 * 180 = 360
arrow.memory.AutoCloseableLock -> 4 * 24 = 96
arrow.memory.LowCostIdentityHashMap -> 2 * 32 = 64

Before Total = 1508 bytes

My change removes the locks, and shares all objects above except ArrowBuf

io.netty.buffer.ArrowBuf -> 2 * 109 = 218
io.netty.buffer.PooledUnsafeDirectBuf -> 1 * 116 = 116
io.netty.buffer.UnsafeDirectLittleEndian -> 1 * 48 = 48
io.netty.util.Recycler$DefaultHandle -> 1 * 41 = 41
arrow.memory.AllocationManager -> 1 * 76 = 76
arrow.memory.AllocationManager$BufferLedger -> 1 * 80 = 80
arrow.memory.LowCostIdentityHashMap -> 1 * 32 = 32

After total = 611 bytes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

We had recently done the exact same optimization inside Dremio code by slicing a single large ArrowBuf and handing over the resulting buffers to vectors. This reduced the heap overhead (due to large volume of ArrowBufs) from 1GB to 250MB for very memory intensive aggregation queries.

https://github.com/dremio/dremio-oss/blob/master/sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate/vectorized/AccumulatorSet.java#L93

This reduced heap overhead drastically but the downside was OOMs became frequent since we are asking for a very largebuffer (due to combining both into one and allocator's power of 2 semantics). So we implemented two additional variants of the algorithm to optimize usage of both heap and direct memory.

I don't think those concerns are applicable here in the context of a single vector.

However, I am just trying to recall that when we created this JIRA with the goal to reduce heap usage in vectors, I think the proposal was to just have a single buffer as opposed to having two buffers sliced from a single buffer and then getting rid of latter.

For example, in case of fixed width vectors, we can pack validity and data into a single buffer. For variable width vectors, we can pack offsets and validity into a single buffer. Similarly for list, we can combine offset and validity into one buffer.

I am wondering if that is even needed now since the heap reduction due to sliced buffer technique is significant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if that is even needed now since the heap reduction due to sliced buffer
technique is significant

yes, @siddharthteotia. We could save another 80 odd bytes on the heap by using a single buffer but it causes complexity in the code due to the following reasons

  • There are places in the code where the caller assumes a FixedWidthVector to contain two arrow buffers (one for validity and one for value).
  • When we do a splitAndTransfer of an existing FixedWidthVector, the validity and data portions will not be contiguous. If the split happens at an unaligned boundary, we allocate a new validity buffer, but retain the value buffer. so, there will be too many cases to deal with (both contiguous, both in same buffer but not contiguous, in different buffers).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes combining two buffers into 1 will require a lot of changes -- both in arrow and downstream consumers like Arrow where we assume the number of buffers.

I am merging this


valueAllocationSizeInBytes = valueBufferSlice;
valueBuffer = buffer.slice(0, valueBufferSlice);
valueBuffer.retain();
valueBuffer.readerIndex(0);
valueAllocationSizeInBytes = curSize;
/* allocate validity buffer */
allocateValidityBuffer((int) validityBufferSize);

validityAllocationSizeInBytes = validityBufferSlice;
validityBuffer = buffer.slice(valueBufferSlice, validityBufferSlice);
validityBuffer.retain();
validityBuffer.readerIndex(0);
zeroVector();

buffer.release();
}

/**
Expand Down Expand Up @@ -422,43 +439,50 @@ public ArrowBuf[] getBuffers(boolean clear) {
*/
@Override
public void reAlloc() {
valueBuffer = reallocBufferHelper(valueBuffer, true);
validityBuffer = reallocBufferHelper(validityBuffer, false);
}

/**
* Helper method for reallocating a particular internal buffer
* Returns the new buffer.
*/
private ArrowBuf reallocBufferHelper(ArrowBuf buffer, final boolean dataBuffer) {
final int currentBufferCapacity = buffer.capacity();
long baseSize = (dataBuffer ? valueAllocationSizeInBytes
: validityAllocationSizeInBytes);
int valueBaseSize = Integer.max(valueBuffer.capacity(), valueAllocationSizeInBytes);
long newValueBufferSlice = align(valueBaseSize * 2L);
long newValidityBufferSlice;
if (typeWidth > 0) {
long targetValueBufferSize = align(BaseAllocator.nextPowerOfTwo(newValueBufferSlice));
long targetValueCount = targetValueBufferSize / typeWidth;
targetValueBufferSize -= getValidityBufferSizeFromCount((int) targetValueCount);
if (newValueBufferSlice < targetValueBufferSize) {
newValueBufferSlice = targetValueBufferSize;
}

if (baseSize < (long) currentBufferCapacity) {
baseSize = (long) currentBufferCapacity;
newValidityBufferSlice = getValidityBufferSizeFromCount((int)(newValueBufferSlice / typeWidth));
} else {
newValidityBufferSlice = newValueBufferSlice;
}

long newAllocationSize = baseSize * 2L;
newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);
long newAllocationSize = newValueBufferSlice + newValidityBufferSlice;
assert newAllocationSize >= 1;

if (newAllocationSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Unable to expand the buffer");
}

final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize);
newBuf.setBytes(0, buffer, 0, currentBufferCapacity);
newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
buffer.release(1);
buffer = newBuf;
if (dataBuffer) {
valueAllocationSizeInBytes = (int) newAllocationSize;
} else {
validityAllocationSizeInBytes = (int) newAllocationSize;
}
final ArrowBuf newBuffer = allocator.buffer((int) newAllocationSize);
final ArrowBuf newValueBuffer = newBuffer.slice(0, (int)newValueBufferSlice);
newValueBuffer.setBytes(0, valueBuffer, 0, valueBuffer.capacity());
newValueBuffer.setZero(valueBuffer.capacity(), (int)newValueBufferSlice - valueBuffer.capacity());
newValueBuffer.retain();
newValueBuffer.readerIndex(0);
valueBuffer.release();
valueBuffer = newValueBuffer;
valueAllocationSizeInBytes = (int)newValueBufferSlice;

final ArrowBuf newValidityBuffer = newBuffer.slice((int)newValueBufferSlice,
(int)newValidityBufferSlice);
newValidityBuffer.setBytes(0, validityBuffer, 0, validityBuffer.capacity());
newValidityBuffer.setZero(validityBuffer.capacity(), (int)newValidityBufferSlice - validityBuffer.capacity());
newValidityBuffer.retain();
newValidityBuffer.readerIndex(0);
validityBuffer.release();
validityBuffer = newValidityBuffer;
validityAllocationSizeInBytes = (int)newValidityBufferSlice;

return buffer;
newBuffer.release();
}

@Override
Expand Down
Expand Up @@ -40,15 +40,14 @@ public void testTransferFixedWidth() {
IntVector v1 = new IntVector("v1", childAllocator1);
v1.allocateNew();
v1.setValueCount(4095);
long totalAllocatedMemory = childAllocator1.getAllocatedMemory();

IntVector v2 = new IntVector("v2", childAllocator2);

v1.makeTransferPair(v2).transfer();

assertEquals(0, childAllocator1.getAllocatedMemory());
int expectedBitVector = 512;
int expectedValueVector = 4096 * 4;
assertEquals(expectedBitVector + expectedValueVector, childAllocator2.getAllocatedMemory());
assertEquals(totalAllocatedMemory, childAllocator2.getAllocatedMemory());
}

@Test
Expand Down
Expand Up @@ -774,13 +774,13 @@ public void testSetInitialCapacity() {
vector.setInitialCapacity(512);
vector.allocateNew();
assertEquals(512, vector.getValueCapacity());
assertEquals(4096, vector.getDataVector().getValueCapacity());
assertTrue(vector.getDataVector().getValueCapacity() >= 512 * 5);

/* use density as 4 */
vector.setInitialCapacity(512, 4);
vector.allocateNew();
assertEquals(512, vector.getValueCapacity());
assertEquals(512 * 4, vector.getDataVector().getValueCapacity());
assertTrue(vector.getDataVector().getValueCapacity() >= 512 * 4);

/**
* inner value capacity we pass to data vector is 512 * 0.1 => 51
Expand All @@ -793,7 +793,7 @@ public void testSetInitialCapacity() {
vector.setInitialCapacity(512, 0.1);
vector.allocateNew();
assertEquals(512, vector.getValueCapacity());
assertEquals(64, vector.getDataVector().getValueCapacity());
assertTrue(vector.getDataVector().getValueCapacity() >= 51);

/**
* inner value capacity we pass to data vector is 512 * 0.01 => 5
Expand All @@ -806,7 +806,7 @@ public void testSetInitialCapacity() {
vector.setInitialCapacity(512, 0.01);
vector.allocateNew();
assertEquals(512, vector.getValueCapacity());
assertEquals(8, vector.getDataVector().getValueCapacity());
assertTrue(vector.getDataVector().getValueCapacity() >= 5);

/**
* inner value capacity we pass to data vector is 5 * 0.1 => 0
Expand All @@ -822,7 +822,7 @@ public void testSetInitialCapacity() {
vector.setInitialCapacity(5, 0.1);
vector.allocateNew();
assertEquals(7, vector.getValueCapacity());
assertEquals(1, vector.getDataVector().getValueCapacity());
assertTrue(vector.getDataVector().getValueCapacity() >= 1);
}
}

Expand Down