Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37259][SQL] Support CTE and TempTable queries with MSSQL JDBC #34693

Closed
wants to merge 3 commits into from

Conversation

peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Nov 23, 2021

What changes were proposed in this pull request?

Currently CTE queries from Spark are not supported with MSSQL server via JDBC. This is because MSSQL server doesn't support the nested CTE syntax (SELECT * FROM (WITH t AS (...) SELECT ... FROM t) WHERE 1=0) that Spark builds from the original query (options.tableOrQuery) in JDBCRDD.resolveTable() and in JDBCRDD.compute().
Unfortunately, it is non-trivial to split an arbitrary query it into "with" and "regular" clauses in MsSqlServerDialect. So instead, I'm proposing a new general JDBC option "withClause" that users can use if they have complex queries with CTE:

val withClause = "WITH t AS (SELECT x, y FROM tbl)"
val query = "SELECT * FROM t WHERE x > 10"
val df = spark.read.format("jdbc")
  .option("url", jdbcUrl)
  .option("withClause", withClause)
  .option("query", query)
  .load()

This change also works with MSSQL's temp table syntax:

val withClause = "(SELECT * INTO #TempTable FROM (SELECT * FROM tbl WHERE x > 10) t)"
val query = "SELECT * FROM #TempTable"
val df = spark.read.format("jdbc")
  .option("url", jdbcUrl)
  .option("withClause", withClause)
  .option("query", query)
  .load()

Why are the changes needed?

To support CTE queries with MSSQL.

Does this PR introduce any user-facing change?

Yes, CTE queries are supported form now.

How was this patch tested?

Added new integration UTs.

@github-actions github-actions bot added the SQL label Nov 23, 2021
@SparkQA
Copy link

SparkQA commented Nov 23, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50020/

@SparkQA
Copy link

SparkQA commented Nov 23, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50020/

@peter-toth
Copy link
Contributor Author

peter-toth commented Nov 23, 2021

This change also seem to work with MSSQL's temp table syntax:

val withClause = "(SELECT * INTO #TempTable FROM (SELECT * FROM tbl WHERE x > 10) t)"
val query = "SELECT * FROM #TempTable"
val df = spark.read.format("jdbc")
  .option("url", jdbcUrl)
  .option("withClause", withClause)
  .option("query", query)
  .load()

@SparkQA
Copy link

SparkQA commented Nov 23, 2021

Test build #145548 has finished for PR 34693 at commit e2c9577.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@peter-toth
Copy link
Contributor Author

Hmm, failures in ExpressionsSchemaSuite look unrelated...

@sumeetgajjar
Copy link
Contributor

This change also seem to work with MSSQL's temp table syntax:

val withClause = "(SELECT * INTO #TempTable FROM (SELECT * FROM tbl WHERE x > 10) t)"
val query = "SELECT * FROM #TempTable"
val df = spark.read.format("jdbc")
  .option("url", jdbcUrl)
  .option("withClause", withClause)
  .option("query", query)
  .load()

Since it also works with temp table syntax, do you think it would be a good idea to include it the title of this PR and modify the title to "Support CTE and TempTable queries with MSSQL JDBC"?

