Skip to content

Commit

Permalink
DRILL-6138: Move RecordBatchSizer to org.apache.drill.exec.record pac…
Browse files Browse the repository at this point in the history
…kage

This closes #1115
  • Loading branch information
ppadma authored and parthchandra committed Feb 9, 2018
1 parent 26dcca0 commit f0bd129
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 28 deletions.
Expand Up @@ -53,7 +53,7 @@
import org.apache.drill.exec.physical.impl.common.HashTableStats; import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.common.IndexPointer; import org.apache.drill.exec.physical.impl.common.IndexPointer;


import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; import org.apache.drill.exec.record.RecordBatchSizer;


import org.apache.drill.exec.physical.impl.spill.SpillSet; import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.planner.physical.AggPrelBase; import org.apache.drill.exec.planner.physical.AggPrelBase;
Expand Down
Expand Up @@ -41,7 +41,7 @@
import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.config.FlattenPOP;
import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.MaterializedField;
Expand Down
Expand Up @@ -22,7 +22,7 @@


import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.physical.impl.xsort.MSortTemplate; import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch; import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask; import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask;
Expand Down
Expand Up @@ -20,7 +20,6 @@
import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
import org.apache.drill.exec.physical.rowSet.model.MetadataProvider; import org.apache.drill.exec.physical.rowSet.model.MetadataProvider;
import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataCreator; import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataCreator;
import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval; import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval;
Expand All @@ -38,8 +37,8 @@
* row count and the size information provided in column metadata. * row count and the size information provided in column metadata.
* <p> * <p>
* @see {@link AllocationHelper} - the class which this one replaces * @see {@link AllocationHelper} - the class which this one replaces
* @see {@link VectorInitializer} - an earlier cut at implementation * @see {@link org.apache.drill.exec.record.VectorInitializer} - an earlier cut at implementation
* based on data from the {@link RecordBatchSizer} * based on data from the {@link org.apache.drill.exec.record.RecordBatchSizer}
*/ */


// TODO: Does not yet handle lists; lists are a simple extension // TODO: Does not yet handle lists; lists are a simple extension
Expand Down
Expand Up @@ -15,23 +15,17 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.drill.exec.physical.impl.spill; package org.apache.drill.exec.record;


import java.util.ArrayList;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.Map;


