Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] DPP is not working in Databricks env #3143

Closed
viadea opened this issue Aug 4, 2021 · 7 comments · Fixed by #6919
Closed

[BUG] DPP is not working in Databricks env #3143

viadea opened this issue Aug 4, 2021 · 7 comments · Fixed by #6919
Assignees
Labels
bug Something isn't working

Comments

@viadea
Copy link
Collaborator

viadea commented Aug 4, 2021

Describe the bug
A clear and concise description of what the bug is.

DPP(dynamic partition pruning) is not working in Databricks env.
Found this issue when analyzing NDS query performance on Databricks.

Steps/Code to reproduce bug
Please provide a list of steps or a code sample to reproduce the issue.
Avoid posting private or sensitive data.

Below is the minimum reproduce:

import org.apache.spark.sql.functions.col
spark.range(1000).select(col("id"), col("id").as("k")).write.partitionBy("k").format("parquet").mode("overwrite").save("/tmp/hao/myfact")
spark.range(100).select(col("id"), col("id").as("k")).write.format("parquet").mode("overwrite").save("/tmp/hao/mydim")
spark.read.parquet("/tmp/hao/myfact").createOrReplaceTempView("fact")
spark.read.parquet("/tmp/hao/mydim").createOrReplaceTempView("dim")
spark.sql("SELECT fact.id, fact.k FROM fact JOIN dim ON fact.k = dim.k AND dim.id < 2").collect

GPU's physical plan:

Location: InMemoryFileIndex [dbfs:/tmp/hao/myfact]
PartitionFilters: [isnotnull(k#1265), dynamicpruningexpression(true)]

CPU's physical plan:

Location: InMemoryFileIndex [dbfs:/tmp/hao/myfact]
PartitionFilters: [isnotnull(k#2201), dynamicpruningexpression(cast(k#2201 as bigint) IN dynamicpruning#2211)]

As you can see, even though the dynamicpruningexpression keyword is there , however the filter is always true.
In Spark standalone cluster, there is no such issue for GPU run.

Expected behavior
A clear and concise description of what you expected to happen.

DPP should happen in Databricks env.

Environment details (please complete the following information)

  • Environment location: [Standalone, YARN, Kubernetes, Cloud(specify cloud provider)]
  • Spark configuration settings related to the issue

Databricks runtime 8.2ML GPU
RAPIDS 21.10snapshot / 21.06GA

Additional context

WorkAround:
Disabling DPP by setting:
spark.sql.optimizer.dynamicPartitionPruning.enabled=false

@viadea viadea added bug Something isn't working ? - Needs Triage Need team to review and classify labels Aug 4, 2021
@tgravescs
Copy link
Collaborator

is the CPU run here with AQE off or on?

@viadea
Copy link
Collaborator Author

viadea commented Aug 5, 2021

CPU with AQE on while GPU run can only use AQE off. This is because if I enable AQE for GPU run, another bug will be triggered and will crash the cluster so that i have to restart the cluster

@viadea
Copy link
Collaborator Author

viadea commented Aug 5, 2021

@tgravescs I just quickly tested CPU run on Databricks. Basically my test shows, even if AQE is off, DPP is happening.
DPP is controlled by spark.sql.optimizer.dynamicPartitionPruning.enabled and setting this parameter to false will only make DPP gone.
So I do not think AQE is the trigger for this issue.

@tgravescs
Copy link
Collaborator

for some reason the GPU plan on Databricks is missing the SubqueryBroadcast which is used with DPP:


 :- GpuFileGpuScan parquet [id#60L,k#61] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/tgraves/myfact], PartitionFilters: [isnotnull(k#61), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: struct<id:bigint>
      +- GpuBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#191]
   +- GpuProject [k#65L]
            +- GpuCoalesceBatches TargetSize(2147483647)
               +- GpuFilter ((gpuisnotnull(id#64L) AND (id#64L < 2)) AND gpuisnotnull(k#65L))
                  +- GpuFileGpuScan parquet [id#64L,k#65L] Batched: true, DataFilters: [isnotnull(id#64L), (id#64L < 2), isnotnull(k#65L)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/tgraves/mydim], PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
 

normally this woudl look like:

 +- GpuBroadcastHashJoin [cast(k#39 as bigint)], [k#43L], Inner, GpuBuildRight
      :- GpuFileGpuScan parquet [id#38L,k#39] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/tgraves/myfact], PartitionFilters: [isnotnull(k#39), dynamicpruningexpression(cast(k#39 as bigint) IN dynamicpruning#48)], PushedFilters: [], ReadSchema: struct<id:bigint>
      :     +- SubqueryBroadcast dynamicpruning#48, 0, [k#43L], [id=#235]
      :        +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#234]
      :           +- GpuColumnarToRow false
      :              +- GpuProject [k#43L]
      :                 +- GpuCoalesceBatches targetsize(2147483647)
      :                    +- GpuFilter ((gpuisnotnull(id#42L) AND (id#42L < 2)) AND gpuisnotnull(k#43L))
      :                       +- GpuFileGpuScan parquet [id#42L,k#43L] Batched: true, DataFilters: [isnotnull(id#42L), (id#42L < 2), isnotnull(k#43L)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/tgraves/mydim], PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>

Somehow the subqueryBroadcast isn't there

@tgravescs
Copy link
Collaborator

tgravescs commented Aug 5, 2021

Yeah so the plan that we get from Databricks doesn't even have the SubqueryBroadcast in it like Apache Spark does, so they must be inserting this at some other point because the CPU side eventually gets it inserted, but its sometime after we see the plan.

@tgravescs
Copy link
Collaborator

note, turning off our gpuBroadcastExchange makes dpp work on databricks:

spark.conf.set("spark.rapids.sql.exec.BroadcastExchangeExec", "false")

== Physical Plan ==
GpuColumnarToRow (7)
+- GpuProject (6)
   +- GpuRowToColumnar (5)
      +- * BroadcastHashJoin Inner BuildRight (4)
         :- GpuColumnarToRow (2)
         :  +- GpuScan parquet  (1)
         +- ReusedExchange (3)

===== Subqueries =====

Subquery:1 Hosting operator id = 1 Hosting Expression = cast(k#18 as bigint) IN dynamicpruning#28
BroadcastExchange (13)
+- GpuColumnarToRow (12)
   +- GpuProject (11)
      +- GpuCoalesceBatches (10)
         +- GpuFilter (9)
            +- GpuScan parquet  (8)

@Salonijain27 Salonijain27 removed the ? - Needs Triage Need team to review and classify label Aug 17, 2021
@tgravescs
Copy link
Collaborator

Please note you can work around this issue by disabling DPP with spark.sql.optimizer.dynamicPartitionPruning.enabled=false

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Development

Successfully merging a pull request may close this issue.

4 participants