-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-14684] [SPARK-15277] [SQL] Partition Spec Validation in SessionCatalog and Checking Partition Spec Existence Before Dropping #12801
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
01e4cdf
6835704
9180687
b38a21e
d2b84af
fda8025
ac0dccd
6e0018b
0546772
b37a64f
c2a872c
ab6dbd7
4276356
2dab708
0458770
1debdfa
763706d
4de6ec1
9422a4f
52bdf48
1e95df3
fab24cf
8b2e33b
2ee1876
b9f0090
ade6f7e
9fd63d2
5199d49
404214c
c001dd9
59daa48
41d5f64
472a6e3
0fba10a
cbf73b3
c08f561
474df88
3d9828d
72d2361
07afea5
8bf2007
87a165b
b9359cd
65bd090
babf2da
9e09469
50a8e4a
f3337fa
09cc36d
83a1915
0483145
236a5f4
08aaa4d
64f704e
006ea2d
0c0dc8a
ddd0b2e
980b51e
524d5a4
b0e19c4
6272398
3b7b5de
7c4b2f0
78a868b
38f3af9
fa15228
8089c6f
acbbf5c
8a4980c
a6c7518
546c1db
e2ece35
13c04be
ac88fc1
154d3df
412e88a
c570065
ac03674
f58b5f7
3ccd099
8e13da3
15f287f
9ca7621
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -510,6 +510,7 @@ class SessionCatalog( | |
| tableName: TableIdentifier, | ||
| parts: Seq[CatalogTablePartition], | ||
| ignoreIfExists: Boolean): Unit = { | ||
| requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) | ||
| val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) | ||
| val table = formatTableName(tableName.table) | ||
| requireDbExists(db) | ||
|
|
@@ -523,13 +524,14 @@ class SessionCatalog( | |
| */ | ||
| def dropPartitions( | ||
| tableName: TableIdentifier, | ||
| parts: Seq[TablePartitionSpec], | ||
| specs: Seq[TablePartitionSpec], | ||
| ignoreIfNotExists: Boolean): Unit = { | ||
| requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) | ||
| val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) | ||
| val table = formatTableName(tableName.table) | ||
| requireDbExists(db) | ||
| requireTableExists(TableIdentifier(table, Option(db))) | ||
| externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists) | ||
| externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -542,6 +544,9 @@ class SessionCatalog( | |
| tableName: TableIdentifier, | ||
| specs: Seq[TablePartitionSpec], | ||
| newSpecs: Seq[TablePartitionSpec]): Unit = { | ||
| val tableMetadata = getTableMetadata(tableName) | ||
| requireExactMatchedPartitionSpec(specs, tableMetadata) | ||
| requireExactMatchedPartitionSpec(newSpecs, tableMetadata) | ||
| val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) | ||
| val table = formatTableName(tableName.table) | ||
| requireDbExists(db) | ||
|
|
@@ -559,6 +564,7 @@ class SessionCatalog( | |
| * this becomes a no-op. | ||
| */ | ||
| def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = { | ||
| requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName)) | ||
| val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) | ||
| val table = formatTableName(tableName.table) | ||
| requireDbExists(db) | ||
|
|
@@ -571,6 +577,7 @@ class SessionCatalog( | |
| * If no database is specified, assume the table is in the current database. | ||
| */ | ||
| def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = { | ||
| requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName)) | ||
| val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) | ||
| val table = formatTableName(tableName.table) | ||
| requireDbExists(db) | ||
|
|
@@ -595,6 +602,42 @@ class SessionCatalog( | |
| externalCatalog.listPartitions(db, table, partialSpec) | ||
| } | ||
|
|
||
| /** | ||
| * Verify if the input partition spec exactly matches the existing defined partition spec | ||
| * The columns must be the same but the orders could be different. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it consistent with hive? Sorry I don't remember it clearly, but sometimes the partition order matters in hive?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, Hive supports it. To the external users, the orders do not matter. This is just for usability, I think. |
||
| */ | ||
| private def requireExactMatchedPartitionSpec( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the interface here could be better, maybe something like This would allow you to reuse more code
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, will do it.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The latest code changes have contained this great idea. Thanks! |
||
| specs: Seq[TablePartitionSpec], | ||
| table: CatalogTable): Unit = { | ||
| val defined = table.partitionColumnNames.sorted | ||
| specs.foreach { s => | ||
| if (s.keys.toSeq.sorted != defined) { | ||
| throw new AnalysisException( | ||
| s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must match " + | ||
| s"the partition spec (${table.partitionColumnNames.mkString(", ")}) defined in " + | ||
| s"table '${table.identifier}'") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Verify if the input partition spec partially matches the existing defined partition spec | ||
| * That is, the columns of partition spec should be part of the defined partition spec. | ||
| */ | ||
| private def requirePartialMatchedPartitionSpec( | ||
| specs: Seq[TablePartitionSpec], | ||
| table: CatalogTable): Unit = { | ||
| val defined = table.partitionColumnNames | ||
| specs.foreach { s => | ||
| if (!s.keys.forall(defined.contains)) { | ||
| throw new AnalysisException( | ||
| s"Partition spec is invalid. The spec (${s.keys.mkString(", ")}) must be contained " + | ||
| s"within the partition spec (${table.partitionColumnNames.mkString(", ")}) defined " + | ||
| s"in table '${table.identifier}'") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // ---------------------------------------------------------------------------- | ||
| // Functions | ||
| // ---------------------------------------------------------------------------- | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -482,8 +482,10 @@ class SessionCatalogSuite extends SparkFunSuite { | |
| assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2))) | ||
| // Create partitions without explicitly specifying database | ||
| sessionCatalog.setCurrentDatabase("mydb") | ||
| sessionCatalog.createPartitions(TableIdentifier("tbl"), Seq(part3), ignoreIfExists = false) | ||
| assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2, part3))) | ||
| sessionCatalog.createPartitions( | ||
| TableIdentifier("tbl"), Seq(partWithMixedOrder), ignoreIfExists = false) | ||
| assert(catalogPartitionsEqual( | ||
| externalCatalog, "mydb", "tbl", Seq(part1, part2, partWithMixedOrder))) | ||
| } | ||
|
|
||
| test("create partitions when database/table does not exist") { | ||
|
|
@@ -508,6 +510,31 @@ class SessionCatalogSuite extends SparkFunSuite { | |
| TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = true) | ||
| } | ||
|
|
||
| test("create partitions with invalid part spec") { | ||
| val catalog = new SessionCatalog(newBasicCatalog()) | ||
| var e = intercept[AnalysisException] { | ||
| catalog.createPartitions( | ||
| TableIdentifier("tbl2", Some("db2")), | ||
| Seq(part1, partWithLessColumns), ignoreIfExists = false) | ||
| } | ||
| assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + | ||
| "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) | ||
| e = intercept[AnalysisException] { | ||
| catalog.createPartitions( | ||
| TableIdentifier("tbl2", Some("db2")), | ||
| Seq(part1, partWithMoreColumns), ignoreIfExists = true) | ||
| } | ||
| assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + | ||
| "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) | ||
| e = intercept[AnalysisException] { | ||
| catalog.createPartitions( | ||
| TableIdentifier("tbl2", Some("db2")), | ||
| Seq(partWithUnknownColumns, part1), ignoreIfExists = true) | ||
| } | ||
| assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + | ||
| "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) | ||
| } | ||
|
|
||
| test("drop partitions") { | ||
| val externalCatalog = newBasicCatalog() | ||
| val sessionCatalog = new SessionCatalog(externalCatalog) | ||
|
|
@@ -565,6 +592,28 @@ class SessionCatalogSuite extends SparkFunSuite { | |
| ignoreIfNotExists = true) | ||
| } | ||
|
|
||
| test("drop partitions with invalid partition spec") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we test drop partitions with
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since In-Memory Catalog does not support it, the test will fail. Thus, I did not add it. |
||
| val catalog = new SessionCatalog(newBasicCatalog()) | ||
| var e = intercept[AnalysisException] { | ||
| catalog.dropPartitions( | ||
| TableIdentifier("tbl2", Some("db2")), | ||
| Seq(partWithMoreColumns.spec), | ||
| ignoreIfNotExists = false) | ||
| } | ||
| assert(e.getMessage.contains( | ||
| "Partition spec is invalid. The spec (a, b, c) must be contained within " + | ||
| "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) | ||
| e = intercept[AnalysisException] { | ||
| catalog.dropPartitions( | ||
| TableIdentifier("tbl2", Some("db2")), | ||
| Seq(partWithUnknownColumns.spec), | ||
| ignoreIfNotExists = false) | ||
| } | ||
| assert(e.getMessage.contains( | ||
| "Partition spec is invalid. The spec (a, unknown) must be contained within " + | ||
| "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) | ||
| } | ||
|
|
||
| test("get partition") { | ||
| val catalog = new SessionCatalog(newBasicCatalog()) | ||
| assert(catalog.getPartition( | ||
|
|
@@ -591,6 +640,25 @@ class SessionCatalogSuite extends SparkFunSuite { | |
| } | ||
| } | ||
|
|
||
| test("get partition with invalid partition spec") { | ||
| val catalog = new SessionCatalog(newBasicCatalog()) | ||
| var e = intercept[AnalysisException] { | ||
| catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithLessColumns.spec) | ||
| } | ||
| assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + | ||
| "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) | ||
| e = intercept[AnalysisException] { | ||
| catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithMoreColumns.spec) | ||
| } | ||
| assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + | ||
| "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) | ||
| e = intercept[AnalysisException] { | ||
| catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithUnknownColumns.spec) | ||
| } | ||
| assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + | ||
| "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) | ||
| } | ||
|
|
||
| test("rename partitions") { | ||
| val catalog = new SessionCatalog(newBasicCatalog()) | ||
| val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101")) | ||
|
|
@@ -633,6 +701,31 @@ class SessionCatalogSuite extends SparkFunSuite { | |
| } | ||
| } | ||
|
|
||
| test("rename partition with invalid partition spec") { | ||
| val catalog = new SessionCatalog(newBasicCatalog()) | ||
| var e = intercept[AnalysisException] { | ||
| catalog.renamePartitions( | ||
| TableIdentifier("tbl1", Some("db2")), | ||
| Seq(part1.spec), Seq(partWithLessColumns.spec)) | ||
| } | ||
| assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + | ||
| "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) | ||
| e = intercept[AnalysisException] { | ||
| catalog.renamePartitions( | ||
| TableIdentifier("tbl1", Some("db2")), | ||
| Seq(part1.spec), Seq(partWithMoreColumns.spec)) | ||
| } | ||
| assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + | ||
| "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) | ||
| e = intercept[AnalysisException] { | ||
| catalog.renamePartitions( | ||
| TableIdentifier("tbl1", Some("db2")), | ||
| Seq(part1.spec), Seq(partWithUnknownColumns.spec)) | ||
| } | ||
| assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + | ||
| "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) | ||
| } | ||
|
|
||
| test("alter partitions") { | ||
| val catalog = new SessionCatalog(newBasicCatalog()) | ||
| val newLocation = newUriForDatabase() | ||
|
|
@@ -673,6 +766,25 @@ class SessionCatalogSuite extends SparkFunSuite { | |
| } | ||
| } | ||
|
|
||
| test("alter partition with invalid partition spec") { | ||
| val catalog = new SessionCatalog(newBasicCatalog()) | ||
| var e = intercept[AnalysisException] { | ||
| catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithLessColumns)) | ||
| } | ||
| assert(e.getMessage.contains("Partition spec is invalid. The spec (a) must match " + | ||
| "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) | ||
| e = intercept[AnalysisException] { | ||
| catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithMoreColumns)) | ||
| } | ||
| assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must match " + | ||
| "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) | ||
| e = intercept[AnalysisException] { | ||
| catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithUnknownColumns)) | ||
| } | ||
| assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " + | ||
| "the partition spec (a, b) defined in table '`db2`.`tbl1`'")) | ||
| } | ||
|
|
||
| test("list partitions") { | ||
| val catalog = new SessionCatalog(newBasicCatalog()) | ||
| assert(catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))).toSet == Set(part1, part2)) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, I thought the intention of this patch is to ban the partial matching, which is buggy in Hive? Looks like this just ensures that the user doesn't specify a column that is not a partitioned column here. Is that sufficient to bypass the bug in Hive that you were talking about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, if we do not specify the invalid partition spec, Hive can return a correct set of qualified partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thus, this PR resolves the issue by banning users to specify the invalid partition spec.
Below is the test case:
https://github.com/gatorsmile/spark/blob/9ca76217defe5e0154b1d26a61baf4831d19df3a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala#L189-L194
Without this PR fix, it will drop all the partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand now. The earlier PR descriptions were kind of confusing.