Skip to content

Commit

Permalink
DRILL-2088: ReAlloc in FixedWidthVector.setValueCount if current buff…
Browse files Browse the repository at this point in the history
…er capacity is less than the valueCount.
  • Loading branch information
vkorukanti committed Feb 5, 2015
1 parent 90a2835 commit 8e8b181
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 73 deletions.
Expand Up @@ -803,6 +803,9 @@ public void setValueCount(int valueCount) {
int currentValueCapacity = getValueCapacity();
${minor.class}Vector.this.valueCount = valueCount;
int idx = (${type.width} * valueCount);
while(valueCount > getValueCapacity()) {
reAlloc();
}
if (valueCount > 0 && currentValueCapacity > valueCount * 2) {
incrementAllocationMonitor();
} else if (allocationMonitor > 0) {
Expand Down
Expand Up @@ -44,8 +44,6 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
private int allocationValueCount = INITIAL_VALUE_ALLOCATION;
private int allocationMonitor = 0;

private int valueCapacity;

public BitVector(MaterializedField field, BufferAllocator allocator) {
super(field, allocator);
}
Expand All @@ -62,6 +60,11 @@ private int getSizeFromCount(int valueCount) {
return (int) Math.ceil(valueCount / 8.0);
}

@Override
public int getValueCapacity() {
return data.capacity() * 8;
}

private int getByteIndex(int index) {
return (int) Math.floor(index / 8.0);
}
Expand All @@ -83,7 +86,6 @@ public boolean allocateNewSafe() {
}

clear();
valueCapacity = allocationValueCount;
int valueSize = getSizeFromCount(allocationValueCount);
data = allocator.buffer(valueSize);
if (data == null) {
Expand All @@ -101,7 +103,6 @@ public boolean allocateNewSafe() {
*/
public void allocateNew(int valueCount) {
clear();
valueCapacity = valueCount;
int valueSize = getSizeFromCount(valueCount);
data = allocator.buffer(valueSize);
zeroVector();
Expand All @@ -117,7 +118,6 @@ public void reAlloc() {
newBuf.setBytes(0, data, 0, data.capacity());
data.release();
data = newBuf;
valueCapacity = allocationValueCount;
}

/**
Expand Down Expand Up @@ -158,11 +158,6 @@ public void load(SerializedField metadata, DrillBuf buffer) {
assert metadata.getBufferLength() == loaded;
}

@Override
public int getValueCapacity() {
return valueCapacity;
}

public Mutator getMutator() {
return new Mutator();
}
Expand Down Expand Up @@ -381,7 +376,6 @@ public final void setValueCount(int valueCount) {
} else if (allocationMonitor > 0) {
allocationMonitor = 0;
}
data.readerIndex(data.writerIndex());
VectorTrimmer.trim(data, idx);
}

Expand Down
Expand Up @@ -18,71 +18,20 @@
package org.apache.drill.exec.physical.impl.svremover;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import mockit.Injectable;
import mockit.NonStrictExpectations;

import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.compile.CodeCompiler;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.physical.impl.SimpleRootExec;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.BaseTestQuery;
import org.junit.Test;

import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Charsets;
import com.google.common.io.Files;

public class TestSVRemover extends ExecTest {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSVRemover.class);
DrillConfig c = DrillConfig.create();


public class TestSVRemover extends BaseTestQuery {
@Test
public void testSelectionVectorRemoval(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable{
// System.out.println(System.getProperty("java.class.path"));


new NonStrictExpectations(){{
bitContext.getMetrics(); result = new MetricRegistry();
bitContext.getAllocator(); result = new TopLevelAllocator();
bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
bitContext.getConfig(); result = c;
bitContext.getCompiler(); result = CodeCompiler.getTestCompiler(c);
}};


PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/remover/test1.json"), Charsets.UTF_8));
FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
FragmentContext context = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry);
SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
while(exec.next()){
int count = exec.getRecordCount();
for(ValueVector v : exec){
ValueVector.Accessor a = v.getAccessor();
assertEquals(count, a.getValueCount());
}
}

if(context.getFailureCause() != null){
throw context.getFailureCause();
}
assertTrue(!context.isFailed());

public void testSelectionVectorRemoval() throws Exception {
int numOutputRecords = testPhysical(getFile("remover/test1.json"));
assertEquals(50, numOutputRecords);
}

@Test
public void testSVRWithNoFilter() throws Exception {
int numOutputRecords = testPhysical(getFile("remover/sv_with_no_filter.json"));
assertEquals(100, numOutputRecords);
}
}
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.record.vector;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;

import java.nio.charset.Charset;
Expand Down Expand Up @@ -308,4 +309,90 @@ public void testBitVector() {
assertEquals(0, v.getAccessor().get(3));
}


@Test
public void testReAllocNullableFixedWidthVector() throws Exception {
// Build an optional float field definition
MajorType floatType = MajorType.newBuilder()
.setMinorType(MinorType.FLOAT4)
.setMode(DataMode.OPTIONAL)
.setWidth(4).build();

MaterializedField field = MaterializedField.create(
SerializedField.newBuilder()
.setMajorType(floatType)
.build());

// Create a new value vector for 1024 integers
NullableFloat4Vector v = (NullableFloat4Vector) TypeHelper.getNewVector(field, allocator);
NullableFloat4Vector.Mutator m = v.getMutator();
v.allocateNew(1024);

assertEquals(1024, v.getValueCapacity());

// Put values in indexes that fall within the initial allocation
m.setSafe(0, 100.1f);
m.setSafe(100, 102.3f);
m.setSafe(1023, 104.5f);

// Now try to put values in space that falls beyond the initial allocation
m.setSafe(2000, 105.5f);

// Check valueCapacity is more than initial allocation
assertEquals(1024*2, v.getValueCapacity());

assertEquals(100.1f, v.getAccessor().get(0), 0);
assertEquals(102.3f, v.getAccessor().get(100), 0);
assertEquals(104.5f, v.getAccessor().get(1023), 0);
assertEquals(105.5f, v.getAccessor().get(2000), 0);


// Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors
// as we don't call setSafe for null values, but we do call setValueCount when all values are inserted into the
// vector
m.setValueCount(v.getValueCapacity() + 200);
}

@Test
public void testReAllocNullableVariableWidthVector() throws Exception {
// Build an optional float field definition
MajorType floatType = MajorType.newBuilder()
.setMinorType(MinorType.VARCHAR)
.setMode(DataMode.OPTIONAL)
.setWidth(4).build();

MaterializedField field = MaterializedField.create(
SerializedField.newBuilder()
.setMajorType(floatType)
.build());

// Create a new value vector for 1024 integers
NullableVarCharVector v = (NullableVarCharVector) TypeHelper.getNewVector(field, allocator);
NullableVarCharVector.Mutator m = v.getMutator();
v.allocateNew();

int initialCapacity = v.getValueCapacity();

// Put values in indexes that fall within the initial allocation
byte[] str1 = new String("AAAAA1").getBytes(Charset.forName("UTF-8"));
byte[] str2 = new String("BBBBBBBBB2").getBytes(Charset.forName("UTF-8"));
byte[] str3 = new String("CCCC3").getBytes(Charset.forName("UTF-8"));

m.setSafe(0, str1, 0, str1.length);
m.setSafe(initialCapacity - 1, str2, 0, str2.length);

// Now try to put values in space that falls beyond the initial allocation
m.setSafe(initialCapacity + 200, str3, 0, str3.length);

// Check valueCapacity is more than initial allocation
assertEquals((initialCapacity+1)*2-1, v.getValueCapacity());

assertArrayEquals(str1, v.getAccessor().get(0));
assertArrayEquals(str2, v.getAccessor().get(initialCapacity-1));
assertArrayEquals(str3, v.getAccessor().get(initialCapacity + 200));

// Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors
// as we don't call setSafe for null values, but we do call setValueCount when the current batch is processed.
m.setValueCount(v.getValueCapacity() + 200);
}
}
33 changes: 33 additions & 0 deletions exec/java-exec/src/test/resources/remover/sv_with_no_filter.json
@@ -0,0 +1,33 @@
{
head:{
type:"APACHE_DRILL_PHYSICAL",
version:"1",
generator:{
type:"manual"
}
},
graph:[
{
@id:1,
pop:"mock-scan",
url: "http://apache.org",
entries:[
{records: 100, types: [
{name: "blue", type: "BIT", mode: "REQUIRED"},
{name: "red", type: "BIGINT", mode: "REQUIRED"},
{name: "green", type: "INT", mode: "OPTIONAL"}
]}
]
},
{
@id:2,
child: 1,
pop:"selection-vector-remover"
},
{
@id: 3,
child: 2,
pop: "screen"
}
]
}
2 changes: 1 addition & 1 deletion exec/java-exec/src/test/resources/remover/test1.json
Expand Up @@ -9,7 +9,7 @@
graph:[
{
@id:1,
pop:"mock-sub-scan",
pop:"mock-scan",
url: "http://apache.org",
entries:[
{records: 100, types: [
Expand Down

0 comments on commit 8e8b181

Please sign in to comment.