Skip to content

Commit

Permalink
Introduce a new file manifest for 'CONVERT TO DELTA' on c…
Browse files Browse the repository at this point in the history
…atalog table

Currently, when _spark_metadata log is missing, "CONVERT TO DELTA" always scans the whole directory recursively for data files. However, this is not necessary for some cases. When the source table is a catalog table, we can use metadata from catalog to prune out-of-date files.

This PR introduces a new manifest to fetch data files based on partition informations from catalog, and it is invoked for the scenario above. The feature is protected with a SQL conf, whose default is true

Unit test is added in this PR.

GitOrigin-RevId: 0eb87398bc65e95d54ebf32c1bec2d1c683dd327
  • Loading branch information
mingdai-db authored and tdas committed Aug 11, 2022
1 parent 27797ec commit ebff299
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,9 @@ class ParquetTable(
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_USE_METADATA_LOG) &&
FileStreamSink.hasMetadata(Seq(basePath), serializableConf.value, spark.sessionState.conf)) {
new MetadataLogFileManifest(spark, basePath)
} else if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_USE_CATALOG_PARTITIONS) &&
catalogTable.isDefined) {
new CatalogFileManifest(spark, basePath, catalogTable.get, serializableConf)
} else {
new ManualListingFileManifest(spark, basePath, serializableConf)
}
Expand Down Expand Up @@ -663,6 +666,46 @@ class ManualListingFileManifest(
override def close(): Unit = list.unpersist()
}

/** A file manifest generated through listing partition paths from Metastore catalog. */
class CatalogFileManifest(
spark: SparkSession,
override val basePath: String,
catalogTable: CatalogTable,
serializableConf: SerializableConfiguration)
extends ManualListingFileManifest(spark, basePath, serializableConf) {

private lazy val partitionList = {
if (catalogTable.partitionSchema.isEmpty) {
// Not a partitioned table.
Seq(basePath)
} else {
val partitions = spark.sessionState.catalog.listPartitions(catalogTable.identifier)
partitions.map { partition =>
partition.storage.locationUri.map(_.toString())
.getOrElse {
val partitionDir =
PartitionUtils.getPathFragment(partition.spec, catalogTable.partitionSchema)
basePath.stripSuffix("/") + "/" + partitionDir
}
}
}
}

override def doList(): Dataset[SerializableFileStatus] = {
import spark.implicits._
// Avoid the serialization of this CatalogFileManifest during distributed execution.
val conf = spark.sparkContext.broadcast(serializableConf)
val parallelism = spark.sessionState.conf.parallelPartitionDiscoveryParallelism
val allFiles = spark.sparkContext.parallelize(partitionList)
.repartition(math.min(parallelism, partitionList.length))
.mapPartitions { dirs =>
DeltaFileOperations
.localListDirs(conf.value.value, dirs.toSeq, recursive = false).filter(!_.isDir)
}
spark.createDataset(allFiles)
}
}

/** A file manifest generated from pre-existing parquet MetadataLog. */
class MetadataLogFileManifest(
spark: SparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_CONVERT_USE_CATALOG_PARTITIONS =
buildConf("convert.useCatalogPartitions")
.internal()
.doc(
""" When converting a catalog Parquet table, whether to use the partition information from
| the Metastore catalog and only commit files under the directories of active partitions.
|""".stripMargin)
.booleanConf
.createWithDefault(true)

val DELTA_CONVERT_USE_CATALOG_SCHEMA =
buildConf("convert.useCatalogSchema")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand All @@ -38,7 +39,6 @@ import org.apache.spark.util.Utils
* extend the `SharedSparkSession`, therefore we keep this utility class as bare-bones as possible.
*/
trait ConvertToDeltaTestUtils extends QueryTest { self: SQLTestUtils =>
import org.apache.spark.sql.functions._

protected def simpleDF = spark.range(100)
.withColumn("key1", col("id") % 2)
Expand Down Expand Up @@ -692,6 +692,19 @@ trait ConvertToDeltaSuiteBase extends ConvertToDeltaSuiteBaseCommons
*/
trait ConvertToDeltaHiveTableTests extends ConvertToDeltaTestUtils with SQLTestUtils {

// Test conversion with and without the new CatalogFileManifest.
protected def testCatalogFileManifest(testName: String)(block: (Boolean) => Unit): Unit = {
Seq(true, false).foreach { useCatalogFileManifest =>
test(s"$testName - $useCatalogFileManifest") {
withSQLConf(
DeltaSQLConf.DELTA_CONVERT_USE_CATALOG_PARTITIONS.key
-> useCatalogFileManifest.toString) {
block(useCatalogFileManifest)
}
}
}
}

protected def testCatalogSchema(testName: String)(testFn: (Boolean) => Unit): Unit = {
Seq(true, false).foreach {
useCatalogSchema =>
Expand Down Expand Up @@ -1136,6 +1149,51 @@ trait ConvertToDeltaHiveTableTests extends ConvertToDeltaTestUtils with SQLTestU
}
}

testCatalogFileManifest("convert partitioned parquet table with catalog partitions") {
useCatalogFileManifest => {
val tableName = "ppqtable"
withTable(tableName) {
simpleDF.write.partitionBy("key1").format("parquet").saveAsTable(tableName)
val path = getPathForTableName(tableName)

// Create an orphan partition
val df = spark.range(100, 200)
.withColumn("key1", lit(2))
.withColumn("key2", col("id") % 4 cast "String")

df.write.partitionBy("key1")
.format("parquet")
.mode("Append")
.save(path)

// The path should contains 3 partitions.
val partitionDirs = new File(path).listFiles().filter(_.isDirectory)
assert(partitionDirs.map(_.getName).sorted
.sameElements(Array("key1=0", "key1=1", "key1=2")))

// Catalog only contains 2 partitions.
assert(spark.sessionState.catalog
.listPartitions(TableIdentifier(tableName, Some("default"))).size == 2)

// Convert table to delta
convertToDelta(tableName)

// Verify that table is converted to delta
assert(spark.sessionState.catalog.getTableMetadata(
TableIdentifier(tableName, Some("default"))).provider.contains("delta"))

// Check data in the converted delta table.
if (useCatalogFileManifest) {
// Partition "key1=2" is pruned.
checkAnswer(sql(s"SELECT DISTINCT key1 from default.${tableName}"), spark.range(2).toDF())
} else {
// All partitions are preserved.
checkAnswer(sql(s"SELECT DISTINCT key1 from default.${tableName}"), spark.range(3).toDF())
}
}
}
}

test("external tables use correct path scheme") {
withTempDir { dir =>
withTable("externalTable") {
Expand Down

0 comments on commit ebff299

Please sign in to comment.