New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25121][SQL] Supports multi-part table names for broadcast hint resolution #22198
Conversation
@@ -191,6 +195,39 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { | |||
assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1) | |||
} | |||
|
|||
test("SPARK-25121 Supports multi-part names for broadcast hint resolution") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to move the three tests below into DataFrameHintSuite
?
- test("broadcast join hint using broadcast function")
- test("broadcast join hint using Dataset.hint")
- test("SPARK-25121 Supports multi-part names for broadcast hint resolution")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ResolveHintsSuite
is the smallest one for this. Can we add the following test to ResolveHintsSuite
?
test("Supports multi-part table names for broadcast hint resolution") {
checkAnalysis(
UnresolvedHint("MAPJOIN", Seq("default.table", "default.table2"),
table("table").join(table("table2"))),
Join(ResolvedHint(testRelation, HintInfo(broadcast = true)),
ResolvedHint(testRelation2, HintInfo(broadcast = true)), Inner, None),
caseSensitive = false)
}
Test build #95148 has finished for PR 22198 at commit
|
@dilipbiswal @gatorsmile ping |
cc @dongjoon-hyun Try to review this PR? |
@maropu @gatorsmile @dongjoon-hyun I do have a question on the semantics. use hint;
explain extended SELECT /*+ BROADCASTJOIN(hint.s2) */ * FROM s1, s2 where s1.c1 = s2.c1; In this case, aren't we supposed to apply the hint ? even though s2 is not explicitly qualified with the database in the from clause ? Here is the optimized plan i see ..
|
aha, I see. IMO we need to apply the hint in the case, too. I'll fix. |
Test build #95252 has finished for PR 22198 at commit
|
@@ -191,6 +195,39 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { | |||
assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1) | |||
} | |||
|
|||
test("SPARK-25121 Supports multi-part names for broadcast hint resolution") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ResolveHintsSuite
is the smallest one for this. Can we add the following test to ResolveHintsSuite
?
test("Supports multi-part table names for broadcast hint resolution") {
checkAnalysis(
UnresolvedHint("MAPJOIN", Seq("default.table", "default.table2"),
table("table").join(table("table2"))),
Join(ResolvedHint(testRelation, HintInfo(broadcast = true)),
ResolvedHint(testRelation2, HintInfo(broadcast = true)), Inner, None),
caseSensitive = false)
}
@@ -47,20 +49,39 @@ object ResolveHints { | |||
* | |||
* This rule must happen before common table expressions. | |||
*/ | |||
class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] { | |||
class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accordingly, we can use String instead of SessionCatalog.
- class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog) extends Rule[LogicalPlan] {
+ class ResolveBroadcastHints(conf: SQLConf, currentDatabase: String) extends Rule[LogicalPlan] {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can't use String
there because currentDatabase
might be updatable by others?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can instead use getCurrentDatabase: () => String
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya. Right, please ignore this. We need catalog
to lookup global_temp
, too.
tableIdent: IdentifierWithDatabase): Boolean = { | ||
val identifierList = | ||
tableIdent.database.getOrElse(catalog.getCurrentDatabase) :: tableIdent.identifier :: Nil | ||
namePartsWithDatabase(nameParts).corresponds(identifierList)(resolver) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic will make a regression (plan1
in the below) in case of global temporary view. Please add the following test case into GlobalTempViewSuite
and revise the logic to handle both cases correctly.
test("broadcast hint on global temp view") {
import org.apache.spark.sql.catalyst.plans.logical.{ResolvedHint, Join}
withGlobalTempView("v1") {
spark.range(10).createGlobalTempView("v1")
withTempView("v2") {
spark.range(10).createTempView("v2")
Seq(
"SELECT /*+ MAPJOIN(v1) */ * FROM global_temp.v1, v2 WHERE v1.id = v2.id",
"SELECT /*+ MAPJOIN(global_temp.v1) */ * FROM global_temp.v1, v2 WHERE v1.id = v2.id"
).foreach { statement =>
val plan = sql(statement).queryExecution.optimizedPlan
assert(plan.asInstanceOf[Join].left.isInstanceOf[ResolvedHint])
assert(!plan.asInstanceOf[Join].right.isInstanceOf[ResolvedHint])
}
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun a little confused about the name resolution here;
"SELECT /*+ MAPJOIN(v1) */ * FROM global_temp.v1, v2 WHERE v1.id = v2.id",
MAPJOIN(v1)
implicitly means global_temp.v1
?
For example;
"SELECT /*+ MAPJOIN(v1) */ * FROM default.v1, global_temp.v1 WHERE default.v1.id = global_temp.v1.id",
In this case, what's the MAPJOIN(v1)
behaviour?
- Apply no hint (current behaviour)
- Apply a hint into
default.v1
only - Apply a hint into both
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
First of all, the above two test cases in
test("broadcast hint on global temp view")
should work as before. In general,global_temp.v1
should be used with the prefixglobal_temp.
. However, before this PR, we cannot putdatabase
name on Hint. So, we allowed exceptional cases; hints on global temporary view (withoutglobal_temp.
prefix). -
For the case you mentioned, I'd like to interpret
MAPJOIN(v1)
todefault.v1 only
because it's the Spark's behavior outside this Hint syntax. And, please add a test case for this, too.
@cloud-fan and @gatorsmile . Could you give us some advice, too? Is it okay to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, @maropu . In addition,
- The current behavior of
master
branch (Spark 2.4) isApply a hint into both
. - The legacy behavior of Spark 2.3.1 is raising an AnalysisException for that query.
So, I think it's a good change to become consistent in Spark 2.4.
scala> sql("set spark.sql.autoBroadcastJoinThreshold=-1")
scala> sql("set spark.sql.crossJoin.enabled=true")
scala> sql("drop view v1")
scala> sql("create view v1 as select 'view' id").show
scala> sql("create global temporary view v1 as select 'global_temp_view' id").show
scala> sql("SELECT /*+ MAPJOIN(v1) */ * FROM v1, global_temp.v1 WHERE default.v1.id = global_temp.v1.id").explain(true)
org.apache.spark.sql.AnalysisException: cannot resolve '`default.v1.id`' given input columns: [v1.id, v1.id]; line 1 pos 58;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, yes. I'll refine the pr. thanks.
Thanks, @dongjoon-hyun! I'll check and merge that. |
assert(plan.collect { case p: BroadcastHashJoinExec => p }.size == 0) | ||
|
||
// Uses multi-part table names for broadcast hints | ||
def checkIfHintApplied(tableName: String, hintTableName: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hintTableName
is never used in this func?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, I'll fix.
Test build #95275 has finished for PR 22198 at commit
|
Test build #95279 has finished for PR 22198 at commit
|
retest this please |
Test build #95281 has finished for PR 22198 at commit
|
retest this please |
Test build #95284 has finished for PR 22198 at commit
|
nameParts: Seq[String], | ||
tableIdent: IdentifierWithDatabase): Boolean = { | ||
tableIdent.database match { | ||
case Some(db) if catalog.globalTempViewManager.database == formatDatabaseName(db) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use resolver
here like the following and remove formatDatabaseName
in line 65~67? Since it's a SessionCatalog
function, let's avoid duplication.
- case Some(db) if catalog.globalTempViewManager.database == formatDatabaseName(db) =>
+ case Some(db) if resolver(catalog.globalTempViewManager.database, db) =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we need a case-sensitive test. I made another PR to you for that, maropu#3 .
Test build #95322 has finished for PR 22198 at commit
|
case _ => | ||
val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase) | ||
val identifierList = db :: tableIdent.identifier :: Nil | ||
namePartsWithDatabase(nameParts, catalog.getCurrentDatabase) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part will break temporary view
case. In the following case, no table should be broadcasted. Also, could you add more test cases? We need to test table
, global temporary view
, temporary view
, and view
. It seems that we still miss some cases like the following.
scala> :paste
// Entering paste mode (ctrl-D to finish)
sql("set spark.sql.autoBroadcastJoinThreshold=-1")
spark.range(10).write.mode("overwrite").saveAsTable("t")
sql("create temporary view tv as select * from t")
sql("select /*+ mapjoin(default.tv) */ * from t, tv where t.id = tv.id").explain
sql("select * from default.tv")
// Exiting paste mode, now interpreting.
== Physical Plan ==
*(2) BroadcastHashJoin [id#7L], [id#12L], Inner, BuildRight
:- *(2) Project [id#7L]
: +- *(2) Filter isnotnull(id#7L)
: +- *(2) FileScan parquet default.t[id#7L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dongjoon/PR-22198/spark-warehouse/t], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- *(1) Project [id#12L]
+- *(1) Filter isnotnull(id#12L)
+- *(1) FileScan parquet default.t[id#12L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dongjoon/PR-22198/spark-warehouse/t], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
org.apache.spark.sql.AnalysisException: Table or view not found: `default`.`tv`; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation `default`.`tv`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'll try.
@maropu and @dilipbiswal and @gatorsmile . The complexity comes because this PR duplicates the existing name resolution logic. Although we may move From your PR description,
Originally, For
To sum up, until now, we are moving forward to (2), but is (2) really required for SPARK-25121? If we choose (1), it will become simpler and consistent with the original design choice (matching based on unresolved strings). |
@dongjoon-hyun Thanks for nicely summarizing. Actually i was not clear on the semantics when i asked the question :-) and was wondering if we should resolve it like a table identifier or just match it like a string. Do we know how other databases that support hints handle this ? I am actually fine if we go with option 1. |
Thanks for the sum-up. I like simpler one, too. Le me just describe more to make me more understood; IIUC we have the two case: (1) fully-qualified case (1) no ambiguity, as @dongjoon-hyun said, we just exactly map Since I think most users meet this case (2) (they don't add database names there in most case I probably think...), IMHO it is important to support the syntax for usability. Based on the thought, my proposal is that we handle |
Test build #95395 has finished for PR 22198 at commit
|
Ur, @maropu . What I worried was case (1). For |
Aha, I see. It is simple to match identifiers literally. So, let me wait for other developers comments. cc: @gatorsmile |
Test build #95448 has finished for PR 22198 at commit
|
Thanks for understanding, @maropu . Yes. We need to build consensus. @gatorsmile and @cloud-fan . Could you give us a directional advice for this PR? Basically, we are wondering if we need to provide the same name resolution at this Hint layers. Please see for the summary comment. |
Test build #102295 has finished for PR 22198 at commit
|
Could you check @cloud-fan @dongjoon-hyun ? |
Test build #114805 has finished for PR 22198 at commit
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
does this problem still exist? |
I'll check again. |
Since this issue still exists, I'll open a new PR for this issue. |
… resolution ### What changes were proposed in this pull request? This pr fixed code to respect a database name for broadcast table hint resolution. Currently, spark ignores a database name in multi-part names; ``` scala> sql("CREATE DATABASE testDb") scala> spark.range(10).write.saveAsTable("testDb.t") // without this patch scala> spark.range(10).join(spark.table("testDb.t"), "id").hint("broadcast", "testDb.t").explain == Physical Plan == *(2) Project [id#24L] +- *(2) BroadcastHashJoin [id#24L], [id#26L], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) : +- *(1) Range (0, 10, step=1, splits=4) +- *(2) Project [id#26L] +- *(2) Filter isnotnull(id#26L) +- *(2) FileScan parquet testdb.t[id#26L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-2.3.1-bin-hadoop2.7/spark-warehouse..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> // with this patch scala> spark.range(10).join(spark.table("testDb.t"), "id").hint("broadcast", "testDb.t").explain == Physical Plan == *(2) Project [id#3L] +- *(2) BroadcastHashJoin [id#3L], [id#5L], Inner, BuildRight :- *(2) Range (0, 10, step=1, splits=4) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])) +- *(1) Project [id#5L] +- *(1) Filter isnotnull(id#5L) +- *(1) FileScan parquet testdb.t[id#5L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/testdb.db/t], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> ``` This PR comes from #22198 ### Why are the changes needed? For better usability. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added unit tests. Closes #27935 from maropu/SPARK-25121-2. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
… resolution ### What changes were proposed in this pull request? This pr fixed code to respect a database name for broadcast table hint resolution. Currently, spark ignores a database name in multi-part names; ``` scala> sql("CREATE DATABASE testDb") scala> spark.range(10).write.saveAsTable("testDb.t") // without this patch scala> spark.range(10).join(spark.table("testDb.t"), "id").hint("broadcast", "testDb.t").explain == Physical Plan == *(2) Project [id#24L] +- *(2) BroadcastHashJoin [id#24L], [id#26L], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) : +- *(1) Range (0, 10, step=1, splits=4) +- *(2) Project [id#26L] +- *(2) Filter isnotnull(id#26L) +- *(2) FileScan parquet testdb.t[id#26L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-2.3.1-bin-hadoop2.7/spark-warehouse..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> // with this patch scala> spark.range(10).join(spark.table("testDb.t"), "id").hint("broadcast", "testDb.t").explain == Physical Plan == *(2) Project [id#3L] +- *(2) BroadcastHashJoin [id#3L], [id#5L], Inner, BuildRight :- *(2) Range (0, 10, step=1, splits=4) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])) +- *(1) Project [id#5L] +- *(1) Filter isnotnull(id#5L) +- *(1) FileScan parquet testdb.t[id#5L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/testdb.db/t], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> ``` This PR comes from #22198 ### Why are the changes needed? For better usability. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added unit tests. Closes #27935 from maropu/SPARK-25121-2. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit ca499e9) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
… resolution ### What changes were proposed in this pull request? This pr fixed code to respect a database name for broadcast table hint resolution. Currently, spark ignores a database name in multi-part names; ``` scala> sql("CREATE DATABASE testDb") scala> spark.range(10).write.saveAsTable("testDb.t") // without this patch scala> spark.range(10).join(spark.table("testDb.t"), "id").hint("broadcast", "testDb.t").explain == Physical Plan == *(2) Project [id#24L] +- *(2) BroadcastHashJoin [id#24L], [id#26L], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) : +- *(1) Range (0, 10, step=1, splits=4) +- *(2) Project [id#26L] +- *(2) Filter isnotnull(id#26L) +- *(2) FileScan parquet testdb.t[id#26L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-2.3.1-bin-hadoop2.7/spark-warehouse..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> // with this patch scala> spark.range(10).join(spark.table("testDb.t"), "id").hint("broadcast", "testDb.t").explain == Physical Plan == *(2) Project [id#3L] +- *(2) BroadcastHashJoin [id#3L], [id#5L], Inner, BuildRight :- *(2) Range (0, 10, step=1, splits=4) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])) +- *(1) Project [id#5L] +- *(1) Filter isnotnull(id#5L) +- *(1) FileScan parquet testdb.t[id#5L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/testdb.db/t], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> ``` This PR comes from apache#22198 ### Why are the changes needed? For better usability. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added unit tests. Closes apache#27935 from maropu/SPARK-25121-2. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
This pr fixed code to respect a database name for broadcast table hint resolution.
Currently, spark ignores a database name in multi-part names;
How was this patch tested?
Added tests in
DataFrameJoinSuite
.