Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19724][SQL]create a managed table with an existed default table should throw an exception #17272

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ def setUpClass(cls):
cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
os.unlink(cls.tempdir.name)
cls.spark = SparkSession(cls.sc)
cls.spark.catalog._reset()

cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
cls.df = cls.spark.createDataFrame(cls.testData)

Expand Down Expand Up @@ -1892,7 +1894,7 @@ def test_list_tables(self):
self.assertEquals(spark.catalog.listTables(), [])
self.assertEquals(spark.catalog.listTables("some_db"), [])
spark.createDataFrame([(1, 1)]).createOrReplaceTempView("temp_tab")
spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING parquet")
spark.sql("CREATE TABLE xxxxx1 (name STRING, age INT) USING parquet")
spark.sql("CREATE TABLE some_db.tab2 (name STRING, age INT) USING parquet")
tables = sorted(spark.catalog.listTables(), key=lambda t: t.name)
tablesDefault = sorted(spark.catalog.listTables("default"), key=lambda t: t.name)
Expand All @@ -1901,7 +1903,7 @@ def test_list_tables(self):
self.assertEquals(len(tables), 2)
self.assertEquals(len(tablesSomeDb), 2)
self.assertEquals(tables[0], Table(
name="tab1",
name="xxxxx1",
database="default",
description=None,
tableType="MANAGED",
Expand Down Expand Up @@ -1973,10 +1975,10 @@ def test_list_columns(self):
spark = self.spark
spark.catalog._reset()
spark.sql("CREATE DATABASE some_db")
spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING parquet")
spark.sql("CREATE TABLE xxxxx1 (name STRING, age INT) USING parquet")
spark.sql("CREATE TABLE some_db.tab2 (nickname STRING, tolerance FLOAT) USING parquet")
columns = sorted(spark.catalog.listColumns("tab1"), key=lambda c: c.name)
columnsDefault = sorted(spark.catalog.listColumns("tab1", "default"), key=lambda c: c.name)
columns = sorted(spark.catalog.listColumns("xxxxx1"), key=lambda c: c.name)
columnsDefault = sorted(spark.catalog.listColumns("xxxxx1", "default"), key=lambda c: c.name)
self.assertEquals(columns, columnsDefault)
self.assertEquals(len(columns), 2)
self.assertEquals(columns[0], Column(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,38 +187,32 @@ class InMemoryCatalog(
val db = tableDefinition.identifier.database.get
requireDbExists(db)
val table = tableDefinition.identifier.table
if (tableExists(db, table)) {
if (!ignoreIfExists) {
throw new TableAlreadyExistsException(db = db, table = table)
// Set the default table location if this is a managed table and its location is not
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have check if the table exists in SessionCatalog.createtable

// specified.
// Ideally we should not create a managed table with location, but Hive serde table can
// specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have
// to create the table directory and write out data before we create this table, to avoid
// exposing a partial written table.
val needDefaultTableLocation =
tableDefinition.tableType == CatalogTableType.MANAGED &&
tableDefinition.storage.locationUri.isEmpty

val tableWithLocation = if (needDefaultTableLocation) {
val defaultTableLocation = new Path(new Path(catalog(db).db.locationUri), table)
try {
val fs = defaultTableLocation.getFileSystem(hadoopConfig)
fs.mkdirs(defaultTableLocation)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to create table $table as failed " +
s"to create its directory $defaultTableLocation", e)
}
tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri))
} else {
// Set the default table location if this is a managed table and its location is not
// specified.
// Ideally we should not create a managed table with location, but Hive serde table can
// specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have
// to create the table directory and write out data before we create this table, to avoid
// exposing a partial written table.
val needDefaultTableLocation =
tableDefinition.tableType == CatalogTableType.MANAGED &&
tableDefinition.storage.locationUri.isEmpty

val tableWithLocation = if (needDefaultTableLocation) {
val defaultTableLocation = new Path(new Path(catalog(db).db.locationUri), table)
try {
val fs = defaultTableLocation.getFileSystem(hadoopConfig)
fs.mkdirs(defaultTableLocation)
} catch {
case e: IOException =>
throw new SparkException(s"Unable to create table $table as failed " +
s"to create its directory $defaultTableLocation", e)
}
tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri))
} else {
tableDefinition
}

catalog(db).tables.put(table, new TableDesc(tableWithLocation))
tableDefinition
}

catalog(db).tables.put(table, new TableDesc(tableWithLocation))
}

override def dropTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,28 @@ class SessionCatalog(
new Path(new Path(conf.warehousePath), database + ".db").toUri
}

/**
* Check if the table exists, and check if the path exists for managed table
*/
def checkTableOrPathExists(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
if (!ignoreIfExists) {
val db = formatDatabaseName(table.identifier.database.getOrElse(getCurrentDatabase))
val tbl = formatTableName(table.identifier.table)
val tableIdentifier = TableIdentifier(tbl, Some(db))
if (tableExists(tableIdentifier)) {
throw new TableAlreadyExistsException(db = db, table = tbl)
}
// As discussed in SPARK-19583, the default location of a managed table should not exists
if (table.tableType == CatalogTableType.MANAGED) {
val tablePath = new Path(defaultTablePath(tableIdentifier))
val fs = tablePath.getFileSystem(hadoopConf)
if (fs.exists(tablePath)) {
throw new AnalysisException(s"the location('${tablePath.toString}') " +
s"of table('$tableIdentifier') already exists.")
}
}
}
}
// ----------------------------------------------------------------------------
// Tables
// ----------------------------------------------------------------------------
Expand All @@ -259,6 +281,8 @@ class SessionCatalog(
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableDefinition.identifier.table)
validateName(table)
requireDbExists(db)
checkTableOrPathExists(tableDefinition, ignoreIfExists)

val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
&& !tableDefinition.storage.locationUri.get.isAbsolute) {
Expand All @@ -272,7 +296,6 @@ class SessionCatalog(
tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
}

requireDbExists(db)
externalCatalog.createTable(newTableDefinition, ignoreIfExists)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(actual.tableType === CatalogTableType.EXTERNAL)
}

