Skip to content

Commit

Permalink
[SPARK-34312][SQL] Support partition(s) truncation by `Supports(Atomi…
Browse files Browse the repository at this point in the history
…c)PartitionManagement`

### What changes were proposed in this pull request?
1. Add new method `truncatePartition()` to the `SupportsPartitionManagement` interface.
2. Add new method `truncatePartitions()` to the `SupportsAtomicPartitionManagement` interface.
3. Default implementation of new methods in `InMemoryPartitionTable`/`InMemoryAtomicPartitionTable`.

### Why are the changes needed?
This is the first step in supporting of v2 `TRUNCATE TABLE .. PARTITION`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running new tests:
```
$ build/sbt "test:testOnly *SupportsPartitionManagementSuite"
$ build/sbt "test:testOnly *SupportsAtomicPartitionManagementSuite"
```

Closes #31420 from MaxGekk/dsv2-truncate-table-partitions.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
MaxGekk authored and cloud-fan committed Feb 2, 2021
1 parent f024d30 commit 6d3674b
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
* ${@link #purgePartitions}:
* remove an array of partitions and any data they contain from the table by skipping
* a trash even if it is supported
* ${@link #truncatePartitions}:
* truncate an array of partitions by removing partitions data
*
* @since 3.1.0
*/
Expand Down Expand Up @@ -105,4 +107,22 @@ default boolean purgePartitions(InternalRow[] idents)
throws NoSuchPartitionException, UnsupportedOperationException {
throw new UnsupportedOperationException("Partition purge is not supported");
}

/**
* Truncate an array of partitions atomically from table, and completely remove partitions data.
* <p>
* If any partition doesn't exists,
* the operation of truncatePartitions need to be safely rolled back.
*
* @param idents an array of partition identifiers
* @return true if partitions were truncated successfully otherwise false
* @throws NoSuchPartitionException If any partition identifier to truncate doesn't exist
* @throws UnsupportedOperationException If partition truncate is not supported
*
* @since 3.2.0
*/
default boolean truncatePartitions(InternalRow[] idents)
throws NoSuchPartitionException, UnsupportedOperationException {
throw new UnsupportedOperationException("Partitions truncate is not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
* even if it is supported.
* ${@link #replacePartitionMetadata}:
* point a partition to a new location, which will swap one location's data for the other
* ${@link #truncatePartition}:
* remove partition data from the table
*
* @since 3.1.0
*/
Expand Down Expand Up @@ -158,4 +160,19 @@ default boolean renamePartition(InternalRow from, InternalRow to)
NoSuchPartitionException {
throw new UnsupportedOperationException("Partition renaming is not supported");
}

/**
* Truncate a partition in the table by completely removing partition data.
*
* @param ident a partition identifier
* @return true if the partition was truncated successfully otherwise false
* @throws NoSuchPartitionException If the partition identifier to alter doesn't exist
* @throws UnsupportedOperationException If partition truncation is not supported
*
* @since 3.2.0
*/
default boolean truncatePartition(InternalRow ident)
throws NoSuchPartitionException, UnsupportedOperationException {
throw new UnsupportedOperationException("Partition truncate is not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector
import java.util

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{PartitionAlreadyExistsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -75,4 +75,14 @@ class InMemoryAtomicPartitionTable (
}
idents.forall(dropPartition)
}

override def truncatePartitions(idents: Array[InternalRow]): Boolean = {
val nonExistent = idents.filterNot(partitionExists)
if (nonExistent.isEmpty) {
idents.foreach(truncatePartition)
true
} else {
throw new NoSuchPartitionException(name, nonExistent.head, partitionSchema)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,13 @@ class InMemoryPartitionTable(
renamePartitionKey(partitionSchema, from.toSeq(schema), to.toSeq(schema))
}
}

override def truncatePartition(ident: InternalRow): Boolean = {
if (memoryTablePartitions.containsKey(ident)) {
clearPartition(ident.toSeq(schema))
true
} else {
throw new NoSuchPartitionException(name, ident, partitionSchema)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ class InMemoryTable(
}
}

protected def clearPartition(key: Seq[Any]): Unit = dataMap.synchronized {
assert(dataMap.contains(key))
dataMap(key).clear()
}

def withData(data: Array[BufferedRows]): InMemoryTable = dataMap.synchronized {
data.foreach(_.rows.foreach { row =>
val key = getKey(row)
Expand Down Expand Up @@ -464,6 +469,8 @@ class BufferedRows(
rows.append(row)
this
}

def clear(): Unit = rows.clear()
}

private class BufferedRowsReaderFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.util

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
import org.apache.spark.sql.connector.{InMemoryAtomicPartitionTable, InMemoryTableCatalog}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException}
import org.apache.spark.sql.connector.{BufferedRows, InMemoryAtomicPartitionTable, InMemoryTableCatalog}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -141,4 +141,33 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
partTable.dropPartition(partIdent)
assert(!hasPartitions(partTable))
}

test("truncatePartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))

partTable.createPartitions(
Array(InternalRow("3"), InternalRow("4"), InternalRow("5")),
Array.tabulate(3)(_ => new util.HashMap[String, String]()))
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 3)

partTable.withData(Array(
new BufferedRows("3").withRow(InternalRow(0, "abc", "3")),
new BufferedRows("4").withRow(InternalRow(1, "def", "4")),
new BufferedRows("5").withRow(InternalRow(2, "zyx", "5"))
))

partTable.truncatePartitions(Array(InternalRow("3"), InternalRow("4")))
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 3)
assert(partTable.rows === InternalRow(2, "zyx", "5") :: Nil)

// Truncate non-existing partition
val errMsg = intercept[NoSuchPartitionException] {
partTable.truncatePartitions(Array(InternalRow("5"), InternalRow("6")))
}.getMessage
assert(errMsg.contains("Partition not found in table test.ns.test_table: 6 -> dt"))
assert(partTable.rows === InternalRow(2, "zyx", "5") :: Nil)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException}
import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog}
import org.apache.spark.sql.connector.{BufferedRows, InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -233,4 +233,26 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
assert(partTable.renamePartition(InternalRow(0, "abc"), newPart))
assert(partTable.partitionExists(newPart))
}

test("truncatePartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
val partIdent1 = InternalRow.apply("4")
partTable.createPartition(partIdent, new util.HashMap[String, String]())
partTable.createPartition(partIdent1, new util.HashMap[String, String]())
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2)

partTable.withData(Array(
new BufferedRows("3").withRow(InternalRow(0, "abc", "3")),
new BufferedRows("4").withRow(InternalRow(1, "def", "4"))
))

partTable.truncatePartition(partIdent)
assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2)
assert(partTable.rows === InternalRow(1, "def", "4") :: Nil)
}
}

0 comments on commit 6d3674b

Please sign in to comment.