-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30508][SQL] Add SparkSession.executeCommand API for external datasource #27199
Conversation
Test build #116696 has finished for PR 27199 at commit
|
cc @cloud-fan |
Test build #116697 has finished for PR 27199 at commit
|
Jenkins, retest this please. |
Test build #116705 has started for PR 27199 at commit |
|
||
override def run(sparkSession: SparkSession): Seq[Row] = { | ||
val output = provider.executeCommand(command, parameters) | ||
Seq(Row(output.mkString("\n"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can output each string as one row: output.map(Row(_))
Test build #116711 has finished for PR 27199 at commit
|
Test build #116714 has finished for PR 27199 at commit
|
@Unstable | ||
public interface ExternalCommandRunnableProvider { | ||
/** | ||
* Execute a random DDL/DML command inside an external execution engine rather than Spark, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this assume that the command runs on the driver side only, or a datasource can execute it in parallel on all available executors, and combine the results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this only happens in driver. But, ideally, driver won't run the command itself, but delegates it to external execution engine depends on the datasource. For example, JDBC could establish a connection and run the command on the external DMBS.
sql/catalyst/src/main/java/org/apache/spark/sql/connector/ExternalCommandRunnableProvider.java
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/ExternalCommandRunnableProvider.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/ExternalCommandRunnableProvider.java
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Outdated
Show resolved
Hide resolved
@cloud-fan @MaxGekk @HyukjinKwon updated, thanks. |
Test build #116809 has finished for PR 27199 at commit
|
Test build #116904 has finished for PR 27199 at commit
|
Test build #116901 has finished for PR 27199 at commit
|
Jenkins, retest this please. |
Test build #116919 has finished for PR 27199 at commit
|
* @since 3.0.0 | ||
*/ | ||
@Unstable | ||
public interface ExternalCommandRunnableProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about ExternalCommandRunnerProvider
@Unstable | ||
public interface ExternalCommandRunnableProvider { | ||
/** | ||
* Execute a random command inside an external execution engine rather than Spark. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a random command
-> an arbitrary string command
val df = spark.executeCommand("hello", "cmdSource", parameters) | ||
// executeCommand should execute the command eagerly | ||
assert(System.getProperty("command") === "world") | ||
val output1 = df.collect()(0).getString(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we check result with checkAnswer
?
Test build #117218 has finished for PR 27199 at commit
|
import java.util.Map; | ||
|
||
/** | ||
* @since 3.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add some classdoc?
@@ -2,6 +2,7 @@ org.apache.spark.sql.sources.FakeSourceOne | |||
org.apache.spark.sql.sources.FakeSourceTwo | |||
org.apache.spark.sql.sources.FakeSourceThree | |||
org.apache.spark.sql.sources.FakeSourceFour | |||
org.apache.spark.sql.sources.CommandRunnableDataSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to register it as we are not testing the short name. We can just use the full class name when calling spark.executeCommand
System.setProperty("command", "hello") | ||
val parameters = Map("one" -> "1", "two" -> "2").asJava | ||
assert(System.getProperty("command") === "hello") | ||
val df = spark.executeCommand("hello", "cmdSource", parameters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we can use classOf[CommandRunnableDataSource].getName
instead of cmdSource
Test build #117682 has finished for PR 27199 at commit
|
LGTM Test build #117671 has passed all the tests. I am merging it to master. |
Test build #117671 has finished for PR 27199 at commit
|
What changes were proposed in this pull request?
This PR adds
SparkSession.executeCommand
API for external datasource to execute a random command likeNote that the command doesn't execute in Spark, but inside an external execution engine depending on data source. And it will be eagerly executed after
executeCommand
called and the returnedDataFrame
will contain the output of the command(if any).Why are the changes needed?
This can be useful when user wants to execute some commands out of Spark. For example, executing custom DDL/DML command for JDBC, creating index for ElasticSearch, creating cores for Solr and so on(as @HyukjinKwon suggested).
Previously, user needs to use an option to achieve the goal, e.g.
spark.read.format("xxxSource").option("command", "xxxCommand").load()
, which is kind of cumbersome. With this change, it can be more convenient for user to achieve the same goal.Does this PR introduce any user-facing change?
Yes, new API from
SparkSession
and a new interfaceExternalCommandRunnableProvider
.How was this patch tested?
Added a new test suite.