From f3ca6c73e86928d0a087fbfd36de968ae873bbe3 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 25 Jan 2018 22:30:57 +0800 Subject: [PATCH 1/2] simplify ColumnVector.getArray --- .../datasources/orc/OrcColumnVector.java | 13 +-- .../vectorized/WritableColumnVector.java | 13 ++- .../sql/vectorized/ArrowColumnVector.java | 49 ++++------- .../spark/sql/vectorized/ColumnVector.java | 88 ++++++++++--------- .../spark/sql/vectorized/ColumnarArray.java | 2 + .../spark/sql/vectorized/ColumnarBatch.java | 2 + .../spark/sql/vectorized/ColumnarRow.java | 2 + .../vectorized/ColumnarBatchBenchmark.scala | 14 ++- 8 files changed, 88 insertions(+), 95 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java index aaf2a380034a9..5078bc7922ee2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java @@ -24,6 +24,7 @@ import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.TimestampType; +import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.unsafe.types.UTF8String; /** @@ -145,16 +146,6 @@ public double getDouble(int rowId) { return doubleData.vector[getRowIndex(rowId)]; } - @Override - public int getArrayLength(int rowId) { - throw new UnsupportedOperationException(); - } - - @Override - public int getArrayOffset(int rowId) { - throw new UnsupportedOperationException(); - } - @Override public Decimal getDecimal(int rowId, int precision, int scale) { BigDecimal data = decimalData.vector[getRowIndex(rowId)].getHiveDecimal().bigDecimalValue(); @@ -177,7 +168,7 @@ public byte[] getBinary(int rowId) { } @Override - public org.apache.spark.sql.vectorized.ColumnVector arrayData() { + public ColumnarArray getArray(int rowId) { throw new UnsupportedOperationException(); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index ca4f00985c2a3..a8ec8ef2aadf8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -24,6 +24,7 @@ import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.*; import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.types.UTF8String; @@ -602,7 +603,17 @@ public final int appendStruct(boolean isNull) { // `WritableColumnVector` puts the data of array in the first child column vector, and puts the // array offsets and lengths in the current column vector. @Override - public WritableColumnVector arrayData() { return childColumns[0]; } + public final ColumnarArray getArray(int rowId) { + return new ColumnarArray(arrayData(), getArrayOffset(rowId), getArrayLength(rowId)); + } + + public WritableColumnVector arrayData() { + return childColumns[0]; + } + + public abstract int getArrayLength(int rowId); + + public abstract int getArrayOffset(int rowId); @Override public WritableColumnVector getChild(int ordinal) { return childColumns[ordinal]; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java index ca7a4751450d4..6c7ac7262aedc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java @@ -17,17 +17,21 @@ package org.apache.spark.sql.vectorized; +import io.netty.buffer.ArrowBuf; import org.apache.arrow.vector.*; import org.apache.arrow.vector.complex.*; import org.apache.arrow.vector.holders.NullableVarCharHolder; +import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.execution.arrow.ArrowUtils; import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.types.UTF8String; /** - * A column vector backed by Apache Arrow. + * A column vector backed by Apache Arrow. Currently time interval type and map type are not + * supported. */ +@InterfaceStability.Evolving public final class ArrowColumnVector extends ColumnVector { private final ArrowVectorAccessor accessor; @@ -90,16 +94,6 @@ public double getDouble(int rowId) { return accessor.getDouble(rowId); } - @Override - public int getArrayLength(int rowId) { - return accessor.getArrayLength(rowId); - } - - @Override - public int getArrayOffset(int rowId) { - return accessor.getArrayOffset(rowId); - } - @Override public Decimal getDecimal(int rowId, int precision, int scale) { return accessor.getDecimal(rowId, precision, scale); @@ -116,7 +110,9 @@ public byte[] getBinary(int rowId) { } @Override - public ArrowColumnVector arrayData() { return childColumns[0]; } + public ColumnarArray getArray(int rowId) { + return accessor.getArray(rowId); + } @Override public ArrowColumnVector getChild(int ordinal) { return childColumns[ordinal]; } @@ -151,9 +147,6 @@ public ArrowColumnVector(ValueVector vector) { } else if (vector instanceof ListVector) { ListVector listVector = (ListVector) vector; accessor = new ArrayAccessor(listVector); - - childColumns = new ArrowColumnVector[1]; - childColumns[0] = new ArrowColumnVector(listVector.getDataVector()); } else if (vector instanceof NullableMapVector) { NullableMapVector mapVector = (NullableMapVector) vector; accessor = new StructAccessor(mapVector); @@ -180,10 +173,6 @@ boolean isNullAt(int rowId) { return vector.isNull(rowId); } - final int getValueCount() { - return vector.getValueCount(); - } - final int getNullCount() { return vector.getNullCount(); } @@ -232,11 +221,7 @@ byte[] getBinary(int rowId) { throw new UnsupportedOperationException(); } - int getArrayLength(int rowId) { - throw new UnsupportedOperationException(); - } - - int getArrayOffset(int rowId) { + ColumnarArray getArray(int rowId) { throw new UnsupportedOperationException(); } } @@ -433,10 +418,14 @@ final long getLong(int rowId) { private static class ArrayAccessor extends ArrowVectorAccessor { private final ListVector accessor; + private final ArrowBuf offsets; + private final ArrowColumnVector arrayData; ArrayAccessor(ListVector vector) { super(vector); this.accessor = vector; + this.offsets = vector.getOffsetBuffer(); + this.arrayData = new ArrowColumnVector(vector.getDataVector()); } @Override @@ -450,13 +439,11 @@ final boolean isNullAt(int rowId) { } @Override - final int getArrayLength(int rowId) { - return accessor.getInnerValueCountAt(rowId); - } - - @Override - final int getArrayOffset(int rowId) { - return accessor.getOffsetBuffer().getInt(rowId * accessor.OFFSET_WIDTH); + final ColumnarArray getArray(int rowId) { + int index = rowId * accessor.OFFSET_WIDTH; + int start = offsets.getInt(index); + int end = offsets.getInt(index + accessor.OFFSET_WIDTH); + return new ColumnarArray(arrayData, start, end - start); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index f9936214035b6..4b955ceddd0f2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.vectorized; +import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; @@ -29,11 +30,14 @@ * Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values * in this ColumnVector. * + * Spark only calls specific `get` method according to the data type of this {@link ColumnVector}, + * e.g. if it's int type, Spark is guaranteed to only call {@link #getInt(int)} or + * {@link #getInts(int, int)}. + * * ColumnVector supports all the data types including nested types. To handle nested types, - * ColumnVector can have children and is a tree structure. For struct type, it stores the actual - * data of each field in the corresponding child ColumnVector, and only stores null information in - * the parent ColumnVector. For array type, it stores the actual array elements in the child - * ColumnVector, and stores null information, array offsets and lengths in the parent ColumnVector. + * ColumnVector can have children and is a tree structure. Please refer to {@link #getStruct(int)}, + * {@link #getArray(int)} and {@link #getMap(int)} for the details about how to implement nested + * types. * * ColumnVector is expected to be reused during the entire data loading process, to avoid allocating * memory again and again. @@ -43,6 +47,7 @@ * format. Since it is expected to reuse the ColumnVector instance while loading data, the storage * footprint is negligible. */ +@InterfaceStability.Evolving public abstract class ColumnVector implements AutoCloseable { /** @@ -70,12 +75,12 @@ public abstract class ColumnVector implements AutoCloseable { public abstract boolean isNullAt(int rowId); /** - * Returns the value for rowId. + * Returns the boolean type value for rowId. */ public abstract boolean getBoolean(int rowId); /** - * Gets values from [rowId, rowId + count) + * Gets boolean type values from [rowId, rowId + count) */ public boolean[] getBooleans(int rowId, int count) { boolean[] res = new boolean[count]; @@ -86,12 +91,12 @@ public boolean[] getBooleans(int rowId, int count) { } /** - * Returns the value for rowId. + * Returns the byte type value for rowId. */ public abstract byte getByte(int rowId); /** - * Gets values from [rowId, rowId + count) + * Gets byte type values from [rowId, rowId + count) */ public byte[] getBytes(int rowId, int count) { byte[] res = new byte[count]; @@ -102,12 +107,12 @@ public byte[] getBytes(int rowId, int count) { } /** - * Returns the value for rowId. + * Returns the short type value for rowId. */ public abstract short getShort(int rowId); /** - * Gets values from [rowId, rowId + count) + * Gets short type values from [rowId, rowId + count) */ public short[] getShorts(int rowId, int count) { short[] res = new short[count]; @@ -118,12 +123,12 @@ public short[] getShorts(int rowId, int count) { } /** - * Returns the value for rowId. + * Returns the int type value for rowId. */ public abstract int getInt(int rowId); /** - * Gets values from [rowId, rowId + count) + * Gets int type values from [rowId, rowId + count) */ public int[] getInts(int rowId, int count) { int[] res = new int[count]; @@ -134,12 +139,12 @@ public int[] getInts(int rowId, int count) { } /** - * Returns the value for rowId. + * Returns the long type value for rowId. */ public abstract long getLong(int rowId); /** - * Gets values from [rowId, rowId + count) + * Gets long type values from [rowId, rowId + count) */ public long[] getLongs(int rowId, int count) { long[] res = new long[count]; @@ -150,12 +155,12 @@ public long[] getLongs(int rowId, int count) { } /** - * Returns the value for rowId. + * Returns the float type value for rowId. */ public abstract float getFloat(int rowId); /** - * Gets values from [rowId, rowId + count) + * Gets float type values from [rowId, rowId + count) */ public float[] getFloats(int rowId, int count) { float[] res = new float[count]; @@ -166,12 +171,12 @@ public float[] getFloats(int rowId, int count) { } /** - * Returns the value for rowId. + * Returns the double type value for rowId. */ public abstract double getDouble(int rowId); /** - * Gets values from [rowId, rowId + count) + * Gets double type values from [rowId, rowId + count) */ public double[] getDoubles(int rowId, int count) { double[] res = new double[count]; @@ -182,57 +187,54 @@ public double[] getDoubles(int rowId, int count) { } /** - * Returns the length of the array for rowId. - */ - public abstract int getArrayLength(int rowId); - - /** - * Returns the offset of the array for rowId. - */ - public abstract int getArrayOffset(int rowId); - - /** - * Returns the struct for rowId. + * Returns the struct type value for rowId. + * + * To support struct type, implementations must implement {@link #getChild(int)} and make this + * vector a tree structure. The number of child vectors must be same as the number of fields of + * the struct type, and each child vector is responsible to store the data for its corresponding + * struct field. */ public final ColumnarRow getStruct(int rowId) { return new ColumnarRow(this, rowId); } /** - * Returns the array for rowId. + * Returns the array type value for rowId. + * + * To support array type, implementations must construct an {@link ColumnarArray} and return it in + * this method. {@link ColumnarArray} requires a {@link ColumnVector} that stores the data of all + * the elements of all the arrays in this vector, and an offset and length which points to a range + * in that {@link ColumnVector}, and the range represents the array for rowId. Implementations + * are free to decide where to put the data vector and offsets and lengths. For example, we can + * use the first child vector as the data vector, and store offsets and lengths in 2 int arrays in + * this vector. */ - public final ColumnarArray getArray(int rowId) { - return new ColumnarArray(arrayData(), getArrayOffset(rowId), getArrayLength(rowId)); - } + public abstract ColumnarArray getArray(int rowId); /** - * Returns the map for rowId. + * Returns the map type value for rowId. */ public MapData getMap(int ordinal) { throw new UnsupportedOperationException(); } /** - * Returns the decimal for rowId. + * Returns the decimal type value for rowId. */ public abstract Decimal getDecimal(int rowId, int precision, int scale); /** - * Returns the UTF8String for rowId. Note that the returned UTF8String may point to the data of - * this column vector, please copy it if you want to keep it after this column vector is freed. + * Returns the string type value for rowId. Note that the returned UTF8String may point to the + * data of this column vector, please copy it if you want to keep it after this column vector is + * freed. */ public abstract UTF8String getUTF8String(int rowId); /** - * Returns the byte array for rowId. + * Returns the binary type value for rowId. */ public abstract byte[] getBinary(int rowId); - /** - * Returns the data for the underlying array. - */ - public abstract ColumnVector arrayData(); - /** * Returns the ordinal's child column vector. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java index 522c39580389f..0d2c3ec8648d3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.vectorized; +import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.types.*; @@ -25,6 +26,7 @@ /** * Array abstraction in {@link ColumnVector}. */ +@InterfaceStability.Evolving public final class ColumnarArray extends ArrayData { // The data for this array. This array contains elements from // data[offset] to data[offset + length). diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java index 4dc826cf60c15..d206c1df42abb 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java @@ -18,6 +18,7 @@ import java.util.*; +import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.MutableColumnarRow; @@ -26,6 +27,7 @@ * batch so that Spark can access the data row by row. Instance of it is meant to be reused during * the entire data loading process. */ +@InterfaceStability.Evolving public final class ColumnarBatch { private int numRows; private final ColumnVector[] columns; diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java index 2e59085a82768..25db7e09d20d0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.vectorized; +import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.catalyst.util.MapData; @@ -26,6 +27,7 @@ /** * Row abstraction in {@link ColumnVector}. */ +@InterfaceStability.Evolving public final class ColumnarRow extends InternalRow { // The data for this row. // E.g. the value of 3rd int field is `data.getChild(3).getInt(rowId)`. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala index ad74fb99b0c73..1f31aa45a1220 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.parquet +package org.apache.spark.sql.execution.vectorized import java.nio.ByteBuffer import java.nio.charset.StandardCharsets @@ -23,8 +23,6 @@ import scala.util.Random import org.apache.spark.memory.MemoryMode import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector -import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.types.{ArrayType, BinaryType, IntegerType} import org.apache.spark.unsafe.Platform import org.apache.spark.util.Benchmark @@ -434,7 +432,6 @@ object ColumnarBatchBenchmark { } def readArrays(onHeap: Boolean): Unit = { - System.gc() val vector = if (onHeap) onHeapVector else offHeapVector var sum = 0L @@ -448,7 +445,6 @@ object ColumnarBatchBenchmark { } def readArrayElements(onHeap: Boolean): Unit = { - System.gc() val vector = if (onHeap) onHeapVector else offHeapVector var sum = 0L @@ -479,10 +475,10 @@ object ColumnarBatchBenchmark { Array Vector Read: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - On Heap Read Size Only 416 / 423 393.5 2.5 1.0X - Off Heap Read Size Only 396 / 404 413.6 2.4 1.1X - On Heap Read Elements 2569 / 2590 63.8 15.7 0.2X - Off Heap Read Elements 3302 / 3333 49.6 20.2 0.1X + On Heap Read Size Only 426 / 437 384.9 2.6 1.0X + Off Heap Read Size Only 406 / 421 404.0 2.5 1.0X + On Heap Read Elements 2636 / 2642 62.2 16.1 0.2X + Off Heap Read Elements 3770 / 3774 43.5 23.0 0.1X */ benchmark.run } From 957d040a47c5a60bdbb19d0490dc748f66680376 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 26 Jan 2018 15:55:38 +0800 Subject: [PATCH 2/2] address comments --- .../org/apache/spark/sql/vectorized/ArrowColumnVector.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java index 6c7ac7262aedc..9803c3dec6de2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java @@ -418,13 +418,11 @@ final long getLong(int rowId) { private static class ArrayAccessor extends ArrowVectorAccessor { private final ListVector accessor; - private final ArrowBuf offsets; private final ArrowColumnVector arrayData; ArrayAccessor(ListVector vector) { super(vector); this.accessor = vector; - this.offsets = vector.getOffsetBuffer(); this.arrayData = new ArrowColumnVector(vector.getDataVector()); } @@ -440,6 +438,7 @@ final boolean isNullAt(int rowId) { @Override final ColumnarArray getArray(int rowId) { + ArrowBuf offsets = accessor.getOffsetBuffer(); int index = rowId * accessor.OFFSET_WIDTH; int start = offsets.getInt(index); int end = offsets.getInt(index + accessor.OFFSET_WIDTH);