import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.AllocationManager.BufferLedger; import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
import org.apache.drill.exec.memory.BaseAllocator; import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorInitializer;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.vector.UInt4Vector; import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.ValueVector;
Expand Down Expand Up @@ -260,7 +254,7 @@ public static ColumnSize getColumn(ValueVector v, String prefix) {


public static final int MAX_VECTOR_SIZE = ValueVector.MAX_BUFFER_SIZE; // 16 MiB public static final int MAX_VECTOR_SIZE = ValueVector.MAX_BUFFER_SIZE; // 16 MiB


private List<ColumnSize> columnSizes = new ArrayList<>(); private Map<String, ColumnSize> columnSizes = CaseInsensitiveMap.newHashMap();


/** /**
* Number of records (rows) in the batch. * Number of records (rows) in the batch.
Expand Down Expand Up @@ -394,7 +388,7 @@ private int roundUpToPowerOf2(int arg) {
private void measureColumn(ValueVector v, String prefix) { private void measureColumn(ValueVector v, String prefix) {


ColumnSize colSize = new ColumnSize(v, prefix); ColumnSize colSize = new ColumnSize(v, prefix);
columnSizes.add(colSize); columnSizes.put(v.getField().getName(), colSize);
stdRowWidth += colSize.stdSize; stdRowWidth += colSize.stdSize;
netBatchSize += colSize.dataSize; netBatchSize += colSize.dataSize;
maxSize = Math.max(maxSize, colSize.dataSize); maxSize = Math.max(maxSize, colSize.dataSize);
Expand Down Expand Up @@ -458,7 +452,7 @@ public static int safeDivide(long num, long denom) {
public int stdRowWidth() { return stdRowWidth; } public int stdRowWidth() { return stdRowWidth; }
public int grossRowWidth() { return grossRowWidth; } public int grossRowWidth() { return grossRowWidth; }
public int netRowWidth() { return netRowWidth; } public int netRowWidth() { return netRowWidth; }
public List<ColumnSize> columns() { return columnSizes; } public Map<String, ColumnSize> columns() { return columnSizes; }


/** /**
* Compute the "real" width of the row, taking into account each varchar column size * Compute the "real" width of the row, taking into account each varchar column size
Expand All @@ -477,7 +471,7 @@ public static int safeDivide(long num, long denom) {
public String toString() { public String toString() {
StringBuilder buf = new StringBuilder(); StringBuilder buf = new StringBuilder();
buf.append("Actual batch schema & sizes {\n"); buf.append("Actual batch schema & sizes {\n");
for (ColumnSize colSize : columnSizes) { for (ColumnSize colSize : columnSizes.values()) {
buf.append(" "); buf.append(" ");
buf.append(colSize.toString()); buf.append(colSize.toString());
buf.append("\n"); buf.append("\n");
Expand Down Expand Up @@ -508,7 +502,7 @@ public String toString() {


public VectorInitializer buildVectorInitializer() { public VectorInitializer buildVectorInitializer() {
VectorInitializer initializer = new VectorInitializer(); VectorInitializer initializer = new VectorInitializer();
for (ColumnSize colSize : columnSizes) { for (ColumnSize colSize : columnSizes.values()) {
colSize.buildVectorInitializer(initializer); colSize.buildVectorInitializer(initializer);
} }
return initializer; return initializer;
Expand Down
Expand Up @@ -23,8 +23,8 @@


import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer.ColumnSize; import org.apache.drill.exec.record.RecordBatchSizer.ColumnSize;
import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.VectorInitializer; import org.apache.drill.exec.record.VectorInitializer;
import org.apache.drill.exec.record.VectorInitializer.AllocationHint; import org.apache.drill.exec.record.VectorInitializer.AllocationHint;
Expand Down Expand Up @@ -71,7 +71,7 @@ public void testSizer() {


RecordBatchSizer sizer = new RecordBatchSizer(rows.container()); RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
assertEquals(2, sizer.columns().size()); assertEquals(2, sizer.columns().size());
ColumnSize bCol = sizer.columns().get(1); ColumnSize bCol = sizer.columns().get("b");
assertEquals(0.1, bCol.estElementCountPerArray, 0.01); assertEquals(0.1, bCol.estElementCountPerArray, 0.01);
assertEquals(1, bCol.elementCount); assertEquals(1, bCol.elementCount);


Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.config.FlattenPOP;
import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.util.JsonStringArrayList; import org.apache.drill.exec.util.JsonStringArrayList;
Expand Down
Expand Up @@ -40,8 +40,7 @@
import org.apache.drill.exec.HyperVectorValueIterator; import org.apache.drill.exec.HyperVectorValueIterator;
import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryType; import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema;
Expand Down
Expand Up @@ -17,7 +17,7 @@
*/ */
package org.apache.drill.test.rowSet; package org.apache.drill.test.rowSet;


import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.physical.rowSet.model.ReaderIndex; import org.apache.drill.exec.physical.rowSet.model.ReaderIndex;
import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval; import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval;
import org.apache.drill.exec.physical.rowSet.model.single.BaseReaderBuilder; import org.apache.drill.exec.physical.rowSet.model.single.BaseReaderBuilder;
Expand Down
Expand Up @@ -20,7 +20,7 @@
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.physical.rowSet.model.ReaderIndex; import org.apache.drill.exec.physical.rowSet.model.ReaderIndex;
import org.apache.drill.exec.physical.rowSet.model.SchemaInference; import org.apache.drill.exec.physical.rowSet.model.SchemaInference;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
Expand Down

0 comments on commit f0bd129

Please sign in to comment.