From d74298a31d1069f8cd4fefcc29a52ed17b462fe2 Mon Sep 17 00:00:00 2001 From: "Lantao, Jin(lajin)" Date: Fri, 6 Nov 2020 08:26:56 +0800 Subject: [PATCH] [CARMEL-4055] Overwrite previous data anyway when create delta table (#11) --- .../commands/CreateDeltaTableCommand.scala | 14 +++-- .../spark/sql/delta/DeltaSQLQuerySuite.scala | 55 ++++++++++++++++++- .../sql/delta/DeltaTableCreationTests.scala | 2 +- 3 files changed, 63 insertions(+), 8 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index 4c2de87ee9a..1c380fc1ab5 100644 --- a/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -110,7 +110,7 @@ case class CreateDeltaTableCommand( // We may have failed a previous write. The retry should still succeed even if we have // garbage data if (txn.readVersion > -1 || !fs.exists(deltaLog.logPath)) { - assertPathEmpty(sparkSession, tableWithLocation) + makePathEmpty(sparkSession, tableWithLocation) } } // We are either appending/overwriting with saveAsTable or creating a new table with CTAS or @@ -141,7 +141,7 @@ case class CreateDeltaTableCommand( // When creating a managed table, the table path should not exist or is empty, or // users would be surprised to see the data, or see the data directory being dropped // after the table is dropped. - assertPathEmpty(sparkSession, tableWithLocation) + makePathEmpty(sparkSession, tableWithLocation) } // This is either a new table, or, we never defined the schema of the table. While it is @@ -150,7 +150,7 @@ case class CreateDeltaTableCommand( val noExistingMetadata = txn.readVersion == -1 || txn.metadata.schema.isEmpty if (noExistingMetadata) { assertTableSchemaDefined(fs, tableLocation, tableWithLocation, sparkSession) - assertPathEmpty(sparkSession, tableWithLocation) + makePathEmpty(sparkSession, tableWithLocation) // This is a user provided schema. // Doesn't come from a query, Follow nullability invariants. val newMetadata = getProvidedMetadata(table, table.schema.json) @@ -216,7 +216,7 @@ case class CreateDeltaTableCommand( bucketSpec = table.bucketSpec) } - private def assertPathEmpty( + private def makePathEmpty( sparkSession: SparkSession, tableWithLocation: CatalogTable): Unit = { val path = new Path(tableWithLocation.location) @@ -225,8 +225,10 @@ case class CreateDeltaTableCommand( // we intentionally diverge from this behavior w.r.t regular datasource tables (that silently // overwrite any previous data) if (fs.exists(path) && fs.listStatus(path).nonEmpty) { - throw new AnalysisException(s"Cannot create table ('${tableWithLocation.identifier}')." + - s" The associated location ('${tableWithLocation.location}') is not empty.") +// throw new AnalysisException(s"Cannot create table ('${tableWithLocation.identifier}')." + +// s" The associated location ('${tableWithLocation.location}') is not empty.") + // we overwrite any previous data since our users have no permission to clean data in HDFS + fs.delete(path, true) } } diff --git a/src/test/scala/org/apache/spark/sql/delta/DeltaSQLQuerySuite.scala b/src/test/scala/org/apache/spark/sql/delta/DeltaSQLQuerySuite.scala index 4a0dba7b5b0..74eedb78f13 100644 --- a/src/test/scala/org/apache/spark/sql/delta/DeltaSQLQuerySuite.scala +++ b/src/test/scala/org/apache/spark/sql/delta/DeltaSQLQuerySuite.scala @@ -16,8 +16,9 @@ package org.apache.spark.sql.delta -import java.io.FileNotFoundException +import java.io.{File, FileNotFoundException} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform} @@ -25,6 +26,7 @@ import org.apache.spark.sql.delta.catalog.DeltaCatalog import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession} +import org.apache.spark.util.Utils class DeltaSQLQuerySuite extends QueryTest with SharedSparkSession with DeltaSQLCommandTest with SQLTestUtils { @@ -231,4 +233,55 @@ class DeltaSQLQuerySuite extends QueryTest } } } + + test("temporary view may break the rule of full scan") { + withTable("delta_table") { + sql( + s""" + |CREATE TABLE delta_table + |USING delta AS SELECT 1 AS key, 1 AS value + """.stripMargin) + + sql("create temporary view temp_view as select key from delta_table") + val e1 = intercept[AnalysisException] ( + sql("UPDATE temp_view v SET key=2") + ).getMessage + assert(e1.contains("Expect a full scan of Delta sources, but found a partial scan.")) + } + } + + test("CTAS a delta table with the existing non-empty directory") { + withTable("tab1") { + val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) + try { + // create an empty hidden file + tableLoc.mkdir() + val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage") + hiddenGarbageFile.createNewFile() + sql(s"CREATE TABLE tab1 USING DELTA AS SELECT 1, 'a'") + checkAnswer(spark.table("tab1"), Row(1, "a")) + } finally { + waitForTasksToFinish() + Utils.deleteRecursively(tableLoc) + } + } + } + + test("create a managed table with the existing non-empty directory") { + withTable("tab1") { + val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1"))) + try { + // create an empty hidden file + tableLoc.mkdir() + val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage") + hiddenGarbageFile.createNewFile() + sql(s"CREATE TABLE tab1 (col1 int, col2 string) USING DELTA") + sql("INSERT INTO tab1 VALUES (1, 'a')") + checkAnswer(spark.table("tab1"), Row(1, "a")) + } finally { + waitForTasksToFinish() + Utils.deleteRecursively(tableLoc) + } + } + } } diff --git a/src/test/scala/org/apache/spark/sql/delta/DeltaTableCreationTests.scala b/src/test/scala/org/apache/spark/sql/delta/DeltaTableCreationTests.scala index ebbd5251b5e..b55d3095a01 100644 --- a/src/test/scala/org/apache/spark/sql/delta/DeltaTableCreationTests.scala +++ b/src/test/scala/org/apache/spark/sql/delta/DeltaTableCreationTests.scala @@ -657,7 +657,7 @@ trait DeltaTableCreationTests } } - testQuietly("reject creating a delta table pointing to non-delta files") { + ignore("reject creating a delta table pointing to non-delta files") { withTempPath { dir => withTable("delta_test") { val path = dir.getCanonicalPath