test("create table when the table already exists") {
val catalog = newBasicCatalog()
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
val table = newTable("tbl1", "db2")
intercept[TableAlreadyExistsException] {
catalog.createTable(table, ignoreIfExists = false)
}
}

test("drop table") {
Copy link
Contributor Author

@windpiger windpiger Mar 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since in InMemoryCatalog and HiveExternalCatalog does not check if the table exists, here move the test case to SessionCatalogSuite(using InMemoryCatalog) and HiveExternalCatalogSuite

val catalog = newBasicCatalog()
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ class SessionCatalogSuite extends PlanTest {
assert(!catalog.databaseExists("does_not_exist"))
}

test("create table when the table already exists") {
val catalog = new SessionCatalog(newEmptyCatalog())
catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
catalog.createTable(newTable("tbl1", "db1"), ignoreIfExists = false)

val table = newTable("tbl1", "db1")
intercept[TableAlreadyExistsException] {
catalog.createTable(table, ignoreIfExists = false)
}.getMessage
}

def testInvalidName(func: (String) => Unit) {
// scalastyle:off
// non ascii characters are not allowed in the source code, so we disable the scalastyle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.command

import java.net.URI

import org.apache.hadoop.fs.Path

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -155,6 +153,8 @@ case class CreateDataSourceTableAsSelectCommand(
} else {
table.storage.locationUri
}

sparkSession.sessionState.catalog.checkTableOrPathExists(table, ignoreIfExists = false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because in following saveDataIntoTable will create the table path and store data in it, we should not check the path in following createTable, we check it here before saveDataIntoTable

val result = saveDataIntoTable(
sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
val newTable = table.copy(
Expand All @@ -163,7 +163,9 @@ case class CreateDataSourceTableAsSelectCommand(
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
schema = result.schema)
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
// we have checked the table/path exists above before saveDataIntoTable, here we
// set ignoreIfExists to true
sessionState.catalog.createTable(newTable, ignoreIfExists = true)

result match {
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2216,4 +2216,48 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}

test("create table for managed datasource table with a created location throw an exception") {
withTable("t", "t1", "t2", "t3") {
val warehousePath = spark.sharedState.warehousePath
val qualifiedwarehousePath = CatalogUtils.URIToString(makeQualifiedPath(warehousePath))
val tPath = new Path(qualifiedwarehousePath, "t")
val fs = tPath.getFileSystem(spark.sessionState.newHadoopConf())
fs.mkdirs(tPath)
assert(fs.exists(tPath))
val e = intercept[AnalysisException] {
spark.sql("CREATE TABLE t(a string) USING parquet")
}.getMessage
assert(e.contains(s"the location('${tPath.toString}') of table" +
s"('`default`.`t`') already exists."))
// partition table(table path exists)
val t1Path = new Path(qualifiedwarehousePath, "t1")
fs.mkdirs(t1Path)
assert(fs.exists(t1Path))
val e1 = intercept[AnalysisException] {
spark.sql("CREATE TABLE t1(a string, b string) USING parquet PARTITIONED BY(a)")
}.getMessage
assert(e1.contains(s"the location('${t1Path.toString}') of table" +
s"('`default`.`t1`') already exists."))

val t2Path = new Path(qualifiedwarehousePath, "t2")
fs.mkdirs(t2Path)
assert(fs.exists(t2Path))
val e2 = intercept[AnalysisException] {
spark.sql("CREATE TABLE t2 USING parquet AS SELECT 1")
}.getMessage
assert(e2.contains(s"the location('${t2Path.toString}') of table" +
s"('`default`.`t2`') already exists."))

val t3Path = new Path(qualifiedwarehousePath, "t3")
val t3PartPath = new Path(t3Path, "a=1")
fs.mkdirs(t3PartPath)
assert(fs.exists(t3PartPath))
val e3 = intercept[AnalysisException] {
spark.sql("CREATE TABLE t3 USING parquet PARTITIONED BY(a) AS SELECT 1 a, 2 b")
}.getMessage
assert(e3.contains(s"the location('${t3Path.toString}') of table" +
s"('`default`.`t3`') already exists."))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.sql.test

import java.io.File

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils


/**
Expand Down Expand Up @@ -71,6 +73,7 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
*/
protected override def afterAll(): Unit = {
super.afterAll()
Utils.deleteRecursively(new File(_spark.sessionState.conf.warehousePath))
if (_spark != null) {
_spark.stop()
_spark = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
requireDbExists(db)
verifyTableProperties(tableDefinition)

if (tableExists(db, table) && !ignoreIfExists) {
throw new TableAlreadyExistsException(db = db, table = table)
}

if (tableDefinition.tableType == VIEW) {
client.createTable(tableDefinition, ignoreIfExists)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.sql.hive.test

import java.io.File

import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkFunSuite
import org.apache.spark.util.Utils


trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll {
Expand All @@ -31,6 +34,7 @@ trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll {
try {
hiveContext.reset()
} finally {
Utils.deleteRecursively(new File(spark.sessionState.conf.warehousePath))
super.afterAll()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.dsl.expressions._
Expand Down Expand Up @@ -50,6 +51,16 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {

import utils._

test("create table when the table already exists") {
val catalog = newBasicCatalog()
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
val table = newTable("tbl1", "db2")
val e = intercept[AnalysisException] {
catalog.createTable(table, ignoreIfExists = false)
}.getMessage
assert(e.contains("AlreadyExistsException(message:Table tbl1 already exists);"))
}

test("list partitions by filter") {
val catalog = newBasicCatalog()
val selectedPartitions = catalog.listPartitionsByFilter("db2", "tbl2", Seq('a.int === 1), "GMT")
Expand Down
Loading