Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17230] [SQL] Should not pass optimized query into QueryExecution in DataFrameWriter #14797

Closed
wants to merge 4 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Aug 24, 2016

What changes were proposed in this pull request?

Some analyzer rules have assumptions on logical plans, optimizer may break these assumption, we should not pass an optimized query plan into QueryExecution (will be analyzed again), otherwise we may some weird bugs.

For example, we have a rule for decimal calculation to promote the precision before binary operations, use PromotePrecision as placeholder to indicate that this rule should not apply twice. But a Optimizer rule will remove this placeholder, that break the assumption, then the rule applied twice, cause wrong result.

Ideally, we should make all the analyzer rules all idempotent, that may require lots of effort to double checking them one by one (may be not easy).

An easier approach could be never feed a optimized plan into Analyzer, this PR fix the case for RunnableComand, they will be optimized, during execution, the passed query will also be passed into QueryExecution again. This PR make these query not part of the children, so they will not be optimized and analyzed again.

Right now, we did not know a logical plan is optimized or not, we could introduce a flag for that, and make sure a optimized logical plan will not be analyzed again.

How was this patch tested?

Added regression tests.

@davies
Copy link
Contributor Author

davies commented Aug 24, 2016

cc @yhuai @JoshRosen @rxin

@SparkQA
Copy link

SparkQA commented Aug 24, 2016

