Skip to content

Pipe: Implement sink.data-distribution-strategy feature where a pipe can send data to all sink nodes#14922

Closed
XNX02 wants to merge 16 commits intoapache:masterfrom
XNX02:data-distribution
Closed

Pipe: Implement sink.data-distribution-strategy feature where a pipe can send data to all sink nodes#14922
XNX02 wants to merge 16 commits intoapache:masterfrom
XNX02:data-distribution

Conversation

@XNX02
Copy link
Copy Markdown
Contributor

@XNX02 XNX02 commented Feb 21, 2025

No description provided.

@XNX02 XNX02 marked this pull request as draft February 21, 2025 07:15
@SteveYurongSu SteveYurongSu self-assigned this Feb 21, 2025
@XNX02 XNX02 marked this pull request as ready for review February 24, 2025 12:54
Comment on lines +174 to +175
PipeTransferConfigPlanReq.toTPipeTransferBytes(
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this byte[] only needs to be constructed once, which can reduce a lot of unnecessary overhead

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appreciate it. I've made some changes about this problem. Please take a look when you have time and let me know if you think it's working as expected.

Comment on lines +235 to +249
PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
// The pattern is surely Non-null
pipeConfigRegionSnapshotEvent.getTreePatternString(),
pipeConfigRegionSnapshotEvent.getTablePattern().getDatabasePattern(),
pipeConfigRegionSnapshotEvent.getTablePattern().getTablePattern(),
pipeConfigRegionSnapshotEvent.getTreePattern().isTreeModelDataAllowedToBeCaptured(),
pipeConfigRegionSnapshotEvent
.getTablePattern()
.isTableModelDataAllowedToBeCaptured(),
snapshot.getName(),
snapshot.length(),
Objects.nonNull(templateFile) ? templateFile.getName() : null,
Objects.nonNull(templateFile) ? templateFile.length() : 0,
pipeConfigRegionSnapshotEvent.getFileType(),
pipeConfigRegionSnapshotEvent.toSealTypeString()))) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Comment on lines +156 to +159
final TPipeTransferReq req =
compressIfNeeded(
PipeTransferConfigPlanReq.toTPipeTransferReq(
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()));
Copy link
Copy Markdown
Member

@luoluoyuyu luoluoyuyu Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the byte[] here can also be reused, the sending performance can be greatly improved. It should also be noted here that the internal ByteBuffer needs to copy the internal array

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify what you meant by 'internal ByteBuffer needs to copy the internal array'? It sounds that simply place the req outside the loop is not enough in this situation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
Because ByteBuffer has capacity, limit, position and mark to record the corresponding progress. But I looked at the code and found that a new ByteBuffer will be created when writing. You can ignore my words.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it

.trim()
.toLowerCase();
validator.validate(
arg -> arg.equals("any") || arg.equals("all"),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to define two constants for Any and All?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants