Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32013][SQL] Support query execution before reading DataFrame and before/after writing DataFrame over JDBC #28953

Closed
wants to merge 27 commits into from

Conversation

moomindani
Copy link
Contributor

@moomindani moomindani commented Jun 30, 2020

What changes were proposed in this pull request?

This pull request is to support query execution before reading DataFrame and after reading/writing DataFrame over JDBC.
There are two new options; preActions and postActions in DataFrame's JDBC configuration.

SQL statements specified in preActions will be executed before reading/writing DataFrame via JDBC.
SQL statements specified in postActions will be executed after writing DataFrame via JDBC.

Note: postActions is only supported in JDBC writer, not in JDBC reader. It is because it won't be needed so often and I was not able to find good places to implement this.

Why are the changes needed?

For ETL workload, there is a common requirement to perform SQL statement before/after reading/writing over JDBC.
Here's examples;

  • Create a view with specific conditions
  • Delete/Update some records
  • Truncate a table (it is already possible in truncate option)
  • Execute stored procedure (it is also requested in SPARK-32014)

See details in the JIRA. https://issues.apache.org/jira/browse/SPARK-32013

Does this PR introduce any user-facing change?

Yes.
With this feature, users can run any SQL statements before/after reading/writing DataFrame over JDBC.

How to use it

Here's two examples which describes how to use the new options.

  • Read
val df = spark.read.format("jdbc")
      .option("url", "jdbc:someUrl")
      .option("dbtable", "TEST.PEOPLE")
      .option("preActions", "delete from test.people where name = 'fred'")
      .load()
  • Write
    df.write.format("jdbc")
      .option("url", "jdbc:someUrl")
      .option("dbtable", "TEST.CUSTOMQUERY")
      .option("postActions", "insert into TEST.CUSTOMQUERY values ('fred', 1)")
      .options(properties.asScala)
      .save()

It does not affect any existing behavior. It just adds new options to use this new feature.

How was this patch tested?

I added test cases into JDBCSuite.scala and JDBCWriteSuite.scala, and confirmed all the tests have been passed.

@dilipbiswal
Copy link
Contributor

@moomindani Thanks. Could we illustrate the usage of these two options via examples in the PR description ? I think, it will help the reviewers.

@moomindani
Copy link
Contributor Author

moomindani commented Jun 30, 2020

@dilipbiswal Sure I added it in this PR description.

@maropu
Copy link
Member

maropu commented Jun 30, 2020

ok to test

@maropu
Copy link
Member

maropu commented Jun 30, 2020

For ETL workload, there is a common requirement to perform SQL statement before/after reading/writing over JDBC.
Here's examples;
Create a view with specific conditions
Delete/Update some records
Truncate a table (it is already possible in truncate option)
Execute stored procedure (it is also requested in SPARK-32014)

@gatorsmile WDYT the usecases described above? I'm not sure it's reasonable or not because I've never met users making such a request.

val statement = conn.prepareStatement(queryString)
try {
statement.setQueryTimeout(options.queryTimeout)
val hasResultSet = statement.execute()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@moomindani Is there a reason to allow a query here ? I am asking, since no one can consume this result set ? Or we are allowing it, just coz we are not able to parse the input to know what kind of statement it is ?

Copy link
Contributor Author

@moomindani moomindani Jul 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to support stored procedures in different engines. (related to https://issues.apache.org/jira/browse/SPARK-32014)
For example, MySQL does not return result sets in stored procedures. In such case, we can use executeUpdate().
However, some other databases (like Snowflake in the related SIM) returns result sets in stored procedures. To support such use-case, we need to consider both executeQuery() and executeUpdate(). That's the reason why I used execute() here.

Copy link
Contributor

@dilipbiswal dilipbiswal Jul 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@moomindani I see. But if we call a cursory stored procedure in the pre/post action, what do we do with the result set retuned from it ? I am thinking users would call a cursory stored procedure if they want to do something with the results, no ? There are also some engines that can return multiple result set from a stored procedure.

https://stackoverflow.com/questions/53621106/db2-multiple-result-set-stored-procedure

cc @maropu

Copy link
Contributor Author

@moomindani moomindani Jul 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently result sets are always ignored if preActions or postActions return them. As you said, cursor won't be able to be used for reading as DataFrame.

By design, basically DDL queries and DML queries (INSERT/UPDATE/DELETE, except SELECT) which do not return result sets are expected for preActions and postActions.
The reason why I used execute instead of executeUpdate is just to avoid exceptions due to stored procedure which returns result sets. Not to use result sets as an input for DataFrame.

Do you think it is enough for us to explain that result sets are always ignored in preActions and postActions if query results the result sets?
Or should we narrow down use-cases to support only the queries (which do not return result sets) by using executeUpdate instead of execute?

My idea was the first one, but I want to follow community's opinion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@moomindani
IMHO, the best behavior would be to only allow DDL or DML (insert/update/delete) in pre and post actions. But if its not achievable, then we have to clearly document what is allowed and ramifications of users specifying other kind of statements. Lets please get input from @maropu and @HyukjinKwon to decide on the best course to move forward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@moomindani Option (a) sounds okay to me as its simpler. We can always improve the behavior later.
wdyt @maropu ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, (a) sounds fine to me.

Only DDL or DML (insert/update/delete) are allowed.

Btw, how do we control this? what if SELECT queries given?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu
My expectation is that the JDBC driver would reject a statement other than DML (insert/update/delete). Looking at this link https://stackoverflow.com/questions/24577358/how-to-add-select-statements-in-addbatch, it seems like executeBatch may also reject DDLS.

@moomindani Can you please confirm if we can execute DDL's in executeBatch ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, how do we control this? what if SELECT queries given?

I confirmed that it returns exception like Method is not allowed for a query.

Can you please confirm if we can execute DDL's in executeBatch ?

Sure, let me confirm it and add test to use DDL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some tests for DDLs. As far as I tested, addBatch()/executeBatch() worked also for DDLs.
It seems it also works for MySQL Connector/J since executeBatch just calls executeUpdate internally. https://github.com/mysql/mysql-connector-j/blob/dbd865c9fc68701efc70197ea1e65002520bed91/src/main/user-impl/java/com/mysql/cj/jdbc/StatementImpl.java#L859
I found a blog post which used addBatch()/executeBatch() with MySQL. https://www.roseindia.net/tutorial/java/jdbc/jdbcbatch/jdbcbatchdatabasetablecreation.html

However, I could not find any clear documentation which says DDLs are supported in executeBatch(). What I could find is that DDLs are supported in executeUpdate() and executeLargeUpdate().
https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/Statement.html

Should we move from (a) to (b) as safer approach?

@dilipbiswal
Copy link
Contributor

@moomindani Thanks a lot for working on it. Would be nice to wait on answer to @maropu's question.

@maropu
Copy link
Member

maropu commented Jun 30, 2020

ok to test

@SparkQA
Copy link

SparkQA commented Jul 1, 2020

Test build #124703 has finished for PR 28953 at commit 47ef483.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Jul 1, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jul 1, 2020

Test build #124713 has finished for PR 28953 at commit 47ef483.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 1, 2020

Test build #124745 has finished for PR 28953 at commit 0db0376.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 1, 2020

Test build #124715 has finished for PR 28953 at commit f2e68a2.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 1, 2020

Test build #124748 has finished for PR 28953 at commit 0db0376.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jul 2, 2020

Test build #124864 has finished for PR 28953 at commit 0db0376.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 3, 2020

Test build #126957 has finished for PR 28953 at commit 311a89e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 3, 2020

Test build #126966 has finished for PR 28953 at commit ae7afd2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 4, 2020

Test build #127052 has finished for PR 28953 at commit fbac1f3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 19, 2020

Test build #127620 has finished for PR 28953 at commit 13f0dfc.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 19, 2020

Test build #127632 has finished for PR 28953 at commit 62d7fc0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 19, 2020

Test build #127649 has finished for PR 28953 at commit 979cec5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 20, 2020

Test build #127681 has finished for PR 28953 at commit 3244c30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@moomindani
Copy link
Contributor Author

I resolved recent conflicts.
Folks, can you review this and proceed it? I thought there are no blockers now.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not JDBC specific. All the data sources might require pre/post execution hooks/triggers/actions. We need to consider this request in the API design of DSV2.

cc @cloud-fan

@moomindani
Copy link
Contributor Author

@gatorsmile Thank you for your comment.
I understand that this kind of hooks are not specific to JDBC, it can be general to all the data sources.
However, for JDBC, it should be executed in data source side. This part can be specific to JDBC (or other data sources which include execution engine.).

Of course we can have discussion in DSV2 API design, however, can we proceed this PR separately?

@moomindani
Copy link
Contributor Author

@gatorsmile Just a reminder.. Can you take a look?

@cloud-fan
Copy link
Contributor

This sounds like JDBC specific, as we can't use SQL to define actions for data sources like parquet.

To clarify your use case, do you really need hooks, or just need the ability to run arbitrary SQL against the JDBC server with Spark?

@moomindani
Copy link
Contributor Author

@cloud-fan We need hooks right before/afterJDBC reads/writes especially for ETL purpose. It is also frequently asked request from our customers.

@github-actions
Copy link

github-actions bot commented Apr 5, 2021

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Apr 5, 2021
@github-actions github-actions bot closed this Apr 6, 2021
@abhishekboga
Copy link

Hello everyone, I know this pr has been closed but it would be really helpful if I get a resolution on the same topic. So here I am trying to call snowflake procedure in the aws glue job using Pyspark. Below is the code:
df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "call SP_UTILIZATION_INSIGHTS_LOOKUP_TABLE()").option("preActions", "call SP_UTILIZATION_INSIGHTS_LOOKUP_TABLE()").load()

While execution I'm getting an error as SQL Compilation error: at 'call'.

In the log files what I found is it basically creates a select statement as 'Select * from (call SP_UTILIZATION_INSIGHTS_LOOKUP_TABLE()) where 1=0' and due to this it failed.

Any help here?

@cloud-fan
Copy link
Contributor

@peter-toth I vaguely remembered that you added a new JDBC option to support this case?

@peter-toth
Copy link
Contributor

This was my PR: #36440, but I'm not sure it helps in this case as the new prepareQuery property becomes part of the statement sent to the server, while in this case the stored procedure call (.option("preActions", "call SP_UTILIZATION_INSIGHTS_LOOKUP_TABLE()") should be a separete statement.

@abhishekboga, I think you could probaly use the sessionInitStatement property to call the stored procedure before running your query...

@abhishekboga
Copy link

@peter-toth , can you share some examples of sessionInitStatement to call the stored procedure?

@peter-toth
Copy link
Contributor

Can you please try:

df = spark.read.format(SNOWFLAKE_SOURCE_NAME)
  .option("sessionInitStatement", "call SP_UTILIZATION_INSIGHTS_LOOKUP_TABLE()")
  .option("query", "SELECT * FROM your_table_that_is_populated")
  .load()

@abhishekboga
Copy link

@peter-toth , I have tried your example but it doesn't call the SP which populates a table. What I was expecting was SP will populate the table and then that table which is used in the "query" but it didn't happen.

@abhishekboga
Copy link

@peter-toth, any update on the above query? It's not executing the SP.

@peter-toth
Copy link
Contributor

peter-toth commented Nov 8, 2022

@abhishekboga, sorry I can't test this feature with Snowflake, but I added the following simple unittest using H2 into sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala:

  test("SPARK-32013: option sessionInitStatement, call stored function") {
    val initSQL =
      """CREATE ALIAS NEXT_PRIME AS '
        |String nextPrime(String value) {
        |  return new BigInteger(value).nextProbablePrime().toString();
        |}';
        |INSERT INTO TEST.seq VALUES(NEXT_PRIME(1000))
        |""".stripMargin
    val df = spark.read.format("jdbc")
      .option("url", urlWithUserAndPass)
      .option("query", "SELECT * FROM TEST.seq WHERE ID > 1000")
      .option("sessionInitStatement", initSQL)
      .load()
    assert(df.collect() === Array(Row(1009)))
  }

and it does work.

Please find more examples in https://issues.apache.org/jira/browse/SPARK-21519.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants