Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ trait CheckAnalysis extends PredicateHelper {
// of the result of cascading resolution failures.
plan.foreachUp {

// Skip checking analysis for view, for it's only a wrapper for sub child plan and it will be
// eliminated in EliminateView rule
case v: View => checkAnalysis(v.child)

case p if p.analyzed => // Skip already analyzed sub-plans

case u: UnresolvedRelation =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ColumnarRule, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser}
import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.{V2StreamingScanSupportCheck, V2WriteSupportCheck}
import org.apache.spark.sql.streaming.StreamingQueryManager
Expand Down Expand Up @@ -188,6 +189,15 @@ abstract class BaseSessionStateBuilder(
customCheckRules

override protected def lookupCatalog(name: String): CatalogPlugin = session.catalog(name)

override def checkAnalysis(plan: LogicalPlan): Unit = {
// We should check it's innerChildren for InsertIntoDataSourceDirCommand
val planToCheck = plan match {
case e: InsertIntoDataSourceDirCommand => e.query
case _ => plan
}
super.checkAnalysis(planToCheck)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we resolve this issue in the InsertIntoDataSourceDirCommand side?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu Yes, I agree that resolving this issue inside InsertIntoDataSourceDirCommand is better, but as I have already questioned is the jira: https://issues.apache.org/jira/browse/SPARK-28195, I'm not sure whether there are some special consideration for making the children of InsertIntoDataSourceDirCommand to empty and use innerChildren instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You meant this issue only happened when spark.sql.runSQLOnFiles=true? What if spark.sql.runSQLOnFiles=false?

Copy link
Author

@liupc liupc Jul 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu This issue affect more aspects, what I point out in the jira is just for the case "table or view not found", actually, this issue may cause many other problems.
The root cause is that the the children of InsertIntoDataSourceDirCommand is empty, thus many analysis rules after InsertIntoDataSourceDirCommand being inserted(In DataSourceAnalysis rule) may not be effective and so was it for CheckAnalysis.
I think we can fix it better inside InsertIntoDataSourceDirCommand, but I should first make it clear that why we set it's children to empty, but use innerChildren instead? Is there any PR or issue for that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should investigate it by yourself. I don't like the current approach too. Appeartly issue is minor but the fix is pretty invasive. I doesn't need to touch SessionStateBuilder side at all.

}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.test.SharedSQLContext

class CommandAnalysisSuite extends PlanTest with SharedSQLContext {
val parser = new SparkSqlParser(conf)
val catalog = new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf)
val analyzer = new Analyzer(catalog, conf) {
override def checkAnalysis(plan: LogicalPlan): Unit = {
val planToCheck = plan match {
case e: InsertIntoDataSourceDirCommand => e.query
case _ => plan
}
super.checkAnalysis(planToCheck)
}
}

test("SPARK-28195: checkAnalysis should work for InsertIntoDataSourceDirCommand") {
val query = "insert overwrite directory '/path' using parquet select * from table1"
val exception = intercept[AnalysisException](
analyzer.executeAndCheck(parser.parsePlan(query), new QueryPlanningTracker))
assert(exception.getMessage.contains("Table or view not found: table1"))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this into InsertSuite?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.{V2StreamingScanSupportCheck, V2WriteSupportCheck}
import org.apache.spark.sql.hive.client.HiveClient
Expand Down Expand Up @@ -94,6 +95,15 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
customCheckRules

override protected def lookupCatalog(name: String): CatalogPlugin = session.catalog(name)

override def checkAnalysis(plan: LogicalPlan): Unit = {
// We should check it's innerChildren for InsertIntoDataSourceDirCommand
val planToCheck = plan match {
case e: InsertIntoDataSourceDirCommand => e.query
case _ => plan
}
super.checkAnalysis(planToCheck)
}
}

/**
Expand Down