Skip to content

Commit

Permalink
Merge branch 'main' into dataset-api
Browse files Browse the repository at this point in the history
  • Loading branch information
cugni committed Jul 10, 2024
2 parents a0461ac + bb4a999 commit 08819d7
Show file tree
Hide file tree
Showing 18 changed files with 625 additions and 87 deletions.
47 changes: 47 additions & 0 deletions docs/AdvancedConfiguration.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,50 @@ We can empty the staging area with a given write by setting the staging size to
```scala
--conf spark.qbeast.index.stagingSizeInBytes=0
```

## Pre-commit Hooks
**Pre-commit hooks** enable the execution of custom code just before a write or optimization is committed.

To implement such hooks, extend `io.qbeast.spark.delta.hook.PreCommitHook` by implementing its `run` method, which has access to the sequence of `Action`s created by the operation.
The same method returns a `Map[String, String],` which will be used as `tags` for the transaction's `CommitInfo`:

```json
{
"commitInfo": {
"timestamp": 1718787341410,
"operation": "WRITE",
...
"tags": {
"HookOutputKey": "HookOutputValue"
},
...
}
}
```


1. You can use more than one hook, as shown in the case below: `myHook1`, and `myHook2.`
2. For each hook you want to use, provide their class names with the option name: `qbeastPreCommitHook.<custom-hook-name>.`
3. Add an option with the name `qbeastPreCommitHook.<custom-hook-name>.arg` for the ones that take initiation arguments. Currently, only one `String` argument is allowed for each hook.

```scala
// Hooks for Writes
df
.write
.format("qbeast")
.option("qbeastPreCommitHook.myHook1", classOf[SimpleHook].getCanonicalName)
.option("qbeastPreCommitHook.myHook2", classOf[StatefulHook].getCanonicalName)
.option("qbeastPreCommitHook.myHook2.arg", myStringHookArg)
.save(pathToTable)
```