@@ -325,6 +325,6 @@ private[sql] case class JDBCRelation(
override def toString: String = {
val partitioningInfo = if (parts.nonEmpty) s" [numPartitions=${parts.length}]" else ""
// credentials should not be included in the plan output, table information is sufficient.
s"JDBCRelation(${jdbcOptions.tableOrQuery})" + partitioningInfo
s"JDBCRelation(${jdbcOptions.withClause}${jdbcOptions.tableOrQuery})" + partitioningInfo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since partitioningInfo is also a string, should we create the final string as
s"JDBCRelation(${jdbcOptions.withClause}${jdbcOptions.tableOrQuery})$partitioningInfo"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b53ef47

.option("dbtable", dbtable)
.load()
assert(df.collect.toSet === expectedResult)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it already works with temp table syntax, should we also add the corresponding UTs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Added in dea730a

@peter-toth
Copy link
Contributor Author

peter-toth commented Nov 24, 2021

This change also seem to work with MSSQL's temp table syntax:

val withClause = "(SELECT * INTO #TempTable FROM (SELECT * FROM tbl WHERE x > 10) t)"
val query = "SELECT * FROM #TempTable"
val df = spark.read.format("jdbc")
  .option("url", jdbcUrl)
  .option("withClause", withClause)
  .option("query", query)
  .load()

Since it also works with temp table syntax, do you think it would be a good idea to include it the title of this PR and modify the title to "Support CTE and TempTable queries with MSSQL JDBC"?

Thanks, makes sense. I've modified PR title and description and added a new test in: dea730a

Actually, I wonder if it makes sense to rename withClause to a more general queryPrefix option.

@peter-toth peter-toth changed the title [SPARK-37259][SQL] Support CTE queries with MSSQL JDBC [SPARK-37259][SQL] Support CTE and TempTable queries with MSSQL JDBC Nov 24, 2021
@SparkQA
Copy link

SparkQA commented Nov 24, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50044/

@SparkQA
Copy link

SparkQA commented Nov 24, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50045/

@SparkQA
Copy link

SparkQA commented Nov 24, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50045/

@SparkQA
Copy link

SparkQA commented Nov 24, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50044/

@SparkQA
Copy link

SparkQA commented Nov 24, 2021

Test build #145572 has finished for PR 34693 at commit b53ef47.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 24, 2021

Test build #145571 has finished for PR 34693 at commit dea730a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@KevinAppelBofa
Copy link

@peter-toth thank you for working on this, I was able to get a spark 3.3.0-snapshot compiled and test the changes you made. I ran both the sample queries first and those were able to work, then I ran the temp table query and this is also working; that one was easy to split into the withClause and query. I am running into an issue though getting the CTE query to run, i have tried to split this up a few ways but I keep getting the same error which is below. I'm going to try to add a logwarning to dump out the query it is trying to run to get the schema and see if I can get that to run directly in the sql server. This was the issue I ran into originally I was able to get the test CTE to work and doing a $CTEQUERY where 1=0; was working but in this more complex CTE I can't find a spot where to add the 1=0 to get a schema back only.

py4j.protocol.Py4JJavaError: An error occurred while calling o85.load. : com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near the keyword 'WITH'. at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262) at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1632) at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:602) at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:524) at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7418) at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3272) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:247) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:222) at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeQuery(SQLServerPreparedStatement.java:446) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.getQueryOutputSchema(JDBCRDD.scala:69) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:59) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:240) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:209) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:209) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:170) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:748)

@KevinAppelBofa
Copy link

@peter-toth I was able to get the CTE query to work using the split method you have, this was a little trial and error to find the right place to split and is producing the same results using the other method that uses the useRawQuery option and appears to be a similar run time.

@attilapiros
Copy link
Contributor

attilapiros commented Dec 2, 2021

Unfortunately, it is non-trivial to split an arbitrary query it into "with" and "regular" clauses in MsSqlServerDialect.

Could you please show us some example queries where the split is non-trivial?

@peter-toth
Copy link
Contributor Author

Unfortunately, it is non-trivial to split an arbitrary query it into "with" and "regular" clauses in MsSqlServerDialect.

Could you please show us some example queries where the split is non-trivial?

For example:

  WITH t AS (SELECT x FROM tbl), t2 AS (SELECT y FROM tbl2) SELECT * FROM t WHERE x > SELECT max(y) FROM t2 

You need to split this to WITH t AS (SELECT x FROM tbl), t2 AS (SELECT y FROM tbl2) and SELECT * FROM t WHERE x > SELECT max(y) FROM t2. Obviously there can be more brackets in the query and some might not be paired/closed if they appear in strings like: t2 AS (SELECT y, ')' AS dummy FROM tbl2). We could use ANTLR to parse the query just like we use it to parse Spark SQL statements but I'm not sure it is worth it...

@attilapiros
Copy link
Contributor

We could use ANTLR to parse the query just like we use it to parse Spark SQL statements but I'm not sure it is worth it...

I agree that would be too much here and without a complex parsing logic we might fail at the splitting

@KevinAppelBofa
Copy link

KevinAppelBofa commented Dec 2, 2021

@attilapiros Finding in the query where the WITH piece stops, then where the SELECT begins is where I found the place to split. In the test query

