Skip to content
Permalink
Browse files

[SPARK-23149][SQL] polish ColumnarBatch

## What changes were proposed in this pull request?

Several cleanups in `ColumnarBatch`
* remove `schema`. The `ColumnVector`s inside `ColumnarBatch` already have the data type information, we don't need this `schema`.
* remove `capacity`. `ColumnarBatch` is just a wrapper of `ColumnVector`s, not builders, it doesn't need a capacity property.
* remove `DEFAULT_BATCH_SIZE`. As a wrapper, `ColumnarBatch` can't decide the batch size, it should be decided by the reader, e.g. parquet reader, orc reader, cached table reader. The default batch size should also be defined by the reader.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20316 from cloud-fan/columnar-batch.

(cherry picked from commit d8aaa77)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information...
cloud-fan authored and gatorsmile committed Jan 19, 2018
1 parent 8d6845c commit 55efeffd774a776806f379df5b2209af05270cc4
@@ -49,18 +49,8 @@
* After creating, `initialize` and `initBatch` should be called sequentially.
*/
public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {

/**
* The default size of batch. We use this value for ORC reader to make it consistent with Spark's
* columnar batch, because their default batch sizes are different like the following:
*
* - ORC's VectorizedRowBatch.DEFAULT_SIZE = 1024
* - Spark's ColumnarBatch.DEFAULT_BATCH_SIZE = 4 * 1024
*/
private static final int DEFAULT_SIZE = 4 * 1024;

// ORC File Reader
private Reader reader;
// TODO: make this configurable.
private static final int CAPACITY = 4 * 1024;

// Vectorized ORC Row Batch
private VectorizedRowBatch batch;
@@ -98,22 +88,22 @@ public OrcColumnarBatchReader(boolean useOffHeap, boolean copyToSpark) {


@Override
public Void getCurrentKey() throws IOException, InterruptedException {
public Void getCurrentKey() {
return null;
}

@Override
public ColumnarBatch getCurrentValue() throws IOException, InterruptedException {
public ColumnarBatch getCurrentValue() {
return columnarBatch;
}

@Override
public float getProgress() throws IOException, InterruptedException {
public float getProgress() throws IOException {
return recordReader.getProgress();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
public boolean nextKeyValue() throws IOException {
return nextBatch();
}

@@ -134,16 +124,15 @@ public void close() throws IOException {
* Please note that `initBatch` is needed to be called after this.
*/
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
public void initialize(
InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
FileSplit fileSplit = (FileSplit)inputSplit;
Configuration conf = taskAttemptContext.getConfiguration();
reader = OrcFile.createReader(
Reader reader = OrcFile.createReader(
fileSplit.getPath(),
OrcFile.readerOptions(conf)
.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
.filesystem(fileSplit.getPath().getFileSystem(conf)));

Reader.Options options =
OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart(), fileSplit.getLength());
recordReader = reader.rows(options);
@@ -159,7 +148,7 @@ public void initBatch(
StructField[] requiredFields,
StructType partitionSchema,
InternalRow partitionValues) {
batch = orcSchema.createRowBatch(DEFAULT_SIZE);
batch = orcSchema.createRowBatch(CAPACITY);
assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`.

this.requiredFields = requiredFields;
@@ -171,19 +160,17 @@ public void initBatch(
resultSchema = resultSchema.add(f);
}

int capacity = DEFAULT_SIZE;

if (copyToSpark) {
if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema);
columnVectors = OffHeapColumnVector.allocateColumns(CAPACITY, resultSchema);
} else {
columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema);
columnVectors = OnHeapColumnVector.allocateColumns(CAPACITY, resultSchema);
}

// Initialize the missing columns once.
for (int i = 0; i < requiredFields.length; i++) {
if (requestedColIds[i] == -1) {
columnVectors[i].putNulls(0, capacity);
columnVectors[i].putNulls(0, CAPACITY);
columnVectors[i].setIsConstant();
}
}
@@ -196,7 +183,7 @@ public void initBatch(
}
}

columnarBatch = new ColumnarBatch(resultSchema, columnVectors, capacity);
columnarBatch = new ColumnarBatch(columnVectors);
} else {
// Just wrap the ORC column vector instead of copying it to Spark column vector.
orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];
@@ -206,8 +193,8 @@ public void initBatch(
int colId = requestedColIds[i];
// Initialize the missing columns once.
if (colId == -1) {
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
missingCol.putNulls(0, capacity);
OnHeapColumnVector missingCol = new OnHeapColumnVector(CAPACITY, dt);
missingCol.putNulls(0, CAPACITY);
missingCol.setIsConstant();
orcVectorWrappers[i] = missingCol;
} else {
@@ -219,14 +206,14 @@ public void initBatch(
int partitionIdx = requiredFields.length;
for (int i = 0; i < partitionValues.numFields(); i++) {
DataType dt = partitionSchema.fields()[i].dataType();
OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
OnHeapColumnVector partitionCol = new OnHeapColumnVector(CAPACITY, dt);
ColumnVectorUtils.populate(partitionCol, partitionValues, i);
partitionCol.setIsConstant();
orcVectorWrappers[partitionIdx + i] = partitionCol;
}
}

columnarBatch = new ColumnarBatch(resultSchema, orcVectorWrappers, capacity);
columnarBatch = new ColumnarBatch(orcVectorWrappers);
}
}

@@ -170,7 +170,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
* Returns the list of files at 'path' recursively. This skips files that are ignored normally
* by MapReduce.
*/
public static List<String> listDirectory(File path) throws IOException {
public static List<String> listDirectory(File path) {
List<String> result = new ArrayList<>();
if (path.isDirectory()) {
for (File f: path.listFiles()) {
@@ -231,7 +231,7 @@ protected void initialize(String path, List<String> columns) throws IOException
}

@Override
public Void getCurrentKey() throws IOException, InterruptedException {
public Void getCurrentKey() {
return null;
}

@@ -259,7 +259,7 @@ public ValuesReaderIntIterator(ValuesReader delegate) {
}

@Override
int nextInt() throws IOException {
int nextInt() {
return delegate.readInteger();
}
}
@@ -279,15 +279,15 @@ int nextInt() throws IOException {

protected static final class NullIntIterator extends IntIterator {
@Override
int nextInt() throws IOException { return 0; }
int nextInt() { return 0; }
}

/**
* Creates a reader for definition and repetition levels, returning an optimized one if
* the levels are not needed.
*/
protected static IntIterator createRLEIterator(int maxLevel, BytesInput bytes,
ColumnDescriptor descriptor) throws IOException {
protected static IntIterator createRLEIterator(
int maxLevel, BytesInput bytes, ColumnDescriptor descriptor) throws IOException {
try {
if (maxLevel == 0) return new NullIntIterator();
return new RLEIntIterator(
@@ -50,6 +50,9 @@
* TODO: make this always return ColumnarBatches.
*/
public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBase<Object> {
// TODO: make this configurable.
private static final int CAPACITY = 4 * 1024;

/**
* Batch of rows that we assemble and the current index we've returned. Every time this
* batch is used up (batchIdx == numBatched), we populated the batch.
@@ -152,7 +155,7 @@ public void close() throws IOException {
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
public boolean nextKeyValue() throws IOException {
resultBatch();

if (returnColumnarBatch) return nextBatch();
@@ -165,13 +168,13 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
}

@Override
public Object getCurrentValue() throws IOException, InterruptedException {
public Object getCurrentValue() {
if (returnColumnarBatch) return columnarBatch;
return columnarBatch.getRow(batchIdx - 1);
}

@Override
public float getProgress() throws IOException, InterruptedException {
public float getProgress() {
return (float) rowsReturned / totalRowCount;
}

@@ -181,7 +184,7 @@ public float getProgress() throws IOException, InterruptedException {
// Columns 0,1: data columns
// Column 2: partitionValues[0]
// Column 3: partitionValues[1]
public void initBatch(
private void initBatch(
MemoryMode memMode,
StructType partitionColumns,
InternalRow partitionValues) {
@@ -195,13 +198,12 @@ public void initBatch(
}
}

int capacity = ColumnarBatch.DEFAULT_BATCH_SIZE;
if (memMode == MemoryMode.OFF_HEAP) {
columnVectors = OffHeapColumnVector.allocateColumns(capacity, batchSchema);
columnVectors = OffHeapColumnVector.allocateColumns(CAPACITY, batchSchema);
} else {
columnVectors = OnHeapColumnVector.allocateColumns(capacity, batchSchema);
columnVectors = OnHeapColumnVector.allocateColumns(CAPACITY, batchSchema);
}
columnarBatch = new ColumnarBatch(batchSchema, columnVectors, capacity);
columnarBatch = new ColumnarBatch(columnVectors);
if (partitionColumns != null) {
int partitionIdx = sparkSchema.fields().length;
for (int i = 0; i < partitionColumns.fields().length; i++) {
@@ -213,13 +215,13 @@ public void initBatch(
// Initialize missing columns with nulls.
for (int i = 0; i < missingColumns.length; i++) {
if (missingColumns[i]) {
columnVectors[i].putNulls(0, columnarBatch.capacity());
columnVectors[i].putNulls(0, CAPACITY);
columnVectors[i].setIsConstant();
}
}
}

public void initBatch() {
private void initBatch() {
initBatch(MEMORY_MODE, null, null);
}

@@ -255,7 +257,7 @@ public boolean nextBatch() throws IOException {
if (rowsReturned >= totalRowCount) return false;
checkEndOfRowGroup();

int num = (int) Math.min((long) columnarBatch.capacity(), totalCountLoadedSoFar - rowsReturned);
int num = (int) Math.min((long) CAPACITY, totalCountLoadedSoFar - rowsReturned);
for (int i = 0; i < columnReaders.length; ++i) {
if (columnReaders[i] == null) continue;
columnReaders[i].readBatch(num, columnVectors[i]);
@@ -118,19 +118,19 @@ private static void appendValue(WritableColumnVector dst, DataType t, Object o)
}
} else {
if (t == DataTypes.BooleanType) {
dst.appendBoolean(((Boolean)o).booleanValue());
dst.appendBoolean((Boolean) o);
} else if (t == DataTypes.ByteType) {
dst.appendByte(((Byte) o).byteValue());
dst.appendByte((Byte) o);
} else if (t == DataTypes.ShortType) {
dst.appendShort(((Short)o).shortValue());
dst.appendShort((Short) o);
} else if (t == DataTypes.IntegerType) {
dst.appendInt(((Integer)o).intValue());
dst.appendInt((Integer) o);
} else if (t == DataTypes.LongType) {
dst.appendLong(((Long)o).longValue());
dst.appendLong((Long) o);
} else if (t == DataTypes.FloatType) {
dst.appendFloat(((Float)o).floatValue());
dst.appendFloat((Float) o);
} else if (t == DataTypes.DoubleType) {
dst.appendDouble(((Double)o).doubleValue());
dst.appendDouble((Double) o);
} else if (t == DataTypes.StringType) {
byte[] b =((String)o).getBytes(StandardCharsets.UTF_8);
dst.appendByteArray(b, 0, b.length);
@@ -192,7 +192,7 @@ private static void appendValue(WritableColumnVector dst, DataType t, Row src, i
*/
public static ColumnarBatch toBatch(
StructType schema, MemoryMode memMode, Iterator<Row> row) {
int capacity = ColumnarBatch.DEFAULT_BATCH_SIZE;
int capacity = 4 * 1024;
WritableColumnVector[] columnVectors;
if (memMode == MemoryMode.OFF_HEAP) {
columnVectors = OffHeapColumnVector.allocateColumns(capacity, schema);
@@ -208,7 +208,7 @@ public static ColumnarBatch toBatch(
}
n++;
}
ColumnarBatch batch = new ColumnarBatch(schema, columnVectors, capacity);
ColumnarBatch batch = new ColumnarBatch(columnVectors);
batch.setNumRows(n);
return batch;
}
@@ -20,18 +20,13 @@

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow;
import org.apache.spark.sql.types.StructType;

/**
* This class wraps multiple ColumnVectors as a row-wise table. It provides a row view of this
* batch so that Spark can access the data row by row. Instance of it is meant to be reused during
* the entire data loading process.
*/
public final class ColumnarBatch {
public static final int DEFAULT_BATCH_SIZE = 4 * 1024;

private final StructType schema;
private final int capacity;
private int numRows;
private final ColumnVector[] columns;

@@ -82,7 +77,6 @@ public void remove() {
* Sets the number of rows in this batch.
*/
public void setNumRows(int numRows) {
assert(numRows <= this.capacity);
this.numRows = numRows;
}

@@ -96,16 +90,6 @@ public void setNumRows(int numRows) {
*/
public int numRows() { return numRows; }

/**
* Returns the schema that makes up this batch.
*/
public StructType schema() { return schema; }

/**
* Returns the max capacity (in number of rows) for this batch.
*/
public int capacity() { return capacity; }

/**
* Returns the column at `ordinal`.
*/
@@ -120,10 +104,8 @@ public InternalRow getRow(int rowId) {
return row;
}

public ColumnarBatch(StructType schema, ColumnVector[] columns, int capacity) {
this.schema = schema;
public ColumnarBatch(ColumnVector[] columns) {
this.columns = columns;
this.capacity = capacity;
this.row = new MutableColumnarRow(columns);
}
}
@@ -94,7 +94,7 @@ class VectorizedHashMapGenerator(
|
| public $generatedClassName() {
| vectors = ${classOf[OnHeapColumnVector].getName}.allocateColumns(capacity, schema);
| batch = new ${classOf[ColumnarBatch].getName}(schema, vectors, capacity);
| batch = new ${classOf[ColumnarBatch].getName}(vectors);
|
| // Generates a projection to return the aggregate buffer only.
| ${classOf[OnHeapColumnVector].getName}[] aggBufferVectors =
@@ -175,7 +175,7 @@ private[sql] object ArrowConverters {
new ArrowColumnVector(vector).asInstanceOf[ColumnVector]
}.toArray

val batch = new ColumnarBatch(schemaRead, columns, root.getRowCount)
val batch = new ColumnarBatch(columns)
batch.setNumRows(root.getRowCount)
batch.rowIterator().asScala
}

0 comments on commit 55efeff

Please sign in to comment.
You can’t perform that action at this time.