-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hudi uniform support #2333
Hudi uniform support #2333
Changes from 6 commits
992fc5e
a965d49
1de8899
9747207
a4f3e75
2f0419d
375243a
74e01fa
070d043
8c0d1e0
b9a829a
ca9178e
b945ff1
88b8f97
23f3088
e692ab0
80a00a9
88e660e
a4e6bb2
7737272
8cda724
8cd2fbd
78e8652
d43028a
59d83a0
eea6a7b
e4e9e83
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,363 @@ | ||||||
/* | ||||||
* Copyright (2021) The Delta Lake Project Authors. | ||||||
* | ||||||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
* you may not use this file except in compliance with the License. | ||||||
* You may obtain a copy of the License at | ||||||
* | ||||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||||
* | ||||||
* Unless required by applicable law or agreed to in writing, software | ||||||
* distributed under the License is distributed on an "AS IS" BASIS, | ||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
* See the License for the specific language governing permissions and | ||||||
* limitations under the License. | ||||||
*/ | ||||||
|
||||||
package org.apache.spark.sql.delta.icebergShaded | ||||||
|
||||||
import org.apache.commons.lang3.exception.ExceptionUtils | ||||||
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata} | ||||||
import org.apache.hudi.common.table.HoodieTableMetaClient | ||||||
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} | ||||||
import org.apache.spark.sql.SparkSession | ||||||
import org.apache.spark.sql.catalyst.catalog.CatalogTable | ||||||
import org.apache.spark.sql.delta.actions.Action | ||||||
import org.apache.spark.sql.delta.hooks.HudiConverterHook | ||||||
import org.apache.spark.sql.delta.icebergShaded.HudiTransactionUtils._ | ||||||
import org.apache.spark.sql.delta.metering.DeltaLogging | ||||||
import org.apache.spark.sql.delta.sources.DeltaSQLConf | ||||||
import org.apache.spark.sql.delta._ | ||||||
|
||||||
import java.io.{IOException, UncheckedIOException} | ||||||
import java.util.concurrent.atomic.AtomicReference | ||||||
import javax.annotation.concurrent.GuardedBy | ||||||
import scala.collection.JavaConverters._ | ||||||
import scala.util.control.NonFatal | ||||||
|
||||||
object HudiConverter { | ||||||
|
||||||
|
||||||
/** | ||||||
* Property to be set in translated Iceberg metadata files. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated |
||||||
* Indicates the delta commit version # that it corresponds to. | ||||||
*/ | ||||||
val DELTA_VERSION_PROPERTY = "delta-version" | ||||||
|
||||||
/** | ||||||
* Property to be set in translated Iceberg metadata files. | ||||||
the-other-tim-brown marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
* Indicates the timestamp (milliseconds) of the delta commit that it corresponds to. | ||||||
*/ | ||||||
val DELTA_TIMESTAMP_PROPERTY = "delta-timestamp" | ||||||
|
||||||
} | ||||||
|
||||||
/** | ||||||
* This class manages the transformation of delta snapshots into their Iceberg equivalent. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
*/ | ||||||
class HudiConverter(spark: SparkSession) | ||||||
extends UniversalFormatConverter(spark) | ||||||
with DeltaLogging { | ||||||
|
||||||
// Save an atomic reference of the snapshot being converted, and the txn that triggered | ||||||
// resulted in the specified snapshot | ||||||
protected val currentConversion = | ||||||
new AtomicReference[(Snapshot, OptimisticTransactionImpl)]() | ||||||
protected val standbyConversion = | ||||||
new AtomicReference[(Snapshot, OptimisticTransactionImpl)]() | ||||||
|
||||||
// Whether our async converter thread is active. We may already have an alive thread that is | ||||||
// about to shutdown, but in such cases this value should return false. | ||||||
@GuardedBy("asyncThreadLock") | ||||||
private var asyncConverterThreadActive: Boolean = false | ||||||
private val asyncThreadLock = new Object | ||||||
|
||||||
/** | ||||||
* Enqueue the specified snapshot to be converted to Hudi. This will start an async | ||||||
* job to run the conversion, unless there already is an async conversion running for | ||||||
* this table. In that case, it will queue up the provided snapshot to be run after | ||||||
* the existing job completes. | ||||||
* Note that if there is another snapshot already queued, the previous snapshot will get | ||||||
* removed from the wait queue. Only one snapshot is queued at any point of time. | ||||||
* | ||||||
*/ | ||||||
override def enqueueSnapshotForConversion( | ||||||
snapshotToConvert: Snapshot, | ||||||
txn: OptimisticTransactionImpl): Unit = { | ||||||
if (!UniversalFormat.hudiEnabled(snapshotToConvert.metadata)) { | ||||||
return | ||||||
} | ||||||
val log = snapshotToConvert.deltaLog | ||||||
// Replace any previously queued snapshot | ||||||
val previouslyQueued = standbyConversion.getAndSet((snapshotToConvert, txn)) | ||||||
asyncThreadLock.synchronized { | ||||||
if (!asyncConverterThreadActive) { | ||||||
val threadName = HudiConverterHook.ASYNC_HUDI_CONVERTER_THREAD_NAME + | ||||||
s" [id=${snapshotToConvert.metadata.id}]" | ||||||
val asyncConverterThread: Thread = new Thread(threadName) { | ||||||
setDaemon(true) | ||||||
|
||||||
override def run(): Unit = | ||||||
try { | ||||||
var snapshotAndTxn = getNextSnapshot | ||||||
while (snapshotAndTxn != null) { | ||||||
val snapshotVal = snapshotAndTxn._1 | ||||||
val prevTxn = snapshotAndTxn._2 | ||||||
try { | ||||||
logInfo(s"Converting Delta table [path=${log.logPath}, " + | ||||||
s"tableId=${log.tableId}, version=${snapshotVal.version}] into Iceberg") | ||||||
convertSnapshot(snapshotVal, prevTxn) | ||||||
} catch { | ||||||
case NonFatal(e) => | ||||||
logWarning(s"Error when writing Iceberg metadata asynchronously", e) | ||||||
recordDeltaEvent( | ||||||
log, | ||||||
"delta.hudi.conversion.async.error", | ||||||
data = Map( | ||||||
"exception" -> ExceptionUtils.getMessage(e), | ||||||
"stackTrace" -> ExceptionUtils.getStackTrace(e) | ||||||
) | ||||||
) | ||||||
} | ||||||
currentConversion.set(null) | ||||||
// Pick next snapshot to convert if there's a new one | ||||||
snapshotAndTxn = getNextSnapshot | ||||||
} | ||||||
} finally { | ||||||
// shuttingdown thread | ||||||
asyncThreadLock.synchronized { | ||||||
asyncConverterThreadActive = false | ||||||
} | ||||||
} | ||||||
|
||||||
// Get a snapshot to convert from the hudiQueue. Sets the queue to null after. | ||||||
private def getNextSnapshot: (Snapshot, OptimisticTransactionImpl) = | ||||||
asyncThreadLock.synchronized { | ||||||
val potentialSnapshotAndTxn = standbyConversion.get() | ||||||
currentConversion.set(potentialSnapshotAndTxn) | ||||||
standbyConversion.compareAndSet(potentialSnapshotAndTxn, null) | ||||||
if (potentialSnapshotAndTxn == null) { | ||||||
asyncConverterThreadActive = false | ||||||
} | ||||||
potentialSnapshotAndTxn | ||||||
} | ||||||
} | ||||||
asyncConverterThread.start() | ||||||
asyncConverterThreadActive = true | ||||||
} | ||||||
} | ||||||
|
||||||
// If there already was a snapshot waiting to be converted, log that snapshot info. | ||||||
if (previouslyQueued != null) { | ||||||
recordDeltaEvent( | ||||||
snapshotToConvert.deltaLog, | ||||||
"delta.hudi.conversion.async.backlog", | ||||||
data = Map( | ||||||
"newVersion" -> snapshotToConvert.version, | ||||||
"replacedVersion" -> previouslyQueued._1.version) | ||||||
) | ||||||
} | ||||||
} | ||||||
|
||||||
/** | ||||||
* Convert the specified snapshot into Hudi for the given catalogTable | ||||||
* @param snapshotToConvert the snapshot that needs to be converted to Hudi | ||||||
* @param catalogTable the catalogTable this conversion targets. | ||||||
* @return Converted Delta version and commit timestamp | ||||||
*/ | ||||||
override def convertSnapshot( | ||||||
snapshotToConvert: Snapshot, catalogTable: CatalogTable): Option[(Long, Long)] = { | ||||||
if (!UniversalFormat.hudiEnabled(snapshotToConvert.metadata)) { | ||||||
return None | ||||||
} | ||||||
convertSnapshot(snapshotToConvert, None, catalogTable) | ||||||
} | ||||||
|
||||||
/** | ||||||
* Convert the specified snapshot into Hudi when performing an OptimisticTransaction | ||||||
* on a delta table. | ||||||
* @param snapshotToConvert the snapshot that needs to be converted to Hudi | ||||||
* @param txn the transaction that triggers the conversion. It must | ||||||
* contain the catalogTable this conversion targets. | ||||||
* @return Converted Delta version and commit timestamp | ||||||
*/ | ||||||
override def convertSnapshot( | ||||||
snapshotToConvert: Snapshot, txn: OptimisticTransactionImpl): Option[(Long, Long)] = { | ||||||
if (!UniversalFormat.hudiEnabled(snapshotToConvert.metadata)) { | ||||||
return None | ||||||
} | ||||||
txn.catalogTable match { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Iceberg almost always require a catalog so we had this requirement. If thats not true for Hudi, I would suggest to release this contraint, otherwise it wont work if we access/write to the Delta table by path. However on the other hand I get the fact that we need to pass in the tableName to Hudi table creator. Can we add a TODO here saying requiring a catalogTable is a constraint that we can release in the future ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't require a catalog for Hudi. I just figured if there is a name present we can use it when creating the table. |
||||||
case Some(table) => convertSnapshot(snapshotToConvert, Some(txn), table) | ||||||
case _ => | ||||||
logWarning(s"CatalogTable for table ${snapshotToConvert.deltaLog.tableId} " + | ||||||
s"is empty in txn. Skip hudi conversion.") | ||||||
recordDeltaEvent( | ||||||
snapshotToConvert.deltaLog, | ||||||
"delta.hudi.conversion.skipped.emptyCatalogTable", | ||||||
data = Map( | ||||||
"version" -> snapshotToConvert.version | ||||||
) | ||||||
) | ||||||
None | ||||||
} | ||||||
} | ||||||
|
||||||
/** | ||||||
* Convert the specified snapshot into Hudi. NOTE: This operation is blocking. Call | ||||||
* enqueueSnapshotForConversion to run the operation asynchronously. | ||||||
* @param snapshotToConvert the snapshot that needs to be converted to Hudi | ||||||
* @param txnOpt the OptimisticTransaction that created snapshotToConvert. | ||||||
* Used as a hint to avoid recomputing old metadata. | ||||||
* @param catalogTable the catalogTable this conversion targets | ||||||
* @return Converted Delta version and commit timestamp | ||||||
*/ | ||||||
private def convertSnapshot( | ||||||
snapshotToConvert: Snapshot, | ||||||
txnOpt: Option[OptimisticTransactionImpl], | ||||||
catalogTable: CatalogTable): Option[(Long, Long)] = | ||||||
recordFrameProfile("Delta", "HudiConverter.convertSnapshot") { | ||||||
val log = snapshotToConvert.deltaLog | ||||||
val metaClient = loadTableMetaClient(snapshotToConvert.deltaLog.dataPath.toString, | ||||||
catalogTable.identifier.table, snapshotToConvert.metadata.partitionColumns, | ||||||
log.newDeltaHadoopConf()) | ||||||
val lastDeltaVersionConverted: Option[Long] = loadLastDeltaVersionConverted(metaClient) | ||||||
val maxCommitsToConvert = | ||||||
spark.sessionState.conf.getConf(DeltaSQLConf.ICEBERG_MAX_COMMITS_TO_CONVERT) | ||||||
|
||||||
// Nth to convert | ||||||
if (lastDeltaVersionConverted.exists(_ == snapshotToConvert.version)) { | ||||||
return None | ||||||
} | ||||||
|
||||||
// Get the most recently converted delta snapshot, if applicable | ||||||
val prevConvertedSnapshotOpt = (lastDeltaVersionConverted, txnOpt) match { | ||||||
case (Some(version), Some(txn)) if version == txn.snapshot.version => | ||||||
Some(txn.snapshot) | ||||||
// Check how long it has been since we last converted to Hudi. If outside the threshold, | ||||||
// fall back to state reconstruction to get the actions, to protect driver from OOMing. | ||||||
case (Some(version), _) if snapshotToConvert.version - version <= maxCommitsToConvert => | ||||||
try { | ||||||
// TODO: We can optimize this by providing a checkpointHint to getSnapshotAt. Check if | ||||||
// txn.snapshot.version < version. If true, use txn.snapshot's checkpoint as a hint. | ||||||
Some(log.getSnapshotAt(version)) | ||||||
} catch { | ||||||
// If we can't load the file since the last time Hudi was converted, it's likely that | ||||||
// the commit file expired. Treat this like a new Hudi table conversion. | ||||||
case _: DeltaFileNotFoundException => None | ||||||
} | ||||||
case (_, _) => None | ||||||
} | ||||||
|
||||||
val tableOp = (lastDeltaVersionConverted, prevConvertedSnapshotOpt) match { | ||||||
case (Some(_), Some(_)) => WRITE_TABLE | ||||||
case (Some(_), None) => REPLACE_TABLE | ||||||
case (None, None) => CREATE_TABLE | ||||||
} | ||||||
|
||||||
UniversalFormat.enforceSupportInCatalog(catalogTable, snapshotToConvert.metadata) match { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz remove this part for now... this is to specifically update HMS with Iceberg props. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed |
||||||
case Some(updatedTable) => spark.sessionState.catalog.alterTable(updatedTable) | ||||||
case _ => | ||||||
} | ||||||
|
||||||
val hudiTxn = new HudiConversionTransaction(log.newDeltaHadoopConf(), | ||||||
snapshotToConvert, tableOp, metaClient, lastDeltaVersionConverted) | ||||||
|
||||||
// Write out the actions taken since the last conversion (or since table creation). | ||||||
// This is done in batches, with each batch corresponding either to one delta file, | ||||||
// or to the specified batch size. | ||||||
val actionBatchSize = | ||||||
spark.sessionState.conf.getConf(DeltaSQLConf.ICEBERG_MAX_ACTIONS_TO_CONVERT) | ||||||
prevConvertedSnapshotOpt match { | ||||||
case Some(prevSnapshot) => | ||||||
// Read the actions directly from the delta json files. | ||||||
// TODO: Run this as a spark job on executors | ||||||
val deltaFiles = DeltaFileProviderUtils.getDeltaFilesInVersionRange( | ||||||
spark, log, prevSnapshot.version + 1, snapshotToConvert.version) | ||||||
|
||||||
recordDeltaEvent( | ||||||
snapshotToConvert.deltaLog, | ||||||
"delta.hudi.conversion.deltaCommitRange", | ||||||
data = Map( | ||||||
"fromVersion" -> (prevSnapshot.version + 1), | ||||||
"toVersion" -> snapshotToConvert.version, | ||||||
"numDeltaFiles" -> deltaFiles.length | ||||||
) | ||||||
) | ||||||
|
||||||
val actionsToConvert = DeltaFileProviderUtils.parallelReadAndParseDeltaFilesAsIterator( | ||||||
log, spark, deltaFiles) | ||||||
actionsToConvert.foreach { actionsIter => | ||||||
try { | ||||||
actionsIter.grouped(actionBatchSize).foreach { actionStrs => | ||||||
runHudiConversionForActions( | ||||||
hudiTxn, | ||||||
actionStrs.map(Action.fromJson)) | ||||||
} | ||||||
} finally { | ||||||
actionsIter.close() | ||||||
} | ||||||
} | ||||||
|
||||||
// If we don't have a snapshot of the last converted version, get all the table addFiles | ||||||
// (via state reconstruction). | ||||||
case None => | ||||||
val actionsToConvert = snapshotToConvert.allFiles.toLocalIterator().asScala | ||||||
|
||||||
recordDeltaEvent( | ||||||
snapshotToConvert.deltaLog, | ||||||
"delta.hudi.conversion.batch", | ||||||
data = Map( | ||||||
"version" -> snapshotToConvert.version, | ||||||
"numDeltaFiles" -> snapshotToConvert.numOfFiles | ||||||
) | ||||||
) | ||||||
|
||||||
actionsToConvert.grouped(actionBatchSize) | ||||||
.foreach { actions => | ||||||
runHudiConversionForActions(hudiTxn, actions) | ||||||
} | ||||||
} | ||||||
hudiTxn.commit() | ||||||
Some(snapshotToConvert.version, snapshotToConvert.timestamp) | ||||||
} | ||||||
|
||||||
def loadLastDeltaVersionConverted(snapshot: Snapshot, table: CatalogTable): Option[Long] = { | ||||||
val metaClient = loadTableMetaClient(snapshot.deltaLog.dataPath.toString, | ||||||
table.identifier.table, snapshot.metadata.partitionColumns, | ||||||
snapshot.deltaLog.newDeltaHadoopConf()) | ||||||
loadLastDeltaVersionConverted(metaClient) | ||||||
} | ||||||
|
||||||
private def loadLastDeltaVersionConverted(metaClient: HoodieTableMetaClient): Option[Long] = { | ||||||
val lastCompletedCommit = metaClient.getCommitsTimeline.filterCompletedInstants.lastInstant | ||||||
if (!lastCompletedCommit.isPresent) { | ||||||
return None | ||||||
} | ||||||
val extraMetadata = parseCommitExtraMetadata(lastCompletedCommit.get(), metaClient) | ||||||
extraMetadata.get(HudiConverter.DELTA_VERSION_PROPERTY).map(_.toLong) | ||||||
} | ||||||
|
||||||
private def parseCommitExtraMetadata(instant: HoodieInstant, | ||||||
metaClient: HoodieTableMetaClient): Map[String, String] = { | ||||||
try { | ||||||
if (instant.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) { | ||||||
HoodieReplaceCommitMetadata.fromBytes( | ||||||
metaClient.getActiveTimeline.getInstantDetails(instant).get, | ||||||
classOf[HoodieReplaceCommitMetadata]).getExtraMetadata.asScala.toMap | ||||||
} else { | ||||||
HoodieCommitMetadata.fromBytes( | ||||||
metaClient.getActiveTimeline.getInstantDetails(instant).get, | ||||||
classOf[HoodieCommitMetadata]).getExtraMetadata.asScala.toMap | ||||||
} | ||||||
} catch { | ||||||
case ex: IOException => | ||||||
throw new UncheckedIOException("Unable to read Hudi commit metadata", ex) | ||||||
} | ||||||
} | ||||||
|
||||||
private[delta] def runHudiConversionForActions( | ||||||
hudiTxn: HudiConversionTransaction, | ||||||
actionsToCommit: Seq[Action]): Unit = { | ||||||
hudiTxn.setCommitFileUpdates(actionsToCommit) | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed