Skip to content

Commit

Permalink
DRILL-6687: Improve RemovingRecordBatch to do transfer when all recor…
Browse files Browse the repository at this point in the history
…ds needs to be copied

Add optimization in SelectionVector2 to enable RemovingRecordBatch to transfer ValueVectors from incoming to output container when all records needs to be copied.
Modified FilterRecordBatch and LimitRecordBatch to play by this optimization
  • Loading branch information
Sorabh Hamirwasia authored and sohami committed Aug 14, 2018
1 parent 6ad0f9f commit 9725d4d
Show file tree
Hide file tree
Showing 20 changed files with 293 additions and 151 deletions.
Expand Up @@ -47,7 +47,7 @@
import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder; import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
import org.apache.drill.exec.physical.impl.svremover.Copier; import org.apache.drill.exec.physical.impl.svremover.Copier;
import org.apache.drill.exec.physical.impl.svremover.GenericSV4Copier; import org.apache.drill.exec.physical.impl.svremover.GenericCopierFactory;
import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
Expand Down Expand Up @@ -360,7 +360,7 @@ private void purge() throws SchemaChangeException {
SelectionVector4 selectionVector4 = priorityQueue.getSv4(); SelectionVector4 selectionVector4 = priorityQueue.getSv4();
SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context); SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
if (copier == null) { if (copier == null) {
copier = GenericSV4Copier.createCopier(batch, newContainer, null); copier = GenericCopierFactory.createAndSetupCopier(batch, newContainer, null);
} else { } else {
for (VectorWrapper<?> i : batch) { for (VectorWrapper<?> i : batch) {


Expand Down Expand Up @@ -468,7 +468,7 @@ public void purgeAndResetPriorityQueue() throws SchemaChangeException, ClassTran
@SuppressWarnings("resource") @SuppressWarnings("resource")
final SelectionVector4 selectionVector4 = priorityQueue.getSv4(); final SelectionVector4 selectionVector4 = priorityQueue.getSv4();
final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context); final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
copier = GenericSV4Copier.createCopier(batch, newContainer, null); copier = GenericCopierFactory.createAndSetupCopier(batch, newContainer, null);
@SuppressWarnings("resource") @SuppressWarnings("resource")
SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator()); SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
try { try {
Expand Down
Expand Up @@ -67,11 +67,18 @@ public void filterBatch(int recordCount) throws SchemaChangeException{
if (! outgoingSelectionVector.allocateNewSafe(recordCount)) { if (! outgoingSelectionVector.allocateNewSafe(recordCount)) {
throw new OutOfMemoryException("Unable to allocate filter batch"); throw new OutOfMemoryException("Unable to allocate filter batch");
} }

switch(svMode){ switch(svMode){
case NONE: case NONE:
// Set the actual recordCount in outgoing selection vector to help SVRemover copy the entire
// batch if possible at once rather than row-by-row
outgoingSelectionVector.setBatchActualRecordCount(recordCount);
filterBatchNoSV(recordCount); filterBatchNoSV(recordCount);
break; break;
case TWO_BYTE: case TWO_BYTE:
// Set the actual recordCount in outgoing selection vector to help SVRemover copy the entire
// batch if possible at once rather than row-by-row
outgoingSelectionVector.setBatchActualRecordCount(incomingSelectionVector.getBatchActualRecordCount());
filterBatchSV2(recordCount); filterBatchSV2(recordCount);
break; break;
default: default:
Expand Down
Expand Up @@ -194,6 +194,7 @@ protected IterOutcome doWork() {


// clear memory for incoming sv (if any) // clear memory for incoming sv (if any)
if (incomingSv != null) { if (incomingSv != null) {
outgoingSv.setBatchActualRecordCount(incomingSv.getBatchActualRecordCount());
incomingSv.clear(); incomingSv.clear();
} }


Expand Down
Expand Up @@ -19,7 +19,6 @@


import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types; import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.VectorWrapper;
Expand All @@ -31,7 +30,7 @@ public abstract class AbstractCopier implements Copier {
protected VectorContainer outgoing; protected VectorContainer outgoing;


@Override @Override
public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException { public void setup(RecordBatch incoming, VectorContainer outgoing) {
this.outgoing = outgoing; this.outgoing = outgoing;


final int count = outgoing.getNumberOfColumns(); final int count = outgoing.getNumberOfColumns();
Expand All @@ -43,7 +42,7 @@ public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaC
} }


@Override @Override
public int copyRecords(int index, int recordCount) throws SchemaChangeException { public int copyRecords(int index, int recordCount) {
for(VectorWrapper<?> out : outgoing){ for(VectorWrapper<?> out : outgoing){
TypeProtos.MajorType type = out.getField().getType(); TypeProtos.MajorType type = out.getField().getType();
if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) { if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
Expand All @@ -57,7 +56,7 @@ public int copyRecords(int index, int recordCount) throws SchemaChangeException
} }


@Override @Override
public int appendRecord(int index) throws SchemaChangeException { public int appendRecord(int index) {
int outgoingPosition = outgoing.getRecordCount(); int outgoingPosition = outgoing.getRecordCount();
copyEntryIndirect(index, outgoingPosition); copyEntryIndirect(index, outgoingPosition);
outgoingPosition++; outgoingPosition++;
Expand All @@ -66,11 +65,11 @@ public int appendRecord(int index) throws SchemaChangeException {
} }


@Override @Override
public int appendRecords(int index, int recordCount) throws SchemaChangeException { public int appendRecords(int index, int recordCount) {
return insertRecords(outgoing.getRecordCount(), index, recordCount); return insertRecords(outgoing.getRecordCount(), index, recordCount);
} }


private int insertRecords(int outgoingPosition, int index, int recordCount) throws SchemaChangeException { private int insertRecords(int outgoingPosition, int index, int recordCount) {
final int endIndex = index + recordCount; final int endIndex = index + recordCount;


for(int svIndex = index; svIndex < endIndex; svIndex++, outgoingPosition++){ for(int svIndex = index; svIndex < endIndex; svIndex++, outgoingPosition++){
Expand All @@ -81,15 +80,15 @@ private int insertRecords(int outgoingPosition, int index, int recordCount) thro
return outgoingPosition; return outgoingPosition;
} }


private void updateCounts(int numRecords) { protected void updateCounts(int numRecords) {
outgoing.setRecordCount(numRecords); outgoing.setRecordCount(numRecords);


for (int vectorIndex = 0; vectorIndex < vvOut.length; vectorIndex++) { for (int vectorIndex = 0; vectorIndex < vvOut.length; vectorIndex++) {
vvOut[vectorIndex].getMutator().setValueCount(numRecords); vvOut[vectorIndex].getMutator().setValueCount(numRecords);
} }
} }


public abstract void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException; public abstract void copyEntryIndirect(int inIndex, int outIndex);


public abstract void copyEntry(int inIndex, int outIndex) throws SchemaChangeException; public abstract void copyEntry(int inIndex, int outIndex);
} }
Expand Up @@ -17,19 +17,23 @@
*/ */
package org.apache.drill.exec.physical.impl.svremover; package org.apache.drill.exec.physical.impl.svremover;


import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.ValueVector;


import java.util.ArrayList;
import java.util.List;

public abstract class AbstractSV2Copier extends AbstractCopier { public abstract class AbstractSV2Copier extends AbstractCopier {
protected ValueVector[] vvIn; protected ValueVector[] vvIn;
private SelectionVector2 sv2; private SelectionVector2 sv2;
protected List<TransferPair> transferPairs = new ArrayList<>();


@Override @Override
public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException { public void setup(RecordBatch incoming, VectorContainer outgoing) {
super.setup(incoming, outgoing); super.setup(incoming, outgoing);
this.sv2 = incoming.getSelectionVector2(); this.sv2 = incoming.getSelectionVector2();


Expand All @@ -46,7 +50,20 @@ public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaC
} }
} }


public void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException { public void copyEntryIndirect(int inIndex, int outIndex) {
copyEntry(sv2.getIndex(inIndex), outIndex); copyEntry(sv2.getIndex(inIndex), outIndex);
} }

@Override
public int copyRecords(int index, int recordCount) {
if (sv2.doFullTransfer()) {
for (TransferPair pair : transferPairs) {
pair.transfer();
}
updateCounts(recordCount);
return recordCount;
}

return super.copyRecords(index, recordCount);
}
} }
Expand Up @@ -17,7 +17,6 @@
*/ */
package org.apache.drill.exec.physical.impl.svremover; package org.apache.drill.exec.physical.impl.svremover;


import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.VectorWrapper;
Expand All @@ -31,7 +30,7 @@ public abstract class AbstractSV4Copier extends AbstractCopier {
private SelectionVector4 sv4; private SelectionVector4 sv4;


@Override @Override
public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException{ public void setup(RecordBatch incoming, VectorContainer outgoing) {
super.setup(incoming, outgoing); super.setup(incoming, outgoing);
this.sv4 = incoming.getSelectionVector4(); this.sv4 = incoming.getSelectionVector4();


Expand All @@ -48,7 +47,7 @@ public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaC
} }
} }


public void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException { public void copyEntryIndirect(int inIndex, int outIndex) {
copyEntry(sv4.get(inIndex), outIndex); copyEntry(sv4.get(inIndex), outIndex);
} }
} }
Expand Up @@ -17,13 +17,12 @@
*/ */
package org.apache.drill.exec.physical.impl.svremover; package org.apache.drill.exec.physical.impl.svremover;


import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorContainer;


public interface Copier { public interface Copier {
void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException; void setup(RecordBatch incoming, VectorContainer outgoing);
int copyRecords(int index, int recordCount) throws SchemaChangeException; int copyRecords(int index, int recordCount);
int appendRecord(int index) throws SchemaChangeException; int appendRecord(int index);
int appendRecords(int index, int recordCount) throws SchemaChangeException; int appendRecords(int index, int recordCount);
} }
Expand Up @@ -17,7 +17,6 @@
*/ */
package org.apache.drill.exec.physical.impl.svremover; package org.apache.drill.exec.physical.impl.svremover;


import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.VectorWrapper;
Expand All @@ -30,7 +29,7 @@ public class GenericCopier implements Copier {
private VectorContainer outgoing; private VectorContainer outgoing;


@Override @Override
public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException { public void setup(RecordBatch incoming, VectorContainer outgoing) {
this.outgoing = outgoing; this.outgoing = outgoing;


final int count = outgoing.getNumberOfColumns(); final int count = outgoing.getNumberOfColumns();
Expand All @@ -53,12 +52,12 @@ public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaC
} }


@Override @Override
public int copyRecords(int index, int recordCount) throws SchemaChangeException { public int copyRecords(int index, int recordCount) {
return insertRecords(0, index, recordCount); return insertRecords(0, index, recordCount);
} }


@Override @Override
public int appendRecord(int index) throws SchemaChangeException { public int appendRecord(int index) {
int outgoingPosition = outgoing.getRecordCount(); int outgoingPosition = outgoing.getRecordCount();
for (int vectorIndex = 0; vectorIndex < vvIn.length; vectorIndex++) { for (int vectorIndex = 0; vectorIndex < vvIn.length; vectorIndex++) {
vvOut[vectorIndex].copyEntry(outgoingPosition, vvIn[vectorIndex], index); vvOut[vectorIndex].copyEntry(outgoingPosition, vvIn[vectorIndex], index);
Expand All @@ -69,11 +68,11 @@ public int appendRecord(int index) throws SchemaChangeException {
} }


@Override @Override
public int appendRecords(int index, int recordCount) throws SchemaChangeException { public int appendRecords(int index, int recordCount) {
return insertRecords(outgoing.getRecordCount(), index, recordCount); return insertRecords(outgoing.getRecordCount(), index, recordCount);
} }


private int insertRecords(int outgoingPosition, int startIndex, int recordCount) throws SchemaChangeException { private int insertRecords(int outgoingPosition, int startIndex, int recordCount) {
final int endIndex = startIndex + recordCount; final int endIndex = startIndex + recordCount;


for (int index = startIndex; index < endIndex; index++, outgoingPosition++) { for (int index = startIndex; index < endIndex; index++, outgoingPosition++) {
Expand Down
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.svremover;

import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.vector.SchemaChangeCallBack;

public class GenericCopierFactory {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GenericCopierFactory.class);

public static Copier createAndSetupCopier(RecordBatch incoming, VectorContainer outputContainer,
SchemaChangeCallBack callBack) {
Copier copier;

switch(incoming.getSchema().getSelectionVectorMode()){
case NONE:
copier = new StraightCopier(incoming, outputContainer, callBack);
break;
case TWO_BYTE:
copier = new GenericSV2Copier(incoming, outputContainer, callBack);
break;
case FOUR_BYTE:
copier = new GenericSV4Copier(incoming, outputContainer, callBack);
break;
default:
throw new UnsupportedOperationException();
}

copier.setup(incoming, outputContainer);
return copier;
}

public static Copier createAndSetupNonSVGenericCopier(RecordBatch incoming, VectorContainer outputContainer) {
Copier copier = new GenericCopier();
copier.setup(incoming, outputContainer);
return copier;
}
}
Expand Up @@ -17,11 +17,24 @@
*/ */
package org.apache.drill.exec.physical.impl.svremover; package org.apache.drill.exec.physical.impl.svremover;


import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.SchemaChangeCallBack;


public class GenericSV2Copier extends AbstractSV2Copier { public class GenericSV2Copier extends AbstractSV2Copier {

public GenericSV2Copier(RecordBatch incomingBatch, VectorContainer outputContainer,
SchemaChangeCallBack callBack) {
for(VectorWrapper<?> vv : incomingBatch){
TransferPair pair = vv.getValueVector().makeTransferPair(outputContainer.addOrGet(vv.getField(), callBack));
transferPairs.add(pair);
}
}

@Override @Override
public void copyEntry(int inIndex, int outIndex) throws SchemaChangeException { public void copyEntry(int inIndex, int outIndex) {
for ( int i = 0; i < vvIn.length; i++ ) { for ( int i = 0; i < vvIn.length; i++ ) {
vvOut[i].copyEntry(outIndex, vvIn[i], inIndex); vvOut[i].copyEntry(outIndex, vvIn[i], inIndex);
} }
Expand Down
Expand Up @@ -17,33 +17,30 @@
*/ */
package org.apache.drill.exec.physical.impl.svremover; package org.apache.drill.exec.physical.impl.svremover;


import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.ValueVector;


public class GenericSV4Copier extends AbstractSV4Copier { public class GenericSV4Copier extends AbstractSV4Copier {

public GenericSV4Copier(RecordBatch incomingBatch, VectorContainer outputContainer,
SchemaChangeCallBack callBack) {
for(VectorWrapper<?> vv : incomingBatch){
@SuppressWarnings("resource")
ValueVector v = vv.getValueVectors()[0];
v.makeTransferPair(outputContainer.addOrGet(v.getField(), callBack));
}
}

@Override @Override
public void copyEntry(int inIndex, int outIndex) throws SchemaChangeException { public void copyEntry(int inIndex, int outIndex) {
int inOffset = inIndex & 0xFFFF; int inOffset = inIndex & 0xFFFF;
int inVector = inIndex >>> 16; int inVector = inIndex >>> 16;
for ( int i = 0; i < vvIn.length; i++ ) { for ( int i = 0; i < vvIn.length; i++ ) {
ValueVector[] vectorsFromIncoming = vvIn[i].getValueVectors(); ValueVector[] vectorsFromIncoming = vvIn[i].getValueVectors();
vvOut[i].copyEntry(outIndex, vectorsFromIncoming[inVector], inOffset); vvOut[i].copyEntry(outIndex, vectorsFromIncoming[inVector], inOffset);
} }
} }

public static Copier createCopier(RecordBatch batch, VectorContainer container, SchemaChangeCallBack callBack) throws SchemaChangeException {
for(VectorWrapper<?> vv : batch){
@SuppressWarnings("resource")
ValueVector v = vv.getValueVectors()[0];
v.makeTransferPair(container.addOrGet(v.getField(), callBack));
}

Copier copier = new GenericSV4Copier();
copier.setup(batch, container);
return copier;
}
} }

0 comments on commit 9725d4d

Please sign in to comment.