Skip to content

Commit

Permalink
DRILL-2178: Update outgoing record batch size and allocation in Parti…
Browse files Browse the repository at this point in the history
…tionSender.

Also:
 + Add setInitialCapacity() method to ValueVector interface set the initial capacity
   of memory allocated in first allocateNew() call.
 + Send an empty batch for fast schema instead of flushing the OutgoingRecordBatches
   which throw away allocated memory and reallocate again.
 + Remove the v.getValueVector().makeTransferPair(outgoingVector) hack as the complex
   schema child schema population bug is fixed in DRILL-1885.
 + Cleanup/refactor PartitionSender related code.
  • Loading branch information
vkorukanti committed Mar 1, 2015
1 parent c8a241b commit 9c0738d
Show file tree
Hide file tree
Showing 17 changed files with 253 additions and 237 deletions.
Expand Up @@ -63,6 +63,10 @@ public Mutator getMutator(){
return mutator;
}

@Override
public void setInitialCapacity(int numRecords) {
allocationValueCount = numRecords;
}

public void allocateNew() {
if(!allocateNewSafe()){
Expand Down
Expand Up @@ -100,6 +100,12 @@ public DrillBuf getData(){
return values;
}

@Override
public void setInitialCapacity(int numRecords) {
bits.setInitialCapacity(numRecords);
values.setInitialCapacity(numRecords);
}

<#if type.major == "VarLen">
@Override
public SerializedField getMetadata() {
Expand Down
Expand Up @@ -174,6 +174,12 @@ public void copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector
}
}

@Override
public void setInitialCapacity(int numRecords) {
offsets.setInitialCapacity(numRecords + 1);
values.setInitialCapacity(numRecords * DEFAULT_REPEAT_PER_RECORD);
}

public boolean allocateNewSafe(){
if(!offsets.allocateNewSafe()) return false;
offsets.zeroVector();
Expand Down
Expand Up @@ -50,7 +50,8 @@
public final class ${minor.class}Vector extends BaseDataValueVector implements VariableWidthVector{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${minor.class}Vector.class);

private static final int INITIAL_BYTE_COUNT = 32768;
private static final int DEFAULT_RECORD_BYTE_COUNT = 8;
private static final int INITIAL_BYTE_COUNT = 4096 * DEFAULT_RECORD_BYTE_COUNT;
private static final int MIN_BYTE_COUNT = 4096;

private final UInt${type.width}Vector offsetVector;
Expand Down Expand Up @@ -241,6 +242,12 @@ public void copyValueSafe(int fromIndex, int toIndex) {
}
}

@Override
public void setInitialCapacity(int numRecords) {
allocationTotalByteCount = numRecords * DEFAULT_RECORD_BYTE_COUNT;
offsetVector.setInitialCapacity(numRecords + 1);
}

public void allocateNew() {
if(!allocateNewSafe()){
throw new OutOfMemoryRuntimeException("Failure while allocating buffer.");
Expand Down
Expand Up @@ -64,20 +64,4 @@ public QueryResult getHeader() {
public String toString() {
return "QueryWritableBatch [header=" + header + ", buffers=" + Arrays.toString(buffers) + "]";
}

public static QueryWritableBatch getEmptyBatchWithSchema(QueryId queryId, int rowCount, boolean isLastChunk, BatchSchema schema) {
List<SerializedField> fields = Lists.newArrayList();
for (MaterializedField field : schema) {
fields.add(field.getAsBuilder().build());
}
RecordBatchDef def = RecordBatchDef.newBuilder().addAllField(fields).build();
QueryResult header = QueryResult.newBuilder() //
.setQueryId(queryId) //
.setRowCount(rowCount) //
.setDef(def) //
.setIsLastChunk(isLastChunk) //
.build();
return new QueryWritableBatch(header);
}

}
Expand Up @@ -90,7 +90,6 @@ public PartitionSenderRootExec(FragmentContext context,
RecordBatch incoming,
HashPartitionSender operator) throws OutOfMemoryException {
super(context, new OperatorContext(operator, context, null, false), operator);
//super(context, operator);
this.incoming = incoming;
this.operator = operator;
this.context = context;
Expand All @@ -101,24 +100,6 @@ public PartitionSenderRootExec(FragmentContext context,
this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount);
}

private boolean done() {
for (int i = 0; i < remainingReceivers.length(); i++) {
if (remainingReceivers.get(i) == 0) {
return false;
}
}
return true;
}

private void buildSchema() throws SchemaChangeException {
createPartitioner();
try {
partitioner.flushOutgoingBatches(false, true);
} catch (IOException e) {
throw new SchemaChangeException(e);
}
}

@Override
public boolean innerNext() {

Expand Down Expand Up @@ -147,7 +128,7 @@ public boolean innerNext() {
if(partitioner != null) {
partitioner.flushOutgoingBatches(true, false);
} else {
sendEmptyBatch();
sendEmptyBatch(true);
}
} catch (IOException e) {
incoming.kill(false);
Expand All @@ -170,10 +151,11 @@ public boolean innerNext() {
partitioner.clear();
}
createPartitioner();
// flush to send schema downstream

if (first) {
// Send an empty batch for fast schema
first = false;
partitioner.flushOutgoingBatches(false, true);
sendEmptyBatch(false);
}
} catch (IOException e) {
incoming.kill(false);
Expand Down Expand Up @@ -233,7 +215,6 @@ private void createPartitioner() throws SchemaChangeException {

try {
// compile and setup generated code
// partitioner = context.getImplementationClassMultipleOutput(cg);
partitioner = context.getImplementationClass(cg);
partitioner.setup(context, incoming, popConfig, stats, sendCount, oContext, statusHandler);

Expand Down Expand Up @@ -285,27 +266,28 @@ public void stop() {
incoming.cleanup();
}

public void sendEmptyBatch() {
public void sendEmptyBatch(boolean isLast) {
FragmentHandle handle = context.getHandle();
int fieldId = 0;
StatusHandler statusHandler = new StatusHandler(sendCount, context);
for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
DataTunnel tunnel = context.getDataTunnel(endpoint, opposite);
FragmentWritableBatch writableBatch = FragmentWritableBatch.getEmptyLastWithSchema(
handle.getQueryId(),
handle.getMajorFragmentId(),
handle.getMinorFragmentId(),
operator.getOppositeMajorFragmentId(),
fieldId,
incoming.getSchema());
FragmentWritableBatch writableBatch = FragmentWritableBatch.getEmptyBatchWithSchema(
isLast,
handle.getQueryId(),
handle.getMajorFragmentId(),
handle.getMinorFragmentId(),
operator.getOppositeMajorFragmentId(),
fieldId,
incoming.getSchema());
stats.startWait();
try {
tunnel.sendRecordBatch(statusHandler, writableBatch);
} finally {
stats.stopWait();
}
this.sendCount.increment();
sendCount.increment();
fieldId++;
}
}
Expand Down

0 comments on commit 9c0738d

Please sign in to comment.