Test build #64377 has finished for PR 14797 at commit 25c451a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • logWarning(s\"All labels belong to a single class and fitIntercept=false. It's a \" +
    • * Set thresholds in multiclass (or binary) classification to adjust the probability of
    • class MultinomialLogisticRegression @Since(\"2.1.0\") (
    • logWarning(s\"All labels belong to a single class and fitIntercept=false. It's\" +
    • /** Margin (rawPrediction) for each class label. */
    • /** Score (probability) for each class label. */
    • class MultinomialLogisticRegressionModelWriter(instance: MultinomialLogisticRegressionModel)
    • class ALSWrapperWriter(instance: ALSWrapper) extends MLWriter
    • class ALSWrapperReader extends MLReader[ALSWrapper]
    • class GaussianMixtureWrapperWriter(instance: GaussianMixtureWrapper) extends MLWriter
    • class GaussianMixtureWrapperReader extends MLReader[GaussianMixtureWrapper]
    • class IsotonicRegressionWrapperWriter(instance: IsotonicRegressionWrapper) extends MLWriter
    • class IsotonicRegressionWrapperReader extends MLReader[IsotonicRegressionWrapper]
    • class LDAWrapperWriter(instance: LDAWrapper) extends MLWriter
    • class LDAWrapperReader extends MLReader[LDAWrapper]
    • class MultilayerPerceptronClassifierWrapperWriter(instance: MultilayerPerceptronClassifierWrapper)
    • class JavaClassificationModel(JavaPredictionModel):
    • class LogisticRegressionModel(JavaModel, JavaClassificationModel, JavaMLWritable, JavaMLReadable):
    • class DecisionTreeClassificationModel(DecisionTreeModel, JavaClassificationModel, JavaMLWritable,
    • class RandomForestClassificationModel(TreeEnsembleModel, JavaClassificationModel, JavaMLWritable,
    • class GBTClassificationModel(TreeEnsembleModel, JavaPredictionModel, JavaMLWritable,
    • class NaiveBayesModel(JavaModel, JavaClassificationModel, JavaMLWritable, JavaMLReadable):
    • class MultilayerPerceptronClassificationModel(JavaModel, JavaPredictionModel, JavaMLWritable,
    • class LinearRegressionModel(JavaModel, JavaPredictionModel, JavaMLWritable, JavaMLReadable):
    • class DecisionTreeModel(JavaModel, JavaPredictionModel):
    • class RandomForestRegressionModel(TreeEnsembleModel, JavaPredictionModel, JavaMLWritable,
    • class GBTRegressionModel(TreeEnsembleModel, JavaPredictionModel, JavaMLWritable, JavaMLReadable):
    • class GeneralizedLinearRegressionModel(JavaModel, JavaPredictionModel, JavaMLWritable,
    • class JavaPredictionModel():
    • class SubstituteUnresolvedOrdinals(conf: CatalystConf) extends Rule[LogicalPlan]
    • case class UnresolvedInlineTable(
    • case class UnresolvedTableValuedFunction(functionName: String, functionArgs: Seq[Expression])
    • case class UnresolvedOrdinal(ordinal: Int)
    • case class Literal (value: Any, dataType: DataType) extends LeafExpression with CodegenFallback
    • abstract class PlanExpression[T <: QueryPlan[_]] extends Expression
    • abstract class SubqueryExpression extends PlanExpression[LogicalPlan]
    • case class ListQuery(plan: LogicalPlan, exprId: ExprId = NamedExpression.newExprId)
    • case class Exists(plan: LogicalPlan, exprId: ExprId = NamedExpression.newExprId)
    • case class SubqueryAlias(
    • class QuantileSummaries(
    • case class Stats(value: Double, g: Int, delta: Int)
    • class JDBCOptions(
    • abstract class ExecSubqueryExpression extends PlanExpression[SubqueryExec]
    • class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable

@SparkQA
Copy link

SparkQA commented Aug 31, 2016

Test build #64682 has finished for PR 14797 at commit 7771c6a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 1, 2016

Test build #64791 has finished for PR 14797 at commit 02b2896.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
// not need to have the query as child, to avoid to analyze an optimized query,
// because InsertIntoHadoopFsRelationCommand will be optimized first.
val columns = partitionColumns.map { name =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be done before
val partitionSchema = ...
above ? Looks like we're trying to analyze the columns there as well...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only for write(), it does not have val partitionSchema = (others have).

@sameeragarwal
Copy link
Member

LGTM

@srinathshankar
Copy link
Contributor

Looks fine.

@asfgit asfgit closed this in ed9c884 Sep 2, 2016
asfgit pushed a commit that referenced this pull request Sep 2, 2016
…on in DataFrameWriter

Some analyzer rules have assumptions on logical plans, optimizer may break these assumption, we should not pass an optimized query plan into QueryExecution (will be analyzed again), otherwise we may some weird bugs.

For example, we have a rule for decimal calculation to promote the precision before binary operations, use PromotePrecision as placeholder to indicate that this rule should not apply twice. But a Optimizer rule will remove this placeholder, that break the assumption, then the rule applied twice, cause wrong result.

Ideally, we should make all the analyzer rules all idempotent, that may require lots of effort to double checking them one by one (may be not easy).

An easier approach could be never feed a optimized plan into Analyzer, this PR fix the case for RunnableComand, they will be optimized, during execution, the passed `query` will also be passed into QueryExecution again. This PR make these `query` not part of the children, so they will not be optimized and analyzed again.

Right now, we did not know a logical plan is optimized or not, we could introduce a flag for that, and make sure a optimized logical plan will not be analyzed again.

Added regression tests.

Author: Davies Liu <davies@databricks.com>

Closes #14797 from davies/fix_writer.

(cherry picked from commit ed9c884)
Signed-off-by: Davies Liu <davies.liu@gmail.com>
@davies
Copy link
Contributor Author

davies commented Sep 2, 2016

Merged this into master and 2.0 branch, thanks!

@gatorsmile
Copy link
Member

This does not catch all the cases. In CTAS, we still optimize the query. By following the way in this PR, I can try to fix that case. Thanks!

@yhuai
Copy link
Contributor

yhuai commented Sep 6, 2016

@gatorsmile want to put the regression tests at here? Or, you have already have a pr?

@gatorsmile
Copy link
Member

The test failure in Spark 2.0 has been fixed in #14951. However, after this fix, we still optimize the query in CTAS twice, as reported in this PR. Thus, I plan to fix it soon. Thanks!

@yhuai
Copy link
Contributor

yhuai commented Sep 6, 2016

Thanks! Is there a jira?

@gatorsmile
Copy link
Member

Just created a JIRA: https://issues.apache.org/jira/browse/SPARK-17409

FYI, Spark 2.0.1 has been fixed by this PR, but Spark 2.1.0 has the issue due to the related codes have been changed.

asfgit pushed a commit that referenced this pull request Sep 14, 2016
### What changes were proposed in this pull request?
As explained in #14797:
>Some analyzer rules have assumptions on logical plans, optimizer may break these assumption, we should not pass an optimized query plan into QueryExecution (will be analyzed again), otherwise we may some weird bugs.
For example, we have a rule for decimal calculation to promote the precision before binary operations, use PromotePrecision as placeholder to indicate that this rule should not apply twice. But a Optimizer rule will remove this placeholder, that break the assumption, then the rule applied twice, cause wrong result.

We should not optimize the query in CTAS more than once. For example,
```Scala
spark.range(99, 101).createOrReplaceTempView("tab1")
val sqlStmt = "SELECT id, cast(id as long) * cast('1.0' as decimal(38, 18)) as num FROM tab1"
sql(s"CREATE TABLE tab2 USING PARQUET AS $sqlStmt")
checkAnswer(spark.table("tab2"), sql(sqlStmt))
```
Before this PR, the results do not match
```
== Results ==
!== Correct Answer - 2 ==       == Spark Answer - 2 ==
![100,100.000000000000000000]   [100,null]
 [99,99.000000000000000000]     [99,99.000000000000000000]
```
After this PR, the results match.
```
+---+----------------------+
|id |num                   |
+---+----------------------+
|99 |99.000000000000000000 |
|100|100.000000000000000000|
+---+----------------------+
```

In this PR, we do not treat the `query` in CTAS as a child. Thus, the `query` will not be optimized when optimizing CTAS statement. However, we still need to analyze it for normalizing and verifying the CTAS in the Analyzer. Thus, we do it in the analyzer rule `PreprocessDDL`, because so far only this rule needs the analyzed plan of the `query`.

### How was this patch tested?
Added a test

Author: gatorsmile <gatorsmile@gmail.com>

Closes #15048 from gatorsmile/ctasOptimized.
wgtmac pushed a commit to wgtmac/spark that referenced this pull request Sep 19, 2016
### What changes were proposed in this pull request?
As explained in apache#14797:
>Some analyzer rules have assumptions on logical plans, optimizer may break these assumption, we should not pass an optimized query plan into QueryExecution (will be analyzed again), otherwise we may some weird bugs.
For example, we have a rule for decimal calculation to promote the precision before binary operations, use PromotePrecision as placeholder to indicate that this rule should not apply twice. But a Optimizer rule will remove this placeholder, that break the assumption, then the rule applied twice, cause wrong result.

We should not optimize the query in CTAS more than once. For example,
```Scala
spark.range(99, 101).createOrReplaceTempView("tab1")
val sqlStmt = "SELECT id, cast(id as long) * cast('1.0' as decimal(38, 18)) as num FROM tab1"
sql(s"CREATE TABLE tab2 USING PARQUET AS $sqlStmt")
checkAnswer(spark.table("tab2"), sql(sqlStmt))
```
Before this PR, the results do not match
```
== Results ==
!== Correct Answer - 2 ==       == Spark Answer - 2 ==
![100,100.000000000000000000]   [100,null]
 [99,99.000000000000000000]     [99,99.000000000000000000]
```
After this PR, the results match.
```
+---+----------------------+
|id |num                   |
+---+----------------------+
|99 |99.000000000000000000 |
|100|100.000000000000000000|
+---+----------------------+
```

In this PR, we do not treat the `query` in CTAS as a child. Thus, the `query` will not be optimized when optimizing CTAS statement. However, we still need to analyze it for normalizing and verifying the CTAS in the Analyzer. Thus, we do it in the analyzer rule `PreprocessDDL`, because so far only this rule needs the analyzed plan of the `query`.

### How was this patch tested?
Added a test

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#15048 from gatorsmile/ctasOptimized.
asfgit pushed a commit that referenced this pull request Oct 17, 2016
…15048

### What changes were proposed in this pull request?
This PR is to backport #15048 and #15459.

However, in 2.0, we do not have a unified logical node `CreateTable` and the analyzer rule `PreWriteCheck` is also different. To minimize the code changes, this PR adds a new rule `AnalyzeCreateTableAsSelect`. Please treat it as a new PR to review. Thanks!

As explained in #14797:
>Some analyzer rules have assumptions on logical plans, optimizer may break these assumption, we should not pass an optimized query plan into QueryExecution (will be analyzed again), otherwise we may some weird bugs.
For example, we have a rule for decimal calculation to promote the precision before binary operations, use PromotePrecision as placeholder to indicate that this rule should not apply twice. But a Optimizer rule will remove this placeholder, that break the assumption, then the rule applied twice, cause wrong result.

We should not optimize the query in CTAS more than once. For example,
```Scala
spark.range(99, 101).createOrReplaceTempView("tab1")
val sqlStmt = "SELECT id, cast(id as long) * cast('1.0' as decimal(38, 18)) as num FROM tab1"
sql(s"CREATE TABLE tab2 USING PARQUET AS $sqlStmt")
checkAnswer(spark.table("tab2"), sql(sqlStmt))
```
Before this PR, the results do not match
```
== Results ==
!== Correct Answer - 2 ==       == Spark Answer - 2 ==
![100,100.000000000000000000]   [100,null]
 [99,99.000000000000000000]     [99,99.000000000000000000]
```
After this PR, the results match.
```
+---+----------------------+
|id |num                   |
+---+----------------------+
|99 |99.000000000000000000 |
|100|100.000000000000000000|
+---+----------------------+
```

In this PR, we do not treat the `query` in CTAS as a child. Thus, the `query` will not be optimized when optimizing CTAS statement. However, we still need to analyze it for normalizing and verifying the CTAS in the Analyzer. Thus, we do it in the analyzer rule `PreprocessDDL`, because so far only this rule needs the analyzed plan of the `query`.

### How was this patch tested?

Author: gatorsmile <gatorsmile@gmail.com>

Closes #15502 from gatorsmile/ctasOptimize2.0.
ghost pushed a commit to dbtsai/spark that referenced this pull request Dec 19, 2017
## What changes were proposed in this pull request?
We could get incorrect results by running DecimalPrecision twice. This PR resolves the original found in apache#15048 and apache#14797. After this PR, it becomes easier to change it back using `children` instead of using `innerChildren`.

## How was this patch tested?
The existing test.

Author: gatorsmile <gatorsmile@gmail.com>

Closes apache#20000 from gatorsmile/keepPromotePrecision.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants