[HUDI-8647] Support Clustering Timeline Instant Metrics in HoodieMetrics#12466
[HUDI-8647] Support Clustering Timeline Instant Metrics in HoodieMetrics#12466zhangyue19921010 merged 13 commits intoapache:masterfrom
Conversation
| import org.apache.flink.runtime.operators.coordination.CoordinatorStore; | ||
| import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; | ||
|
|
||
| public class ContextAdapter implements OperatorCoordinator.Context { |
There was a problem hiding this comment.
So the coordinator always reference this class?
There was a problem hiding this comment.
OperatorCoordinator.Context is OK for flink version 1.18、1.19、1.20, define ContextAdapter for adapting flink version 1.14、1.15、1.16、1.17.
| metricGroup.gauge(PENDING_CLUSTERING_COUNT, () -> clusteringPendingClusteringCount); | ||
| } | ||
|
|
||
| public void updateTimeLineMetrics(HoodieActiveTimeline activeTimeline) { |
There was a problem hiding this comment.
so when the metrics is updated?
There was a problem hiding this comment.
Thanks for Teacher Danny's reminding. forget to call update metrics in StreamWriteOperatorCoordinator, metrics will be updated after do commit success.
| reset(); | ||
| this.ckpMetadata.commitInstant(instant); | ||
| LOG.info("Commit instant [{}] success!", instant); | ||
| this.flinkTimeLineMetrics.updateTimeLineMetrics(this.writeClient.getHoodieTable().getActiveTimeline()); |
There was a problem hiding this comment.
The BaseHoodieClient already got a HoodieMetrics instance, we can just enhance it with new items or add the FlinkTimeLineMetrics in HoodieFlinkWriteClient, and update it in HoodieFlinkWriteClient.postCommit, you need to override the #postCommit method so that we can eliminate the redundant timeline loading(file listing) for each commit.
There was a problem hiding this comment.
Thanks for advising, finally enhanced in HoodieMetrics . @danny0405
| public static final String FINALIZE_ACTION = "finalize"; | ||
| public static final String INDEX_ACTION = "index"; | ||
| public static final String SOURCE_READ_AND_INDEX_ACTION = "source_read_and_index"; | ||
| public static final String CLUSTERTING_INSTANT_ACTION = "replacecommit"; |
There was a problem hiding this comment.
maybe we need to take care of clustering commit instead of replace commit
| // Delete the marker directory for the instant. | ||
| WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) | ||
| .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); | ||
| if (metrics.getClusteringInstantTimerCtx() != null) { |
There was a problem hiding this comment.
Check Config config.isMetricsOn() instead of trying to create a Timer.
What we want is to monitor clustering and compaction instant here rat
| Set<String> validActions = CollectionUtils.createSet(HoodieMetrics.CLUSTERTING_INSTANT_ACTION); | ||
| HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); | ||
| HoodieTimeline inflightAndRequested = activeTimeline.filterInflightsAndRequested() | ||
| .filter(instant -> validActions.contains(instant.getAction())); | ||
| long pendingClusteringInstantCount = Long.valueOf(inflightAndRequested.countInstants()); | ||
| long earliestInflightClusteringInstant = 0L; | ||
| Option<HoodieInstant> firstInstant = inflightAndRequested.firstInstant(); | ||
| if (firstInstant.isPresent()) { | ||
| earliestInflightClusteringInstant = Long.valueOf(firstInstant.get().requestedTime()); | ||
| } | ||
|
|
||
| HoodieTimeline completed = activeTimeline.filterCompletedInstants() | ||
| .filter(instant -> validActions.contains(instant.getAction())); | ||
| long latestCompletedClusteringInstant = 0L; | ||
| Option<HoodieInstant> lastInstant = completed.lastInstant(); | ||
| if (lastInstant.isPresent()) { | ||
| latestCompletedClusteringInstant = Long.valueOf(lastInstant.get().requestedTime()); | ||
| } |
There was a problem hiding this comment.
Can we move this logic into metrics.updateTimeLineClusteringInstantMetrics ? it's a little confusing that we do metrics computing in post commit here.
There was a problem hiding this comment.
Thanks for advising, updated according to all above comments.
|
Teacher Danny, PTAL @danny0405 |
|
@fhan688 Can you resolve the conflicts. |
|
@hudi-bot run azure |
Change Logs
add hudi clustering timeline instant related metrics:
earliest_inflight_clustering_instant
latest_completed_clustering_instant
clustering_pendingClusteringCount
Impact
hudi-flink-datasource
Risk level (write none, low medium or high below)
Low
Documentation Update
None
Contributor's checklist