Skip to content

Commit

Permalink
[CARMEL-4055] Overwrite previous data anyway when create delta table (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
LantaoJin authored and GitHub Enterprise committed Nov 6, 2020
1 parent eab3180 commit d74298a
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ case class CreateDeltaTableCommand(
// We may have failed a previous write. The retry should still succeed even if we have
// garbage data
if (txn.readVersion > -1 || !fs.exists(deltaLog.logPath)) {
assertPathEmpty(sparkSession, tableWithLocation)
makePathEmpty(sparkSession, tableWithLocation)
}
}
// We are either appending/overwriting with saveAsTable or creating a new table with CTAS or
Expand Down Expand Up @@ -141,7 +141,7 @@ case class CreateDeltaTableCommand(
// When creating a managed table, the table path should not exist or is empty, or
// users would be surprised to see the data, or see the data directory being dropped
// after the table is dropped.
assertPathEmpty(sparkSession, tableWithLocation)
makePathEmpty(sparkSession, tableWithLocation)
}

// This is either a new table, or, we never defined the schema of the table. While it is
Expand All @@ -150,7 +150,7 @@ case class CreateDeltaTableCommand(
val noExistingMetadata = txn.readVersion == -1 || txn.metadata.schema.isEmpty
if (noExistingMetadata) {
assertTableSchemaDefined(fs, tableLocation, tableWithLocation, sparkSession)
assertPathEmpty(sparkSession, tableWithLocation)
makePathEmpty(sparkSession, tableWithLocation)
// This is a user provided schema.
// Doesn't come from a query, Follow nullability invariants.
val newMetadata = getProvidedMetadata(table, table.schema.json)
Expand Down Expand Up @@ -216,7 +216,7 @@ case class CreateDeltaTableCommand(
bucketSpec = table.bucketSpec)
}

private def assertPathEmpty(
private def makePathEmpty(
sparkSession: SparkSession,
tableWithLocation: CatalogTable): Unit = {
val path = new Path(tableWithLocation.location)
Expand All @@ -225,8 +225,10 @@ case class CreateDeltaTableCommand(
// we intentionally diverge from this behavior w.r.t regular datasource tables (that silently
// overwrite any previous data)
if (fs.exists(path) && fs.listStatus(path).nonEmpty) {
throw new AnalysisException(s"Cannot create table ('${tableWithLocation.identifier}')." +
s" The associated location ('${tableWithLocation.location}') is not empty.")
// throw new AnalysisException(s"Cannot create table ('${tableWithLocation.identifier}')." +
// s" The associated location ('${tableWithLocation.location}') is not empty.")
// we overwrite any previous data since our users have no permission to clean data in HDFS
fs.delete(path, true)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@

package org.apache.spark.sql.delta

import java.io.FileNotFoundException
import java.io.{File, FileNotFoundException}

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform}
import org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession}
import org.apache.spark.util.Utils

class DeltaSQLQuerySuite extends QueryTest
with SharedSparkSession with DeltaSQLCommandTest with SQLTestUtils {
Expand Down Expand Up @@ -231,4 +233,55 @@ class DeltaSQLQuerySuite extends QueryTest
}
}
}

test("temporary view may break the rule of full scan") {
withTable("delta_table") {
sql(
s"""
|CREATE TABLE delta_table
|USING delta AS SELECT 1 AS key, 1 AS value
""".stripMargin)

sql("create temporary view temp_view as select key from delta_table")
val e1 = intercept[AnalysisException] (
sql("UPDATE temp_view v SET key=2")
).getMessage
assert(e1.contains("Expect a full scan of Delta sources, but found a partial scan."))
}
}

test("CTAS a delta table with the existing non-empty directory") {
withTable("tab1") {
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
try {
// create an empty hidden file
tableLoc.mkdir()
val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage")
hiddenGarbageFile.createNewFile()
sql(s"CREATE TABLE tab1 USING DELTA AS SELECT 1, 'a'")
checkAnswer(spark.table("tab1"), Row(1, "a"))
} finally {
waitForTasksToFinish()
Utils.deleteRecursively(tableLoc)
}
}
}

test("create a managed table with the existing non-empty directory") {
withTable("tab1") {
val tableLoc = new File(spark.sessionState.catalog.defaultTablePath(TableIdentifier("tab1")))
try {
// create an empty hidden file
tableLoc.mkdir()
val hiddenGarbageFile = new File(tableLoc.getCanonicalPath, ".garbage")
hiddenGarbageFile.createNewFile()
sql(s"CREATE TABLE tab1 (col1 int, col2 string) USING DELTA")
sql("INSERT INTO tab1 VALUES (1, 'a')")
checkAnswer(spark.table("tab1"), Row(1, "a"))
} finally {
waitForTasksToFinish()
Utils.deleteRecursively(tableLoc)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ trait DeltaTableCreationTests
}
}

testQuietly("reject creating a delta table pointing to non-delta files") {
ignore("reject creating a delta table pointing to non-delta files") {
withTempPath { dir =>
withTable("delta_test") {
val path = dir.getCanonicalPath
Expand Down

0 comments on commit d74298a

Please sign in to comment.