New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2973][SQL] Lightweight SQL commands without distributed jobs when calling .collect() #2215
Conversation
QA tests have started for PR 2215 at commit
|
QA tests have finished for PR 2215 at commit
|
} | ||
def execute(): RDD[Row] = context.sparkContext.parallelize(executeCollect(), 1) | ||
|
||
override def executeCollect(): Array[Row] = sideEffectResult.map(Row(_)).toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we can't just define these in the super class command
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Refactored a bit, now Command.sideEffectResult
return Seq[Row]
and Command.executeCollect()
simply returns sideEffectResult.toArray
.
QA tests have started for PR 2215 at commit
|
QA tests have finished for PR 2215 at commit
|
QA tests have started for PR 2215 at commit
|
QA tests have finished for PR 2215 at commit
|
val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) } | ||
context.sparkContext.parallelize(rows, 1) | ||
} | ||
def execute(): RDD[Row] = context.sparkContext.parallelize(sideEffectResult, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this also be in Command
? The implementation looks the same everywhere.
ok to test |
QA tests have started for PR 2215 at commit
|
QA tests have finished for PR 2215 at commit
|
Build failure caused by unrelated GraphX test suite. retest this please. |
test this please |
QA tests have started for PR 2215 at commit
|
QA tests have finished for PR 2215 at commit
|
Build failure caused by streaming test suite. retest this please. |
ok to test |
QA tests have started for PR 2215 at commit
|
QA tests have finished for PR 2215 at commit
|
…hen calling .collect() By overriding `executeCollect()` in physical plan classes of all commands, we can avoid to kick off a distributed job when collecting result of a SQL command, e.g. `sql("SET").collect()`. Previously, `Command.sideEffectResult` returns a `Seq[Any]`, and the `execute()` method in sub-classes of `Command` typically convert that to a `Seq[Row]` then parallelize it to an RDD. Now with this PR, `sideEffectResult` is required to return a `Seq[Row]` directly, so that `executeCollect()` can directly leverage that and be factored to the `Command` parent class. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes apache#2215 from liancheng/lightweight-commands and squashes the following commits: 3fbef60 [Cheng Lian] Factored execute() method of physical commands to parent class Command 5a0e16c [Cheng Lian] Passes test suites e0e12e9 [Cheng Lian] Refactored Command.sideEffectResult and Command.executeCollect 995bdd8 [Cheng Lian] Cleaned up DescribeHiveTableCommand 542977c [Cheng Lian] Avoids confusion between logical and physical plan by adding package prefixes 55b2aa5 [Cheng Lian] Avoids distributed jobs when execution SQL commands
Adds logical and physical command classes for the "add jar" command. Note that this PR conflicts with and should be merged after #2215. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2242 from liancheng/add-jar and squashes the following commits: e43a2f1 [Cheng Lian] Updates AddJar according to conventions introduced in #2215 b99107f [Cheng Lian] Added test case for ADD JAR command 095b2c7 [Cheng Lian] Also forward ADD JAR command to Hive 9be031b [Cheng Lian] Trims Jar path string 8195056 [Cheng Lian] Added support for the "add jar" command
Adds logical and physical command classes for the "add jar" command. Note that this PR conflicts with and should be merged after apache#2215. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes apache#2242 from liancheng/add-jar and squashes the following commits: e43a2f1 [Cheng Lian] Updates AddJar according to conventions introduced in apache#2215 b99107f [Cheng Lian] Added test case for ADD JAR command 095b2c7 [Cheng Lian] Also forward ADD JAR command to Hive 9be031b [Cheng Lian] Trims Jar path string 8195056 [Cheng Lian] Added support for the "add jar" command Conflicts: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
By overriding
executeCollect()
in physical plan classes of all commands, we can avoid to kick off a distributed job when collecting result of a SQL command, e.g.sql("SET").collect()
.Previously,
Command.sideEffectResult
returns aSeq[Any]
, and theexecute()
method in sub-classes ofCommand
typically convert that to aSeq[Row]
then parallelize it to an RDD. Now with this PR,sideEffectResult
is required to return aSeq[Row]
directly, so thatexecuteCollect()
can directly leverage that and be factored to theCommand
parent class.