Skip to content

Commit

Permalink
[DELTA-OSS-EXTERNAL] Added the SymlinkTextInputFormat manifest genera…
Browse files Browse the repository at this point in the history
…tion for Presto/Athena support

## What changes were proposed in this pull request?

This PR is the first in the sequence of PRs to add manifest file generation (SymlinkInputFormat) to OSS Delta for Presto/Athena read support (issue #76). Specifically, this PR adds the core functionality for manifest generation and rigorous tests to verify the contents of the manifest. Future PRs will add the public APIs for on-demand generation.

- Added post-commit hooks to run tasks after a successful commit.

- Added GenerateSymlinkManifest implementation of post-commit hook to generate the manifests.
  - Each manifest contains the name of data files to read for querying the whole table or partition
  - Non-partitioned table produces a single manifest file containing all the data files.
  - Partitioned table produces partitioned manifest files; same partition structured like the table, each partition directory containing one manifest file containing data files of that partition. This allows Presto/Athena partition-pruned queries to read only manifest files of the necessary partitions.
  - Each attempt to generate manifest will atomically (as much as possible) overwrite the manifest files in the directories (if they exist) and also delete manifest files of partitions that have been deleted from the table.

Closes #250

Co-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Co-authored-by: Rahul Mahadev <rahul.mahadev@databricks.com>

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Rahul Mahadev <rahul.mahadev@databricks.com>

#6910 is resolved by tdas/SC-25511.

GitOrigin-RevId: a3e04f2fcdafb6ac29c3adcfb791a3d0611583dc
  • Loading branch information
tdas authored and mukulmurthy committed Nov 18, 2019
1 parent 48f5185 commit b18ffba
Show file tree
Hide file tree
Showing 6 changed files with 920 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -355,4 +355,11 @@ object DeltaConfigs extends DeltaLogging {
a => a >= -1,
"needs to be larger than or equal to -1.")

val SYMLINK_FORMAT_MANIFEST_ENABLED = buildConfig[Boolean](
s"${hooks.GenerateSymlinkManifest.CONFIG_NAME_ROOT}.enabled",
"false",
_.toBoolean,
_ => true,
"needs to be a boolean.")

}
14 changes: 14 additions & 0 deletions src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.FileNotFoundException
import java.util.ConcurrentModificationException

import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata}
import org.apache.spark.sql.delta.hooks.PostCommitHook
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{Invariant, InvariantViolationException}
import org.apache.spark.sql.delta.util.JsonUtils
Expand Down Expand Up @@ -673,6 +674,19 @@ object DeltaErrors
def describeViewHistory: Throwable = {
new AnalysisException("Cannot describe the history of a view.")
}

def postCommitHookFailedException(
failedHook: PostCommitHook,
failedOnCommitVersion: Long,
extraErrorMessage: String,
error: Throwable): Throwable = {
var errorMessage = s"Committing to the Delta table version $failedOnCommitVersion succeeded" +
s" but error while executing post-commit hook ${failedHook.name}"
if (extraErrorMessage != null && extraErrorMessage.nonEmpty) {
errorMessage += s": $extraErrorMessage"
}
new RuntimeException(errorMessage, error)
}
}

/** The basic class for all Tahoe commit conflict exceptions. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.util.control.NonFatal
import com.databricks.spark.util.TagDefinitions.TAG_LOG_STORE_CLASS
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.hooks.{GenerateSymlinkManifest, PostCommitHook}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -172,6 +173,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite {
snapshot.metadata
}

protected val postCommitHooks = new ArrayBuffer[PostCommitHook]()

/** Returns the metadata at the current point in the log. */
def metadata: Metadata = newMetadata.getOrElse(snapshotMetadata)

Expand Down Expand Up @@ -261,6 +264,12 @@ trait OptimisticTransactionImpl extends TransactionalWrite {
finalActions = commitInfo +: finalActions
}

// Register post-commit hooks if any
lazy val hasFileActions = finalActions.collect { case f: FileAction => f }.nonEmpty
if (DeltaConfigs.SYMLINK_FORMAT_MANIFEST_ENABLED.fromMetaData(metadata) && hasFileActions) {
registerPostCommitHook(GenerateSymlinkManifest)
}

val commitVersion = doCommit(snapshot.version + 1, finalActions, 0)
logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}")
postCommit(commitVersion, finalActions)
Expand All @@ -274,6 +283,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite {
deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e)))
throw e
}

runPostCommitHooks(version, actions)

version
}

Expand Down Expand Up @@ -451,4 +463,42 @@ trait OptimisticTransactionImpl extends TransactionalWrite {
logInfo(s"No logical conflicts with deltas [$checkVersion, $nextAttempt), retrying.")
doCommit(nextAttempt, actions, attemptNumber + 1)
}

/** Register a hook that will be executed once a commit is successful. */
def registerPostCommitHook(hook: PostCommitHook): Unit = {
if (!postCommitHooks.contains(hook)) {
postCommitHooks.append(hook)
}
}

/** Executes the registered post commit hooks. */
protected def runPostCommitHooks(
version: Long,
committedActions: Seq[Action]): Unit = {
assert(committed, "Can't call post commit hooks before committing")

// Keep track of the active txn because hooks may create more txns and overwrite the active one.
val activeCommit = OptimisticTransaction.getActive()
OptimisticTransaction.clearActive()

try {
postCommitHooks.foreach { hook =>
try {
hook.run(spark, this, committedActions)
} catch {
case NonFatal(e) =>
logWarning(s"Error when executing post-commit hook ${hook.name} " +
s"for commit $version", e)
recordDeltaEvent(deltaLog, "delta.commit.hook.failure", data = Map(
"hook" -> hook.name,
"version" -> version,
"exception" -> e.toString
))
hook.handleError(e, version)
}
}
} finally {
activeCommit.foreach(OptimisticTransaction.setActive)
}
}
}
Loading

0 comments on commit b18ffba

Please sign in to comment.