From c864304ec593a02a62550f44a00372c5604a9477 Mon Sep 17 00:00:00 2001 From: Indhumathi27 Date: Tue, 24 Apr 2018 20:43:19 +0530 Subject: [PATCH] [CARBONDATA-2396] Create Table As Select Fix with 'using carbondata' --- .../sql/commands/UsingCarbondataSuite.scala | 74 +++++++- .../org/apache/spark/sql/CarbonSource.scala | 38 ++-- .../sql/execution/strategy/DDLStrategy.scala | 21 ++- ...eateCarbonSourceTableAsSelectCommand.scala | 167 ++++++++++++++++++ ...eateCarbonSourceTableAsSelectCommand.scala | 122 +++++++++++++ 5 files changed, 406 insertions(+), 16 deletions(-) create mode 100644 integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala create mode 100644 integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala index 37d65b471e6..097c9d91f3c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/UsingCarbondataSuite.scala @@ -19,7 +19,7 @@ package org.apache.carbondata.sql.commands import scala.collection.mutable -import org.apache.spark.sql.Row +import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterEach @@ -28,11 +28,15 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants class UsingCarbondataSuite extends QueryTest with BeforeAndAfterEach { override def beforeEach(): Unit = { sql("DROP TABLE IF EXISTS src_carbondata1") + sql("DROP TABLE IF EXISTS src_carbondata3") + sql("DROP TABLE IF EXISTS src_carbondata4") sql("DROP TABLE IF EXISTS tableSize3") } override def afterEach(): Unit = { sql("DROP TABLE IF EXISTS src_carbondata1") + sql("DROP TABLE IF EXISTS src_carbondata3") + sql("DROP TABLE IF EXISTS src_carbondata4") sql("DROP TABLE IF EXISTS tableSize3") } @@ -69,4 +73,72 @@ class UsingCarbondataSuite extends QueryTest with BeforeAndAfterEach { res3.foreach(row => assert(row.getString(1).trim.toLong > 0)) } + test("CARBONDATA-2396 Support Create Table As Select with 'using carbondata'") { + sql("CREATE TABLE src_carbondata3(key INT, value STRING) USING carbondata") + sql("INSERT INTO src_carbondata3 VALUES(1,'source')") + checkAnswer(sql("SELECT * FROM src_carbondata3"), Row(1, "source")) + sql("CREATE TABLE src_carbondata4 USING carbondata as select * from src_carbondata3") + checkAnswer(sql("SELECT * FROM src_carbondata4"), Row(1, "source")) + } + + test("CARBONDATA-2396 Support Create Table As Select with 'USING org.apache.spark.sql.CarbonSource'") { + sql("DROP TABLE IF EXISTS src_carbondata3") + sql("DROP TABLE IF EXISTS src_carbondata4") + sql("CREATE TABLE src_carbondata3(key INT, value STRING) USING org.apache.spark.sql.CarbonSource") + sql("INSERT INTO src_carbondata3 VALUES(1,'source')") + checkAnswer(sql("SELECT * FROM src_carbondata3"), Row(1, "source")) + sql("CREATE TABLE src_carbondata4 USING org.apache.spark.sql.CarbonSource as select * from src_carbondata3") + checkAnswer(sql("SELECT * FROM src_carbondata4"), Row(1, "source")) + sql("DROP TABLE IF EXISTS src_carbondata3") + sql("DROP TABLE IF EXISTS src_carbondata4") + } + + test("CARBONDATA-2396 Support Create Table As Select [IF NOT EXISTS] with 'using carbondata'") { + sql("DROP TABLE IF EXISTS src_carbondata5") + sql("DROP TABLE IF EXISTS src_carbondata6") + sql("CREATE TABLE src_carbondata5(key INT, value STRING) USING carbondata") + sql("INSERT INTO src_carbondata5 VALUES(1,'source')") + checkAnswer(sql("SELECT * FROM src_carbondata5"), Row(1, "source")) + sql( + "CREATE TABLE IF NOT EXISTS src_carbondata6 USING carbondata as select * from " + + "src_carbondata5") + checkAnswer(sql("SELECT * FROM src_carbondata6"), Row(1, "source")) + sql("DROP TABLE IF EXISTS src_carbondata5") + sql("DROP TABLE IF EXISTS src_carbondata6") + } + + test("CARBONDATA-2396 Support Create Table As Select with 'using carbondata' with Table properties") { + sql("DROP TABLE IF EXISTS src_carbondata5") + sql("DROP TABLE IF EXISTS src_carbondata6") + sql("CREATE TABLE src_carbondata5(key INT, value STRING) USING carbondata") + sql("INSERT INTO src_carbondata5 VALUES(1,'source')") + checkAnswer(sql("SELECT * FROM src_carbondata5"), Row(1, "source")) + sql("CREATE TABLE src_carbondata6 USING carbondata options('table_blocksize'='10'," + + "'sort_scope'='local_sort') as select * from src_carbondata5") + val result = sql("describe FORMATTED src_carbondata6") + checkExistence(result, true, "Table Block Size") + checkExistence(result, true, "10 MB") + checkAnswer(sql("SELECT * FROM src_carbondata6"), Row(1, "source")) + sql("DROP TABLE IF EXISTS src_carbondata5") + sql("DROP TABLE IF EXISTS src_carbondata6") + } + + test("CARBONDATA-2396 Support Create Table As Select with 'using carbondata' with Columns") { + sql("DROP TABLE IF EXISTS src_carbondata5") + sql("DROP TABLE IF EXISTS src_carbondata6") + sql("CREATE TABLE src_carbondata5(key INT, value STRING) USING carbondata") + sql("INSERT INTO src_carbondata5 VALUES(1,'source')") + checkAnswer(sql("SELECT * FROM src_carbondata5"), Row(1, "source")) + val exception = intercept[AnalysisException]( + sql( + "CREATE TABLE src_carbondata6(name String) USING carbondata as select * from " + + "src_carbondata5")) + assert(exception.getMessage + .contains( + "Operation not allowed: Schema may not be specified in a Create Table As Select (CTAS) " + + "statement")) + sql("DROP TABLE IF EXISTS src_carbondata5") + sql("DROP TABLE IF EXISTS src_carbondata6") + } + } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 3600854f520..837613609a7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -26,6 +26,7 @@ import scala.language.implicitConversions import org.apache.commons.lang.StringUtils import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor} import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy import org.apache.spark.sql.execution.streaming.Sink @@ -87,12 +88,6 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider data: DataFrame): BaseRelation = { CarbonEnv.getInstance(sqlContext.sparkSession) var newParameters = CarbonScalaUtil.getDeserializedParameters(parameters) - // User should not specify path since only one store is supported in carbon currently, - // after we support multi-store, we can remove this limitation - require(!newParameters.contains("path"), "'path' should not be specified, " + - "the path to store carbon file is the 'storePath' " + - "specified when creating CarbonContext") - val options = new CarbonOption(newParameters) val isExists = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.tableExists( options.tableName, options.dbName)(sqlContext.sparkSession) @@ -181,7 +176,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider dbName, tableName) val updatedParams = CarbonSource.updateAndCreateTable( - identifier, dataSchema, sparkSession, metaStore, parameters) + identifier, dataSchema, sparkSession, metaStore, parameters, None) (CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession), updatedParams) case ex: Exception => throw new Exception("do not have dbname and tablename for carbon table", ex) @@ -273,12 +268,26 @@ object CarbonSource { def createTableInfoFromParams( parameters: Map[String, String], dataSchema: StructType, - identifier: AbsoluteTableIdentifier): TableModel = { + identifier: AbsoluteTableIdentifier, + query: Option[LogicalPlan], + sparkSession: SparkSession): TableModel = { val sqlParser = new CarbonSpark2SqlParser - val fields = sqlParser.getFields(dataSchema) val map = scala.collection.mutable.Map[String, String]() parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) } val options = new CarbonOption(parameters) + val fields = query match { + case Some(q) => + // if query is provided then it is a CTAS flow + if (sqlParser.getFields(dataSchema).nonEmpty) { + throw new AnalysisException( + "Schema cannot be specified in a Create Table As Select (CTAS) statement") + } + sqlParser + .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore + .getSchemaFromUnresolvedRelation(sparkSession, q)) + case None => + sqlParser.getFields(dataSchema) + } val bucketFields = sqlParser.getBucketFields(map, fields, options) sqlParser.prepareTableModel(ifNotExistPresent = false, Option(identifier.getDatabaseName), identifier.getTableName, fields, Nil, map, bucketFields) @@ -292,7 +301,8 @@ object CarbonSource { */ def updateCatalogTableWithCarbonSchema( tableDesc: CatalogTable, - sparkSession: SparkSession): CatalogTable = { + sparkSession: SparkSession, + query: Option[LogicalPlan] = None): CatalogTable = { val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore val storageFormat = tableDesc.storage val properties = storageFormat.properties @@ -306,7 +316,8 @@ object CarbonSource { tableDesc.schema, sparkSession, metaStore, - properties) + properties, + query) // updating params val updatedFormat = storageFormat.copy(properties = map) tableDesc.copy(storage = updatedFormat) @@ -334,8 +345,9 @@ object CarbonSource { dataSchema: StructType, sparkSession: SparkSession, metaStore: CarbonMetaStore, - properties: Map[String, String]): Map[String, String] = { - val model = createTableInfoFromParams(properties, dataSchema, identifier) + properties: Map[String, String], + query: Option[LogicalPlan]): Map[String, String] = { + val model = createTableInfoFromParams(properties, dataSchema, identifier, query, sparkSession) val tableInfo: TableInfo = TableNewProcessor(model) val isExternal = properties.getOrElse("isExternal", "false") val isTransactionalTable = properties.getOrElse("isTransactional", "true") diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index d7e202397e6..ef4d05c2ecb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedComm import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand} import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable} -import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.hive.{CarbonRelation, CreateCarbonSourceTableAsSelectCommand} import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.spark.util.{CarbonReflectionUtils, FileUtils} @@ -228,11 +228,28 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore) ExecutedCommandExec(cmd) :: Nil + case cmd@CreateDataSourceTableAsSelectCommand(tableDesc, mode, query) + if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER + && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") + || tableDesc.provider.get.equalsIgnoreCase("carbondata")) => + val updatedCatalog = CarbonSource + .updateCatalogTableWithCarbonSchema(tableDesc, sparkSession, Option(query)) + val cmd = CreateCarbonSourceTableAsSelectCommand(updatedCatalog, SaveMode.Ignore, query) + ExecutedCommandExec(cmd) :: Nil + case cmd@org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, query) + if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER + && (tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") + || tableDesc.provider.get.equalsIgnoreCase("carbondata")) => + val updatedCatalog = CarbonSource + .updateCatalogTableWithCarbonSchema(tableDesc, sparkSession, query) + val cmd = CreateCarbonSourceTableAsSelectCommand(updatedCatalog, SaveMode.Ignore, query.get) + ExecutedCommandExec(cmd) :: Nil case CreateDataSourceTableCommand(table, ignoreIfExists) if table.provider.get != DDLUtils.HIVE_PROVIDER && (table.provider.get.equals("org.apache.spark.sql.CarbonSource") || table.provider.get.equalsIgnoreCase("carbondata")) => - val updatedCatalog = CarbonSource.updateCatalogTableWithCarbonSchema(table, sparkSession) + val updatedCatalog = CarbonSource + .updateCatalogTableWithCarbonSchema(table, sparkSession) val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists) ExecutedCommandExec(cmd) :: Nil case AlterTableSetPropertiesCommand(tableName, properties, isView) diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala new file mode 100644 index 00000000000..989b1d5d6b8 --- /dev/null +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala @@ -0,0 +1,167 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{ + CatalogRelation, CatalogTable, CatalogTableType, + SimpleCatalogRelation +} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.{ + AlterTableRecoverPartitionsCommand, DDLUtils, + RunnableCommand +} +import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.sources.InsertableRelation +import org.apache.spark.sql.types.StructType + +/** + * Create table 'using carbondata' and insert the query result into it. + * + * @param table the Catalog Table + * @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append + * @param query the query whose result will be insert into the new relation + * + */ + +case class CreateCarbonSourceTableAsSelectCommand( + table: CatalogTable, + mode: SaveMode, + query: LogicalPlan) + extends RunnableCommand { + + override protected def innerChildren: Seq[LogicalPlan] = Seq(query) + + override def run(sparkSession: SparkSession): Seq[Row] = { + assert(table.tableType != CatalogTableType.VIEW) + assert(table.provider.isDefined) + assert(table.schema.isEmpty) + + val provider = table.provider.get + val sessionState = sparkSession.sessionState + val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = table.identifier.copy(database = Some(db)) + val tableName = tableIdentWithDB.unquotedString + + var createMetastoreTable = false + var existingSchema = Option.empty[StructType] + if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) { + // Check if we need to throw an exception or just return. + mode match { + case SaveMode.ErrorIfExists => + throw new AnalysisException(s"Table $tableName already exists. " + + s"If you are using saveAsTable, you can set SaveMode to " + + s"SaveMode.Append to " + + s"insert data into the table or set SaveMode to SaveMode" + + s".Overwrite to overwrite" + + s"the existing data. " + + s"Or, if you are using SQL CREATE TABLE, you need to drop " + + s"$tableName first.") + case SaveMode.Ignore => + // Since the table already exists and the save mode is Ignore, we will just return. + return Seq.empty[Row] + case SaveMode.Append => + // Check if the specified data source match the data source of the existing table. + val existingProvider = DataSource.lookupDataSource(provider) + // TODO: Check that options from the resolved relation match the relation that we are + // inserting into (i.e. using the same compression). + + // Pass a table identifier with database part, so that `lookupRelation` won't get temp + // views unexpectedly. + EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match { + case l@LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => + // check if the file formats match + l.relation match { + case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider => + throw new AnalysisException( + s"The file format of the existing table $tableName is " + + s"`${ r.fileFormat.getClass.getName }`. It doesn't match the specified " + + s"format `$provider`") + case _ => + } + if (query.schema.size != l.schema.size) { + throw new AnalysisException( + s"The column number of the existing schema[${ l.schema }] " + + s"doesn't match the data schema[${ query.schema }]'s") + } + existingSchema = Some(l.schema) + case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => + existingSchema = Some(s.metadata.schema) + case c: CatalogRelation if c.catalogTable.provider == Some(DDLUtils.HIVE_PROVIDER) => + throw new AnalysisException("Saving data in the Hive serde table " + + s"${ c.catalogTable.identifier } is not supported yet. " + + s"Please use the insertInto() API as an alternative..") + case o => + throw new AnalysisException(s"Saving data in ${ o.toString } is not supported.") + } + case SaveMode.Overwrite => + sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false) + // Need to create the table again. + createMetastoreTable = true + } + } else { + // The table does not exist. We need to create it in metastore. + createMetastoreTable = true + } + + val data = Dataset.ofRows(sparkSession, query) + val df = existingSchema match { + // If we are inserting into an existing table, just use the existing schema. + case Some(s) => data.selectExpr(s.fieldNames: _*) + case None => data + } + + val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { + Some(sessionState.catalog.defaultTablePath(table.identifier)) + } else { + table.storage.locationUri + } + + // Create the relation based on the data of df. + val pathOption = tableLocation.map("path" -> _) + val dataSource = DataSource( + sparkSession, + className = provider, + partitionColumns = table.partitionColumnNames, + bucketSpec = table.bucketSpec, + options = table.storage.properties ++ pathOption, + catalogTable = Some(table)) + + val result = try { + dataSource.write(mode, df) + } catch { + case ex: AnalysisException => + logError(s"Failed to write to table $tableName in $mode mode", ex) + throw ex + } + result match { + case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && + sparkSession.sqlContext.conf.manageFilesourcePartitions => + // Need to recover partitions into the metastore so our saved data is visible. + sparkSession.sessionState.executePlan( + AlterTableRecoverPartitionsCommand(table.identifier)).toRdd + case _ => + } + + // Refresh the cache of the table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) + Seq.empty[Row] + } +} diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala new file mode 100644 index 00000000000..4e22d136bb2 --- /dev/null +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala @@ -0,0 +1,122 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.net.URI + +import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, RunnableCommand} +import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation} +import org.apache.spark.sql.sources.BaseRelation + +/** + * Create table 'using carbondata' and insert the query result into it. + * + * @param table the Catalog Table + * @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append + * @param query the query whose result will be insert into the new relation + * + */ + +case class CreateCarbonSourceTableAsSelectCommand( + table: CatalogTable, + mode: SaveMode, + query: LogicalPlan) + extends RunnableCommand { + + override protected def innerChildren: Seq[LogicalPlan] = Seq(query) + + override def run(sparkSession: SparkSession): Seq[Row] = { + assert(table.tableType != CatalogTableType.VIEW) + assert(table.provider.isDefined) + + val sessionState = sparkSession.sessionState + val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = table.identifier.copy(database = Some(db)) + val tableName = tableIdentWithDB.unquotedString + + if (sessionState.catalog.tableExists(tableIdentWithDB)) { + assert(mode != SaveMode.Overwrite, + s"Expect the table $tableName has been dropped when the save mode is Overwrite") + + if (mode == SaveMode.ErrorIfExists) { + throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.") + } + if (mode == SaveMode.Ignore) { + // Since the table already exists and the save mode is Ignore, we will just return. + return Seq.empty + } + + saveDataIntoTable( + sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true) + } else { + assert(table.schema.isEmpty) + + val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { + Some(sessionState.catalog.defaultTablePath(table.identifier)) + } else { + table.storage.locationUri + } + val result = saveDataIntoTable( + sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false) + + result match { + case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && + sparkSession.sqlContext.conf.manageFilesourcePartitions => + // Need to recover partitions into the metastore so our saved data is visible. + sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd + case _ => + } + } + + Seq.empty[Row] + } + + private def saveDataIntoTable( + session: SparkSession, + table: CatalogTable, + tableLocation: Option[URI], + data: LogicalPlan, + mode: SaveMode, + tableExists: Boolean): BaseRelation = { + // Create the relation based on the input logical plan: `data`. + val pathOption = tableLocation.map("path" -> CatalogUtils.URIToString(_)) + val dataSource = DataSource( + session, + className = table.provider.get, + partitionColumns = table.partitionColumnNames, + bucketSpec = table.bucketSpec, + options = table.storage.properties ++ pathOption, + catalogTable = if (tableExists) { + Some(table) + } else { + None + }) + + try { + dataSource.writeAndRead(mode, Dataset.ofRows(session, query)) + } catch { + case ex: AnalysisException => + logError(s"Failed to write to table ${ table.identifier.unquotedString }", ex) + throw ex + } + } +}