```scala
// Hooks for Optimizations
val qt = QbeastTable.forPath(spark, tablePath)
val options = Map(
"qbeastPreCommitHook.myHook1" -> classOf[SimpleHook].getCanonicalName,
"qbeastPreCommitHook.myHook2" -> classOf[StatefulHook].getCanonicalName,
"qbeastPreCommitHook.myHook2.arg" -> "myStringHookArg"
)
qt.optimize(filesToOptimize, options)
```
19 changes: 10 additions & 9 deletions src/main/scala/io/qbeast/spark/QbeastTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ class QbeastTable private (
* the identifier of the revision to optimize. If doesn't exist or none is specified, would be
* the last available
*/
def optimize(revisionID: RevisionID): Unit = {
def optimize(revisionID: RevisionID, options: Map[String, String]): Unit = {
if (!isStaging(revisionID)) {
checkRevisionAvailable(revisionID)
OptimizeTableCommand(revisionID, indexedTable)
OptimizeTableCommand(revisionID, indexedTable, options)
.run(sparkSession)
}
}

def optimize(): Unit = {
def optimize(options: Map[String, String] = Map.empty): Unit = {
if (!isStaging(latestRevisionAvailableID)) {
optimize(latestRevisionAvailableID)
optimize(latestRevisionAvailableID, options)
}
}

Expand All @@ -95,7 +95,8 @@ class QbeastTable private (
* @param files
* the index files to optimize
*/
def optimize(files: Seq[String]): Unit = indexedTable.optimize(files)
def optimize(files: Seq[String], options: Map[String, String]): Unit =
indexedTable.optimize(files, options)

/**
* The analyze operation should analyze the index structure and find the cubes that need
Expand Down Expand Up @@ -127,15 +128,15 @@ class QbeastTable private (
* the identifier of the revision to optimize. If doesn't exist or none is specified, would be
* the last available
*/
def compact(revisionID: RevisionID): Unit = {
def compact(revisionID: RevisionID, options: Map[String, String]): Unit = {
checkRevisionAvailable(revisionID)
CompactTableCommand(revisionID, indexedTable)
CompactTableCommand(revisionID, indexedTable, options)
.run(sparkSession)
.map(_.getString(0))
}

def compact(): Unit = {
compact(latestRevisionAvailableID)
def compact(options: Map[String, String] = Map.empty): Unit = {
compact(latestRevisionAvailableID, options)
}

/**
Expand Down
83 changes: 74 additions & 9 deletions src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ package io.qbeast.spark.delta
import io.qbeast.core.model.QTableID
import io.qbeast.core.model.RevisionID
import io.qbeast.core.model.TableChanges
import io.qbeast.spark.delta.hook.PreCommitHook
import io.qbeast.spark.delta.hook.PreCommitHook.PreCommitHookOutput
import io.qbeast.spark.delta.hook.QbeastHookLoader
import io.qbeast.spark.delta.writer.StatsTracker.registerStatsTrackers
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.utils.QbeastExceptionMessages.partitionedTableExceptionMsg
import io.qbeast.spark.utils.TagColumns
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -49,7 +53,7 @@ import scala.collection.mutable.ListBuffer
* SaveMode of the writeMetadata
* @param deltaLog
* deltaLog associated to the table
* @param options
* @param qbeastOptions
* options for writeMetadata operation
* @param schema
* the schema of the table
Expand All @@ -58,12 +62,17 @@ private[delta] case class DeltaMetadataWriter(
tableID: QTableID,
mode: SaveMode,
deltaLog: DeltaLog,
options: DeltaOptions,
qbeastOptions: QbeastOptions,
schema: StructType)
extends QbeastMetadataOperation
with DeltaCommand
with Logging {

private val options = {
val optionsMap = qbeastOptions.toMap ++ Map("path" -> tableID.id)
new DeltaOptions(optionsMap, SparkSession.active.sessionState.conf)
}

private def isOverwriteOperation: Boolean = mode == SaveMode.Overwrite

override protected val canMergeSchema: Boolean = options.canMergeSchema
Expand All @@ -73,13 +82,10 @@ private[delta] case class DeltaMetadataWriter(

private val sparkSession = SparkSession.active

private val deltaOperation = {
DeltaOperations.Write(mode, None, options.replaceWhere, options.userMetadata)
}

/**
* Creates an instance of basic stats tracker on the desired transaction
* @param txn
* the transaction
* @return
*/
private def createStatsTrackers(txn: OptimisticTransaction): Seq[WriteJobStatsTracker] = {
Expand All @@ -94,12 +100,65 @@ private[delta] case class DeltaMetadataWriter(
statsTrackers
}

private val preCommitHooks = new ListBuffer[PreCommitHook]()

// Load the pre-commit hooks
loadPreCommitHooks().foreach(registerPreCommitHooks)

/**
* Register a pre-commit hook
* @param preCommitHook
* the hook to register
*/
private def registerPreCommitHooks(preCommitHook: PreCommitHook): Unit = {
if (!preCommitHooks.contains(preCommitHook)) {
preCommitHooks.append(preCommitHook)
}
}

/**
* Load the pre-commit hooks from the options
* @return
* the loaded hooks
*/
private def loadPreCommitHooks(): Seq[PreCommitHook] =
qbeastOptions.hookInfo.map(QbeastHookLoader.loadHook)

/**
* Executes all registered pre-commit hooks.
*
* This function iterates over all pre-commit hooks registered in the `preCommitHooks`
* ArrayBuffer. For each hook, it calls the `run` method of the hook, passing the provided
* actions as an argument. The `run` method of a hook is expected to return a Map[String,
* String] which represents the output of the hook. The outputs of all hooks are combined into a
* single Map[String, String] which is returned as the result of this function.
*
* It's important to note that if two or more hooks return a map with the same key, the value of
* the key in the resulting map will be the value from the last hook that returned that key.
* This is because the `++` operation on maps in Scala is a right-biased union operation, which
* means that if there are duplicate keys, the value from the right operand (in this case, the
* later hook) will overwrite the value from the left operand.
*
* Therefore, to avoid unexpected behavior, it's crucial to ensure that the outputs of different
* hooks have unique keys. If there's a possibility of key overlap, the hooks should be designed
* to handle this appropriately, for example by prefixing each key with a unique identifier for
* the hook.
*
* @param actions
* The actions to be passed to the `run` method of each hook.
* @return
* A Map[String, String] representing the combined outputs of all hooks.
*/
private def runPreCommitHooks(actions: Seq[Action]): PreCommitHookOutput = {
preCommitHooks.foldLeft(Map.empty[String, String]) { (acc, hook) => acc ++ hook.run(actions) }
}

def writeWithTransaction(writer: => (TableChanges, Seq[FileAction])): Unit = {
val oldTransactions = deltaLog.unsafeVolatileSnapshot.setTransactions
// If the transaction was completed before then no operation
for (txn <- oldTransactions; version <- options.txnVersion; appId <- options.txnAppId) {
if (txn.appId == appId && version <= txn.version) {
val message = s"Transaction ${version} from application ${appId} is already completed," +
val message = s"Transaction $version from application $appId is already completed," +
" the requested write is ignored"
logWarning(message)
return
Expand All @@ -117,8 +176,14 @@ private[delta] case class DeltaMetadataWriter(
for (txnVersion <- options.txnVersion; txnAppId <- options.txnAppId) {
actions +:= SetTransaction(txnAppId, txnVersion, Some(System.currentTimeMillis()))
}

// Run pre-commit hooks
val tags = runPreCommitHooks(actions)

// Commit the information to the DeltaLog
txn.commit(actions, deltaOperation)
val op =
DeltaOperations.Write(mode, None, options.replaceWhere, options.userMetadata)
txn.commit(actions = actions, op = op, tags = tags)
}
}

Expand Down Expand Up @@ -151,7 +216,7 @@ private[delta] case class DeltaMetadataWriter(
.collect()
.map(IndexFiles.fromAddFile(dimensionCount))
.flatMap(_.tryReplicateBlocks(deltaReplicatedSet))
.map(IndexFiles.toAddFile(false))
.map(IndexFiles.toAddFile(dataChange = false))
.toSeq
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.IISeq
import org.apache.spark.sql.delta.actions.FileAction
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.DeltaOptions
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
Expand All @@ -38,23 +37,20 @@ object SparkDeltaMetadataManager extends MetadataManager[StructType, FileAction,

val deltaLog = loadDeltaQbeastLog(tableID).deltaLog
val mode = if (append) SaveMode.Append else SaveMode.Overwrite
val optionsMap = options.toMap() ++ Map("path" -> tableID.id)
val deltaOptions = new DeltaOptions(optionsMap, SparkSession.active.sessionState.conf)

val metadataWriter =
DeltaMetadataWriter(tableID, mode, deltaLog, deltaOptions, schema)
DeltaMetadataWriter(tableID, mode, deltaLog, options, schema)
metadataWriter.writeWithTransaction(writer)
}

override def updateMetadataWithTransaction(tableID: QTableID, schema: StructType)(
update: => Configuration): Unit = {
val deltaLog = loadDeltaQbeastLog(tableID).deltaLog
val options =
new DeltaOptions(Map("path" -> tableID.id), SparkSession.active.sessionState.conf)

val metadataWriter =
DeltaMetadataWriter(tableID, mode = SaveMode.Append, deltaLog, options, schema)
DeltaMetadataWriter(tableID, mode = SaveMode.Append, deltaLog, QbeastOptions.empty, schema)

metadataWriter.updateMetadataWithTransaction(update)

}

override def loadSnapshot(tableID: QTableID): DeltaQbeastSnapshot = {
Expand Down
Loading

0 comments on commit 08819d7

Please sign in to comment.