Skip to content

Commit

Permalink
[Spark] Support UniForm to use Hive Metastore
Browse files Browse the repository at this point in the history
UniForm will use HMS as catalog instead of using file system.

Closes #2120

GitOrigin-RevId: f2d863c6e91e4d5d8c2e0f373f4c0c4ad9956fb6
  • Loading branch information
harperjiang authored and vkorukanti committed Oct 2, 2023
1 parent 789ea30 commit 01fee68
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 108 deletions.
Expand Up @@ -31,6 +31,8 @@ import org.apache.hadoop.conf.Configuration
import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction}
import shadedForDelta.org.apache.iceberg.hadoop.HadoopTables

import org.apache.spark.sql.catalyst.catalog.CatalogTable

sealed trait IcebergTableOp
case object CREATE_TABLE extends IcebergTableOp
case object WRITE_TABLE extends IcebergTableOp
Expand All @@ -46,6 +48,7 @@ case object REPLACE_TABLE extends IcebergTableOp
* @param tableOp How to instantiate the underlying Iceberg table. Defaults to WRITE_TABLE.
*/
class IcebergConversionTransaction(
protected val catalogTable: CatalogTable,
protected val conf: Configuration,
protected val postCommitSnapshot: Snapshot,
protected val tableOp: IcebergTableOp = WRITE_TABLE,
Expand Down Expand Up @@ -316,16 +319,19 @@ class IcebergConversionTransaction(
///////////////////////

protected def createIcebergTxn(): IcebergTransaction = {
val hadoopTables = new HadoopTables(conf)
val tableExists = hadoopTables.exists(tablePath.toString)
val hiveCatalog = IcebergTransactionUtils.createHiveCatalog(conf)
val icebergTableId = IcebergTransactionUtils
.convertSparkTableIdentifierToIcebergHive(catalogTable.identifier)

val tableExists = hiveCatalog.tableExists(icebergTableId)

def tableBuilder = {
val properties = getIcebergPropertiesFromDeltaProperties(
postCommitSnapshot.metadata.configuration
)

hadoopTables
.buildTable(tablePath.toString, icebergSchema)
hiveCatalog
.buildTable(icebergTableId, icebergSchema)
.withPartitionSpec(partitionSpec)
.withProperties(properties.asJava)
}
Expand All @@ -334,7 +340,7 @@ class IcebergConversionTransaction(
case WRITE_TABLE =>
if (tableExists) {
recordFrameProfile("IcebergConversionTransaction", "loadTable") {
hadoopTables.load(tablePath.toString).newTransaction()
hiveCatalog.loadTable(icebergTableId).newTransaction()
}
} else {
throw new IllegalStateException(s"Cannot write to table $tablePath. Table doesn't exist.")
Expand Down
Expand Up @@ -23,16 +23,17 @@ import scala.collection.JavaConverters._
import scala.util.control.Breaks._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaFileNotFoundException, DeltaFileProviderUtils, OptimisticTransactionImpl, Snapshot, UniversalFormatConverter}
import org.apache.spark.sql.delta.{DeltaFileNotFoundException, DeltaFileProviderUtils, OptimisticTransactionImpl, Snapshot, UniversalFormat, UniversalFormatConverter}
import org.apache.spark.sql.delta.actions.{Action, AddFile, CommitInfo, RemoveFile}
import org.apache.spark.sql.delta.hooks.IcebergConverterHook
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.fs.Path
import shadedForDelta.org.apache.iceberg.hadoop.HadoopTables
import shadedForDelta.org.apache.iceberg.hive.{HiveCatalog, HiveTableOperations}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable

object IcebergConverter {

Expand Down Expand Up @@ -60,9 +61,9 @@ class IcebergConverter(spark: SparkSession)
// Save an atomic reference of the snapshot being converted, and the txn that triggered
// resulted in the specified snapshot
protected val currentConversion =
new AtomicReference[(Snapshot, Option[OptimisticTransactionImpl])]()
new AtomicReference[(Snapshot, OptimisticTransactionImpl)]()
protected val standbyConversion =
new AtomicReference[(Snapshot, Option[OptimisticTransactionImpl])]()
new AtomicReference[(Snapshot, OptimisticTransactionImpl)]()

// Whether our async converter thread is active. We may already have an alive thread that is
// about to shutdown, but in such cases this value should return false.
Expand All @@ -81,7 +82,10 @@ class IcebergConverter(spark: SparkSession)
*/
override def enqueueSnapshotForConversion(
snapshotToConvert: Snapshot,
txn: Option[OptimisticTransactionImpl]): Unit = {
txn: OptimisticTransactionImpl): Unit = {
if (!UniversalFormat.icebergEnabled(snapshotToConvert.metadata)) {
return
}
val log = snapshotToConvert.deltaLog
// Replace any previously queued snapshot
val previouslyQueued = standbyConversion.getAndSet((snapshotToConvert, txn))
Expand Down Expand Up @@ -126,7 +130,7 @@ class IcebergConverter(spark: SparkSession)
}

// Get a snapshot to convert from the icebergQueue. Sets the queue to null after.
private def getNextSnapshot: (Snapshot, Option[OptimisticTransactionImpl]) =
private def getNextSnapshot: (Snapshot, OptimisticTransactionImpl) =
asyncThreadLock.synchronized {
val potentialSnapshotAndTxn = standbyConversion.get()
currentConversion.set(potentialSnapshotAndTxn)
Expand Down Expand Up @@ -155,21 +159,66 @@ class IcebergConverter(spark: SparkSession)
}
}

/**
* Convert the specified snapshot into Iceberg for the given catalogTable
* @param snapshotToConvert the snapshot that needs to be converted to Iceberg
* @param catalogTable the catalogTable this conversion targets.
* @return Converted Delta version and commit timestamp
*/
override def convertSnapshot(
snapshotToConvert: Snapshot, catalogTable: CatalogTable): Option[(Long, Long)] = {
if (!UniversalFormat.icebergEnabled(snapshotToConvert.metadata)) {
return None
}
convertSnapshot(snapshotToConvert, None, catalogTable)
}

/**
* Convert the specified snapshot into Iceberg when performing an OptimisticTransaction
* on a delta table.
* @param snapshotToConvert the snapshot that needs to be converted to Iceberg
* @param txn the transaction that triggers the conversion. It must
* contain the catalogTable this conversion targets.
* @return Converted Delta version and commit timestamp
*/
override def convertSnapshot(
snapshotToConvert: Snapshot, txn: OptimisticTransactionImpl): Option[(Long, Long)] = {
if (!UniversalFormat.icebergEnabled(snapshotToConvert.metadata)) {
return None
}
txn.catalogTable match {
case Some(table) => convertSnapshot(snapshotToConvert, Some(txn), table)
case _ =>
logWarning(s"CatalogTable for table ${snapshotToConvert.deltaLog.tableId} " +
s"is empty in txn. Skip iceberg conversion.")
recordDeltaEvent(
snapshotToConvert.deltaLog,
"delta.iceberg.conversion.skipped.emptyCatalogTable",
data = Map(
"version" -> snapshotToConvert.version
)
)
None
}
}

/**
* Convert the specified snapshot into Iceberg. NOTE: This operation is blocking. Call
* enqueueSnapshotForConversion to run the operation asynchronously.
* @param snapshotToConvert the snapshot that needs to be converted to Iceberg
* @param txnOpt the OptimisticTransaction that created snapshotToConvert.
* Used as a hint to avoid recomputing old metadata.
* @param catalogTable the catalogTable this conversion targets
* @return Converted Delta version and commit timestamp
*/
override def convertSnapshot(
private def convertSnapshot(
snapshotToConvert: Snapshot,
txnOpt: Option[OptimisticTransactionImpl]): Option[(Long, Long)] =
txnOpt: Option[OptimisticTransactionImpl],
catalogTable: CatalogTable): Option[(Long, Long)] =
recordFrameProfile("Delta", "IcebergConverter.convertSnapshot") {
val log = snapshotToConvert.deltaLog
val lastDeltaVersionConverted: Option[Long] =
loadLastDeltaVersionConverted(snapshotToConvert)
loadLastDeltaVersionConverted(snapshotToConvert, catalogTable)
val maxCommitsToConvert =
spark.sessionState.conf.getConf(DeltaSQLConf.ICEBERG_MAX_COMMITS_TO_CONVERT)

Expand Down Expand Up @@ -202,8 +251,14 @@ class IcebergConverter(spark: SparkSession)
case (Some(_), None) => REPLACE_TABLE
case (None, None) => CREATE_TABLE
}

UniversalFormat.enforceSupportInCatalog(catalogTable, snapshotToConvert.metadata) match {
case Some(updatedTable) => spark.sessionState.catalog.alterTable(updatedTable)
case _ =>
}

val icebergTxn = new IcebergConversionTransaction(
log.newDeltaHadoopConf(), snapshotToConvert, tableOp, lastDeltaVersionConverted)
catalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp, lastDeltaVersionConverted)

// Write out the actions taken since the last conversion (or since table creation).
// This is done in batches, with each batch corresponding either to one delta file,
Expand Down Expand Up @@ -268,18 +323,10 @@ class IcebergConverter(spark: SparkSession)
Some(snapshotToConvert.version, snapshotToConvert.timestamp)
}

override def loadLastDeltaVersionConverted(snapshot: Snapshot): Option[Long] =
override def loadLastDeltaVersionConverted(
snapshot: Snapshot, catalogTable: CatalogTable): Option[Long] =
recordFrameProfile("Delta", "IcebergConverter.loadLastDeltaVersionConverted") {
val deltaLog = snapshot.deltaLog
val hadoopTables = new HadoopTables(deltaLog.newDeltaHadoopConf())
if (hadoopTables.exists(deltaLog.dataPath.toString)) {
hadoopTables
.load(deltaLog.dataPath.toString)
.properties()
.asScala
.get(IcebergConverter.DELTA_VERSION_PROPERTY)
.map(_.toLong)
} else None
catalogTable.properties.get(IcebergConverter.DELTA_VERSION_PROPERTY).map(_.toLong)
}

/**
Expand Down
Expand Up @@ -22,11 +22,16 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaConfigs, DeltaLog}
import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import shadedForDelta.org.apache.iceberg.{DataFile, DataFiles, FileFormat, PartitionSpec, Schema => IcebergSchema}
// scalastyle:off import.ordering.noEmptyLine
import shadedForDelta.org.apache.iceberg.catalog.{Namespace, TableIdentifier => IcebergTableIdentifier}
// scalastyle:on import.ordering.noEmptyLine
import shadedForDelta.org.apache.iceberg.hive.HiveCatalog

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier => SparkTableIdentifier}
import org.apache.spark.sql.types.StructType

object IcebergTransactionUtils
extends DeltaLogging
Expand Down Expand Up @@ -172,4 +177,30 @@ object IcebergTransactionUtils

builder
}

/**
* Create an Iceberg HiveCatalog
* @param conf: Hadoop Configuration
* @return
*/
def createHiveCatalog(conf : Configuration) : HiveCatalog = {
val catalog = new HiveCatalog()
catalog.setConf(conf)
catalog.initialize("spark_catalog", Map.empty[String, String].asJava)
catalog
}

/**
* Encode Spark table identifier to Iceberg table identifier by putting
* only "database" to the "namespace" in Iceberg table identifier.
* See [[HiveCatalog.isValidateNamespace]]
*/
def convertSparkTableIdentifierToIcebergHive(
identifier: SparkTableIdentifier): IcebergTableIdentifier = {
val namespace = (identifier.database) match {
case Some(database) => Namespace.of(database)
case _ => Namespace.empty()
}
IcebergTableIdentifier.of(namespace, identifier.table)
}
}

0 comments on commit 01fee68

Please sign in to comment.