query2 = """
WITH DummyCTE AS
(
SELECT 1 as DummyCOL
)
SELECT *
FROM DummyCTE
"""

This splits into

withClause = """
WITH DummyCTE AS
(
SELECT 1 as DummyCOL
)

"""
query = """
SELECT *
FROM DummyCTE
"""

In the actual query we are running is more complex and is a bunch of chained WITH together, in that one I did the same approach and where the actual WITH part ends to stick that into the withClause and then the rest into the query

This same technique works for the temp table query, to split it up where the part generating the temp table goes into the withClause and the rest goes into the query

query3 = """
(SELECT *
INTO #Temp1a
FROM
(SELECT @@VERSION as version) data
)

(SELECT *
FROM
#Temp1a)
"""

Turns into

withClause = """
(SELECT *
INTO #Temp1a
FROM
(SELECT @@VERSION as version) data
)
"""

query = """
(SELECT *
FROM
#Temp1a)
"""

@attilapiros
Copy link
Contributor

@attilapiros Finding in the query where the WITH piece stops, then where the SELECT begins is where I found the place to split.

Thanks, I see that. But still it is really hard to automate it. So I think what Peter come up with is best we have right now.

I was thinking about how to avoid SELECT * FROM $table WHERE 1=0. One of my idea was just replacing all the SELECT (ignoring case) with SELECT top(0) as that could be done even in string literals as it does not change the schema. But if top was already used somewhere then this ugly hack fails. And this is just one part of the problem to get the schema without running the query. The other one in (in JDBCRDD.compute()) is even harder to crack where the partitioning and pushed down group by is handled.

So based on this LGTM.

cc @viirya, @HyukjinKwon

Copy link
Contributor

@sumeetgajjar sumeetgajjar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for incorporating the suggestions.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Mar 13, 2022
@github-actions github-actions bot closed this Mar 14, 2022
@WaterKnight1998
Copy link

Any news here?

@peter-toth
Copy link
Contributor Author

I rebased the PR on top of master and already pushed it to my branch, but for some reason I can't reopen the PR.

@HyukjinKwon, @allisonwang-db, @cloud-fan could you please take a look and judge if it make sense to reopen this PR.

@cloud-fan
Copy link
Contributor

I'm not sure what the actual requirement is. To use the JDBC option that directly passes a SQL string to a JDBC database, we must make sure the SQL is fully supported by the database. Or people should run the SQL with Spark and only read/write JDBC tables.

If we only want to run a custom SQL statement before running the query, can we use the sessionInitStatement option?

@KevinAppelBofa
Copy link

@cloud-fan the requirement is to be able to use the Spark JDBC to access Microsoft SQL Server and use items that are unique to SQL Server, such as temp tables or common table expression (with statement). The Oracle and Mysql both also support CTE however their language allows them to begin with a select statement.
The SQL server for CTE is only having it one way where it starts with, WITH, because of how Spark is wrapping the query this is what causes the issue. I had opened up a case with Microsoft and got to their Spark team but they were not able to provide any feedback or commits on how to fix Spark to handle this.

This fix that Peter created is allowing both of these items to work, the CTE query and also the temp table query; in both of these it is very difficult to try to split out the sql into parts to place into the sessionInitStatement vs being able to run the query as is

The sample queries look like
query2 = """
WITH DummyCTE AS
(
SELECT 1 as DummyCOL
)
SELECT *
FROM DummyCTE
"""

query3 = """
(SELECT *
INTO #Temp1a
FROM
(SELECT @@Version as version) data
)
(SELECT *
FROM
#Temp1a)
"""

@peter-toth
Copy link
Contributor Author

peter-toth commented Apr 27, 2022

