Skip to content

Commit

Permalink
[HUDI-5105] Add Call show_commit_extra_metadata for spark sql (apache…
Browse files Browse the repository at this point in the history
…#7091)

* [HUDI-5105] Add Call show_commit_extra_metadata for spark sql

(cherry picked from commit 79ad357)
  • Loading branch information
XuQianJin-Stars authored and XuQianJin-Stars committed Nov 2, 2022
1 parent 97ce2b7 commit 90c0905
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 6 deletions.
Expand Up @@ -82,6 +82,7 @@ object HoodieProcedures {
,(HiveSyncProcedure.NAME, HiveSyncProcedure.builder)
,(BackupInvalidParquetProcedure.NAME, BackupInvalidParquetProcedure.builder)
,(CopyToTempView.NAME, CopyToTempView.builder)
,(ShowCommitExtraMetadataProcedure.NAME, ShowCommitExtraMetadataProcedure.builder)
)
}
}
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command.procedures

import org.apache.hudi.HoodieCLIUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util
import java.util.function.Supplier
import scala.collection.JavaConversions._

class ShowCommitExtraMetadataProcedure() extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 100),
ProcedureParameter.optional(2, "instant_time", DataTypes.StringType, None),
ProcedureParameter.optional(3, "metadata_key", DataTypes.StringType, None)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("metadata_key", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("metadata_value", DataTypes.StringType, nullable = true, Metadata.empty)
))

def parameters: Array[ProcedureParameter] = PARAMETERS

def outputType: StructType = OUTPUT_TYPE

override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)

val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
val instantTime = getArgValueOrDefault(args, PARAMETERS(2))
val metadataKey = getArgValueOrDefault(args, PARAMETERS(3))

val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
val basePath = hoodieCatalogTable.tableLocation
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val activeTimeline = metaClient.getActiveTimeline
val timeline = activeTimeline.getCommitsTimeline.filterCompletedInstants

val hoodieInstantOption: Option[HoodieInstant] = if (instantTime.isEmpty) {
getCommitForLastInstant(timeline)
} else {
getCommitForInstant(timeline, instantTime.get.asInstanceOf[String])
}

if (hoodieInstantOption.isEmpty) {
throw new HoodieException(s"Commit $instantTime not found in Commits $timeline.")
}

val commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption)

if (commitMetadataOptional.isEmpty) {
throw new HoodieException(s"Commit $instantTime not found commitMetadata in Commits $timeline.")
}

val meta = commitMetadataOptional.get
val timestamp: String = hoodieInstantOption.get.getTimestamp
val action: String = hoodieInstantOption.get.getAction
val metadatas: util.Map[String, String] = if (metadataKey.isEmpty) {
meta.getExtraMetadata
} else {
meta.getExtraMetadata.filter(r => r._1.equals(metadataKey.get.asInstanceOf[String].trim))
}

val rows = new util.ArrayList[Row]
metadatas.foreach(r => rows.add(Row(timestamp, action, r._1, r._2)))
rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
}

override def build: Procedure = new ShowCommitExtraMetadataProcedure()

private def getCommitForLastInstant(timeline: HoodieTimeline): Option[HoodieInstant] = {
val instantOptional = timeline.getReverseOrderedInstants
.findFirst
if (instantOptional.isPresent) {
Option.apply(instantOptional.get())
} else {
Option.empty
}
}

private def getCommitForInstant(timeline: HoodieTimeline, instantTime: String): Option[HoodieInstant] = {
val instants: util.List[HoodieInstant] = util.Arrays.asList(
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime),
new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime),
new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime))

val hoodieInstant: Option[HoodieInstant] = instants.find((i: HoodieInstant) => timeline.containsInstant(i))
hoodieInstant
}

private def getHoodieCommitMetadata(timeline: HoodieTimeline, hoodieInstant: Option[HoodieInstant]): Option[HoodieCommitMetadata] = {
if (hoodieInstant.isDefined) {
if (hoodieInstant.get.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) {
Option(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get,
classOf[HoodieReplaceCommitMetadata]))
} else {
Option(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get,
classOf[HoodieCommitMetadata]))
}
} else {
Option.empty
}
}
}

object ShowCommitExtraMetadataProcedure {
val NAME = "show_commit_extra_metadata"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowCommitExtraMetadataProcedure()
}
}
Expand Up @@ -61,9 +61,7 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase {
// collect archived commits for table
val endTs = commits(0).get(0).toString
val archivedCommits = spark.sql(s"""call show_archived_commits(table => '$tableName', end_ts => '$endTs')""").collect()
assertResult(4) {
archivedCommits.length
}
assertResult(4){archivedCommits.length}
}
}

Expand Down Expand Up @@ -109,9 +107,7 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase {
// collect archived commits for table
val endTs = commits(0).get(0).toString
val archivedCommits = spark.sql(s"""call show_archived_commits_metadata(table => '$tableName', end_ts => '$endTs')""").collect()
assertResult(4) {
archivedCommits.length
}
assertResult(4){archivedCommits.length}
}
}

Expand Down Expand Up @@ -288,4 +284,50 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase {
assertResult(1){result.length}
}
}

test("Test Call show_commit_extra_metadata Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")

// Check required fields
checkExceptionContain(s"""call show_commit_extra_metadata()""")(
s"arguments is empty")

// collect commits for table
val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
assertResult(2){commits.length}

val instant_time = commits(0).get(0).toString
// get specify instantTime's extraMetadatas
val metadatas1 = spark.sql(s"""call show_commit_extra_metadata(table => '$tableName', instant_time => '$instant_time')""").collect()
assertResult(true){metadatas1.length > 0}

// get last instantTime's extraMetadatas
val metadatas2 = spark.sql(s"""call show_commit_extra_metadata(table => '$tableName')""").collect()
assertResult(true){metadatas2.length > 0}

// get last instantTime's extraMetadatas and filter extraMetadatas with metadata_key
val metadatas3 = spark.sql(s"""call show_commit_extra_metadata(table => '$tableName', metadata_key => 'schema')""").collect()
assertResult(1){metadatas3.length}
}
}
}

0 comments on commit 90c0905

Please sign in to comment.