Skip to content

Commit

Permalink
DRILL-2719: ValueVector#getBuffers(clear) must consistently clear vec…
Browse files Browse the repository at this point in the history
…tors & retain buffers
  • Loading branch information
Hanifi Gunes authored and mehant committed Apr 16, 2015
1 parent a7e9590 commit caf779d
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 18 deletions.
Expand Up @@ -79,8 +79,11 @@ public void setCurrentValueCount(int count) {


@Override @Override
public DrillBuf[] getBuffers(boolean clear) { public DrillBuf[] getBuffers(boolean clear) {
DrillBuf[] buffers = ObjectArrays.concat(bits.getBuffers(clear), values.getBuffers(clear), DrillBuf.class); DrillBuf[] buffers = ObjectArrays.concat(bits.getBuffers(false), values.getBuffers(false), DrillBuf.class);
if (clear) { if (clear) {
for (DrillBuf buffer:buffers) {
buffer.retain();
}
clear(); clear();
} }
return buffers; return buffers;
Expand Down
Expand Up @@ -285,8 +285,11 @@ public void load(SerializedField metadata, DrillBuf buffer) {


@Override @Override
public DrillBuf[] getBuffers(boolean clear) { public DrillBuf[] getBuffers(boolean clear) {
DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(clear), values.getBuffers(clear), DrillBuf.class); DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(false), values.getBuffers(false), DrillBuf.class);
if (clear) { if (clear) {
for (DrillBuf buffer:buffers) {
buffer.retain();
}
clear(); clear();
} }
return buffers; return buffers;
Expand Down
Expand Up @@ -148,8 +148,12 @@ public void clear() {


@Override @Override
public DrillBuf[] getBuffers(boolean clear) { public DrillBuf[] getBuffers(boolean clear) {
DrillBuf[] buffers = ObjectArrays.concat(offsetVector.getBuffers(clear), super.getBuffers(clear), DrillBuf.class); DrillBuf[] buffers = ObjectArrays.concat(offsetVector.getBuffers(false), super.getBuffers(false), DrillBuf.class);
if (clear) { if (clear) {
// does not make much sense but we have to retain buffers even when clear is set. refactor this interface.
for (DrillBuf buffer:buffers) {
buffer.retain();
}
clear(); clear();
} }
return buffers; return buffers;
Expand Down
Expand Up @@ -23,7 +23,6 @@


import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.DeadBuf;
import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.MaterializedField;


import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
Expand All @@ -36,22 +35,16 @@ public abstract class BaseDataValueVector extends BaseValueVector{


public BaseDataValueVector(MaterializedField field, BufferAllocator allocator) { public BaseDataValueVector(MaterializedField field, BufferAllocator allocator) {
super(field, allocator); super(field, allocator);

this.data = allocator.getEmpty();
} }


/** /**
* Release the underlying DrillBuf and reset the ValueVector * Release the underlying DrillBuf and reset the ValueVector
*/ */
@Override @Override
public void clear() { public void clear() {
if (data == null) { data.release();
data = DeadBuf.DEAD_BUFFER; data = allocator.getEmpty();
}
if (data != DeadBuf.DEAD_BUFFER) {
data.release();
data = data.getAllocator().getEmpty();
valueCount = 0;
}
} }


public void setCurrentValueCount(int count) { public void setCurrentValueCount(int count) {
Expand All @@ -66,7 +59,7 @@ public int getCurrentValueCount() {
@Override @Override
public DrillBuf[] getBuffers(boolean clear) { public DrillBuf[] getBuffers(boolean clear) {
DrillBuf[] out; DrillBuf[] out;
if (valueCount == 0) { if (getBufferSize() == 0) {
out = new DrillBuf[0]; out = new DrillBuf[0];
} else { } else {
out = new DrillBuf[]{data}; out = new DrillBuf[]{data};
Expand All @@ -82,7 +75,7 @@ public DrillBuf[] getBuffers(boolean clear) {
} }


public int getBufferSize() { public int getBufferSize() {
if (valueCount == 0) { if (getAccessor().getValueCount() == 0) {
return 0; return 0;
} }
return data.writerIndex(); return data.writerIndex();
Expand Down
Expand Up @@ -216,9 +216,15 @@ public VectorWithOrdinal getChildVectorWithOrdinal(String name) {
public DrillBuf[] getBuffers(boolean clear) { public DrillBuf[] getBuffers(boolean clear) {
List<DrillBuf> buffers = Lists.newArrayList(); List<DrillBuf> buffers = Lists.newArrayList();


for (ValueVector v : vectors.values()) { for (ValueVector vector : vectors.values()) {
for (DrillBuf buf : v.getBuffers(clear)) { for (DrillBuf buf : vector.getBuffers(false)) {
buffers.add(buf); buffers.add(buf);
if (clear) {
buf.retain();
}
}
if (clear) {
vector.clear();
} }
} }


Expand Down
Expand Up @@ -345,7 +345,15 @@ public RepeatedListAccessor getAccessor() {


@Override @Override
public DrillBuf[] getBuffers(boolean clear) { public DrillBuf[] getBuffers(boolean clear) {
return ArrayUtils.addAll(offsets.getBuffers(clear), vector.getBuffers(clear)); DrillBuf[] buffers = ArrayUtils.addAll(offsets.getBuffers(false), vector.getBuffers(false));
if (clear) {
// does not make much sense but we have to retain buffers even when clear is set. refactor this interface.
for (DrillBuf buffer:buffers) {
buffer.retain();
}
clear();
}
return buffers;
} }


protected void setVector(ValueVector newVector) { protected void setVector(ValueVector newVector) {
Expand Down
Expand Up @@ -43,11 +43,13 @@ public class TestEmptyPopulator extends ExecTest {
private EmptyValuePopulator populator; private EmptyValuePopulator populator;


private final ByteBuffer buffer = ByteBuffer.allocateDirect(BUF_SIZE); private final ByteBuffer buffer = ByteBuffer.allocateDirect(BUF_SIZE);
private final ByteBuffer empty = ByteBuffer.allocateDirect(0);




@Before @Before
public void initialize() { public void initialize() {
Mockito.when(allocator.buffer(Mockito.anyInt())).thenReturn(DrillBuf.wrapByteBuffer(buffer)); Mockito.when(allocator.buffer(Mockito.anyInt())).thenReturn(DrillBuf.wrapByteBuffer(buffer));
Mockito.when(allocator.getEmpty()).thenReturn(DrillBuf.wrapByteBuffer(empty));
offsets = new UInt4Vector(null, allocator); offsets = new UInt4Vector(null, allocator);
offsets.allocateNewSafe(); offsets.allocateNewSafe();
accessor = offsets.getAccessor(); accessor = offsets.getAccessor();
Expand Down

0 comments on commit caf779d

Please sign in to comment.