Skip to content

Commit

Permalink
[SPARK-33569][SQL] Remove getting partitions by an identifier prefix
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1. Remove the method `listPartitionIdentifiers()` from the `SupportsPartitionManagement` interface. The method lists partitions by ident prefix.
2. Rename `listPartitionByNames()` to `listPartitionIdentifiers()`.
3. Re-implement the default method `partitionExists()` using new method.

### Why are the changes needed?
Getting partitions by ident prefix only is not used, and it can be removed to improve code maintenance. Also this makes the `SupportsPartitionManagement` interface cleaner.

### Does this PR introduce _any_ user-facing change?
Should not.

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly org.apache.spark.sql.connector.catalog.*"
```

Closes #30514 from MaxGekk/remove-listPartitionIdentifiers.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
MaxGekk authored and cloud-fan committed Nov 30, 2020
1 parent 0a612b6 commit 6fd148f
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.connector.catalog;

import java.util.Arrays;
import java.util.Map;

import org.apache.spark.annotation.Experimental;
Expand Down Expand Up @@ -79,7 +80,9 @@ void createPartition(
* @return true if the partition exists, false otherwise
*/
default boolean partitionExists(InternalRow ident) {
return listPartitionIdentifiers(ident).length > 0;
String[] partitionNames = partitionSchema().names();
String[] requiredNames = Arrays.copyOfRange(partitionNames, 0, ident.numFields());
return listPartitionIdentifiers(requiredNames, ident).length > 0;
}

/**
Expand All @@ -105,20 +108,12 @@ void replacePartitionMetadata(
Map<String, String> loadPartitionMetadata(InternalRow ident)
throws UnsupportedOperationException;

/**
* List the identifiers of all partitions that have the ident prefix in a table.
*
* @param ident a prefix of partition identifier
* @return an array of Identifiers for the partitions
*/
InternalRow[] listPartitionIdentifiers(InternalRow ident);

/**
* List the identifiers of all partitions that match to the ident by names.
*
* @param names the names of partition values in the identifier.
* @param ident a partition identifier values.
* @return an array of Identifiers for the partitions
*/
InternalRow[] listPartitionByNames(String[] names, InternalRow ident);
InternalRow[] listPartitionIdentifiers(String[] names, InternalRow ident);
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,14 @@ class InMemoryPartitionTable(
}
}

def listPartitionIdentifiers(ident: InternalRow): Array[InternalRow] = {
val prefixPartCols =
new StructType(partitionSchema.dropRight(partitionSchema.length - ident.numFields).toArray)
val prefixPart = ident.toSeq(prefixPartCols)
memoryTablePartitions.keySet().asScala
.filter(_.toSeq(partitionSchema).startsWith(prefixPart)).toArray
}

override def partitionExists(ident: InternalRow): Boolean =
memoryTablePartitions.containsKey(ident)

override protected def addPartitionKey(key: Seq[Any]): Unit = {
memoryTablePartitions.put(InternalRow.fromSeq(key), Map.empty[String, String].asJava)
}

override def listPartitionByNames(
override def listPartitionIdentifiers(
names: Array[String],
ident: InternalRow): Array[InternalRow] = {
assert(names.length == ident.numFields,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,38 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
newCatalog
}

private def hasPartitions(table: SupportsPartitionManagement): Boolean = {
!table.listPartitionIdentifiers(Array.empty, InternalRow.empty).isEmpty
}

test("createPartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
partTable.createPartitions(
partIdents,
Array(new util.HashMap[String, String](), new util.HashMap[String, String]()))
assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty)
assert(hasPartitions(partTable))
assert(partTable.partitionExists(InternalRow.apply("3")))
assert(partTable.partitionExists(InternalRow.apply("4")))

partTable.dropPartition(InternalRow.apply("3"))
partTable.dropPartition(InternalRow.apply("4"))
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("createPartitions failed if partition already exists") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("4")
partTable.createPartition(partIdent, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty)
assert(hasPartitions(partTable))
assert(partTable.partitionExists(partIdent))

val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
Expand All @@ -85,42 +89,42 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
assert(!partTable.partitionExists(InternalRow.apply("3")))

partTable.dropPartition(partIdent)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("dropPartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
partTable.createPartitions(
partIdents,
Array(new util.HashMap[String, String](), new util.HashMap[String, String]()))
assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty)
assert(hasPartitions(partTable))
assert(partTable.partitionExists(InternalRow.apply("3")))
assert(partTable.partitionExists(InternalRow.apply("4")))

partTable.dropPartitions(partIdents)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("dropPartitions failed if partition not exists") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("4")
partTable.createPartition(partIdent, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1)
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 1)

val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
assert(!partTable.dropPartitions(partIdents))
assert(partTable.partitionExists(partIdent))

partTable.dropPartition(partIdent)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,97 +48,101 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
newCatalog
}

private def hasPartitions(table: SupportsPartitionManagement): Boolean = {
!table.listPartitionIdentifiers(Array.empty, InternalRow.empty).isEmpty
}

test("createPartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
partTable.createPartition(partIdent, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty)
assert(hasPartitions(partTable))
assert(partTable.partitionExists(partIdent))

partTable.dropPartition(partIdent)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("dropPartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
val partIdent1 = InternalRow.apply("4")
partTable.createPartition(partIdent, new util.HashMap[String, String]())
partTable.createPartition(partIdent1, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 2)
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2)

partTable.dropPartition(partIdent)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1)
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 1)
partTable.dropPartition(partIdent1)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("replacePartitionMetadata") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
partTable.createPartition(partIdent, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty)
assert(hasPartitions(partTable))
assert(partTable.partitionExists(partIdent))
assert(partTable.loadPartitionMetadata(partIdent).isEmpty)

partTable.replacePartitionMetadata(partIdent, Map("paramKey" -> "paramValue").asJava)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty)
assert(hasPartitions(partTable))
assert(partTable.partitionExists(partIdent))
assert(!partTable.loadPartitionMetadata(partIdent).isEmpty)
assert(partTable.loadPartitionMetadata(partIdent).get("paramKey") == "paramValue")

partTable.dropPartition(partIdent)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("loadPartitionMetadata") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
partTable.createPartition(partIdent, Map("paramKey" -> "paramValue").asJava)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).nonEmpty)
assert(hasPartitions(partTable))
assert(partTable.partitionExists(partIdent))
assert(!partTable.loadPartitionMetadata(partIdent).isEmpty)
assert(partTable.loadPartitionMetadata(partIdent).get("paramKey") == "paramValue")

partTable.dropPartition(partIdent)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("listPartitionIdentifiers") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
partTable.createPartition(partIdent, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1)
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 1)

val partIdent1 = InternalRow.apply("4")
partTable.createPartition(partIdent1, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 2)
assert(partTable.listPartitionIdentifiers(partIdent1).length == 1)
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2)
assert(partTable.listPartitionIdentifiers(Array("dt"), partIdent1).length == 1)

partTable.dropPartition(partIdent)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).length == 1)
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 1)
partTable.dropPartition(partIdent1)
assert(partTable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(!hasPartitions(partTable))
}

test("listPartitionByNames") {
Expand Down Expand Up @@ -170,15 +174,15 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
(Array("part0", "part1"), InternalRow(3, "xyz")) -> Set(),
(Array("part1"), InternalRow(3.14f)) -> Set()
).foreach { case ((names, idents), expected) =>
assert(partTable.listPartitionByNames(names, idents).toSet === expected)
assert(partTable.listPartitionIdentifiers(names, idents).toSet === expected)
}
// Check invalid parameters
Seq(
(Array("part0", "part1"), InternalRow(0)),
(Array("col0", "part1"), InternalRow(0, 1)),
(Array("wrong"), InternalRow("invalid"))
).foreach { case (names, idents) =>
intercept[AssertionError](partTable.listPartitionByNames(names, idents))
intercept[AssertionError](partTable.listPartitionIdentifiers(names, idents))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
catalog("testpart").asTableCatalog.loadTable(Identifier.of(Array("ns1", "ns2"), "tbl"))
assert(!partTable.asPartitionable.partitionExists(InternalRow.fromSeq(Seq(1))))
assert(!partTable.asPartitionable.partitionExists(InternalRow.fromSeq(Seq(2))))
assert(partTable.asPartitionable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(
partTable.asPartitionable.listPartitionIdentifiers(Array.empty, InternalRow.empty).isEmpty)
}
}

Expand All @@ -161,7 +162,8 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
spark.sql(s"ALTER TABLE $t DROP IF EXISTS PARTITION (id=1), PARTITION (id=2)")
assert(!partTable.asPartitionable.partitionExists(InternalRow.fromSeq(Seq(1))))
assert(!partTable.asPartitionable.partitionExists(InternalRow.fromSeq(Seq(2))))
assert(partTable.asPartitionable.listPartitionIdentifiers(InternalRow.empty).isEmpty)
assert(
partTable.asPartitionable.listPartitionIdentifiers(Array.empty, InternalRow.empty).isEmpty)
}
}

Expand Down

0 comments on commit 6fd148f

Please sign in to comment.