From 0c39389cbcf34255d446aac46d1ed01b795e5835 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 27 Jul 2017 15:31:05 -0700 Subject: [PATCH 01/12] refactored ColumnarBatch to allow creating from ColumnVectors --- .../execution/vectorized/ColumnarBatch.java | 49 ++++++++++++++----- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index 34dc3af9b85c8..adc3df488beeb 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -65,15 +65,44 @@ public final class ColumnarBatch { final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { - return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); + ColumnVector[] columns = allocateVectors(schema, DEFAULT_BATCH_SIZE, memMode); + return create(schema, columns); } public static ColumnarBatch allocate(StructType type) { - return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); + ColumnVector[] columns = allocateVectors(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); + return create(type, columns); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { - return new ColumnarBatch(schema, maxRows, memMode); + ColumnVector[] columns = allocateVectors(schema, maxRows, memMode); + return create(schema, columns); + } + + private static ColumnVector[] allocateVectors(StructType schema, int maxRows, MemoryMode memMode) { + ColumnVector[] columns = new ColumnVector[schema.size()]; + for (int i = 0; i < schema.fields().length; ++i) { + StructField field = schema.fields()[i]; + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); + } + return columns; + } + + public static ColumnarBatch createReadOnly( + StructType schema, + ReadOnlyColumnVector[] columns, + int numRows) { + for (ReadOnlyColumnVector c: columns) { + assert(c.capacity >= numRows); + } + ColumnarBatch batch = create(schema, columns); + batch.setNumRows(numRows); + return batch; + } + + private static ColumnarBatch create(StructType schema, ColumnVector[] columns) { + assert(columns.length > 0); + return new ColumnarBatch(schema, columns); } /** @@ -505,18 +534,12 @@ public void filterNullsInColumn(int ordinal) { nullFilteredColumns.add(ordinal); } - private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode) { + private ColumnarBatch(StructType schema, ColumnVector[] columns) { this.schema = schema; - this.capacity = maxRows; - this.columns = new ColumnVector[schema.size()]; + this.columns = columns; + this.capacity = columns[0].capacity; this.nullFilteredColumns = new HashSet<>(); - this.filteredRows = new boolean[maxRows]; - - for (int i = 0; i < schema.fields().length; ++i) { - StructField field = schema.fields()[i]; - columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); - } - + this.filteredRows = new boolean[this.capacity]; this.row = new Row(this); } } From a4be6cf0bb0363e394c520b121835e3190c36730 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 31 Jul 2017 12:11:19 -0700 Subject: [PATCH 02/12] Added fromPayloadIterator to use ColumnarBatch for row iteration --- .../sql/execution/arrow/ArrowConverters.scala | 66 ++++++++++++++++++- 1 file changed, 65 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index 240f38f5bfeb4..f39c9fcd8e716 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.arrow import java.io.ByteArrayOutputStream import java.nio.channels.Channels +import scala.collection.JavaConverters._ + import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector._ import org.apache.arrow.vector.file._ @@ -28,6 +30,7 @@ import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, ColumnarBatch, ReadOnlyColumnVector} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -35,7 +38,7 @@ import org.apache.spark.util.Utils /** * Store Arrow data in a form that can be serialized by Spark and served to a Python process. */ -private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Serializable { +private[sql] class ArrowPayload private[sql] (payload: Array[Byte]) extends Serializable { /** * Convert the ArrowPayload to an ArrowRecordBatch. @@ -110,6 +113,67 @@ private[sql] object ArrowConverters { } } + private[sql] def fromPayloadIterator( + payloadIter: Iterator[ArrowPayload], + schema: StructType, + context: TaskContext): Iterator[InternalRow] = { + + val allocator = + ArrowUtils.rootAllocator.newChildAllocator("fromPayloadIterator", 0, Long.MaxValue) + var reader: ArrowFileReader = null + + new Iterator[InternalRow] { + + context.addTaskCompletionListener { _ => + close() + } + + private var _batch: ColumnarBatch = _ + private var _rowIter = if (payloadIter.hasNext) nextBatch() else Iterator.empty + + override def hasNext: Boolean = _rowIter.hasNext || { + if (payloadIter.hasNext) { + _rowIter = nextBatch() + true + } else { + close() + false + } + } + + override def next(): InternalRow = _rowIter.next() + + def close(): Unit = { + closeReader() + allocator.close() + } + + private def closeReader(): Unit = { + if (reader != null) { + reader.close() + reader = null + } + } + + private def nextBatch(): Iterator[InternalRow] = { + closeReader() + val in = new ByteArrayReadableSeekableByteChannel(payloadIter.next().asPythonSerializable) + reader = new ArrowFileReader(in, allocator) + reader.loadNextBatch() // throws IOException + val root = reader.getVectorSchemaRoot + + assert(schema.equals(ArrowUtils.fromArrowSchema(root.getSchema)), + s"$schema \n!=\n ${ArrowUtils.fromArrowSchema(root.getSchema)}") + + val columns = root.getFieldVectors.asScala.map { vector => + new ArrowColumnVector(vector).asInstanceOf[ReadOnlyColumnVector] + }.toArray + + ColumnarBatch.createReadOnly(schema, columns, root.getRowCount).rowIterator().asScala + } + } + } + /** * Convert a byte array to an ArrowRecordBatch. */ From f35b92c823db4b02edc6de0208fe17ce8b6c96c6 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 31 Jul 2017 14:11:43 -0700 Subject: [PATCH 03/12] added unit tests for ColumnarBatch with Arrow, and fromPayloadIterator --- .../arrow/ArrowConvertersSuite.scala | 41 +++++++++++++- .../vectorized/ColumnarBatchSuite.scala | 54 +++++++++++++++++++ 2 files changed, 93 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala index 4893b52f240ec..8e7840ebea9a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala @@ -22,15 +22,18 @@ import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat import java.util.Locale +import scala.collection.JavaConverters._ + import com.google.common.io.Files import org.apache.arrow.memory.RootAllocator -import org.apache.arrow.vector.{VectorLoader, VectorSchemaRoot} +import org.apache.arrow.vector.{NullableIntVector, VectorLoader, VectorSchemaRoot} import org.apache.arrow.vector.file.json.JsonFileReader import org.apache.arrow.vector.util.Validator import org.scalatest.BeforeAndAfterAll -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, ColumnarBatch, ReadOnlyColumnVector} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{BinaryType, IntegerType, StructField, StructType} import org.apache.spark.util.Utils @@ -1629,6 +1632,40 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { } } + test("roundtrip payloads") { + val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) + val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] + vector.allocateNew() + val mutator = vector.getMutator() + + (0 until 10).foreach { i => + mutator.setSafe(i, i) + } + mutator.setNull(10) + mutator.setValueCount(11) + + val schema = StructType(Seq(StructField("int", IntegerType))) + + val columnarBatch = ColumnarBatch.createReadOnly( + schema, Array[ReadOnlyColumnVector](new ArrowColumnVector(vector)), 11) + + val context = TaskContext.empty() + + val payloadIter = ArrowConverters.toPayloadIterator( + columnarBatch.rowIterator().asScala, schema, 0, context) + + val rowIter = ArrowConverters.fromPayloadIterator(payloadIter, schema, context) + + rowIter.zipWithIndex.foreach { case (row, i) => + if (i == 10) { + assert(row.isNullAt(0)) + } else { + assert(row.getInt(0) == i) + } + } + } + /** Test that a converted DataFrame to Arrow record batch equals batch read from JSON file */ private def collectAndValidate(df: DataFrame, json: String, file: String): Unit = { // NOTE: coalesce to single partition because can only load 1 batch in validator diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index ccf7aa7022a2a..76c1f6b74b315 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -25,10 +25,13 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random +import org.apache.arrow.vector.NullableIntVector + import org.apache.spark.SparkFunSuite import org.apache.spark.memory.MemoryMode import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.arrow.ArrowUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.types.CalendarInterval @@ -1248,4 +1251,55 @@ class ColumnarBatchSuite extends SparkFunSuite { s"vectorized reader")) } } + + test("create read-only batch") { + val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) + val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] + vector1.allocateNew() + val mutator1 = vector1.getMutator() + val vector2 = ArrowUtils.toArrowField("int2", IntegerType, nullable = true) + .createVector(allocator).asInstanceOf[NullableIntVector] + vector2.allocateNew() + val mutator2 = vector2.getMutator() + + (0 until 10).foreach { i => + mutator1.setSafe(i, i) + mutator2.setSafe(i + 1, i) + } + mutator1.setNull(10) + mutator1.setValueCount(11) + mutator2.setNull(0) + mutator2.setValueCount(11) + + val columnVectors = Seq(new ArrowColumnVector(vector1), new ArrowColumnVector(vector2)) + + val schema = StructType(Seq(StructField("int1", IntegerType), StructField("int2", IntegerType))) + val batch = ColumnarBatch.createReadOnly( + schema, columnVectors.toArray[ReadOnlyColumnVector], 11) + + assert(batch.numCols() == 2) + assert(batch.numRows() == 11) + + val rowIter = batch.rowIterator().asScala + rowIter.zipWithIndex.foreach { case (row, i) => + if (i == 10) { + assert(row.isNullAt(0)) + } else { + assert(row.getInt(0) == i) + } + if (i == 0) { + assert(row.isNullAt(1)) + } else { + assert(row.getInt(1) == i - 1) + } + } + + intercept[java.lang.AssertionError] { + batch.getRow(100) + } + + columnVectors.foreach(_.close()) + allocator.close() + } } From 43214b1ddbc477809571ab42c449335d406508a6 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 31 Jul 2017 16:21:04 -0700 Subject: [PATCH 04/12] need to account for possible empty schema --- .../sql/execution/vectorized/ColumnarBatch.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index adc3df488beeb..1dfb41bbdeb41 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -66,17 +66,17 @@ public final class ColumnarBatch { public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { ColumnVector[] columns = allocateVectors(schema, DEFAULT_BATCH_SIZE, memMode); - return create(schema, columns); + return create(schema, columns, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType type) { ColumnVector[] columns = allocateVectors(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); - return create(type, columns); + return create(type, columns, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { ColumnVector[] columns = allocateVectors(schema, maxRows, memMode); - return create(schema, columns); + return create(schema, columns, maxRows); } private static ColumnVector[] allocateVectors(StructType schema, int maxRows, MemoryMode memMode) { @@ -95,14 +95,14 @@ public static ColumnarBatch createReadOnly( for (ReadOnlyColumnVector c: columns) { assert(c.capacity >= numRows); } - ColumnarBatch batch = create(schema, columns); + ColumnarBatch batch = create(schema, columns, numRows); batch.setNumRows(numRows); return batch; } - private static ColumnarBatch create(StructType schema, ColumnVector[] columns) { + private static ColumnarBatch create(StructType schema, ColumnVector[] columns, int capacity) { assert(columns.length > 0); - return new ColumnarBatch(schema, columns); + return new ColumnarBatch(schema, columns, capacity); } /** @@ -534,10 +534,10 @@ public void filterNullsInColumn(int ordinal) { nullFilteredColumns.add(ordinal); } - private ColumnarBatch(StructType schema, ColumnVector[] columns) { + private ColumnarBatch(StructType schema, ColumnVector[] columns, int capacity) { this.schema = schema; this.columns = columns; - this.capacity = columns[0].capacity; + this.capacity = capacity; this.nullFilteredColumns = new HashSet<>(); this.filteredRows = new boolean[this.capacity]; this.row = new Row(this); From f906156189ef66c7812792710b71a6e5e9a7bd5d Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 31 Jul 2017 21:32:28 -0700 Subject: [PATCH 05/12] fix assert that columns equals schema length --- .../spark/sql/execution/vectorized/ColumnarBatch.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index 1dfb41bbdeb41..a01355cb8dc15 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -65,13 +65,11 @@ public final class ColumnarBatch { final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { - ColumnVector[] columns = allocateVectors(schema, DEFAULT_BATCH_SIZE, memMode); - return create(schema, columns, DEFAULT_BATCH_SIZE); + return allocate(schema, memMode, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType type) { - ColumnVector[] columns = allocateVectors(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); - return create(type, columns, DEFAULT_BATCH_SIZE); + return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { @@ -101,7 +99,7 @@ public static ColumnarBatch createReadOnly( } private static ColumnarBatch create(StructType schema, ColumnVector[] columns, int capacity) { - assert(columns.length > 0); + assert(schema.length() == columns.length); return new ColumnarBatch(schema, columns, capacity); } From 3d80e54c53298fedb278ab7d1cc95777af0b6643 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 8 Aug 2017 17:21:07 -0700 Subject: [PATCH 06/12] changed fromPayloadIter to return schema also --- .../sql/execution/arrow/ArrowConverters.scala | 66 +++++++++---------- .../arrow/ArrowConvertersSuite.scala | 4 +- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index f39c9fcd8e716..d6229c89e0b24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -115,38 +115,51 @@ private[sql] object ArrowConverters { private[sql] def fromPayloadIterator( payloadIter: Iterator[ArrowPayload], - schema: StructType, - context: TaskContext): Iterator[InternalRow] = { - + context: TaskContext): (Iterator[InternalRow], StructType) = { val allocator = ArrowUtils.rootAllocator.newChildAllocator("fromPayloadIterator", 0, Long.MaxValue) var reader: ArrowFileReader = null - new Iterator[InternalRow] { + def nextBatch(): (Iterator[InternalRow], StructType) = { + val in = new ByteArrayReadableSeekableByteChannel(payloadIter.next().asPythonSerializable) + reader = new ArrowFileReader(in, allocator) + reader.loadNextBatch() // throws IOException + val root = reader.getVectorSchemaRoot // throws IOException + val schemaRead = ArrowUtils.fromArrowSchema(root.getSchema) + + val columns = root.getFieldVectors.asScala.map { vector => + new ArrowColumnVector(vector).asInstanceOf[ReadOnlyColumnVector] + }.toArray + + (ColumnarBatch.createReadOnly(schemaRead, columns, root.getRowCount).rowIterator().asScala, + schemaRead) + } + + var (rowIter, schemaRead) = if (payloadIter.hasNext) { + nextBatch() + } else { + (Iterator.empty, StructType(Seq.empty)) + } + + val outputIterator = new Iterator[InternalRow] { context.addTaskCompletionListener { _ => - close() + closeReader() + allocator.close() } - private var _batch: ColumnarBatch = _ - private var _rowIter = if (payloadIter.hasNext) nextBatch() else Iterator.empty - - override def hasNext: Boolean = _rowIter.hasNext || { + override def hasNext: Boolean = rowIter.hasNext || { + closeReader() if (payloadIter.hasNext) { - _rowIter = nextBatch() + rowIter = nextBatch()._1 true } else { - close() + allocator.close() false } } - override def next(): InternalRow = _rowIter.next() - - def close(): Unit = { - closeReader() - allocator.close() - } + override def next(): InternalRow = rowIter.next() private def closeReader(): Unit = { if (reader != null) { @@ -154,24 +167,9 @@ private[sql] object ArrowConverters { reader = null } } - - private def nextBatch(): Iterator[InternalRow] = { - closeReader() - val in = new ByteArrayReadableSeekableByteChannel(payloadIter.next().asPythonSerializable) - reader = new ArrowFileReader(in, allocator) - reader.loadNextBatch() // throws IOException - val root = reader.getVectorSchemaRoot - - assert(schema.equals(ArrowUtils.fromArrowSchema(root.getSchema)), - s"$schema \n!=\n ${ArrowUtils.fromArrowSchema(root.getSchema)}") - - val columns = root.getFieldVectors.asScala.map { vector => - new ArrowColumnVector(vector).asInstanceOf[ReadOnlyColumnVector] - }.toArray - - ColumnarBatch.createReadOnly(schema, columns, root.getRowCount).rowIterator().asScala - } } + + (outputIterator, schemaRead) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala index 8e7840ebea9a3..929b35ab688d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala @@ -1655,7 +1655,9 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { val payloadIter = ArrowConverters.toPayloadIterator( columnarBatch.rowIterator().asScala, schema, 0, context) - val rowIter = ArrowConverters.fromPayloadIterator(payloadIter, schema, context) + val (rowIter, schemaRead) = ArrowConverters.fromPayloadIterator(payloadIter, context) + + assert(schema.equals(schemaRead)) rowIter.zipWithIndex.foreach { case (row, i) => if (i == 10) { From 23d19dfde53d02c37a2c20f67c9816a73bd57cd2 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 8 Aug 2017 17:36:07 -0700 Subject: [PATCH 07/12] minor cleanup on ColumnarBatch constructors --- .../sql/execution/vectorized/ColumnarBatch.java | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index a01355cb8dc15..70321af856980 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -73,11 +73,11 @@ public static ColumnarBatch allocate(StructType type) { } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { - ColumnVector[] columns = allocateVectors(schema, maxRows, memMode); - return create(schema, columns, maxRows); + ColumnVector[] columns = allocateCols(schema, maxRows, memMode); + return new ColumnarBatch(schema, columns, maxRows); } - private static ColumnVector[] allocateVectors(StructType schema, int maxRows, MemoryMode memMode) { + private static ColumnVector[] allocateCols(StructType schema, int maxRows, MemoryMode memMode) { ColumnVector[] columns = new ColumnVector[schema.size()]; for (int i = 0; i < schema.fields().length; ++i) { StructField field = schema.fields()[i]; @@ -90,19 +90,12 @@ public static ColumnarBatch createReadOnly( StructType schema, ReadOnlyColumnVector[] columns, int numRows) { - for (ReadOnlyColumnVector c: columns) { - assert(c.capacity >= numRows); - } - ColumnarBatch batch = create(schema, columns, numRows); + assert(schema.length() == columns.length); + ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows); batch.setNumRows(numRows); return batch; } - private static ColumnarBatch create(StructType schema, ColumnVector[] columns, int capacity) { - assert(schema.length() == columns.length); - return new ColumnarBatch(schema, columns, capacity); - } - /** * Called to close all the columns in this batch. It is not valid to access the data after * calling this. This must be called at the end to clean up memory allocations. From cc81d48bec75182420f83297f7467462d33e4537 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 8 Aug 2017 17:41:32 -0700 Subject: [PATCH 08/12] added description on fromPayloadIterator --- .../apache/spark/sql/execution/arrow/ArrowConverters.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index d6229c89e0b24..1438cfc395fb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -113,6 +113,10 @@ private[sql] object ArrowConverters { } } + /** + * Maps Iterator from ArrowPayload to InternalRow. Returns a pair containing the row iterator + * and the schema from the first batch of Arrow data read. + */ private[sql] def fromPayloadIterator( payloadIter: Iterator[ArrowPayload], context: TaskContext): (Iterator[InternalRow], StructType) = { From 9eb929a7139d0ee7b842a547dc1b0d78e200f097 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 24 Aug 2017 23:20:45 -0700 Subject: [PATCH 09/12] updated fromPayloadIterator to work with immutable ColumnVector --- .../sql/execution/arrow/ArrowConverters.scala | 10 ++++++---- .../execution/arrow/ArrowConvertersSuite.scala | 15 ++++++--------- .../execution/vectorized/ColumnarBatchSuite.scala | 4 ++-- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index d48e07ccc8f94..60ba89f0840dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -30,7 +30,7 @@ import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, ColumnarBatch, ReadOnlyColumnVector} +import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -132,11 +132,13 @@ private[sql] object ArrowConverters { val schemaRead = ArrowUtils.fromArrowSchema(root.getSchema) val columns = root.getFieldVectors.asScala.map { vector => - new ArrowColumnVector(vector).asInstanceOf[ReadOnlyColumnVector] + new ArrowColumnVector(vector).asInstanceOf[ColumnVector] }.toArray - (ColumnarBatch.createReadOnly(schemaRead, columns, root.getRowCount).rowIterator().asScala, - schemaRead) + val batch = new ColumnarBatch(schemaRead, columns, root.getRowCount) + batch.setNumRows(root.getRowCount) + + (batch.rowIterator().asScala, schemaRead) } var (rowIter, schemaRead) = if (payloadIter.hasNext) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala index 929b35ab688d1..e0dcf4928b8cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala @@ -33,7 +33,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, ColumnarBatch, ReadOnlyColumnVector} +import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{BinaryType, IntegerType, StructField, StructType} import org.apache.spark.util.Utils @@ -1647,15 +1647,12 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { val schema = StructType(Seq(StructField("int", IntegerType))) - val columnarBatch = ColumnarBatch.createReadOnly( - schema, Array[ReadOnlyColumnVector](new ArrowColumnVector(vector)), 11) + val batch = new ColumnarBatch(schema, Array[ColumnVector](new ArrowColumnVector(vector)), 11) + batch.setNumRows(11) - val context = TaskContext.empty() - - val payloadIter = ArrowConverters.toPayloadIterator( - columnarBatch.rowIterator().asScala, schema, 0, context) - - val (rowIter, schemaRead) = ArrowConverters.fromPayloadIterator(payloadIter, context) + val ctx = TaskContext.empty() + val payloadIter = ArrowConverters.toPayloadIterator(batch.rowIterator().asScala, schema, 0, ctx) + val (rowIter, schemaRead) = ArrowConverters.fromPayloadIterator(payloadIter, ctx) assert(schema.equals(schemaRead)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index c213fed68deda..56c758abe75eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -1288,8 +1288,8 @@ class ColumnarBatchSuite extends SparkFunSuite { val columnVectors = Seq(new ArrowColumnVector(vector1), new ArrowColumnVector(vector2)) val schema = StructType(Seq(StructField("int1", IntegerType), StructField("int2", IntegerType))) - val batch = ColumnarBatch.createReadOnly( - schema, columnVectors.toArray[ReadOnlyColumnVector], 11) + val batch = new ColumnarBatch(schema, columnVectors.toArray[ColumnVector], 11) + batch.setNumRows(11) assert(batch.numCols() == 2) assert(batch.numRows() == 11) From a90a71b2e5cc479ae7b173aa1cb6d9e969400899 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Fri, 25 Aug 2017 00:22:32 -0700 Subject: [PATCH 10/12] using ArrowRowIterator trait instead of tuple for schema --- .../sql/execution/arrow/ArrowConverters.scala | 64 ++++++++++--------- .../arrow/ArrowConvertersSuite.scala | 4 +- 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index 60ba89f0840dc..561a067a2f81f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -53,6 +53,17 @@ private[sql] class ArrowPayload private[sql] (payload: Array[Byte]) extends Seri def asPythonSerializable: Array[Byte] = payload } +/** + * Iterator interface to iterate over Arrow record batches and return rows + */ +private[sql] trait ArrowRowIterator extends Iterator[InternalRow] { + + /** + * Return the schema loaded from the Arrow record batch being iterated over + */ + def schema: StructType +} + private[sql] object ArrowConverters { /** @@ -119,45 +130,26 @@ private[sql] object ArrowConverters { */ private[sql] def fromPayloadIterator( payloadIter: Iterator[ArrowPayload], - context: TaskContext): (Iterator[InternalRow], StructType) = { + context: TaskContext): ArrowRowIterator = { val allocator = ArrowUtils.rootAllocator.newChildAllocator("fromPayloadIterator", 0, Long.MaxValue) - var reader: ArrowFileReader = null - - def nextBatch(): (Iterator[InternalRow], StructType) = { - val in = new ByteArrayReadableSeekableByteChannel(payloadIter.next().asPythonSerializable) - reader = new ArrowFileReader(in, allocator) - reader.loadNextBatch() // throws IOException - val root = reader.getVectorSchemaRoot // throws IOException - val schemaRead = ArrowUtils.fromArrowSchema(root.getSchema) - - val columns = root.getFieldVectors.asScala.map { vector => - new ArrowColumnVector(vector).asInstanceOf[ColumnVector] - }.toArray - val batch = new ColumnarBatch(schemaRead, columns, root.getRowCount) - batch.setNumRows(root.getRowCount) - - (batch.rowIterator().asScala, schemaRead) - } - - var (rowIter, schemaRead) = if (payloadIter.hasNext) { - nextBatch() - } else { - (Iterator.empty, StructType(Seq.empty)) - } - - val outputIterator = new Iterator[InternalRow] { + new ArrowRowIterator { + private var reader: ArrowFileReader = null + private var schemaRead = StructType(Seq.empty) + private var rowIter = if (payloadIter.hasNext) nextBatch() else Iterator.empty context.addTaskCompletionListener { _ => closeReader() allocator.close() } + override def schema: StructType = schemaRead + override def hasNext: Boolean = rowIter.hasNext || { closeReader() if (payloadIter.hasNext) { - rowIter = nextBatch()._1 + rowIter = nextBatch() true } else { allocator.close() @@ -173,9 +165,23 @@ private[sql] object ArrowConverters { reader = null } } - } - (outputIterator, schemaRead) + private def nextBatch(): Iterator[InternalRow] = { + val in = new ByteArrayReadableSeekableByteChannel(payloadIter.next().asPythonSerializable) + reader = new ArrowFileReader(in, allocator) + reader.loadNextBatch() // throws IOException + val root = reader.getVectorSchemaRoot // throws IOException + schemaRead = ArrowUtils.fromArrowSchema(root.getSchema) + + val columns = root.getFieldVectors.asScala.map { vector => + new ArrowColumnVector(vector).asInstanceOf[ColumnVector] + }.toArray + + val batch = new ColumnarBatch(schemaRead, columns, root.getRowCount) + batch.setNumRows(root.getRowCount) + batch.rowIterator().asScala + } + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala index e0dcf4928b8cd..908c35197278b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala @@ -1652,9 +1652,9 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { val ctx = TaskContext.empty() val payloadIter = ArrowConverters.toPayloadIterator(batch.rowIterator().asScala, schema, 0, ctx) - val (rowIter, schemaRead) = ArrowConverters.fromPayloadIterator(payloadIter, ctx) + val rowIter = ArrowConverters.fromPayloadIterator(payloadIter, ctx) - assert(schema.equals(schemaRead)) + assert(schema.equals(rowIter.schema)) rowIter.zipWithIndex.foreach { case (row, i) => if (i == 10) { From 3fcdec57bb57c39ce9c88997ced7007595ae3ad9 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 29 Aug 2017 10:54:38 -0700 Subject: [PATCH 11/12] forgot to close allocator in test, some cleanup --- .../spark/sql/execution/arrow/ArrowConvertersSuite.scala | 3 +++ .../spark/sql/execution/vectorized/ColumnarBatchSuite.scala | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala index 908c35197278b..fccc11a1ad186 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala @@ -1663,6 +1663,9 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { assert(row.getInt(0) == i) } } + + vector.close() + allocator.close() } /** Test that a converted DataFrame to Arrow record batch equals batch read from JSON file */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 56c758abe75eb..1f21d3c0db987 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -1265,7 +1265,7 @@ class ColumnarBatchSuite extends SparkFunSuite { } } - test("create read-only batch") { + test("create columnar batch from Arrow column vectors") { val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) val vector1 = ArrowUtils.toArrowField("int1", IntegerType, nullable = true) .createVector(allocator).asInstanceOf[NullableIntVector] @@ -1312,7 +1312,7 @@ class ColumnarBatchSuite extends SparkFunSuite { batch.getRow(100) } - columnVectors.foreach(_.close()) + batch.close() allocator.close() } } From ffcbf7522700b31f39ea1e11f681bde05d299594 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 30 Aug 2017 16:45:38 -0700 Subject: [PATCH 12/12] simplified round-trip test data to seq of rows --- .../arrow/ArrowConvertersSuite.scala | 44 +++++++------------ 1 file changed, 16 insertions(+), 28 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala index fccc11a1ad186..30422b657742c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala @@ -22,18 +22,16 @@ import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat import java.util.Locale -import scala.collection.JavaConverters._ - import com.google.common.io.Files import org.apache.arrow.memory.RootAllocator -import org.apache.arrow.vector.{NullableIntVector, VectorLoader, VectorSchemaRoot} +import org.apache.arrow.vector.{VectorLoader, VectorSchemaRoot} import org.apache.arrow.vector.file.json.JsonFileReader import org.apache.arrow.vector.util.Validator import org.scalatest.BeforeAndAfterAll import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{BinaryType, IntegerType, StructField, StructType} import org.apache.spark.util.Utils @@ -1633,39 +1631,29 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { } test("roundtrip payloads") { - val allocator = ArrowUtils.rootAllocator.newChildAllocator("int", 0, Long.MaxValue) - val vector = ArrowUtils.toArrowField("int", IntegerType, nullable = true) - .createVector(allocator).asInstanceOf[NullableIntVector] - vector.allocateNew() - val mutator = vector.getMutator() - - (0 until 10).foreach { i => - mutator.setSafe(i, i) - } - mutator.setNull(10) - mutator.setValueCount(11) + val inputRows = (0 until 9).map { i => + InternalRow(i) + } :+ InternalRow(null) - val schema = StructType(Seq(StructField("int", IntegerType))) - - val batch = new ColumnarBatch(schema, Array[ColumnVector](new ArrowColumnVector(vector)), 11) - batch.setNumRows(11) + val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) val ctx = TaskContext.empty() - val payloadIter = ArrowConverters.toPayloadIterator(batch.rowIterator().asScala, schema, 0, ctx) - val rowIter = ArrowConverters.fromPayloadIterator(payloadIter, ctx) + val payloadIter = ArrowConverters.toPayloadIterator(inputRows.toIterator, schema, 0, ctx) + val outputRowIter = ArrowConverters.fromPayloadIterator(payloadIter, ctx) - assert(schema.equals(rowIter.schema)) + assert(schema.equals(outputRowIter.schema)) - rowIter.zipWithIndex.foreach { case (row, i) => - if (i == 10) { - assert(row.isNullAt(0)) - } else { + var count = 0 + outputRowIter.zipWithIndex.foreach { case (row, i) => + if (i != 9) { assert(row.getInt(0) == i) + } else { + assert(row.isNullAt(0)) } + count += 1 } - vector.close() - allocator.close() + assert(count == inputRows.length) } /** Test that a converted DataFrame to Arrow record batch equals batch read from JSON file */