Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jan 19, 2018
1 parent 37c82e6 commit 3972093
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -688,17 +688,13 @@ class CodegenContext {
/**
* Returns the specialized code to access a value from a column vector for a given `DataType`.
*/
def getValue(vector: String, rowId: String, dataType: DataType): String = {
val jt = javaType(dataType)
dataType match {
case _ if isPrimitiveType(jt) =>
s"$vector.get${primitiveTypeName(jt)}($rowId)"
case t: DecimalType =>
s"$vector.getDecimal($rowId, ${t.precision}, ${t.scale})"
case StringType =>
s"$vector.getUTF8String($rowId)"
case _ =>
throw new IllegalArgumentException(s"cannot generate code for unsupported type: $dataType")
def getValueFromVector(vector: String, dataType: DataType, rowId: String): String = {
if (dataType.isInstanceOf[StructType]) {
// `ColumnVector.getStruct` is different from `InternalRow.getStruct`, it only takes an
// `ordinal` parameter.
s"$vector.getStruct($rowId)"
} else {
getValue(vector, dataType, rowId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ public int numNulls() {
@Override
public void close() {
if (childColumns != null) {
for (ArrowColumnVector childColumn : childColumns) {
childColumn.close();
for (int i = 0; i < childColumns.length; i++) {
childColumns[i].close();
childColumns[i] = null;
}
}
accessor.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public final ColumnarRow getStruct(int rowId) {
* Returns the array for rowId.
*/
public final ColumnarArray getArray(int rowId) {
return new ColumnarArray(getChild(0), getArrayOffset(rowId), getArrayLength(rowId));
return new ColumnarArray(arrayData(), getArrayOffset(rowId), getArrayLength(rowId));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.catalyst.expressions.{BoundReference, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}


Expand Down Expand Up @@ -50,14 +50,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
dataType: DataType,
nullable: Boolean): ExprCode = {
val javaType = ctx.javaType(dataType)
val value = if (dataType.isInstanceOf[StructType]) {
// `ColumnVector.getStruct` is different from `InternalRow.getStruct`, it only takes an
// `ordinal` parameter.
s"$columnVar.getStruct($ordinal)"
} else {
ctx.getValue(columnVar, dataType, ordinal)
}

val value = ctx.getValueFromVector(columnVar, dataType, ordinal)
val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
val valueVar = ctx.freshName("value")
val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,7 @@ class VectorizedHashMapGenerator(
groupingKeys.zipWithIndex.map { case (key: Buffer, ordinal: Int) =>
// `ColumnVector.getStruct` is different from `InternalRow.getStruct`, it only takes an
// `ordinal` parameter.
val value = if (key.dataType.isInstanceOf[StructType]) {
s"vectors[$ordinal].getStruct(buckets[idx])"
} else {
ctx.getValue(s"vectors[$ordinal]", "buckets[idx]", key.dataType)
}
val value = ctx.getValue(s"vectors[$ordinal]", key.dataType, "buckets[idx]")
s"(${ctx.genEqual(key.dataType, value, key.name)})"
}.mkString(" && ")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,17 +268,17 @@ object ColumnarBatchBenchmark {
Int Read/Write: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Java Array 177 / 181 1856.4 0.5 1.0X
ByteBuffer Unsafe 318 / 322 1032.0 1.0 0.6X
ByteBuffer API 1411 / 1418 232.2 4.3 0.1X
DirectByteBuffer 467 / 474 701.8 1.4 0.4X
Unsafe Buffer 178 / 185 1843.6 0.5 1.0X
Column(on heap) 178 / 184 1840.8 0.5 1.0X
Column(off heap) 341 / 344 961.8 1.0 0.5X
Column(off heap direct) 178 / 184 1845.4 0.5 1.0X
UnsafeRow (on heap) 378 / 389 866.3 1.2 0.5X
UnsafeRow (off heap) 393 / 402 834.0 1.2 0.4X
Column On Heap Append 309 / 318 1059.1 0.9 0.6X
Java Array 177 / 183 1851.1 0.5 1.0X
ByteBuffer Unsafe 314 / 330 1043.7 1.0 0.6X
ByteBuffer API 1298 / 1307 252.4 4.0 0.1X
DirectByteBuffer 465 / 483 704.2 1.4 0.4X
Unsafe Buffer 179 / 183 1835.5 0.5 1.0X
Column(on heap) 181 / 186 1815.2 0.6 1.0X
Column(off heap) 344 / 349 951.7 1.1 0.5X
Column(off heap direct) 178 / 186 1838.6 0.5 1.0X
UnsafeRow (on heap) 388 / 394 844.8 1.2 0.5X
UnsafeRow (off heap) 400 / 403 819.4 1.2 0.4X
Column On Heap Append 315 / 325 1041.8 1.0 0.6X
*/
val benchmark = new Benchmark("Int Read/Write", count * iters)
benchmark.addCase("Java Array")(javaArray)
Expand Down Expand Up @@ -337,8 +337,8 @@ object ColumnarBatchBenchmark {
Boolean Read/Write: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Bitset 726 / 727 462.4 2.2 1.0X
Byte Array 530 / 542 632.7 1.6 1.4X
Bitset 741 / 747 452.6 2.2 1.0X
Byte Array 531 / 542 631.6 1.6 1.4X
*/
benchmark.run()
}
Expand Down Expand Up @@ -394,8 +394,8 @@ object ColumnarBatchBenchmark {
String Read/Write: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
On Heap 332 / 338 49.3 20.3 1.0X
Off Heap 466 / 467 35.2 28.4 0.7X
On Heap 351 / 362 46.6 21.4 1.0X
Off Heap 456 / 466 35.9 27.8 0.8X
*/
val benchmark = new Benchmark("String Read/Write", count * iters)
benchmark.addCase("On Heap")(column(MemoryMode.ON_HEAP))
Expand Down Expand Up @@ -479,10 +479,10 @@ object ColumnarBatchBenchmark {
Array Vector Read: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
On Heap Read Size Only 415 / 422 394.7 2.5 1.0X
Off Heap Read Size Only 394 / 402 415.9 2.4 1.1X
On Heap Read Elements 2558 / 2593 64.0 15.6 0.2X
Off Heap Read Elements 3316 / 3317 49.4 20.2 0.1X
On Heap Read Size Only 416 / 423 393.5 2.5 1.0X
Off Heap Read Size Only 396 / 404 413.6 2.4 1.1X
On Heap Read Elements 2569 / 2590 63.8 15.7 0.2X
Off Heap Read Elements 3302 / 3333 49.6 20.2 0.1X
*/
benchmark.run
}
Expand Down

0 comments on commit 3972093

Please sign in to comment.