From 2968b2c34f42f6b0bcb5e373a400377abfd09e86 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 4 Nov 2018 18:36:20 +0800 Subject: [PATCH] Fix InsertIntoDataSourceCommand does not use Cached Data --- .../datasources/DataSourceStrategy.scala | 2 +- .../InsertIntoDataSourceCommand.scala | 15 +++++----- .../spark/sql/sources/InsertSuite.scala | 30 +++++++++++++++++++ 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index c6000442fae76..5949b9cd28518 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -143,7 +143,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _), parts, query, overwrite, false) if parts.isEmpty => - InsertIntoDataSourceCommand(l, query, overwrite) + InsertIntoDataSourceCommand(l, query, overwrite, query.output.map(_.name)) case InsertIntoDir(_, storage, provider, query, overwrite) if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) != DDLUtils.HIVE_PROVIDER => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index 80d7608a22891..fd2f9e190f3ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.sources.InsertableRelation @@ -30,14 +30,13 @@ import org.apache.spark.sql.sources.InsertableRelation case class InsertIntoDataSourceCommand( logicalRelation: LogicalRelation, query: LogicalPlan, - overwrite: Boolean) - extends RunnableCommand { + overwrite: Boolean, + outputColumnNames: Seq[String]) + extends DataWritingCommand { - override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) - - override def run(sparkSession: SparkSession): Seq[Row] = { + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] - val data = Dataset.ofRows(sparkSession, query) + val data = sparkSession.internalCreateDataFrame(child.execute(), outputColumns.toStructType) // Data has been casted to the target relation's schema by the PreprocessTableInsertion rule. relation.insert(data, overwrite) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 0b6d93975daef..b24f34aa7c87e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SharedSQLContext @@ -589,4 +590,33 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { sql("INSERT INTO TABLE test_table SELECT 2, null") } } + + test("SPARK-25936 InsertIntoDataSourceCommand does not use Cached Data") { + withTable("test_table") { + withTempView("test_view") { + val schema = new StructType().add("id", LongType, false) + val newTable = CatalogTable( + identifier = TableIdentifier("test_table", None), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + compressed = false, + properties = Map.empty), + schema = schema, + provider = Some(classOf[SimpleInsertSource].getName)) + spark.sessionState.catalog.createTable(newTable, false) + + spark.range(2).createTempView("test_view") + spark.catalog.cacheTable("test_view") + + val sparkPlan = + sql("INSERT INTO TABLE test_table SELECT * FROM test_view").queryExecution.sparkPlan + assert(sparkPlan.collect { case i: InMemoryTableScanExec => i }.size === 1, + "expected to use cached table") + } + } + } }