Skip to content

Commit

Permalink
DRILL-3155: Part 2: Clear allocated memory for composite vectors if o…
Browse files Browse the repository at this point in the history
…ne of the allocations fails
  • Loading branch information
mehant committed Jun 3, 2015
1 parent 21de138 commit 09e46df
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 47 deletions.
61 changes: 51 additions & 10 deletions exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
Expand Up @@ -124,8 +124,21 @@ public void allocateNew() {


@Override @Override
public boolean allocateNewSafe() { public boolean allocateNewSafe() {
if(!values.allocateNewSafe()) return false; /* Boolean to keep track if all the memory allocations were successful
if(!bits.allocateNewSafe()) return false; * Used in the case of composite vectors when we need to allocate multiple
* buffers for multiple vectors. If one of the allocations failed we need to
* clear all the memory that we allocated
*/
boolean success = false;
try {
if(!values.allocateNewSafe()) return false;
if(!bits.allocateNewSafe()) return false;
success = true;
} finally {
if (!success) {
clear();
}
}
bits.zeroVector(); bits.zeroVector();
mutator.reset(); mutator.reset();
accessor.reset(); accessor.reset();
Expand All @@ -134,8 +147,13 @@ public boolean allocateNewSafe() {


@Override @Override
public void allocateNew(int totalBytes, int valueCount) { public void allocateNew(int totalBytes, int valueCount) {
values.allocateNew(totalBytes, valueCount); try {
bits.allocateNew(valueCount); values.allocateNew(totalBytes, valueCount);
bits.allocateNew(valueCount);
} catch(OutOfMemoryRuntimeException e){
clear();
throw e;
}
bits.zeroVector(); bits.zeroVector();
mutator.reset(); mutator.reset();
accessor.reset(); accessor.reset();
Expand Down Expand Up @@ -175,8 +193,13 @@ public int getCurrentSizeInBytes(){


@Override @Override
public void allocateNew() { public void allocateNew() {
values.allocateNew(); try {
bits.allocateNew(); values.allocateNew();
bits.allocateNew();
} catch(OutOfMemoryRuntimeException e) {
clear();
throw e;
}
bits.zeroVector(); bits.zeroVector();
mutator.reset(); mutator.reset();
accessor.reset(); accessor.reset();
Expand All @@ -185,8 +208,21 @@ public void allocateNew() {


@Override @Override
public boolean allocateNewSafe() { public boolean allocateNewSafe() {
if(!values.allocateNewSafe()) return false; /* Boolean to keep track if all the memory allocations were successful
if(!bits.allocateNewSafe()) return false; * Used in the case of composite vectors when we need to allocate multiple
* buffers for multiple vectors. If one of the allocations failed we need to
* clear all the memory that we allocated
*/
boolean success = false;
try {
if(!values.allocateNewSafe()) return false;
if(!bits.allocateNewSafe()) return false;
success = true;
} finally {
if (!success) {
clear();
}
}
bits.zeroVector(); bits.zeroVector();
mutator.reset(); mutator.reset();
accessor.reset(); accessor.reset();
Expand All @@ -195,8 +231,13 @@ public boolean allocateNewSafe() {


@Override @Override
public void allocateNew(int valueCount) { public void allocateNew(int valueCount) {
values.allocateNew(valueCount); try {
bits.allocateNew(valueCount); values.allocateNew(valueCount);
bits.allocateNew(valueCount);
} catch(OutOfMemoryRuntimeException e) {
clear();
throw e;
}
bits.zeroVector(); bits.zeroVector();
mutator.reset(); mutator.reset();
accessor.reset(); accessor.reset();
Expand Down
Expand Up @@ -177,18 +177,36 @@ public void copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector
} }




public boolean allocateNewSafe(){ public boolean allocateNewSafe() {
if(!offsets.allocateNewSafe()) return false; /* boolean to keep track if all the memory allocation were successful
* Used in the case of composite vectors when we need to allocate multiple
* buffers for multiple vectors. If one of the allocations failed we need to
* clear all the memory that we allocated
*/
boolean success = false;
try {
if(!offsets.allocateNewSafe()) return false;
if(!values.allocateNewSafe()) return false;
success = true;
} finally {
if (!success) {
clear();
}
}
offsets.zeroVector(); offsets.zeroVector();
if(!values.allocateNewSafe()) return false;
mutator.reset(); mutator.reset();
return true; return true;
} }


public void allocateNew() { public void allocateNew() {
offsets.allocateNew(); try {
offsets.allocateNew();
values.allocateNew();
} catch (OutOfMemoryRuntimeException e) {
clear();
throw e;
}
offsets.zeroVector(); offsets.zeroVector();
values.allocateNew();
mutator.reset(); mutator.reset();
} }


Expand All @@ -200,9 +218,14 @@ protected SerializedField.Builder getMetadataBuilder() {
} }


public void allocateNew(int totalBytes, int valueCount, int innerValueCount) { public void allocateNew(int totalBytes, int valueCount, int innerValueCount) {
offsets.allocateNew(valueCount+1); try {
offsets.allocateNew(valueCount+1);
values.allocateNew(totalBytes, innerValueCount);
} catch (OutOfMemoryRuntimeException e) {
clear();
throw e;
}
offsets.zeroVector(); offsets.zeroVector();
values.allocateNew(totalBytes, innerValueCount);
mutator.reset(); mutator.reset();
} }


Expand Down Expand Up @@ -230,9 +253,20 @@ public int getByteCapacity(){


public void allocateNew(int valueCount, int innerValueCount) { public void allocateNew(int valueCount, int innerValueCount) {
clear(); clear();
offsets.allocateNew(valueCount+1); /* boolean to keep track if all the memory allocation were successful
* Used in the case of composite vectors when we need to allocate multiple
* buffers for multiple vectors. If one of the allocations failed we need to
* clear all the memory that we allocated
*/
boolean success = false;
try {
offsets.allocateNew(valueCount+1);
values.allocateNew(innerValueCount);
} catch(OutOfMemoryRuntimeException e){
clear();
throw e;
}
offsets.zeroVector(); offsets.zeroVector();
values.allocateNew(innerValueCount);
mutator.reset(); mutator.reset();
} }


Expand Down
Expand Up @@ -18,6 +18,7 @@


import java.lang.Override; import java.lang.Override;


import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.BaseValueVector; import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.VariableWidthVector; import org.apache.drill.exec.vector.VariableWidthVector;
Expand Down Expand Up @@ -282,33 +283,48 @@ public boolean allocateNewSafe() {
allocationMonitor = 0; allocationMonitor = 0;
} }


DrillBuf newBuf = allocator.buffer(allocationTotalByteCount); /* Boolean to keep track if all the memory allocations were successful
if(newBuf == null){ * Used in the case of composite vectors when we need to allocate multiple
return false; * buffers for multiple vectors. If one of the allocations failed we need to
* clear all the memory that we allocated
*/
boolean success = false;
try {
DrillBuf newBuf = allocator.buffer(allocationTotalByteCount);
if (newBuf == null) {
return false;
}
this.data = newBuf;
if (!offsetVector.allocateNewSafe()) {
return false;
}
success = true;
} finally {
if (!success) {
clear();
}
} }

this.data = newBuf;
data.readerIndex(0); data.readerIndex(0);

if(!offsetVector.allocateNewSafe()){
return false;
}
offsetVector.zeroVector(); offsetVector.zeroVector();
return true; return true;
} }


public void allocateNew(int totalBytes, int valueCount) { public void allocateNew(int totalBytes, int valueCount) {
clear(); clear();
assert totalBytes >= 0; assert totalBytes >= 0;
DrillBuf newBuf = allocator.buffer(totalBytes); try {
if(newBuf == null){ DrillBuf newBuf = allocator.buffer(totalBytes);
throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes", totalBytes)); if (newBuf == null) {
throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes", totalBytes));
}
this.data = newBuf;
offsetVector.allocateNew(valueCount + 1);
} catch (OutOfMemoryRuntimeException e) {
clear();
throw e;
} }

this.data = newBuf;
data.readerIndex(0); data.readerIndex(0);
allocationTotalByteCount = totalBytes; allocationTotalByteCount = totalBytes;
offsetVector.allocateNew(valueCount+1);
offsetVector.zeroVector(); offsetVector.zeroVector();
} }


Expand Down
Expand Up @@ -52,9 +52,23 @@ protected AbstractMapVector(MaterializedField field, BufferAllocator allocator,


@Override @Override
public boolean allocateNewSafe() { public boolean allocateNewSafe() {
for (ValueVector v : vectors.values()) { /* boolean to keep track if all the memory allocation were successful
if (!v.allocateNewSafe()) { * Used in the case of composite vectors when we need to allocate multiple
return false; * buffers for multiple vectors. If one of the allocations failed we need to
* clear all the memory that we allocated
*/
boolean success = false;
try {

for (ValueVector v : vectors.values()) {
if (!v.allocateNewSafe()) {
return false;
}
}
success = true;
} finally {
if (!success) {
clear();
} }
} }
return true; return true;
Expand Down
Expand Up @@ -61,13 +61,27 @@ protected BaseRepeatedValueVector(MaterializedField field, BufferAllocator alloc


@Override @Override
public boolean allocateNewSafe() { public boolean allocateNewSafe() {
if (!offsets.allocateNewSafe()) { /* boolean to keep track if all the memory allocation were successful
return false; * Used in the case of composite vectors when we need to allocate multiple
* buffers for multiple vectors. If one of the allocations failed we need to
* clear all the memory that we allocated
*/
boolean success = false;
try {
if (!offsets.allocateNewSafe()) {
return false;
}
success = vector.allocateNewSafe();
} finally {
if (!success) {
clear();
}
} }
offsets.zeroVector(); offsets.zeroVector();
return vector.allocateNewSafe(); return success;
} }



@Override @Override
public UInt4Vector getOffsetVector() { public UInt4Vector getOffsetVector() {
return offsets; return offsets;
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.apache.drill.exec.expr.holders.ComplexHolder; import org.apache.drill.exec.expr.holders.ComplexHolder;
import org.apache.drill.exec.expr.holders.RepeatedMapHolder; import org.apache.drill.exec.expr.holders.RepeatedMapHolder;
import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TransferPair;
Expand Down Expand Up @@ -103,11 +104,16 @@ public RepeatedMapReaderImpl getReader() {
@Override @Override
public void allocateNew(int groupCount, int innerValueCount) { public void allocateNew(int groupCount, int innerValueCount) {
clear(); clear();
offsets.allocateNew(groupCount+1); try {
offsets.zeroVector(); offsets.allocateNew(groupCount + 1);
for (ValueVector v : getChildren()) { for (ValueVector v : getChildren()) {
AllocationHelper.allocatePrecomputedChildCount(v, groupCount, 50, innerValueCount); AllocationHelper.allocatePrecomputedChildCount(v, groupCount, 50, innerValueCount);
}
} catch (OutOfMemoryRuntimeException e){
clear();
throw e;
} }
offsets.zeroVector();
mutator.reset(); mutator.reset();
} }


Expand Down Expand Up @@ -216,11 +222,24 @@ public TransferPair getTransferPair(FieldReference ref) {


@Override @Override
public boolean allocateNewSafe() { public boolean allocateNewSafe() {
if (!offsets.allocateNewSafe()) { /* boolean to keep track if all the memory allocation were successful
return false; * Used in the case of composite vectors when we need to allocate multiple
* buffers for multiple vectors. If one of the allocations failed we need to
* clear all the memory that we allocated
*/
boolean success = false;
try {
if (!offsets.allocateNewSafe()) {
return false;
}
success = super.allocateNewSafe();
} finally {
if (!success) {
clear();
}
} }
offsets.zeroVector(); offsets.zeroVector();
return super.allocateNewSafe(); return success;
} }


protected static class SingleMapTransferPair implements TransferPair { protected static class SingleMapTransferPair implements TransferPair {
Expand Down

0 comments on commit 09e46df

Please sign in to comment.