-
Notifications
You must be signed in to change notification settings - Fork 390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[VL] Support C2R and R2C between broadcast relations #4544
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on Github Issues? https://github.com/oap-project/gluten/issues Then could you also rename commit message and pull request title in the following format?
See also: |
Run Gluten Clickhouse CI |
38f0873
to
f414973
Compare
Run Gluten Clickhouse CI |
f414973
to
922e91c
Compare
Run Gluten Clickhouse CI |
2 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
2b30a78
to
03737f6
Compare
Run Gluten Clickhouse CI |
case bhj: BroadcastHashJoinExec => | ||
// FIXME Hongze: In following codes we perform a lot of if-else conditions to | ||
// make sure the broadcast exchange and broadcast hash-join are of same type, | ||
// either vanilla or columnar. In order to simplify the codes we have to do | ||
// some tricks around C2R and R2C to make them adapt to columnar broadcast. | ||
// Currently their doBroadcast() methods just propagate child's broadcast | ||
// payloads which is not right in speaking of columnar. | ||
if (!enableColumnarBroadcastJoin) { | ||
TransformHints.tagNotTransformable( | ||
bhj, | ||
"columnar BroadcastJoin is not enabled in BroadcastHashJoinExec") | ||
} else { | ||
val isBhjTransformable: ValidationResult = { | ||
val transformer = BackendsApiManager.getSparkPlanExecApiInstance | ||
.genBroadcastHashJoinExecTransformer( | ||
bhj.leftKeys, | ||
bhj.rightKeys, | ||
bhj.joinType, | ||
bhj.buildSide, | ||
bhj.condition, | ||
bhj.left, | ||
bhj.right, | ||
isNullAwareAntiJoin = bhj.isNullAwareAntiJoin) | ||
transformer.doValidate() | ||
} | ||
val buildSidePlan = bhj.buildSide match { | ||
case BuildLeft => bhj.left | ||
case BuildRight => bhj.right | ||
} | ||
|
||
val maybeExchange = buildSidePlan | ||
.find { | ||
case BroadcastExchangeExec(_, _) => true | ||
case _ => false | ||
} | ||
.map(_.asInstanceOf[BroadcastExchangeExec]) | ||
|
||
maybeExchange match { | ||
case Some(exchange @ BroadcastExchangeExec(mode, child)) => | ||
TransformHints.tag(bhj, isBhjTransformable.toTransformHint) | ||
if (!isBhjTransformable.isValid) { | ||
TransformHints.tagNotTransformable(exchange, isBhjTransformable) | ||
} | ||
case None => | ||
// we are in AQE, find the hidden exchange | ||
// FIXME did we consider the case that AQE: OFF && Reuse: ON ? | ||
var maybeHiddenExchange: Option[BroadcastExchangeLike] = None | ||
breakable { | ||
buildSidePlan.foreach { | ||
case e: BroadcastExchangeLike => | ||
maybeHiddenExchange = Some(e) | ||
break | ||
case t: BroadcastQueryStageExec => | ||
t.plan.foreach { | ||
case e2: BroadcastExchangeLike => | ||
maybeHiddenExchange = Some(e2) | ||
break | ||
case r: ReusedExchangeExec => | ||
r.child match { | ||
case e2: BroadcastExchangeLike => | ||
maybeHiddenExchange = Some(e2) | ||
break | ||
case _ => | ||
} | ||
case _ => | ||
} | ||
case _ => | ||
} | ||
} | ||
// restriction to force the hidden exchange to be found | ||
val exchange = maybeHiddenExchange.get | ||
// to conform to the underlying exchange's type, columnar or vanilla | ||
exchange match { | ||
case BroadcastExchangeExec(mode, child) => | ||
TransformHints.tagNotTransformable( | ||
bhj, | ||
"it's a materialized broadcast exchange or reused broadcast exchange") | ||
case ColumnarBroadcastExchangeExec(mode, child) => | ||
if (!isBhjTransformable.isValid) { | ||
throw new IllegalStateException( | ||
s"BroadcastExchange has already been" + | ||
s" transformed to columnar version but BHJ is determined as" + | ||
s" non-transformable: ${bhj.toString()}") | ||
} | ||
TransformHints.tagTransformable(bhj) | ||
} | ||
} | ||
} | ||
} | ||
} catch { | ||
case e: UnsupportedOperationException => | ||
TransformHints.tagNotTransformable( | ||
p, | ||
s"${e.getMessage}, original Spark plan is " + | ||
s"${p.getClass}(${p.children.toList.map(_.getClass)})") | ||
} | ||
} | ||
plan | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code in this file is moved from gluten-core
to backends-clickhouse
.
Run Gluten Clickhouse CI |
4 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
91f5b1e
to
52b6101
Compare
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
@@ -1005,6 +1005,7 @@ class VeloxTestSettings extends BackendTestSettings { | |||
"SPARK-9083: sort with non-deterministic expressions" | |||
) | |||
enableSuite[GlutenDataFrameTimeWindowingSuite] | |||
.exclude("time window joins") // FIXME hongze |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test fails by
Input schema contains unsupported type when convert row to columnar for StructType(StructField(window,StructType(StructField(start,TimestampNTZType,true),StructField(end,TimestampNTZType,true)),false),StructField(othervalue,IntegerType,false)) due to do not support data type: TimestampNTZType
java.lang.UnsupportedOperationException: Input schema contains unsupported type when convert row to columnar for StructType(StructField(window,StructType(StructField(start,TimestampNTZType,true),StructField(end,TimestampNTZType,true)),false),StructField(othervalue,IntegerType,false)) due to do not support data type: TimestampNTZType
at io.glutenproject.execution.RowToVeloxColumnarExec.$anonfun$doExecuteColumnarInternal$1(RowToVeloxColumnarExec.scala:52)
at scala.Option.foreach(Option.scala:407)
at io.glutenproject.execution.RowToVeloxColumnarExec.doExecuteColumnarInternal(RowToVeloxColumnarExec.scala:49)
at io.glutenproject.execution.RowToColumnarExecBase.doExecuteColumnar(RowToColumnarExecBase.scala:62)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:221)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:217)
at io.glutenproject.backendsapi.velox.SparkPlanExecApiImpl.createBroadcastRelation(SparkPlanExecApiImpl.scala:332)
at org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$2(ColumnarBroadcastExchangeExec.scala:79)
at io.glutenproject.utils.Arm$.withResource(Arm.scala:25)
at io.glutenproject.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37)
at org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$1(ColumnarBroadcastExchangeExec.scala:69)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Will fix in a later patch.
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
@zzcclp Please review CH's changes, thanks. |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhztheplayer the changes looks good to me. Added some queries and minor suggestions.
Thanks.
- Surbhi
@@ -152,6 +157,20 @@ case class ShuffledHashJoinExecTransformer( | |||
copy(left = newLeft, right = newRight) | |||
} | |||
|
|||
case class VeloxBroadcastBuildSideRDD( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case class is not specific to hash join and can be moved to generic place or to a new file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm moving it out from this file. If you think it can be reused for BNLJ.
from: Broadcast[F], | ||
fn: Iterator[InternalRow] => Iterator[ColumnarBatch]): Broadcast[T] = { | ||
// HashedRelation to ColumnarBuildSideRelation. | ||
val fromBroadcast = from.asInstanceOf[Broadcast[HashedRelation]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a check here for BroadcastMode? HashedRelation will only be present in case of HashedRelationBroadcastMode
. It will be Array[InternalRow]
in case of IdentityBroadcastmode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just checked your comment FIXME: Add checking for broadcast mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will add some checks for mode.
case IdentityBroadcastMode => | ||
throw new IllegalStateException("Unreachable code") | ||
case HashedRelationBroadcastMode(_, _) => | ||
val serialized: Array[ColumnarBatchSerializeResult] = child |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no hash table created in Gluten for the broadcast relation. IdentityBroadcastMode
and hashedRelationBroadcastMode
are actually same in case of Gluten.
Please check below comment in ColumnarBroadcastExchangeExec
//this created relation ignore HashedRelationBroadcastMode isNullAware, because we
// cannot get child output rows, then compare the hash key is null, if not null,
// compare the isNullAware, so gluten will not generate HashedRelationWithAllNullKeys
// or EmptyHashedRelation, this difference will cause performance regression in some
// cases.
This match block for mode can be removed safely when BNLJ is supported. Please correct me if my understanding is wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assumption is correct to me. Currently Gluten doesn't handle anything about IdentityBroadcastMode
so it's reasonable to throw when it sees one. If BNLJ requires the same code with BHJ in this code block, then the one for IdentityBroadcastMode
can be removed.
.filter(_.numRows() != 0) | ||
.map( | ||
b => { | ||
ColumnarBatches.retain(b) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can there be out of memory scenarios? since the original broadcast will be there in heap memory and then again memory will be allotted until the serialization is finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, you are considering the amplified memory consumption by the original hash relation + converted columnar batch + serialized byte arrays?
If so, it should become a problem when the broadcast threshold is set to a large number. We can optimize it in further development iterations with some benchmarks.
super.sparkConf | ||
.set("spark.sql.sources.useV1SourceList", "parquet") | ||
.set("spark.sql.autoBroadcastJoinThreshold", "30M") | ||
.set("spark.gluten.sql.columnar.broadcastJoin", "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be one more test case where both the configs broadcastJoin
and broadcastExchange
are enabled ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other existing suites in the same file should already cover the case you mentioned.
LGTM, we will support this feature for CH backend later |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
/Benchmark Velox |
1 similar comment
/Benchmark Velox |
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
The patch removes the restriction that broadcast hash join and broadcast exchange should be enabled and validated at the same time.
spark.gluten.sql.columnar.broadcastExchange
spark.gluten.sql.columnar.broadcastJoin
can be then turned on/off individually.C2Rs / R2Cs will work as expected to convert between vanilla Spark broadcast relation and Gluten (Velox, as of now)'s broadcast relation.
By doing this, the related broadcast exchange+join coherent validation rules will be removed from core module. They will continue existing in
backends-clickhouse
module until we implement this feature for CH backend.Require for #4533