I think the requirement is to be able to use CTE queries with JDBC sources on MSSQL.
Currently it doesn't work as Spark wraps the original query into a SELECT statement (e.g. when it queries the schema Spark wraps the query into SELECT * FROM (<query>) WHERE 1=0), which construct is not supported by MSSQL.

  • sessionInitStatement is a separate statement so you can't put the WITH clause there and keep the SELECT clause in query.
  • We could put the whole CTE query into sessionInitStatement as CREATE TEMPORARY VIEW v AS <query> and use SELECT v FROM in query but temporary views are also not supported by MSSQL.
  • We could improve Spark to identify CTE queries and assemble the schema query in a way that it is compatible with MSSQL, but splitting an arbitrary query into WITH and SELECT clauses programatically is not that simple.
  • This PR offers a new withClause option where the user can split the query manually. (I should probably call it queryPrefix as it also works with MSSQL's temp table syntax.)

@cloud-fan
Copy link
Contributor

Why does Spark JDBC source issue SELECT * FROM (<query>) WHERE 1=0 instead of simply <query>? Sorry I'm not very familiar with the details. cc @huaxingao @beliefer

@huaxingao
Copy link
Contributor

@cloud-fan
When JDBC resolves a relation, it needs to get the schema of the relation first. JDBC uses this query

s"SELECT * FROM $table WHERE 1=0"

to discover the schema of the relation. In the user's case, the table is WITH t AS (SELECT x, y FROM tbl) SELECT * FROM t WHERE x > 10 so the query to get the schema is

SELECT * FROM (WITH t AS (SELECT x, y FROM tbl) SELECT * FROM t WHERE x > 10) WHERE 1=0

but MSSQL server doesn't support this syntax SELECT * FROM (WITH t AS (...) SELECT ... FROM t) WHERE 1=0
This PR offers a withClause option so the user can split the query manually. The query will be changed to the following after the split

WITH t AS (SELECT x, y FROM tbl) SELECT * FROM (SELECT * FROM t WHERE x > 10) WHERE 1=0

@cloud-fan
Copy link
Contributor

Can we use WITH t AS (SELECT x, y FROM tbl) SELECT * FROM t WHERE x > 10 directly to send to the database and get the schema?

@beliefer
Copy link
Contributor

beliefer commented Apr 28, 2022

I investigated the behavior of PostgreSQL.
WITH t AS (select dept, name, salary from employee) SELECT * FROM t; works well.
SELECT * FROM (WITH t AS (select dept, name, salary from employee) SELECT * FROM t) WHERE 1=0; works bad.
The option "withClause" looks strange. If so, why not uses WITH t AS (SELECT x, y FROM tbl) SELECT * FROM t WHERE x > 10 directly ?

@beliefer
Copy link
Contributor

Can we use WITH t AS (SELECT x, y FROM tbl) SELECT * FROM t WHERE x > 10 directly to send to the database and get the schema?

+1

@beliefer
Copy link
Contributor

The case case show below works well.

  test("investigate ctes") {
    checkAnswer(sql("WITH t AS (SELECT id, name FROM h2.test.empty_table) SELECT * FROM t"), Seq())
    checkAnswer(sql("WITH t AS (SELECT id, name FROM h2.test.people) SELECT * FROM t"),
      Seq(Row(1, "fred"), Row(2, "mary")))
  }

@peter-toth
Copy link
Contributor Author

Why does Spark JDBC source issue SELECT * FROM (<query>) WHERE 1=0 instead of simply <query>?

Because that way we can let MSSQL (or other) optimizer to kick in and return an empty resultset with the schema very fast.

Can we use WITH t AS (SELECT x, y FROM tbl) SELECT * FROM t WHERE x > 10 directly to send to the database and get the schema?

Well, we could do that with loosing the above optimization, but besides the "schema query" Spark also wraps the original query at other places. For example when the query is actually executed: https://github.com/apache/spark/pull/34693/files#diff-ecf5b374060c1222d3a0a1295b4ec2cb5d07603db460273484b1753e1cab9f90L370-L371 so that JDBC sources can support different pushdowns and partitioning.

@cloud-fan
Copy link
Contributor

Make sense, I'm convinced to add something like prepareQuery JDBC option, with a clear document.

@peter-toth
Copy link
Contributor Author

Sounds good, I can do it early next week.
How can I reopen this PR? Or shall I open a new one?

@cloud-fan cloud-fan removed the Stale label Apr 28, 2022
@cloud-fan
Copy link
Contributor

I can't reopen the PR, I think we need to create a new one

@peter-toth
Copy link
Contributor Author

I opened a new PR here: #36440

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
9 participants