Skip to content

Concurrent Streaming and Replace improvements#15813

Closed
AmatyaAvadhanula wants to merge 1 commit intoapache:masterfrom
AmatyaAvadhanula:segment_transactional_append_streaming
Closed

Concurrent Streaming and Replace improvements#15813
AmatyaAvadhanula wants to merge 1 commit intoapache:masterfrom
AmatyaAvadhanula:segment_transactional_append_streaming

Conversation

@AmatyaAvadhanula
Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula commented Jan 31, 2024

This PR fixes 2 issues with Streaming ingestion with concurrent replace.

  1. Temporary duplicate results in queries
    Suppose a streaming ingestion task has open segments V0-1 with a used segment V0-0, and a replace occurs concurrently with version V1. The replace job would convert the used segment to V1-0, and the open pending segment to V1-1.
    However, when the appending job commits segments using the transactional append action, the pending segment is committed as V1-2. This means that there could be a period where V1-1 has not been unannounced, and V1-2 has been loaded, during which both results are visible.
    This PR aims to remove that redundancy by committing V1-1 directly.

  2. Temporary data loss in queries
    Currently, handoff for streaming ingestion looks only at the original set of segments to the append action. This PR enhances the notifier to look at the upgraded segments as well. Without this change, an append that happens after a replace may miss data that relies on the handoff.

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

* @param persist If true, commit the newly created lock to the metadata store.
* Should be false when syncing from storage.
* @return A TaskLockPosse associating the lock for the request with the provided task.
*/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated docs improvement

String dataSource,
Set<DataSegment> segmentsToAppend
Set<DataSegment> segmentsToAppend,
@Nullable Map<String, Set<SegmentIdWithShardSpec>> segmentIdToUpgradeVersions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@Nullable Map<String, Set<SegmentIdWithShardSpec>> segmentIdToUpgradeVersions
@Nullable Map<String, Set<SegmentIdWithShardSpec>> segmentIdToUpgradedVersions

Comment on lines +1489 to +1517
for (Interval interval : segmentsToUpgrade.keySet()) {
filteredSegmentsToUpgrade.put(interval, new HashSet<>());
for (DataSegment segment : segmentsToUpgrade.get(interval)) {
boolean include = true;
if (segmentIdToUpgradeVersions.containsKey(segment.getId().toString())) {
for (SegmentIdWithShardSpec upgradedSegment : segmentIdToUpgradeVersions.get(segment.getId().toString())) {
if (upgradedSegment.getVersion().equals(upgradeVersion)) {
upgradedSegments.add(
new DataSegment(
upgradedSegment.asSegmentId(),
segment.getLoadSpec(),
segment.getDimensions(),
segment.getMetrics(),
upgradedSegment.getShardSpec(),
null,
segment.getBinaryVersion(),
segment.getSize()
)
);
include = false;
break;
}
}
}
if (include) {
filteredSegmentsToUpgrade.get(interval).add(segment);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please some comments to describe what are you trying to do here? I don't follow.

@AmatyaAvadhanula AmatyaAvadhanula removed this from the 29.0.0 milestone Feb 7, 2024
@AmatyaAvadhanula
Copy link
Contributor Author

Problem 1 will be addressed as part of #16144 with a different approach.
#16162 has been raised for problem 2.
Closing this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants