Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add a test for supported types of SortMergeJoin in DataFusion #365

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,13 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_SORTMERGEJOIN_CHECK_KEY_TYPES: ConfigEntry[Boolean] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @planga82
I'm not sure if we need to change the conf for the unit test purpose

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @comphead . Any idea how to do this in a cleaner way? We could move this constant to other place and avoid putting it in CometConf. In that way we avoid to expose a testing configuration. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh to test variety of supported datatypes I liked the @andygrove approach introduced in #351, wondering if we can do that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to review it. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not the same idea and not applicable here. I'll close this PR and try to think of another way to do it without adding configuration. Thanks again!

conf(s"$COMET_EXEC_CONFIG_PREFIX.sortmergejoin.check.key.types")
.doc("Enable key type checking in SortMergeJoin" +
"By default, this config is true. This config is only used for unit test.")
.booleanConf
.createWithDefault(true)

}

object ConfigHelpers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2424,7 +2424,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde {

// Checks if the join keys are supported by DataFusion SortMergeJoin.
val errorMsgs = join.leftKeys.flatMap { key =>
if (!supportedSortMergeJoinEqualType(key.dataType)) {
if (CometConf.COMET_SORTMERGEJOIN_CHECK_KEY_TYPES.get(op.conf)
&& !supportedSortMergeJoinEqualType(key.dataType)) {
Some(s"Unsupported join key type ${key.dataType} on key: ${key.sql}")
} else {
None
Expand Down
53 changes: 53 additions & 0 deletions spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ package org.apache.comet.exec
import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.spark.SparkException
import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BinaryType, TimestampType}

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
import org.apache.comet.serde.QueryPlanSerde

class CometJoinSuite extends CometTestBase {

Expand Down Expand Up @@ -251,4 +254,54 @@ class CometJoinSuite extends CometTestBase {
}
}
}

test("SortMergeJoin native execution with TimestampType should fail") {
// This test will be deleted when this type is supported
assert(!QueryPlanSerde.supportedSortMergeJoinEqualType(TimestampType))
withSQLConf(
CometConf.COMET_SORTMERGEJOIN_CHECK_KEY_TYPES.key -> "false",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("t1", "t2") {
sql("CREATE TABLE t1(name STRING, time TIMESTAMP) USING PARQUET")
sql("INSERT OVERWRITE t1 VALUES('a', timestamp'2019-01-01 11:11:11')")

sql("CREATE TABLE t2(name STRING, time TIMESTAMP) USING PARQUET")
sql("INSERT OVERWRITE t2 VALUES('a', timestamp'2019-01-01 11:11:11')")

val e = intercept[SparkException] {
sql("SELECT * FROM t1 JOIN t2 ON t1.time = t2.time").collect()
}.getMessage
assert(
e.contains("This feature is not implemented: " +
"Unsupported data type in sort merge join comparator"))
}
}
}

test("SortMergeJoin native execution with Binary should fail") {
// This test will be deleted when this type is supported
assert(!QueryPlanSerde.supportedSortMergeJoinEqualType(BinaryType))
withSQLConf(
CometConf.COMET_SORTMERGEJOIN_CHECK_KEY_TYPES.key -> "false",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {

withTable("t1", "t2") {
sql("CREATE TABLE t1(name STRING, bin BINARY) USING PARQUET")
sql("INSERT OVERWRITE t1 VALUES('a', X'1')")

sql("CREATE TABLE t2(name STRING, bin BINARY) USING PARQUET")
sql("INSERT OVERWRITE t2 VALUES('b', X'1')")

val e = intercept[SparkException] {
sql("SELECT * FROM t1 JOIN t2 ON t1.bin = t2.bin").collect()
}.getMessage
assert(
e.contains("This feature is not implemented: " +
"Unsupported data type in sort merge join comparator"))
}
}
}
}