Pipe: refact pipe remaining metrics calculation#13640
Draft
VGalaxies wants to merge 11 commits intoapache:masterfrom
Draft
Pipe: refact pipe remaining metrics calculation#13640VGalaxies wants to merge 11 commits intoapache:masterfrom
VGalaxies wants to merge 11 commits intoapache:masterfrom
Conversation
VGalaxies
commented
Sep 26, 2024
| return inputPendingQueue.isEmpty(); | ||
| } | ||
|
|
||
| /** DO NOT FORGET to set eventCounter to new value after invoking this method. */ |
needToReport set to false when calculating commit rate to avoid extremely low remaining time
Contributor
There was a problem hiding this comment.
Pull Request Overview
Refactors the pipe remaining metrics calculation to distinguish historical vs. realtime events and unify event‐count filtering.
- Introduces
CommitRateMarkerand updates commit logic to include a realtime flag. - Replaces many
getEventCount()methods with predicate-based filtering onEnrichedEvent. - Overhauls
PipeDataNodeRemainingEventAndTimeOperatorto separately compute historical and realtime remaining‐time metrics.
Reviewed Changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| EnrichedEvent.java | Added isDataRegionRealtimeEvent and needToCommitRate defaults. |
| PipeEventCommitManager.java | Switched to CommitRateMarker, updated commit calls. |
| CommitRateMarker.java | New interface for commit‐rate reporting with realtime flag. |
| SubscriptionConnectorSubtask.java | Changed getEventCount to accept a Predicate<EnrichedEvent>. |
| SubscriptionPipeTsFilePlainEvent.java | Adapted getPipeEventCount to predicate filter. |
| SubscriptionPipeTsFileBatchEvents.java | Predicate‐based event counting in batch. |
| SubscriptionPipeTabletBatchEvents.java | Predicate‐based event counting in tablet batch. |
| SubscriptionPipeEvents.java | Updated interface signature to use predicates. |
| SubscriptionPipeEmptyEvent.java | Predicate‐based empty count. |
| SubscriptionPipeTsFileEventBatch.java | Removed old enriched‐event size metrics. |
| SubscriptionPipeTabletEventBatch.java | Removed outdated enriched‐events list and count. |
| SubscriptionPipeEventBatch.java | Added enrichedEvents list and predicate filtering. |
| SubscriptionEvent.java | Switched to predicate‐based getPipeEventCount. |
| SubscriptionPrefetchingQueue.java | Predicate version of getPipeEventCount. |
| SubscriptionBroker.java | Propagated predicate to getPipeEventCount. |
| SubscriptionBlockingPendingQueue.java | Added forEach helper. |
| SubscriptionBrokerAgent.java | Predicate‐aware broker event count. |
| PipeDataNodeRemainingEventAndTimeOperator.java | Major refactor separating historical vs. realtime metrics, added filtering predicates. |
| PipeDataNodeRemainingEventAndTimeMetrics.java | Updated markRegionCommit to include realtime flag. |
| PipeRealtimeDataRegionExtractor.java | Predicate‐based getEventCount on realtime extractor. |
| IoTDBDataRegionExtractor.java | Renamed to getRealtimeEventCount with predicate. |
| PipeTsFileInsertionEvent.java | Implemented isDataRegionRealtimeEvent. |
| PipeRawTabletInsertionEvent.java | Overrode realtime/event‐rate logic. |
| IoTDBDataRegionAsyncConnector.java | Changed getRetryEventCount to predicate signature. |
| PipeProcessorSubtask.java | Changed getEventCount to predicate signature. |
| PipeConnectorSubtask.java | Predicate‐based getEventCount and retry counts. |
| pom.xml | Added metrics-core dependency. |
| PipeConfigNodeRemainingTimeMetrics.java | Updated markRegionCommit to include realtime flag. |
Comments suppressed due to low confidence (1)
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java:331
- Defaulting
isDataRegionRealtimeEvent()to always return true means all events are treated as realtime. Consider returning false by default and only overriding in subclasses that truly represent realtime events.
}
| : totalDataRegionWriteEventCount / lastDataRegionCommitSmoothingValue; | ||
| dataRegionRealtimeEventRemainingTime = | ||
| lastDataRegionRealtimeEventCommitSmoothingValue <= 0 | ||
| ? 0 // NOTE HERE |
There was a problem hiding this comment.
Returning 0 when the realtime commit‐rate smoothing value is ≤ 0 underestimates remaining time. For symmetry with historical events, consider returning Double.MAX_VALUE to indicate unbounded remaining time.
Suggested change
| ? 0 // NOTE HERE | |
| ? Double.MAX_VALUE // Indicate unbounded remaining time |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
TBD.