Skip to content

Commit

Permalink
[SPARK-21165] [SQL] [2.2] Use executedPlan instead of analyzedPlan in…
Browse files Browse the repository at this point in the history
… INSERT AS SELECT [WIP]

### What changes were proposed in this pull request?

The input query schema of INSERT AS SELECT could be changed after optimization. For example, the following query's output schema is changed by the rule `SimplifyCasts` and `RemoveRedundantAliases`.
```SQL
 SELECT word, length, cast(first as string) as first FROM view1
```

This PR is to fix the issue in Spark 2.2. Instead of using the analyzed plan of the input query, this PR use its executed plan to determine the attributes in `FileFormatWriter`.

The related issue in the master branch has been fixed by #18064. After this PR is merged, I will submit a separate PR to merge the test case to the master.

### How was this patch tested?
Added a test case

Author: Xiao Li <gatorsmile@gmail.com>
Author: gatorsmile <gatorsmile@gmail.com>

Closes #18386 from gatorsmile/newRC5.
  • Loading branch information
gatorsmile authored and cloud-fan committed Jun 23, 2017
1 parent b99c0e9 commit b6749ba
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 51 deletions.
Expand Up @@ -408,16 +408,6 @@ case class DataSource(
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)

// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
// not need to have the query as child, to avoid to analyze an optimized query,
// because InsertIntoHadoopFsRelationCommand will be optimized first.
val partitionAttributes = partitionColumns.map { name =>
val plan = data.logicalPlan
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
}.asInstanceOf[Attribute]
}
val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
Expand All @@ -431,7 +421,7 @@ case class DataSource(
outputPath = outputPath,
staticPartitions = Map.empty,
ifPartitionNotExists = false,
partitionColumns = partitionAttributes,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
fileFormat = format,
options = options,
Expand Down
Expand Up @@ -188,15 +188,13 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
"Cannot overwrite a path that is also being read from.")
}

val partitionSchema = actualQuery.resolve(
t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }

