Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
356 changes: 187 additions & 169 deletions sql/core/benchmarks/DataSourceReadBenchmark-jdk11-results.txt

Large diffs are not rendered by default.

356 changes: 187 additions & 169 deletions sql/core/benchmarks/DataSourceReadBenchmark-results.txt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,52 @@ public void skip() {
throw new UnsupportedOperationException();
}

private void updateCurrentByte() {
try {
currentByte = (byte) in.read();
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read a byte", e);
}
}

@Override
public final void readBooleans(int total, WritableColumnVector c, int rowId) {
// TODO: properly vectorize this
for (int i = 0; i < total; i++) {
c.putBoolean(rowId + i, readBoolean());
int i = 0;
if (bitOffset > 0) {
i = Math.min(8 - bitOffset, total);
c.putBooleans(rowId, i, currentByte, bitOffset);
bitOffset = (bitOffset + i) & 7;
}
for (; i + 7 < total; i += 8) {
updateCurrentByte();
c.putBooleans(rowId + i, currentByte);
}
if (i < total) {
updateCurrentByte();
bitOffset = total - i;
c.putBooleans(rowId + i, bitOffset, currentByte, 0);
}
}

@Override
public final void skipBooleans(int total) {
// TODO: properly vectorize this
for (int i = 0; i < total; i++) {
readBoolean();
int i = 0;
if (bitOffset > 0) {
i = Math.min(8 - bitOffset, total);
bitOffset = (bitOffset + i) & 7;
}
if (i + 7 < total) {
int numBytesToSkip = (total - i) / 8;
try {
in.skipFully(numBytesToSkip);
} catch (IOException e) {
throw new ParquetDecodingException("Failed to skip bytes", e);
}
i += numBytesToSkip * 8;
}
if (i < total) {
updateCurrentByte();
bitOffset = total - i;
}
}

Expand Down Expand Up @@ -276,13 +309,8 @@ public void skipShorts(int total) {

@Override
public final boolean readBoolean() {
// TODO: vectorize decoding and keep boolean[] instead of currentByte
if (bitOffset == 0) {
try {
currentByte = (byte) in.read();
} catch (IOException e) {
throw new ParquetDecodingException("Failed to read a byte", e);
}
updateCurrentByte();
}

boolean v = (currentByte & (1 << bitOffset)) != 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ public void putBooleans(int rowId, int count, boolean value) {
}
}

@Override
public void putBooleans(int rowId, byte src) {
Platform.putByte(null, data + rowId, (byte)(src & 1));
Platform.putByte(null, data + rowId + 1, (byte)(src >>> 1 & 1));
Platform.putByte(null, data + rowId + 2, (byte)(src >>> 2 & 1));
Platform.putByte(null, data + rowId + 3, (byte)(src >>> 3 & 1));
Platform.putByte(null, data + rowId + 4, (byte)(src >>> 4 & 1));
Platform.putByte(null, data + rowId + 5, (byte)(src >>> 5 & 1));
Platform.putByte(null, data + rowId + 6, (byte)(src >>> 6 & 1));
Platform.putByte(null, data + rowId + 7, (byte)(src >>> 7 & 1));
}

@Override
public boolean getBoolean(int rowId) { return Platform.getByte(null, data + rowId) == 1; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ public void putBooleans(int rowId, int count, boolean value) {
}
}

@Override
public void putBooleans(int rowId, byte src) {
byteData[rowId] = (byte)(src & 1);
byteData[rowId + 1] = (byte)(src >>> 1 & 1);
byteData[rowId + 2] = (byte)(src >>> 2 & 1);
byteData[rowId + 3] = (byte)(src >>> 3 & 1);
byteData[rowId + 4] = (byte)(src >>> 4 & 1);
byteData[rowId + 5] = (byte)(src >>> 5 & 1);
byteData[rowId + 6] = (byte)(src >>> 6 & 1);
byteData[rowId + 7] = (byte)(src >>> 7 & 1);
}

@Override
public boolean getBoolean(int rowId) {
return byteData[rowId] == 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
* WritableColumnVector are intended to be reused.
*/
public abstract class WritableColumnVector extends ColumnVector {
private final byte[] byte8 = new byte[8];

/**
* Resets this column for writing. The currently stored values are no longer accessible.
Expand Down Expand Up @@ -201,6 +202,29 @@ public WritableColumnVector reserveDictionaryIds(int capacity) {
*/
public abstract void putBooleans(int rowId, int count, boolean value);

/**
* Sets bits from [src[srcIndex], src[srcIndex + count]) to [rowId, rowId + count)
* src must contain bit-packed 8 booleans in the byte.
*/
public void putBooleans(int rowId, int count, byte src, int srcIndex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This as a public API looks a bit dangerous. count cannot be more than 7, but there is not assert on it. Can we add one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought about this too. Since this is hot path I think we can use assert but not Preconditions check. putBytes is also doing something very similar and there is no assertion there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, added!

assert ((srcIndex + count) <= 8);
byte8[0] = (byte)(src & 1);
byte8[1] = (byte)(src >>> 1 & 1);
byte8[2] = (byte)(src >>> 2 & 1);
byte8[3] = (byte)(src >>> 3 & 1);
byte8[4] = (byte)(src >>> 4 & 1);
byte8[5] = (byte)(src >>> 5 & 1);
byte8[6] = (byte)(src >>> 6 & 1);
byte8[7] = (byte)(src >>> 7 & 1);
putBytes(rowId, count, byte8, srcIndex);
}

/**
* Sets bits from [src[0], src[7]] to [rowId, rowId + 7]
* src must contain bit-packed 8 booleans in the byte.
*/
public abstract void putBooleans(int rowId, byte src);

/**
* Sets `value` to the value at rowId.
*/
Expand Down Expand Up @@ -470,6 +494,18 @@ public final int appendBooleans(int count, boolean v) {
return result;
}

/**
* Append bits from [src[offset], src[offset + count])
* src must contain bit-packed 8 booleans in the byte.
*/
public final int appendBooleans(int count, byte src, int offset) {
reserve(elementsAppended + count);
int result = elementsAppended;
putBooleans(elementsAppended, count, src, offset);
elementsAppended += count;
return result;
}

public final int appendByte(byte v) {
reserve(elementsAppended + 1);
putByte(elementsAppended, v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,31 +119,36 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {

prepareTable(dir, spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM t1"))

val query = dataType match {
case BooleanType => "sum(cast(id as bigint))"
case _ => "sum(id)"
}

sqlBenchmark.addCase("SQL CSV") { _ =>
spark.sql("select sum(id) from csvTable").noop()
spark.sql(s"select $query from csvTable").noop()
}

sqlBenchmark.addCase("SQL Json") { _ =>
spark.sql("select sum(id) from jsonTable").noop()
spark.sql(s"select $query from jsonTable").noop()
}

sqlBenchmark.addCase("SQL Parquet Vectorized") { _ =>
spark.sql("select sum(id) from parquetTable").noop()
spark.sql(s"select $query from parquetTable").noop()
}

sqlBenchmark.addCase("SQL Parquet MR") { _ =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("select sum(id) from parquetTable").noop()
spark.sql(s"select $query from parquetTable").noop()
}
}

sqlBenchmark.addCase("SQL ORC Vectorized") { _ =>
spark.sql("SELECT sum(id) FROM orcTable").noop()
spark.sql(s"SELECT $query FROM orcTable").noop()
}

sqlBenchmark.addCase("SQL ORC MR") { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
spark.sql("SELECT sum(id) FROM orcTable").noop()
spark.sql(s"SELECT $query FROM orcTable").noop()
}
}

Expand All @@ -157,6 +162,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
var longSum = 0L
var doubleSum = 0.0
val aggregateValue: (ColumnVector, Int) => Unit = dataType match {
case BooleanType => (col: ColumnVector, i: Int) => if (col.getBoolean(i)) longSum += 1L
case ByteType => (col: ColumnVector, i: Int) => longSum += col.getByte(i)
case ShortType => (col: ColumnVector, i: Int) => longSum += col.getShort(i)
case IntegerType => (col: ColumnVector, i: Int) => longSum += col.getInt(i)
Expand Down Expand Up @@ -191,6 +197,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
var longSum = 0L
var doubleSum = 0.0
val aggregateValue: (InternalRow) => Unit = dataType match {
case BooleanType => (col: InternalRow) => if (col.getBoolean(0)) longSum += 1L
case ByteType => (col: InternalRow) => longSum += col.getByte(0)
case ShortType => (col: InternalRow) => longSum += col.getShort(0)
case IntegerType => (col: InternalRow) => longSum += col.getInt(0)
Expand Down Expand Up @@ -542,7 +549,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("SQL Single Numeric Column Scan") {
Seq(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType).foreach {
Seq(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType).foreach {
dataType => numericScanBenchmark(1024 * 1024 * 15, dataType)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ import org.apache.spark.sql.test.SharedSparkSession
class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSession {
import testImplicits._

val ROW = ((1).toByte, 2, 3L, "abc", Period.of(1, 1, 0), Duration.ofMillis(100))
val ROW = ((1).toByte, 2, 3L, "abc", Period.of(1, 1, 0), Duration.ofMillis(100), true)
val NULL_ROW = (
null.asInstanceOf[java.lang.Byte],
null.asInstanceOf[Integer],
null.asInstanceOf[java.lang.Long],
null.asInstanceOf[String],
null.asInstanceOf[Period],
null.asInstanceOf[Duration])
null.asInstanceOf[Duration],
null.asInstanceOf[java.lang.Boolean])

test("All Types Dictionary") {
(1 :: 1000 :: Nil).foreach { n => {
Expand All @@ -59,6 +60,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
assert(batch.column(3).getUTF8String(i).toString == "abc")
assert(batch.column(4).getInt(i) == 13)
assert(batch.column(5).getLong(i) == 100000)
assert(batch.column(6).getBoolean(i) == true)
i += 1
}
reader.close()
Expand Down Expand Up @@ -88,6 +90,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess
assert(batch.column(3).isNullAt(i))
assert(batch.column(4).isNullAt(i))
assert(batch.column(5).isNullAt(i))
assert(batch.column(6).isNullAt(i))
i += 1
}
reader.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import java.util.NoSuchElementException

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.implicitConversions
import scala.util.Random

import org.apache.arrow.vector.IntVector
import org.apache.parquet.bytes.ByteBufferInputStream

import org.apache.spark.SparkFunSuite
import org.apache.spark.memory.MemoryMode
Expand All @@ -36,6 +38,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapBuilder, DateTimeUtils, GenericArrayData, MapData}
import org.apache.spark.sql.execution.RowToColumnConverter
import org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnarBatchRow, ColumnVector}
Expand Down Expand Up @@ -130,6 +133,97 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
}

testVector("Boolean APIs", 1024, BooleanType) {
column =>
val reference = mutable.ArrayBuffer.empty[Boolean]

var values = Array(true, false, true, false, false)
var bits = values.foldRight(0)((b, i) => i << 1 | (if (b) 1 else 0)).toByte
column.appendBooleans(2, bits, 0)
reference ++= values.slice(0, 2)

column.appendBooleans(3, bits, 2)
reference ++= values.slice(2, 5)

column.appendBooleans(6, true)
reference ++= Array.fill(6)(true)

column.appendBoolean(false)
reference += false

var idx = column.elementsAppended

values = Array(true, true, false, true, false, true, false, true)
bits = values.foldRight(0)((b, i) => i << 1 | (if (b) 1 else 0)).toByte
column.putBooleans(idx, 2, bits, 0)
reference ++= values.slice(0, 2)
idx += 2

column.putBooleans(idx, 3, bits, 2)
reference ++= values.slice(2, 5)
idx += 3

column.putBooleans(idx, bits)
reference ++= values
idx += 8

column.putBoolean(idx, false)
reference += false
idx += 1

column.putBooleans(idx, 3, true)
reference ++= Array.fill(3)(true)
idx += 3

implicit def intToByte(i: Int): Byte = i.toByte
val buf = ByteBuffer.wrap(Array(0x33, 0x5A, 0xA5, 0xCC, 0x0F, 0xF0, 0xEE, 0x77, 0x88))
val reader = new VectorizedPlainValuesReader()
reader.initFromPage(0, ByteBufferInputStream.wrap(buf))

reader.skipBooleans(1) // bit index 0

column.putBoolean(idx, reader.readBoolean) // bit index 1
reference += true
idx += 1

column.putBoolean(idx, reader.readBoolean) // bit index 2
reference += false
idx += 1

reader.skipBooleans(5) // bit index [3, 7]

column.putBoolean(idx, reader.readBoolean) // bit index 8
reference += false
idx += 1

reader.skipBooleans(8) // bit index [9, 16]
reader.skipBooleans(0) // no-op

column.putBoolean(idx, reader.readBoolean) // bit index 17
reference += false
idx += 1

reader.skipBooleans(16) // bit index [18, 33]

reader.readBooleans(4, column, idx) // bit index [34, 37]
reference ++= Array(true, true, false, false)
idx += 4

reader.readBooleans(11, column, idx) // bit index [38, 48]
reference ++= Array(false, false, false, false, false, false, true, true, true, true, false)
idx += 11

reader.skipBooleans(7) // bit index [49, 55]

reader.readBooleans(9, column, idx) // bit index [56, 64]
reference ++= Array(true, true, true, false, true, true, true, false, false)
idx += 9

reference.zipWithIndex.foreach { v =>
assert(v._1 == column.getBoolean(v._2), "VectorType=" + column.getClass.getSimpleName)
}
}

testVector("Byte APIs", 1024, ByteType) {
column =>
val reference = mutable.ArrayBuffer.empty[Byte]
Expand Down