Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
* TODO: Fix this so that preallocation can never be released back to general pool until allocator is closed.
*/
public class AtomicRemainder {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AtomicRemainder.class);

private static final boolean DEBUG = true;
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AtomicRemainder.class);

private final AtomicRemainder parent;
private final AtomicLong availableShared;
Expand Down Expand Up @@ -66,7 +64,7 @@ public long getUsed() {
/**
* Allow an allocator to constrain the remainder to a particular limit that is lower than the initTotal.
* If limit is larger than initTotal, then the function will do nothing and the hasLimit flag will not be set.
* @param limit
* @param limit new remainder limit
*/
public void setLimit(long limit) {
if(limit<initTotal){
Expand All @@ -76,16 +74,17 @@ public void setLimit(long limit) {

}
/**
* Automatically allocate memory. This is used when an actual allocation happened to be larger than requested. This
* memory has already been used up so it must be accurately accounted for in future allocations.
* Automatically allocate memory. This is used when an actual allocation happened to be larger than requested, or when
* a buffer has it's ownership passed to another allocator.<br>
* This memory has already been used up so it must be accurately accounted for in future allocations.
*
* @param size
* @param size extra allocated memory that needs to be accounted for
*/
public boolean forceGet(long size) {
if (get(size, this.applyFragmentLimit)) {
return true;
} else {
availableShared.addAndGet(size);
availableShared.addAndGet(-size);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this will be subtracting size bytes, should there be a check for availableShared >=0 ? I am not quite sure what's supposed to happen if this value drops below 0 at this point...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can become negative if the allocator takes ownership of a buffer and exceeds it's maximum allocated memory. Negative values are handled properly, at least that's my understanding

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is likely handled..I will defer to Chris on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call is used when transferring ownership from the RPC layer to a receiving fragment -- and you can't refuse it -- the RPC layer has to hand it off. The problem is that the accounting for buffer ownership transfer and sharing is questionable in the original allocator. This is probably the best that can be done for now.

if (parent != null) {
parent.forceGet(size);
}
Expand All @@ -108,22 +107,21 @@ public boolean get(long size, boolean applyFragmentLimitForChild) {
parent.returnAllocation(size);
}
StackTraceElement[] ste = (new Throwable()).getStackTrace();
StringBuffer sb = new StringBuffer();
StringBuilder sb = new StringBuilder();
for (StackTraceElement s : ste) {
sb.append(s.toString());
sb.append("\n");
}
logger.warn("No more memory. Fragment limit ("+this.limit +
" bytes) reached. Trying to allocate "+size+ " bytes. "+getUsed()+" bytes already allocated.\n"+sb.toString());
logger.warn("No more memory. Fragment limit ({} bytes) reached. Trying to allocate {} bytes. {} bytes " +
"already allocated.\n{}", limit, size, getUsed(), sb.toString());
return false;
}

// attempt to get shared memory, if fails, return false.
long outcome = availableShared.addAndGet(-size);
// assert outcome <= initShared;
if (outcome < 0) {
long newAvailableShared = availableShared.addAndGet(size);
// assert newAvailableShared <= initShared;
availableShared.addAndGet(size);
if (parent != null) {
parent.returnAllocation(size);
}
Expand Down Expand Up @@ -163,7 +161,9 @@ public boolean get(long size, boolean applyFragmentLimitForChild) {
// we failed to get space from available shared. Return allocations to initial state.
availablePrivate.addAndGet(size);
availableShared.addAndGet(additionalSpaceNeeded);
parent.returnAllocation(additionalSpaceNeeded);
if (parent != null) {
parent.returnAllocation(additionalSpaceNeeded);
}
return false;
}
}
Expand All @@ -175,7 +175,7 @@ public boolean get(long size, boolean applyFragmentLimitForChild) {
/**
* Return the memory accounting to the allocation pool. Make sure to first maintain hold of the preallocated memory.
*
* @param size
* @param size amount of memory returned
*/
public void returnAllocation(long size) {
long privateSize = availablePrivate.get();
Expand All @@ -188,7 +188,6 @@ public void returnAllocation(long size) {
if (parent != null) {
parent.returnAllocation(sharedChange);
}
assert getUsed() <= initTotal;
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,29 @@ public class TestAllocators {

private final static String planFile="/physical_allocator_test.json";

@Test
public void testTransfer() throws Exception {
final Properties props = new Properties() {
{
put(ExecConstants.TOP_LEVEL_MAX_ALLOC, "1000000");
put(ExecConstants.ERROR_ON_MEMORY_LEAK, "true");
}
};
final DrillConfig config = DrillConfig.create(props);
BufferAllocator a = RootAllocatorFactory.newRoot(config);
BufferAllocator b = RootAllocatorFactory.newRoot(config);

DrillBuf buf1 = a.buffer(1_000_000);
DrillBuf buf2 = b.buffer(1_000);
b.takeOwnership(buf1);

buf1.release();
buf2.release();

a.close();
b.close();
}

@Test
public void testAllocators() throws Exception {
// Setup a drillbit (initializes a root allocator)
Expand Down