From a0ba4190a96e2698e9c485b0ccbb7298a360f1dd Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 13 Mar 2017 15:24:18 +0800 Subject: [PATCH 1/7] [SPARK-19724][SQL]create a managed table with an existed default table path should throw an exception --- .../catalyst/catalog/InMemoryCatalog.scala | 52 ++++++++----------- .../sql/catalyst/catalog/SessionCatalog.scala | 25 ++++++++- .../catalog/ExternalCatalogSuite.scala | 9 ---- .../catalog/SessionCatalogSuite.scala | 11 ++++ .../command/createDataSourceTables.scala | 7 ++- .../sql/execution/command/DDLSuite.scala | 44 ++++++++++++++++ .../spark/sql/hive/HiveExternalCatalog.scala | 4 -- .../sql/hive/HiveExternalCatalogSuite.scala | 11 ++++ .../sql/hive/execution/HiveDDLSuite.scala | 49 ++++++++++++++++- 9 files changed, 165 insertions(+), 47 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 5cc6b0abc6fde..e4f7160882eac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -187,38 +187,32 @@ class InMemoryCatalog( val db = tableDefinition.identifier.database.get requireDbExists(db) val table = tableDefinition.identifier.table - if (tableExists(db, table)) { - if (!ignoreIfExists) { - throw new TableAlreadyExistsException(db = db, table = table) + // Set the default table location if this is a managed table and its location is not + // specified. + // Ideally we should not create a managed table with location, but Hive serde table can + // specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have + // to create the table directory and write out data before we create this table, to avoid + // exposing a partial written table. + val needDefaultTableLocation = + tableDefinition.tableType == CatalogTableType.MANAGED && + tableDefinition.storage.locationUri.isEmpty + + val tableWithLocation = if (needDefaultTableLocation) { + val defaultTableLocation = new Path(new Path(catalog(db).db.locationUri), table) + try { + val fs = defaultTableLocation.getFileSystem(hadoopConfig) + fs.mkdirs(defaultTableLocation) + } catch { + case e: IOException => + throw new SparkException(s"Unable to create table $table as failed " + + s"to create its directory $defaultTableLocation", e) } + tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri)) } else { - // Set the default table location if this is a managed table and its location is not - // specified. - // Ideally we should not create a managed table with location, but Hive serde table can - // specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have - // to create the table directory and write out data before we create this table, to avoid - // exposing a partial written table. - val needDefaultTableLocation = - tableDefinition.tableType == CatalogTableType.MANAGED && - tableDefinition.storage.locationUri.isEmpty - - val tableWithLocation = if (needDefaultTableLocation) { - val defaultTableLocation = new Path(new Path(catalog(db).db.locationUri), table) - try { - val fs = defaultTableLocation.getFileSystem(hadoopConfig) - fs.mkdirs(defaultTableLocation) - } catch { - case e: IOException => - throw new SparkException(s"Unable to create table $table as failed " + - s"to create its directory $defaultTableLocation", e) - } - tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri)) - } else { - tableDefinition - } - - catalog(db).tables.put(table, new TableDesc(tableWithLocation)) + tableDefinition } + + catalog(db).tables.put(table, new TableDesc(tableWithLocation)) } override def dropTable( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index bfcdb70fe47c1..a601dd1c7fb74 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -238,6 +238,28 @@ class SessionCatalog( new Path(new Path(conf.warehousePath), database + ".db").toUri } + /** + * Check if the table exists, and check if the path exists for managed table + */ + def checkTableOrPathExists(table: CatalogTable, ignoreIfExists: Boolean): Unit = { + if (!ignoreIfExists) { + val db = formatDatabaseName(table.identifier.database.getOrElse(getCurrentDatabase)) + val tbl = formatTableName(table.identifier.table) + val tableIdentifier = TableIdentifier(tbl, Some(db)) + if (tableExists(tableIdentifier)) { + throw new TableAlreadyExistsException(db = db, table = tbl) + } + // As discussed in SPARK-19583, the default location of a managed table should not exists + if (table.tableType == CatalogTableType.MANAGED) { + val tablePath = new Path(defaultTablePath(tableIdentifier)) + val fs = tablePath.getFileSystem(hadoopConf) + if (fs.exists(tablePath)) { + throw new AnalysisException(s"the location('${tablePath.toString}') " + + s"of table('$tableIdentifier') already exists.") + } + } + } + } // ---------------------------------------------------------------------------- // Tables // ---------------------------------------------------------------------------- @@ -259,6 +281,8 @@ class SessionCatalog( val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableDefinition.identifier.table) validateName(table) + requireDbExists(db) + checkTableOrPathExists(tableDefinition, ignoreIfExists) val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined && !tableDefinition.storage.locationUri.get.isAbsolute) { @@ -272,7 +296,6 @@ class SessionCatalog( tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) } - requireDbExists(db) externalCatalog.createTable(newTableDefinition, ignoreIfExists) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 7820f39d96426..7df2663a823b6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -164,15 +164,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(actual.tableType === CatalogTableType.EXTERNAL) } - test("create table when the table already exists") { - val catalog = newBasicCatalog() - assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - val table = newTable("tbl1", "db2") - intercept[TableAlreadyExistsException] { - catalog.createTable(table, ignoreIfExists = false) - } - } - test("drop table") { val catalog = newBasicCatalog() assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 7e74dcdef0e27..5dcfeaca7112f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -64,6 +64,17 @@ class SessionCatalogSuite extends PlanTest { assert(!catalog.databaseExists("does_not_exist")) } + test("create table when the table already exists") { + val catalog = new SessionCatalog(newEmptyCatalog()) + catalog.createDatabase(newDb("db1"), ignoreIfExists = false) + catalog.createTable(newTable("tbl1", "db1"), ignoreIfExists = false) + + val table = newTable("tbl1", "db1") + intercept[TableAlreadyExistsException] { + catalog.createTable(table, ignoreIfExists = false) + }.getMessage + } + def testInvalidName(func: (String) => Unit) { // scalastyle:off // non ascii characters are not allowed in the source code, so we disable the scalastyle. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 2d890118ae0a5..f5c8624c7e8b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command import java.net.URI -import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ @@ -155,6 +154,8 @@ case class CreateDataSourceTableAsSelectCommand( } else { table.storage.locationUri } + + sparkSession.sessionState.catalog.checkTableOrPathExists(table, ignoreIfExists = false) val result = saveDataIntoTable( sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false) val newTable = table.copy( @@ -163,7 +164,9 @@ case class CreateDataSourceTableAsSelectCommand( // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). schema = result.schema) - sessionState.catalog.createTable(newTable, ignoreIfExists = false) + // we have check the table/path exists above before saveDataIntoTable, here we + // set ignoreIfExists to true + sessionState.catalog.createTable(newTable, ignoreIfExists = true) result match { case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 0666f446f3b52..599d9cd17242b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2216,4 +2216,48 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } } + + test("create table for managed datasource table with a created location throw an exception") { + withTable("t", "t1", "t2", "t3") { + val warehousePath = spark.sharedState.warehousePath + val qualifiedwarehousePath = CatalogUtils.URIToString(makeQualifiedPath(warehousePath)) + val tPath = new Path(qualifiedwarehousePath, "t") + val fs = tPath.getFileSystem(spark.sessionState.newHadoopConf()) + fs.mkdirs(tPath) + assert(fs.exists(tPath)) + val e = intercept[AnalysisException] { + spark.sql("CREATE TABLE t(a string) USING parquet") + }.getMessage + assert(e.contains(s"the location('${tPath.toString}') of table" + + s"('`default`.`t`') already exists.")) + // partition table(table path exists) + val t1Path = new Path(qualifiedwarehousePath, "t1") + fs.mkdirs(t1Path) + assert(fs.exists(t1Path)) + val e1 = intercept[AnalysisException] { + spark.sql("CREATE TABLE t1(a string, b string) USING parquet PARTITIONED BY(a)") + }.getMessage + assert(e1.contains(s"the location('${t1Path.toString}') of table" + + s"('`default`.`t1`') already exists.")) + + val t2Path = new Path(qualifiedwarehousePath, "t2") + fs.mkdirs(t2Path) + assert(fs.exists(t2Path)) + val e2 = intercept[AnalysisException] { + spark.sql("CREATE TABLE t2 USING parquet AS SELECT 1") + }.getMessage + assert(e2.contains(s"the location('${t2Path.toString}') of table" + + s"('`default`.`t2`') already exists.")) + + val t3Path = new Path(qualifiedwarehousePath, "t3") + val t3PartPath = new Path(t3Path, "a=1") + fs.mkdirs(t3PartPath) + assert(fs.exists(t3PartPath)) + val e3 = intercept[AnalysisException] { + spark.sql("CREATE TABLE t3 USING parquet PARTITIONED BY(a) AS SELECT 1 a, 2 b") + }.getMessage + assert(e3.contains(s"the location('${t3Path.toString}') of table" + + s"('`default`.`t3`') already exists.")) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 78aa2bd2494f3..33e17f3e4a760 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -195,10 +195,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat requireDbExists(db) verifyTableProperties(tableDefinition) - if (tableExists(db, table) && !ignoreIfExists) { - throw new TableAlreadyExistsException(db = db, table = table) - } - if (tableDefinition.tableType == VIEW) { client.createTable(tableDefinition, ignoreIfExists) } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 4349f1aa23be0..8046d186d45db 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -50,6 +51,16 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { import utils._ + test("create table when the table already exists") { + val catalog = newBasicCatalog() + assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + val table = newTable("tbl1", "db2") + val e = intercept[AnalysisException] { + catalog.createTable(table, ignoreIfExists = false) + }.getMessage + assert(e.contains("AlreadyExistsException(message:Table tbl1 already exists);")) + } + test("list partitions by filter") { val catalog = newBasicCatalog() val selectedPartitions = catalog.listPartitionsByFilter("db2", "tbl2", Seq('a.int === 1), "GMT") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d29242bb47e36..65aacbe22b2c9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1364,7 +1364,7 @@ class HiveDDLSuite import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_PREFIX import org.apache.spark.sql.hive.HiveExternalCatalog.STATISTICS_PREFIX - withTable("tbl") { + withTable("tbl", "tbl1") { sql("CREATE TABLE tbl(a INT) STORED AS parquet") Seq(DATASOURCE_PREFIX, STATISTICS_PREFIX).foreach { forbiddenPrefix => @@ -1379,7 +1379,7 @@ class HiveDDLSuite assert(e2.getMessage.contains(forbiddenPrefix + "foo")) val e3 = intercept[AnalysisException] { - sql(s"CREATE TABLE tbl (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')") + sql(s"CREATE TABLE tbl1 (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')") } assert(e3.getMessage.contains(forbiddenPrefix + "foo")) } @@ -1861,4 +1861,49 @@ class HiveDDLSuite } } } + + + test("create table for managed hive table with a created location throw an exception") { + withTable("t", "t1", "t2", "t3") { + val warehousePath = spark.sharedState.warehousePath + val qualifiedwarehousePath = CatalogUtils.URIToString(makeQualifiedPath(warehousePath)) + val tPath = new Path(qualifiedwarehousePath, "t") + val fs = tPath.getFileSystem(spark.sessionState.newHadoopConf()) + fs.mkdirs(tPath) + assert(fs.exists(tPath)) + val e = intercept[AnalysisException] { + spark.sql("CREATE TABLE t(a string) USING hive") + }.getMessage + assert(e.contains(s"the location('${tPath.toString}') of table" + + s"('`default`.`t`') already exists.")) + // partition table(table path exists) + val t1Path = new Path(qualifiedwarehousePath, "t1") + fs.mkdirs(t1Path) + assert(fs.exists(t1Path)) + val e1 = intercept[AnalysisException] { + spark.sql("CREATE TABLE t1(a string, b string) USING hive PARTITIONED BY(a)") + }.getMessage + assert(e1.contains(s"the location('${t1Path.toString}') of table" + + s"('`default`.`t1`') already exists.")) + + val t2Path = new Path(qualifiedwarehousePath, "t2") + fs.mkdirs(t2Path) + assert(fs.exists(t2Path)) + val e2 = intercept[AnalysisException] { + spark.sql("CREATE TABLE t2 USING hive AS SELECT 1") + }.getMessage + assert(e2.contains(s"the location('${t2Path.toString}') of table" + + s"('`default`.`t2`') already exists.")) + + val t3Path = new Path(qualifiedwarehousePath, "t3") + val t3PartPath = new Path(t3Path, "a=1") + fs.mkdirs(t3PartPath) + assert(fs.exists(t3PartPath)) + val e3 = intercept[AnalysisException] { + spark.sql("CREATE TABLE t3 USING hive PARTITIONED BY(a) AS SELECT 1 a, 2 b") + }.getMessage + assert(e3.contains(s"the location('${t3Path.toString}') of table" + + s"('`default`.`t3`') already exists.")) + } + } } From 65d7ea9f9b2bcf02e5eab830208bfc889cf26d38 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 13 Mar 2017 15:31:37 +0800 Subject: [PATCH 2/7] remove empty lines --- .../spark/sql/execution/command/createDataSourceTables.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index f5c8624c7e8b9..f8b63e4aaa035 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command import java.net.URI - import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan From 0e753cc689bcee1e2a06d3ebad0e2829e7a5660c Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 13 Mar 2017 16:08:40 +0800 Subject: [PATCH 3/7] fix comment --- .../spark/sql/execution/command/createDataSourceTables.scala | 2 +- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index f8b63e4aaa035..b5aba293174cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -163,7 +163,7 @@ case class CreateDataSourceTableAsSelectCommand( // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). schema = result.schema) - // we have check the table/path exists above before saveDataIntoTable, here we + // we have checked the table/path exists above before saveDataIntoTable, here we // set ignoreIfExists to true sessionState.catalog.createTable(newTable, ignoreIfExists = true) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 65aacbe22b2c9..8c6267a60754b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1862,7 +1862,6 @@ class HiveDDLSuite } } - test("create table for managed hive table with a created location throw an exception") { withTable("t", "t1", "t2", "t3") { val warehousePath = spark.sharedState.warehousePath From c8d9b7757feeb1e4a1e4a84a0445f9c45a604797 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 13 Mar 2017 19:19:06 +0800 Subject: [PATCH 4/7] fix test failed --- .../org/apache/spark/sql/hive/test/TestHiveSingleton.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala index 9bf84ab1fb7a2..53f2d50a31a22 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/test/TestHiveSingleton.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.hive.test +import java.io.File + import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.SparkSession import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll { @@ -31,6 +34,7 @@ trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll { try { hiveContext.reset() } finally { + Utils.deleteRecursively(new File(spark.sessionState.conf.warehousePath)) super.afterAll() } } From cd4a0912bc934ca475f2d9097ab1f684351b7bf8 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 14 Mar 2017 08:29:43 +0800 Subject: [PATCH 5/7] fix test failed --- .../scala/org/apache/spark/sql/test/SharedSQLContext.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index e122b39f6fc40..8050858bdfa6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.test +import java.io.File + import org.scalatest.BeforeAndAfterEach import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils /** @@ -71,6 +73,7 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach { */ protected override def afterAll(): Unit = { super.afterAll() + Utils.deleteRecursively(new File(_spark.sessionState.conf.warehousePath)) if (_spark != null) { _spark.stop() _spark = null From 739f207b5a79055c90266202b6df21a74307a7b3 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 15 Mar 2017 00:00:20 +0800 Subject: [PATCH 6/7] reset catalog in pyspark tests --- python/pyspark/sql/tests.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index f0a9a0400e392..d0b2bb6be63de 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -197,6 +197,8 @@ def setUpClass(cls): cls.tempdir = tempfile.NamedTemporaryFile(delete=False) os.unlink(cls.tempdir.name) cls.spark = SparkSession(cls.sc) + cls.spark.catalog._reset() + cls.testData = [Row(key=i, value=str(i)) for i in range(100)] cls.df = cls.spark.createDataFrame(cls.testData) From 4351cd7e9d1cd3a858b55dd21739ec362da43173 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 15 Mar 2017 22:28:18 +0800 Subject: [PATCH 7/7] to find why jenkins failed --- python/pyspark/sql/tests.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index d0b2bb6be63de..e89105d42fc7c 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1894,7 +1894,7 @@ def test_list_tables(self): self.assertEquals(spark.catalog.listTables(), []) self.assertEquals(spark.catalog.listTables("some_db"), []) spark.createDataFrame([(1, 1)]).createOrReplaceTempView("temp_tab") - spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING parquet") + spark.sql("CREATE TABLE xxxxx1 (name STRING, age INT) USING parquet") spark.sql("CREATE TABLE some_db.tab2 (name STRING, age INT) USING parquet") tables = sorted(spark.catalog.listTables(), key=lambda t: t.name) tablesDefault = sorted(spark.catalog.listTables("default"), key=lambda t: t.name) @@ -1903,7 +1903,7 @@ def test_list_tables(self): self.assertEquals(len(tables), 2) self.assertEquals(len(tablesSomeDb), 2) self.assertEquals(tables[0], Table( - name="tab1", + name="xxxxx1", database="default", description=None, tableType="MANAGED", @@ -1975,10 +1975,10 @@ def test_list_columns(self): spark = self.spark spark.catalog._reset() spark.sql("CREATE DATABASE some_db") - spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING parquet") + spark.sql("CREATE TABLE xxxxx1 (name STRING, age INT) USING parquet") spark.sql("CREATE TABLE some_db.tab2 (nickname STRING, tolerance FLOAT) USING parquet") - columns = sorted(spark.catalog.listColumns("tab1"), key=lambda c: c.name) - columnsDefault = sorted(spark.catalog.listColumns("tab1", "default"), key=lambda c: c.name) + columns = sorted(spark.catalog.listColumns("xxxxx1"), key=lambda c: c.name) + columnsDefault = sorted(spark.catalog.listColumns("xxxxx1", "default"), key=lambda c: c.name) self.assertEquals(columns, columnsDefault) self.assertEquals(len(columns), 2) self.assertEquals(columns[0], Column(