From 9725d4dfd25e9ac6302122463832f3334dd615fa Mon Sep 17 00:00:00 2001 From: Sorabh Hamirwasia Date: Thu, 9 Aug 2018 19:21:42 -0700 Subject: [PATCH] DRILL-6687: Improve RemovingRecordBatch to do transfer when all records needs to be copied Add optimization in SelectionVector2 to enable RemovingRecordBatch to transfer ValueVectors from incoming to output container when all records needs to be copied. Modified FilterRecordBatch and LimitRecordBatch to play by this optimization --- .../exec/physical/impl/TopN/TopNBatch.java | 6 +- .../physical/impl/filter/FilterTemplate2.java | 7 ++ .../physical/impl/limit/LimitRecordBatch.java | 1 + .../impl/svremover/AbstractCopier.java | 17 ++-- .../impl/svremover/AbstractSV2Copier.java | 23 +++++- .../impl/svremover/AbstractSV4Copier.java | 5 +- .../exec/physical/impl/svremover/Copier.java | 9 +-- .../impl/svremover/GenericCopier.java | 11 ++- .../impl/svremover/GenericCopierFactory.java | 54 +++++++++++++ .../impl/svremover/GenericSV2Copier.java | 17 +++- .../impl/svremover/GenericSV4Copier.java | 25 +++--- .../impl/svremover/RemovingRecordBatch.java | 80 +------------------ .../impl/svremover/StraightCopier.java | 69 ++++++++++++++++ .../record/selection/SelectionVector2.java | 20 +++++ .../svremover/AbstractGenericCopierTest.java | 35 ++++---- .../impl/svremover/GenericCopierTest.java | 8 +- .../svremover/GenericSV2BatchCopierTest.java | 40 ++++++++++ .../impl/svremover/GenericSV2CopierTest.java | 7 +- .../impl/svremover/GenericSV4CopierTest.java | 9 +-- .../drill/test/rowSet/IndirectRowSet.java | 1 + 20 files changed, 293 insertions(+), 151 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierFactory.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 4fc0d1596a5..2763f5903c3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -47,7 +47,7 @@ import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder; import org.apache.drill.exec.physical.impl.svremover.Copier; -import org.apache.drill.exec.physical.impl.svremover.GenericSV4Copier; +import org.apache.drill.exec.physical.impl.svremover.GenericCopierFactory; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -360,7 +360,7 @@ private void purge() throws SchemaChangeException { SelectionVector4 selectionVector4 = priorityQueue.getSv4(); SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context); if (copier == null) { - copier = GenericSV4Copier.createCopier(batch, newContainer, null); + copier = GenericCopierFactory.createAndSetupCopier(batch, newContainer, null); } else { for (VectorWrapper i : batch) { @@ -468,7 +468,7 @@ public void purgeAndResetPriorityQueue() throws SchemaChangeException, ClassTran @SuppressWarnings("resource") final SelectionVector4 selectionVector4 = priorityQueue.getSv4(); final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context); - copier = GenericSV4Copier.createCopier(batch, newContainer, null); + copier = GenericCopierFactory.createAndSetupCopier(batch, newContainer, null); @SuppressWarnings("resource") SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator()); try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java index 6d1f03462ab..7b0183bdab8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java @@ -67,11 +67,18 @@ public void filterBatch(int recordCount) throws SchemaChangeException{ if (! outgoingSelectionVector.allocateNewSafe(recordCount)) { throw new OutOfMemoryException("Unable to allocate filter batch"); } + switch(svMode){ case NONE: + // Set the actual recordCount in outgoing selection vector to help SVRemover copy the entire + // batch if possible at once rather than row-by-row + outgoingSelectionVector.setBatchActualRecordCount(recordCount); filterBatchNoSV(recordCount); break; case TWO_BYTE: + // Set the actual recordCount in outgoing selection vector to help SVRemover copy the entire + // batch if possible at once rather than row-by-row + outgoingSelectionVector.setBatchActualRecordCount(incomingSelectionVector.getBatchActualRecordCount()); filterBatchSV2(recordCount); break; default: diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 06f0fdbee0d..a86271483ca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -194,6 +194,7 @@ protected IterOutcome doWork() { // clear memory for incoming sv (if any) if (incomingSv != null) { + outgoingSv.setBatchActualRecordCount(incomingSv.getBatchActualRecordCount()); incomingSv.clear(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java index ddea4684f2a..47ec1cb14b9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java @@ -19,7 +19,6 @@ import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -31,7 +30,7 @@ public abstract class AbstractCopier implements Copier { protected VectorContainer outgoing; @Override - public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException { + public void setup(RecordBatch incoming, VectorContainer outgoing) { this.outgoing = outgoing; final int count = outgoing.getNumberOfColumns(); @@ -43,7 +42,7 @@ public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaC } @Override - public int copyRecords(int index, int recordCount) throws SchemaChangeException { + public int copyRecords(int index, int recordCount) { for(VectorWrapper out : outgoing){ TypeProtos.MajorType type = out.getField().getType(); if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) { @@ -57,7 +56,7 @@ public int copyRecords(int index, int recordCount) throws SchemaChangeException } @Override - public int appendRecord(int index) throws SchemaChangeException { + public int appendRecord(int index) { int outgoingPosition = outgoing.getRecordCount(); copyEntryIndirect(index, outgoingPosition); outgoingPosition++; @@ -66,11 +65,11 @@ public int appendRecord(int index) throws SchemaChangeException { } @Override - public int appendRecords(int index, int recordCount) throws SchemaChangeException { + public int appendRecords(int index, int recordCount) { return insertRecords(outgoing.getRecordCount(), index, recordCount); } - private int insertRecords(int outgoingPosition, int index, int recordCount) throws SchemaChangeException { + private int insertRecords(int outgoingPosition, int index, int recordCount) { final int endIndex = index + recordCount; for(int svIndex = index; svIndex < endIndex; svIndex++, outgoingPosition++){ @@ -81,7 +80,7 @@ private int insertRecords(int outgoingPosition, int index, int recordCount) thro return outgoingPosition; } - private void updateCounts(int numRecords) { + protected void updateCounts(int numRecords) { outgoing.setRecordCount(numRecords); for (int vectorIndex = 0; vectorIndex < vvOut.length; vectorIndex++) { @@ -89,7 +88,7 @@ private void updateCounts(int numRecords) { } } - public abstract void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException; + public abstract void copyEntryIndirect(int inIndex, int outIndex); - public abstract void copyEntry(int inIndex, int outIndex) throws SchemaChangeException; + public abstract void copyEntry(int inIndex, int outIndex); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java index 321d9a87d69..ec712e5d458 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java @@ -17,19 +17,23 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.vector.ValueVector; +import java.util.ArrayList; +import java.util.List; + public abstract class AbstractSV2Copier extends AbstractCopier { protected ValueVector[] vvIn; private SelectionVector2 sv2; + protected List transferPairs = new ArrayList<>(); @Override - public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException { + public void setup(RecordBatch incoming, VectorContainer outgoing) { super.setup(incoming, outgoing); this.sv2 = incoming.getSelectionVector2(); @@ -46,7 +50,20 @@ public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaC } } - public void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException { + public void copyEntryIndirect(int inIndex, int outIndex) { copyEntry(sv2.getIndex(inIndex), outIndex); } + + @Override + public int copyRecords(int index, int recordCount) { + if (sv2.doFullTransfer()) { + for (TransferPair pair : transferPairs) { + pair.transfer(); + } + updateCounts(recordCount); + return recordCount; + } + + return super.copyRecords(index, recordCount); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java index cd6af07b378..56e258659e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -31,7 +30,7 @@ public abstract class AbstractSV4Copier extends AbstractCopier { private SelectionVector4 sv4; @Override - public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException{ + public void setup(RecordBatch incoming, VectorContainer outgoing) { super.setup(incoming, outgoing); this.sv4 = incoming.getSelectionVector4(); @@ -48,7 +47,7 @@ public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaC } } - public void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException { + public void copyEntryIndirect(int inIndex, int outIndex) { copyEntry(sv4.get(inIndex), outIndex); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java index bc31252827f..92dea7021d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java @@ -17,13 +17,12 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; public interface Copier { - void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException; - int copyRecords(int index, int recordCount) throws SchemaChangeException; - int appendRecord(int index) throws SchemaChangeException; - int appendRecords(int index, int recordCount) throws SchemaChangeException; + void setup(RecordBatch incoming, VectorContainer outgoing); + int copyRecords(int index, int recordCount); + int appendRecord(int index); + int appendRecords(int index, int recordCount); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java index de048dcb7e5..72516e058ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -30,7 +29,7 @@ public class GenericCopier implements Copier { private VectorContainer outgoing; @Override - public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException { + public void setup(RecordBatch incoming, VectorContainer outgoing) { this.outgoing = outgoing; final int count = outgoing.getNumberOfColumns(); @@ -53,12 +52,12 @@ public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaC } @Override - public int copyRecords(int index, int recordCount) throws SchemaChangeException { + public int copyRecords(int index, int recordCount) { return insertRecords(0, index, recordCount); } @Override - public int appendRecord(int index) throws SchemaChangeException { + public int appendRecord(int index) { int outgoingPosition = outgoing.getRecordCount(); for (int vectorIndex = 0; vectorIndex < vvIn.length; vectorIndex++) { vvOut[vectorIndex].copyEntry(outgoingPosition, vvIn[vectorIndex], index); @@ -69,11 +68,11 @@ public int appendRecord(int index) throws SchemaChangeException { } @Override - public int appendRecords(int index, int recordCount) throws SchemaChangeException { + public int appendRecords(int index, int recordCount) { return insertRecords(outgoing.getRecordCount(), index, recordCount); } - private int insertRecords(int outgoingPosition, int startIndex, int recordCount) throws SchemaChangeException { + private int insertRecords(int outgoingPosition, int startIndex, int recordCount) { final int endIndex = startIndex + recordCount; for (int index = startIndex; index < endIndex; index++, outgoingPosition++) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierFactory.java new file mode 100644 index 00000000000..cd6dd02c968 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.SchemaChangeCallBack; + +public class GenericCopierFactory { + //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GenericCopierFactory.class); + + public static Copier createAndSetupCopier(RecordBatch incoming, VectorContainer outputContainer, + SchemaChangeCallBack callBack) { + Copier copier; + + switch(incoming.getSchema().getSelectionVectorMode()){ + case NONE: + copier = new StraightCopier(incoming, outputContainer, callBack); + break; + case TWO_BYTE: + copier = new GenericSV2Copier(incoming, outputContainer, callBack); + break; + case FOUR_BYTE: + copier = new GenericSV4Copier(incoming, outputContainer, callBack); + break; + default: + throw new UnsupportedOperationException(); + } + + copier.setup(incoming, outputContainer); + return copier; + } + + public static Copier createAndSetupNonSVGenericCopier(RecordBatch incoming, VectorContainer outputContainer) { + Copier copier = new GenericCopier(); + copier.setup(incoming, outputContainer); + return copier; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java index a375f451d02..f607e8c7d20 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java @@ -17,11 +17,24 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.SchemaChangeCallBack; public class GenericSV2Copier extends AbstractSV2Copier { + + public GenericSV2Copier(RecordBatch incomingBatch, VectorContainer outputContainer, + SchemaChangeCallBack callBack) { + for(VectorWrapper vv : incomingBatch){ + TransferPair pair = vv.getValueVector().makeTransferPair(outputContainer.addOrGet(vv.getField(), callBack)); + transferPairs.add(pair); + } + } + @Override - public void copyEntry(int inIndex, int outIndex) throws SchemaChangeException { + public void copyEntry(int inIndex, int outIndex) { for ( int i = 0; i < vvIn.length; i++ ) { vvOut[i].copyEntry(outIndex, vvIn[i], inIndex); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java index 1f3d28bdc4b..c676841f822 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -25,8 +24,18 @@ import org.apache.drill.exec.vector.ValueVector; public class GenericSV4Copier extends AbstractSV4Copier { + + public GenericSV4Copier(RecordBatch incomingBatch, VectorContainer outputContainer, + SchemaChangeCallBack callBack) { + for(VectorWrapper vv : incomingBatch){ + @SuppressWarnings("resource") + ValueVector v = vv.getValueVectors()[0]; + v.makeTransferPair(outputContainer.addOrGet(v.getField(), callBack)); + } + } + @Override - public void copyEntry(int inIndex, int outIndex) throws SchemaChangeException { + public void copyEntry(int inIndex, int outIndex) { int inOffset = inIndex & 0xFFFF; int inVector = inIndex >>> 16; for ( int i = 0; i < vvIn.length; i++ ) { @@ -34,16 +43,4 @@ public void copyEntry(int inIndex, int outIndex) throws SchemaChangeException { vvOut[i].copyEntry(outIndex, vectorsFromIncoming[inVector], inOffset); } } - - public static Copier createCopier(RecordBatch batch, VectorContainer container, SchemaChangeCallBack callBack) throws SchemaChangeException { - for(VectorWrapper vv : batch){ - @SuppressWarnings("resource") - ValueVector v = vv.getValueVectors()[0]; - v.makeTransferPair(container.addOrGet(v.getField(), callBack)); - } - - Copier copier = new GenericSV4Copier(); - copier.setup(batch, container); - return copier; - } } \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index acfdc878aa6..1471d5e558f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import java.util.List; - import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; @@ -26,14 +24,9 @@ import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.TransferPair; -import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - public class RemovingRecordBatch extends AbstractSingleRecordBatch{ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class); @@ -56,19 +49,7 @@ protected boolean setupNewSchema() throws SchemaChangeException { // send OK to downstream instead. Since the output of RemovingRecordBatch is always going to be a regular container // change in incoming container type is not actual schema change. container.zeroVectors(); - switch(incoming.getSchema().getSelectionVectorMode()){ - case NONE: - this.copier = getStraightCopier(); - break; - case TWO_BYTE: - this.copier = create2Copier(); - break; - case FOUR_BYTE: - this.copier = create4Copier(); - break; - default: - throw new UnsupportedOperationException(); - } + copier = GenericCopierFactory.createAndSetupCopier(incoming, container, callBack); // If there is an actual schema change then below condition will be true and it will send OK_NEW_SCHEMA // downstream too @@ -84,7 +65,7 @@ protected boolean setupNewSchema() throws SchemaChangeException { protected IterOutcome doWork() { try { copier.copyRecords(0, incoming.getRecordCount()); - } catch (SchemaChangeException e) { + } catch (Exception e) { throw new IllegalStateException(e); } finally { if (incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.FOUR_BYTE) { @@ -107,63 +88,6 @@ public void close() { super.close(); } - private class StraightCopier implements Copier{ - - private List pairs = Lists.newArrayList(); - - @Override - public void setup(RecordBatch incoming, VectorContainer outgoing){ - for(VectorWrapper vv : incoming){ - TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack)); - pairs.add(tp); - } - } - - @Override - public int copyRecords(int index, int recordCount) { - assert index == 0 && recordCount == incoming.getRecordCount() : "Straight copier cannot split batch"; - for(TransferPair tp : pairs){ - tp.transfer(); - } - - container.setRecordCount(incoming.getRecordCount()); - return recordCount; - } - - @Override - public int appendRecord(int index) throws SchemaChangeException { - throw new UnsupportedOperationException(); - } - - @Override - public int appendRecords(int index, int recordCount) throws SchemaChangeException { - throw new UnsupportedOperationException(); - } - } - - private Copier getStraightCopier(){ - StraightCopier copier = new StraightCopier(); - copier.setup(incoming, container); - return copier; - } - - private Copier create2Copier() throws SchemaChangeException { - Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE); - - for(VectorWrapper vv : incoming){ - vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack)); - } - - Copier copier = new GenericSV2Copier(); - copier.setup(incoming, container); - return copier; - } - - private Copier create4Copier() throws SchemaChangeException { - Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE); - return GenericSV4Copier.createCopier(incoming, container, callBack); - } - @Override public WritableBatch getWritableBatch() { return WritableBatch.get(this); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java new file mode 100644 index 00000000000..33f2a964b98 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import com.google.common.collect.Lists; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.SchemaChangeCallBack; + +import java.util.List; + +public class StraightCopier implements Copier { + private List pairs = Lists.newArrayList(); + private RecordBatch incoming; + private VectorContainer outputContainer; + private SchemaChangeCallBack callBack; + + public StraightCopier(RecordBatch incomingBatch, VectorContainer outputContainer, SchemaChangeCallBack callBack) { + this.incoming = incomingBatch; + this.outputContainer = outputContainer; + this.callBack = callBack; + } + + @Override + public void setup(RecordBatch incoming, VectorContainer outgoing) { + for(VectorWrapper vv : incoming){ + TransferPair tp = vv.getValueVector().makeTransferPair(outputContainer.addOrGet(vv.getField(), callBack)); + pairs.add(tp); + } + } + + @Override + public int copyRecords(int index, int recordCount) { + assert index == 0 && recordCount == incoming.getRecordCount() : "Straight copier cannot split batch"; + for(TransferPair tp : pairs){ + tp.transfer(); + } + + outputContainer.setRecordCount(incoming.getRecordCount()); + return recordCount; + } + + @Override + public int appendRecord(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int appendRecords(int index, int recordCount) { + throw new UnsupportedOperationException(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java index 72441485874..1d9c5dae64a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java @@ -37,6 +37,7 @@ public class SelectionVector2 implements AutoCloseable { private final BufferAllocator allocator; private int recordCount; + private int batchActualRecordCount = -1; private DrillBuf buffer = DeadBuf.DEAD_BUFFER; public static final int RECORD_SIZE = 2; @@ -61,6 +62,11 @@ public SelectionVector2(BufferAllocator allocator, DrillBuf buf, int count) { recordCount = count; } + public SelectionVector2(BufferAllocator allocator, DrillBuf buf, int count, int actualRecordCount) { + this(allocator, buf, count); + this.batchActualRecordCount = actualRecordCount; + } + public int getCount() { return recordCount; } @@ -127,6 +133,7 @@ public void allocateNew(int size) { public SelectionVector2 clone() { SelectionVector2 newSV = new SelectionVector2(allocator); newSV.recordCount = recordCount; + newSV.batchActualRecordCount = batchActualRecordCount; newSV.buffer = buffer; /* Since buffer and newSV.buffer essentially point to the @@ -143,6 +150,7 @@ public void clear() { buffer.release(); buffer = DeadBuf.DEAD_BUFFER; recordCount = 0; + batchActualRecordCount = -1; } } @@ -151,6 +159,18 @@ public void setRecordCount(int recordCount){ this.recordCount = recordCount; } + public boolean doFullTransfer() { + return (recordCount == batchActualRecordCount); + } + + public void setBatchActualRecordCount(int actualRecordCount) { + this.batchActualRecordCount = actualRecordCount; + } + + public int getBatchActualRecordCount() { + return batchActualRecordCount; + } + @Override public void close() { clear(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java index 01263b1d242..7d444b4ed50 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java @@ -17,34 +17,34 @@ */ package org.apache.drill.exec.physical.impl.svremover; -import com.google.common.collect.Lists; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.RootAllocator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSetBatch; import org.apache.drill.test.rowSet.RowSetBuilder; import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; import org.junit.Test; -import java.util.List; - public abstract class AbstractGenericCopierTest { @Test public void testCopyRecords() throws SchemaChangeException { try (RootAllocator allocator = new RootAllocator(10_000_000)) { - final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); + final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); final RowSet srcRowSet = createSrcRowSet(allocator); final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build(); final VectorContainer destContainer = destRowSet.container(); - final Copier copier = createCopier(); + final Copier copier = createCopier(new RowSetBatch(srcRowSet), destContainer, null); final RowSet expectedRowSet = createExpectedRowset(allocator); - copier.setup(new RowSetBatch(srcRowSet), destContainer); copier.copyRecords(0, 3); try { @@ -65,14 +65,13 @@ public void testCopyRecords() throws SchemaChangeException { @Test public void testAppendRecords() throws SchemaChangeException { try (RootAllocator allocator = new RootAllocator(10_000_000)) { - final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); + final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); final RowSet srcRowSet = createSrcRowSet(allocator); final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build(); final VectorContainer destContainer = destRowSet.container(); - final Copier copier = createCopier(); + final Copier copier = createCopier(new RowSetBatch(srcRowSet), destContainer, null); final RowSet expectedRowSet = createExpectedRowset(allocator); - copier.setup(new RowSetBatch(srcRowSet), destContainer); copier.appendRecord(0); copier.appendRecords(1, 2); @@ -93,7 +92,10 @@ public void testAppendRecords() throws SchemaChangeException { public abstract RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException; - public abstract Copier createCopier(); + public Copier createCopier(RecordBatch incoming, VectorContainer outputContainer, + SchemaChangeCallBack callback) { + return GenericCopierFactory.createAndSetupCopier(incoming, outputContainer, callback); + } public static Object[] row1() { return new Object[]{110, "green", new float[]{5.5f, 2.3f}, new String[]{"1a", "1b"}}; @@ -115,7 +117,7 @@ public static Object[] row5() { return new Object[]{106, "black", new float[]{.75f}, new String[]{"4a"}}; } - public static RowSet createExpectedRowset(RootAllocator allocator) { + private RowSet createExpectedRowset(RootAllocator allocator) { return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.NONE)) .addRow(row1()) .addRow(row2()) @@ -123,14 +125,17 @@ public static RowSet createExpectedRowset(RootAllocator allocator) { .build(); } - public static BatchSchema createTestSchema(BatchSchema.SelectionVectorMode mode) { + protected TupleMetadata createTestSchema(BatchSchema.SelectionVectorMode mode) { MaterializedField colA = MaterializedField.create("colA", Types.required(TypeProtos.MinorType.INT)); MaterializedField colB = MaterializedField.create("colB", Types.required(TypeProtos.MinorType.VARCHAR)); MaterializedField colC = MaterializedField.create("colC", Types.repeated(TypeProtos.MinorType.FLOAT4)); MaterializedField colD = MaterializedField.create("colD", Types.repeated(TypeProtos.MinorType.VARCHAR)); - List cols = Lists.newArrayList(colA, colB, colC, colD); - BatchSchema batchSchema = new BatchSchema(mode, cols); - return batchSchema; + return new SchemaBuilder().add(colA) + .add(colB) + .add(colC) + .add(colD) + .withSVMode(mode) + .buildSchema(); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java index f946f81f47a..d6c38e72482 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java @@ -19,6 +19,9 @@ import org.apache.drill.exec.memory.RootAllocator; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSetBuilder; @@ -35,7 +38,8 @@ public RowSet createSrcRowSet(RootAllocator allocator) { } @Override - public Copier createCopier() { - return new GenericCopier(); + public Copier createCopier(RecordBatch incoming, VectorContainer outputContainer, + SchemaChangeCallBack callback) { + return GenericCopierFactory.createAndSetupNonSVGenericCopier(incoming, outputContainer); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java new file mode 100644 index 00000000000..748e0d06b61 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.svremover; + +import org.apache.drill.exec.memory.RootAllocator; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetBuilder; + +/** + * Verifies optimization in SV2 such that when total record to copy is same as number of records in the + * underlying batch for SV2 then SV2 will do transfer rather than row by row copy + */ +public class GenericSV2BatchCopierTest extends AbstractGenericCopierTest { + + @Override + public RowSet createSrcRowSet(RootAllocator allocator) { + return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE)) + .addSelection(true, row1()) + .addRow(row2()) + .addSelection(true, row3()) + .withSv2() + .build(); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java index 428124d3299..b2f0e5105bc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java @@ -21,7 +21,9 @@ import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSetBuilder; + public class GenericSV2CopierTest extends AbstractGenericCopierTest { + @Override public RowSet createSrcRowSet(RootAllocator allocator) { return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE)) @@ -33,9 +35,4 @@ public RowSet createSrcRowSet(RootAllocator allocator) { .withSv2() .build(); } - - @Override - public Copier createCopier() { - return new GenericSV2Copier(); - } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java index 447ad3ae9d0..a5f5bb7852c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java @@ -23,15 +23,17 @@ import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.ExpandableHyperContainer; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.test.rowSet.HyperRowSetImpl; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSetBuilder; public class GenericSV4CopierTest extends AbstractGenericCopierTest { + @Override public RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException { - final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); + final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE); final DrillBuf drillBuf = allocator.buffer(4 * 3); final SelectionVector4 sv4 = new SelectionVector4(drillBuf, 3, Character.MAX_VALUE); @@ -57,9 +59,4 @@ public RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeExcept return new HyperRowSetImpl(hyperContainer, sv4); } - - @Override - public Copier createCopier() { - return new GenericSV4Copier(); - } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java index f0ebdc073be..878aa25e611 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java @@ -96,6 +96,7 @@ private static SelectionVector2 makeSv2(BufferAllocator allocator, VectorContain destIndex++; } sv2.setRecordCount(rowCount); + sv2.setBatchActualRecordCount(container.getRecordCount()); container.buildSchema(SelectionVectorMode.TWO_BYTE); return sv2; }