InsertIntoHadoopFsRelationCommand(
outputPath,
staticPartitions,
i.ifPartitionNotExists,
partitionSchema,
partitionColumns = t.partitionSchema.map(_.name),
t.bucketSpec,
t.fileFormat,
t.options,
Expand Down
Expand Up @@ -101,7 +101,7 @@ object FileFormatWriter extends Logging {
committer: FileCommitProtocol,
outputSpec: OutputSpec,
hadoopConf: Configuration,
partitionColumns: Seq[Attribute],
partitionColumnNames: Seq[String],
bucketSpec: Option[BucketSpec],
refreshFunction: (Seq[TablePartitionSpec]) => Unit,
options: Map[String, String]): Unit = {
Expand All @@ -111,9 +111,18 @@ object FileFormatWriter extends Logging {
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))

val allColumns = queryExecution.logical.output
val allColumns = queryExecution.executedPlan.output
// Get the actual partition columns as attributes after matching them by name with
// the given columns names.
val partitionColumns = partitionColumnNames.map { col =>
val nameEquality = sparkSession.sessionState.conf.resolver
allColumns.find(f => nameEquality(f.name, col)).getOrElse {
throw new RuntimeException(
s"Partition column $col not found in schema ${queryExecution.executedPlan.schema}")
}
}
val partitionSet = AttributeSet(partitionColumns)
val dataColumns = queryExecution.logical.output.filterNot(partitionSet.contains)
val dataColumns = allColumns.filterNot(partitionSet.contains)

val bucketIdExpression = bucketSpec.map { spec =>
val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
Expand Down
Expand Up @@ -44,7 +44,7 @@ case class InsertIntoHadoopFsRelationCommand(
outputPath: Path,
staticPartitions: TablePartitionSpec,
ifPartitionNotExists: Boolean,
partitionColumns: Seq[Attribute],
partitionColumns: Seq[String],
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
options: Map[String, String],
Expand Down Expand Up @@ -150,7 +150,7 @@ case class InsertIntoHadoopFsRelationCommand(
outputSpec = FileFormatWriter.OutputSpec(
qualifiedOutputPath.toString, customPartitionLocations),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
partitionColumnNames = partitionColumns,
bucketSpec = bucketSpec,
refreshFunction = refreshPartitionsCallback,
options = options)
Expand All @@ -176,10 +176,10 @@ case class InsertIntoHadoopFsRelationCommand(
customPartitionLocations: Map[TablePartitionSpec, String],
committer: FileCommitProtocol): Unit = {
val staticPartitionPrefix = if (staticPartitions.nonEmpty) {
"/" + partitionColumns.flatMap { p =>
staticPartitions.get(p.name) match {
"/" + partitionColumns.flatMap { col =>
staticPartitions.get(col) match {
case Some(value) =>
Some(escapePathName(p.name) + "=" + escapePathName(value))
Some(escapePathName(col) + "=" + escapePathName(value))
case None =>
None
}
Expand Down
Expand Up @@ -127,11 +127,11 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
val resolver = sparkSession.sessionState.conf.resolver
val tableCols = existingTable.schema.map(_.name)

// As we are inserting into an existing table, we should respect the existing schema and
// adjust the column order of the given dataframe according to it, or throw exception
// if the column names do not match.
// As we are inserting into an existing table, we should respect the existing schema, preserve
// the case and adjust the column order of the given DataFrame according to it, or throw
// an exception if the column names do not match.
val adjustedColumns = tableCols.map { col =>
query.resolve(Seq(col), resolver).getOrElse {
query.resolve(Seq(col), resolver).map(Alias(_, col)()).getOrElse {
val inputColumns = query.schema.map(_.name).mkString(", ")
throw new AnalysisException(
s"cannot resolve '$col' given input columns: [$inputColumns]")
Expand Down Expand Up @@ -168,15 +168,9 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
""".stripMargin)
}

val newQuery = if (adjustedColumns != query.output) {
Project(adjustedColumns, query)
} else {
query
}

c.copy(
tableDesc = existingTable,
query = Some(newQuery))
query = Some(Project(adjustedColumns, query)))

// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
// config, and do various checks:
Expand Down
Expand Up @@ -111,23 +111,14 @@ class FileStreamSink(
case _ => // Do nothing
}

// Get the actual partition columns as attributes after matching them by name with
// the given columns names.
val partitionColumns: Seq[Attribute] = partitionColumnNames.map { col =>
val nameEquality = data.sparkSession.sessionState.conf.resolver
data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse {
throw new RuntimeException(s"Partition column $col not found in schema ${data.schema}")
}
}

FileFormatWriter.write(
sparkSession = sparkSession,
queryExecution = data.queryExecution,
fileFormat = fileFormat,
committer = committer,
outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
partitionColumnNames = partitionColumnNames,
bucketSpec = None,
refreshFunction = _ => (),
options = options)
Expand Down
Expand Up @@ -314,21 +314,14 @@ case class InsertIntoHiveTable(
outputPath = tmpLocation.toString,
isAppend = false)

val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
}.asInstanceOf[Attribute]
}

FileFormatWriter.write(
sparkSession = sparkSession,
queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
fileFormat = new HiveFileFormat(fileSinkConf),
committer = committer,
outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty),
hadoopConf = hadoopConf,
partitionColumns = partitionAttributes,
partitionColumnNames = partitionColumnNames.takeRight(numDynamicPartitions),
bucketSpec = None,
refreshFunction = _ => (),
options = Map.empty)
Expand Down
Expand Up @@ -468,6 +468,28 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
}
}

test("SPARK-21165: the query schema of INSERT is changed after optimization") {
withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
withTable("tab1", "tab2") {
Seq(("a", "b", 3)).toDF("word", "first", "length").write.saveAsTable("tab1")

spark.sql(
"""
|CREATE TABLE tab2 (word string, length int)
|PARTITIONED BY (first string)
""".stripMargin)

spark.sql(
"""
|INSERT INTO TABLE tab2 PARTITION(first)
|SELECT word, length, cast(first as string) as first FROM tab1
""".stripMargin)

checkAnswer(spark.table("tab2"), Row("a", 3, "b"))
}
}
}

testPartitionedTable("insertInto() should reject extra columns") {
tableName =>
sql("CREATE TABLE t (a INT, b INT, c INT, d INT, e INT)")
Expand Down

0 comments on commit b6749ba

Please sign in to comment.