feat: [HUDI-9775] Support for show_metadata_table_history with appropriate matching data table info#14303
feat: [HUDI-9775] Support for show_metadata_table_history with appropriate matching data table info#14303PavithranRick wants to merge 4 commits intoapache:masterfrom
Conversation
…data table info matched
…data table info matched
|
@CTTY could you help review this PR? |
CTTY
left a comment
There was a problem hiding this comment.
Hi @PavithranRick , thanks for the contribution! I've left some comments. Haven't got a chance to check the unit test yet but will do later
|
|
||
| private val PARAMETERS = Array[ProcedureParameter]( | ||
| ProcedureParameter.optional(0, "table", DataTypes.StringType), | ||
| ProcedureParameter.optional(1, "path", DataTypes.StringType, ""), |
There was a problem hiding this comment.
| ProcedureParameter.optional(1, "path", DataTypes.StringType, ""), | |
| ProcedureParameter.optional(1, "path", DataTypes.StringType), |
path should not have default value
There was a problem hiding this comment.
should the table and path be required rather than optional?
| ProcedureParameter.optional(4, "filter", DataTypes.StringType, ""), | ||
| ProcedureParameter.optional(5, "startTime", DataTypes.StringType, ""), | ||
| ProcedureParameter.optional(6, "endTime", DataTypes.StringType, "") |
There was a problem hiding this comment.
nit: We do not have to assign default values for these
|
|
||
| val filteredEntries = applyFilter(entries, filter, outputType) | ||
|
|
||
| val finalEntries = if (startTime.nonEmpty && endTime.nonEmpty) { |
There was a problem hiding this comment.
Could it be possible that only startTime is empty, and endTime is not?
| val filteredInstantTimes = if (startTime.nonEmpty && endTime.nonEmpty) { | ||
| allInstantTimes.filter { instantTime => | ||
| val timeMatches = (instantTime >= startTime && instantTime <= endTime) | ||
| timeMatches | ||
| } | ||
| } else { | ||
| allInstantTimes | ||
| } | ||
|
|
||
| val sortedInstantTimes = filteredInstantTimes.toSeq.sorted(Ordering[String].reverse) | ||
|
|
||
| val entries = sortedInstantTimes.map { instantTime => | ||
| createTimelineEntry( | ||
| instantTime, | ||
| dataActiveTimeline, | ||
| dataArchivedTimeline, | ||
| metadataActiveTimeline, | ||
| metadataArchivedTimeline, | ||
| dataInstantInfoMap, | ||
| metadataInstantInfoMap | ||
| ) | ||
| } |
There was a problem hiding this comment.
I think this is kinda verbose, maybe
| val filteredInstantTimes = if (startTime.nonEmpty && endTime.nonEmpty) { | |
| allInstantTimes.filter { instantTime => | |
| val timeMatches = (instantTime >= startTime && instantTime <= endTime) | |
| timeMatches | |
| } | |
| } else { | |
| allInstantTimes | |
| } | |
| val sortedInstantTimes = filteredInstantTimes.toSeq.sorted(Ordering[String].reverse) | |
| val entries = sortedInstantTimes.map { instantTime => | |
| createTimelineEntry( | |
| instantTime, | |
| dataActiveTimeline, | |
| dataArchivedTimeline, | |
| metadataActiveTimeline, | |
| metadataArchivedTimeline, | |
| dataInstantInfoMap, | |
| metadataInstantInfoMap | |
| ) | |
| } | |
| val entries = | |
| allInstantTimes | |
| .filter(t => startTime.nonEmpty && endTime.nonEmpty || (t >= startTime && t <= endTime)) | |
| .toSeq | |
| .sorted(Ordering.String.reverse) | |
| .map { instantTime => | |
| createTimelineEntry( | |
| instantTime, | |
| dataActiveTimeline, | |
| dataArchivedTimeline, | |
| metadataActiveTimeline, | |
| metadataArchivedTimeline, | |
| dataInstantInfoMap, | |
| metadataInstantInfoMap | |
| ) | |
| } |
| completionTime: String, | ||
| modificationTimeMs: Long | ||
| ) { | ||
| def getModificationTime: Long = modificationTimeMs |
There was a problem hiding this comment.
iirc, scala case class variables can be directly accessed like instantWithModTime.modificationTime, and we don't need an extra getter
| instantMap.getOrElseUpdate(instant.requestedTime(), scala.collection.mutable.Map[HoodieInstant.State, HoodieInstantWithModTime]()) | ||
| .put(instant.getState, instantWithModTime) | ||
| } catch { | ||
| case _: Exception => // Skip invalid files |
| timestamp -> stateMap.toMap | ||
| }.toMap | ||
| } catch { | ||
| case _: Exception => Map.empty[String, Map[HoodieInstant.State, HoodieInstantWithModTime]] |
| val stateMap = instantInfoMap.get(instantTimestamp) | ||
| if (stateMap.isDefined) { | ||
| val stateInfo = stateMap.get.get(state) | ||
| if (stateInfo.isDefined) { | ||
| val modificationTime = stateInfo.get.getModificationTime | ||
| if (modificationTime > 0) { | ||
| val date = new java.util.Date(modificationTime) | ||
| val formatter = new java.text.SimpleDateFormat("MM-dd HH:mm:ss") | ||
| formatter.format(date) | ||
| } else { | ||
| instantTimestamp | ||
| } | ||
| } else { | ||
| null | ||
| } | ||
| } else { | ||
| null | ||
| } | ||
| } |
There was a problem hiding this comment.
We can simplify this to
| val stateMap = instantInfoMap.get(instantTimestamp) | |
| if (stateMap.isDefined) { | |
| val stateInfo = stateMap.get.get(state) | |
| if (stateInfo.isDefined) { | |
| val modificationTime = stateInfo.get.getModificationTime | |
| if (modificationTime > 0) { | |
| val date = new java.util.Date(modificationTime) | |
| val formatter = new java.text.SimpleDateFormat("MM-dd HH:mm:ss") | |
| formatter.format(date) | |
| } else { | |
| instantTimestamp | |
| } | |
| } else { | |
| null | |
| } | |
| } else { | |
| null | |
| } | |
| } | |
| val formatter = new java.text.SimpleDateFormat("MM-dd HH:mm:ss") | |
| instantInfoMap.get(instantTimestamp) | |
| .flatMap(_.get(state)) | |
| .map(_.modificationTimeMs) | |
| .filter(_ > 0) | |
| .map(time => formatter.format(new java.util.Date(time))) | |
| .getOrElse(instantTimestamp) | |
Describe the issue this Pull Request addresses
This PR introduces the
ShowMetadataTableHistoryProcedure, a new Spark SQL procedure that displays timeline information for both the data table and metadata table side-by-side, enabling analysis of metadata table synchronization and evolution.Summary and Changelog
MM-dd HH:mm:ssformatting for requested/inflight/completed timesshowArchivedparameterstartTime/endTime)Impact
show_metadata_table_history(table, path, limit, showArchived, filter, startTime, endTime)Risk Level
Low
Verification performed:
Documentation Update
Update Hudi Spark SQL procedures documentation to include
show_metadata_table_historyusage examples and parameter descriptions.Contributor's checklist