-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13805][SQL] Generate code that get a value in each column from ColumnVector when ColumnarBatch is used #11636
Conversation
Test build #52846 has finished for PR 11636 at commit
|
This is great. Can you include the generated code snippet with a few columns? |
@nongli , here is another example. Spark code with two columns sqlContext.conf.setConfString(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true")
sqlContext.conf.setConfString(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
val values = 10
withTempPath { dir =>
withTempTable("t1", "tempTable") {
sqlContext.range(values).registerTempTable("t1")
sqlContext.sql("select id % 2 as p, cast(id as INT) as id from t1")
.write.partitionBy("p").parquet(dir.getCanonicalPath)
sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
sqlContext.sql("select sum(p), sum(id) from tempTable").collect
}
} Code snippet generated by this PR ...
/* 073 */ private void rdd_processBatches() throws java.io.IOException {
/* 074 */ while (true) {
/* 075 */ int numRows = rdd_batch.numRows();
/* 076 */ if (rdd_batchIdx == 0) rdd_metricValue.add(numRows);
/* 077 */
/* 078 */ while (!shouldStop() && rdd_batchIdx < numRows) {
/* 079 */ org.apache.spark.sql.execution.vectorized.ColumnVector rdd_col0 = rdd_batch.column(0);org.apache.spark.sql.execution.vectorized.ColumnVector r
dd_col1 = rdd_batch.column(1);
/* 080 */ /*** CONSUME: TungstenAggregate(key=[], functions=[(sum(cast(p#4 as bigint)),mode=Partial,isDistinct=false),(sum(cast(id#3 as bigint)),mode=Pa
rtial,isDistinct=false)], output=[sum#13L,sum#14L]) */
/* 081 */ /* input[0, int] */
/* 082 */ boolean rdd_isNull = rdd_col0.getIsNull(rdd_batchIdx);
/* 083 */ int rdd_value = rdd_isNull ? -1 : (rdd_col0.getInt(rdd_batchIdx));
/* 084 */ /* input[1, int] */
/* 085 */ boolean rdd_isNull1 = rdd_col1.getIsNull(rdd_batchIdx);
/* 086 */ int rdd_value1 = rdd_isNull1 ? -1 : (rdd_col1.getInt(rdd_batchIdx));
/* 087 */
/* 088 */ // do aggregate
... |
Test build #52991 has finished for PR 11636 at commit
|
Test build #52992 has finished for PR 11636 at commit
|
cc @davies |
@@ -158,9 +158,13 @@ class CodegenContext { | |||
/** The variable name of the input row in generated code. */ | |||
final var INPUT_ROW = "i" | |||
|
|||
/** The variable name of the input col in generated code. */ | |||
var INPUT_COLORDINAL = "idx" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INPUT_COL_ORDINAL
@nongli , is it possible to change the name of API from |
@kiszk Feel free to change that API |
val value = if (!ctx.isColumnarType(ctx.INPUT_ROW)) { | ||
ctx.getValue(ctx.INPUT_ROW, dataType, ordinal.toString) | ||
} else { | ||
ctx.getValue(ctx.INPUT_ROW, dataType, ctx.INPUT_COLORDINAL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move these into DataSourceScan? (Because it's only used there)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One possible idea is to prepare getValue()
function that is can be overridden as follows.
In my opinion, the following code is not easy to read. What do you think?
In BoundExpression
def getValue(...): String = { ctx.getValue(ctx.INPUT_ROW, dataType, ordinal.toString) }
override genCode(...): String = {
...
val value = getValue(...)
...
}
In DataSourceScan
, we pass BoundExpression
that has own getValue()
as follows:
...
val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true) {
def getValue(...): String = {
val value = if (!ctx.isColumnarType(ctx.INPUT_ROW)) {
ctx.getValue(ctx.INPUT_ROW, dataType, ordinal.toString)
} else {
ctx.getValue(ctx.INPUT_ROW, dataType, ctx.INPUT_COLORDINAL)
}
}
})
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In DataSourceScan, we do NOT need BoundReference to generate the code to access ColumnBatch, we can generate the code directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it good to introduce new InputReference
, which is similar to BoundReference
, only for the code to access ColumnBatch
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it's only used in one place, I'd like to narrow down the changes, it's easier to maintain.
Test build #53324 has finished for PR 11636 at commit
|
@@ -199,7 +210,8 @@ class CodegenContext { | |||
case StringType => s"$input.getUTF8String($ordinal)" | |||
case BinaryType => s"$input.getBinary($ordinal)" | |||
case CalendarIntervalType => s"$input.getInterval($ordinal)" | |||
case t: StructType => s"$input.getStruct($ordinal, ${t.size})" | |||
case t: StructType => if (!isColumnarType(input)) { s"$input.getStruct($ordinal, ${t.size})" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, the parquet reader does not support nested types (Array, Map, Struct), it's fine to not have this special case in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this getStruct()
for ColumnVector
may not be called at rutnime now, a code generator always produces two version for InternalRow
and ColumnVector
.
Thus, this code is necessary to avoid compilation error for now. In the future, this code is necessary to correctly handle nested types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not make them have the same APIs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I provided the same API ColumnVector.getStruct(int, int)
.
Test build #53466 has finished for PR 11636 at commit
|
Test build #53468 has finished for PR 11636 at commit
|
Test build #53470 has finished for PR 11636 at commit
|
Test build #53500 has finished for PR 11636 at commit
|
Test build #53537 has finished for PR 11636 at commit
|
val exprCols = output.zipWithIndex.map( | ||
x => new InputReference(x._2, x._1.dataType, x._1.nullable, rowidx)) | ||
val exprRows = output.zipWithIndex.map( | ||
x => new InputReference(x._2, x._1.dataType, x._1.nullable)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use BoundReference here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used BoundReference for exprRows
Test build #53584 has finished for PR 11636 at commit
|
…tor for easy code generation rename getIsNull() to isNullAt() add getStruct(int, int)
…tor for easy code generation rename getIsNull() to isNullAt()
Test build #53674 has finished for PR 11636 at commit
|
Have you rerun the benchmark with these changes? |
Test build #53678 has finished for PR 11636 at commit
|
| | ||
| ${columnLIVAssigns.mkString("", "\n", "\n")} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, I will not use this. In the future, I may revisit this scala replacement regarding all of possible variables in another PR.
Here are benchmark results in the latest code. Without this PR
With this PR
|
LGTM |
Test build #53706 has finished for PR 11636 at commit
|
Merged into master, thanks! |
…m ColumnVector when ColumnarBatch is used ## What changes were proposed in this pull request? This PR generates code that get a value in each column from ```ColumnVector``` instead of creating ```InternalRow``` when ```ColumnarBatch``` is accessed. This PR improves benchmark program by up to 15%. This PR consists of two parts: 1. Get an ```ColumnVector ``` by using ```ColumnarBatch.column()``` method 2. Get a value of each column by using ```rdd_col${COLIDX}.getInt(ROWIDX)``` instead of ```rdd_row.getInt(COLIDX)``` This is a motivated example. ```` sqlContext.conf.setConfString(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true") sqlContext.conf.setConfString(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") val values = 10 withTempPath { dir => withTempTable("t1", "tempTable") { sqlContext.range(values).registerTempTable("t1") sqlContext.sql("select id % 2 as p, cast(id as INT) as id from t1") .write.partitionBy("p").parquet(dir.getCanonicalPath) sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable") sqlContext.sql("select sum(p) from tempTable").collect } } ```` The original code ````java ... /* 072 */ while (!shouldStop() && rdd_batchIdx < numRows) { /* 073 */ InternalRow rdd_row = rdd_batch.getRow(rdd_batchIdx++); /* 074 */ /*** CONSUME: TungstenAggregate(key=[], functions=[(sum(cast(p#4 as bigint)),mode=Partial,isDistinct=false)], output=[sum#10L]) */ /* 075 */ /* input[0, int] */ /* 076 */ boolean rdd_isNull = rdd_row.isNullAt(0); /* 077 */ int rdd_value = rdd_isNull ? -1 : (rdd_row.getInt(0)); ... ```` The code generated by this PR ````java /* 072 */ while (!shouldStop() && rdd_batchIdx < numRows) { /* 073 */ org.apache.spark.sql.execution.vectorized.ColumnVector rdd_col0 = rdd_batch.column(0); /* 074 */ /*** CONSUME: TungstenAggregate(key=[], functions=[(sum(cast(p#4 as bigint)),mode=Partial,isDistinct=false)], output=[sum#10L]) */ /* 075 */ /* input[0, int] */ /* 076 */ boolean rdd_isNull = rdd_col0.getIsNull(rdd_batchIdx); /* 077 */ int rdd_value = rdd_isNull ? -1 : (rdd_col0.getInt(rdd_batchIdx)); ... /* 128 */ rdd_batchIdx++; /* 129 */ } /* 130 */ if (shouldStop()) return; ```` Performance Without this PR ```` model name : Intel(R) Xeon(R) CPU E5-2667 v2 3.30GHz Partitioned Table: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- Read data column 434 / 488 36.3 27.6 1.0X Read partition column 302 / 346 52.1 19.2 1.4X Read both columns 588 / 643 26.8 37.4 0.7X ```` With this PR ```` model name : Intel(R) Xeon(R) CPU E5-2667 v2 3.30GHz Partitioned Table: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- Read data column 392 / 516 40.1 24.9 1.0X Read partition column 256 / 318 61.4 16.3 1.5X Read both columns 523 / 539 30.1 33.3 0.7X ```` ## How was this patch tested? Tested by existing test suites and benchmark Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes apache#11636 from kiszk/SPARK-13805.
What changes were proposed in this pull request?
This PR generates code that get a value in each column from
ColumnVector
instead of creatingInternalRow
whenColumnarBatch
is accessed. This PR improves benchmark program by up to 15%.This PR consists of two parts:
ColumnVector
by usingColumnarBatch.column()
methodrdd_col${COLIDX}.getInt(ROWIDX)
instead ofrdd_row.getInt(COLIDX)
This is a motivated example.
The original code
The code generated by this PR
Performance
Without this PR
With this PR
How was this patch tested?
Tested by existing test suites and benchmark