Skip to content

Pipe: Double-living Semantic Correction to Forward Events from Cluster A to B by Default Unless They Originate from B#15112

Closed
XNX02 wants to merge 32 commits intoapache:masterfrom
XNX02:double-living
Closed

Pipe: Double-living Semantic Correction to Forward Events from Cluster A to B by Default Unless They Originate from B#15112
XNX02 wants to merge 32 commits intoapache:masterfrom
XNX02:double-living

Conversation

@XNX02
Copy link
Contributor

@XNX02 XNX02 commented Mar 17, 2025

No description provided.

@SteveYurongSu SteveYurongSu self-assigned this Mar 18, 2025
private AbstractDeleteDataNode deleteDataNode;
private DeletionResource deletionResource;
private boolean isGeneratedByPipe;
private String originClusterId;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment

ByteBuffer.allocate(
Byte.BYTES + planBuffer.limit() + computeOriginClusterIdBufferSize(originClusterId));
ReadWriteIOUtils.write(isGeneratedByPipe, result);
ReadWriteIOUtils.write(originClusterId, result);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Comment on lines +308 to +318
if (byteBuffer.hasRemaining()) {
boolean hasClusterId = byteBuffer.get() != 0;
if (hasClusterId) {
int strLength = byteBuffer.getShort();
byte[] bytes = new byte[strLength];
byteBuffer.get(bytes);
originClusterId = new String(bytes);
} else {
originClusterId = null;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check and add some UTs

: insertTabletNode.getOriginClusterId();
if (Objects.isNull(workMemTable.getCurrentOriginClusterId())) {
workMemTable.setCurrentOriginClusterId(originClusterId);
} else if (!Objects.equals(originClusterId, workMemTable.getCurrentOriginClusterId())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

originClusterId != workMemTable.getCurrentOriginClusterId()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make a static final cache map in the receiver and get the id string from the map

map <id, id> putIfAbsent

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment here if using the ref comparing

}
final String originClusterId =
insertTabletNode.getOriginClusterId() == null
? config.getClusterId()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generated or cached?

insertTabletNode.getOriginClusterId() == null
? config.getClusterId()
: insertTabletNode.getOriginClusterId();
if (Objects.isNull(workMemTable.getCurrentOriginClusterId())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if from the same cluster

@XNX02 XNX02 marked this pull request as ready for review March 23, 2025 16:00

@Override
public String toString() {
return "PipeEnrichedPlanV2{" + "innerPlan='" + innerPlan + "'}";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add "originClusterId" here

*/
public TSStatus operatePermission(AuthorPlan authorPlan, boolean isGeneratedByPipe) {
public TSStatus operatePermission(
AuthorPlan authorPlan, boolean isGeneratedByPipe, String originClusterIds) {
Copy link
Collaborator

@Caideyipi Caideyipi Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

originClusterId... Same in other files

@@ -87,6 +87,19 @@ public AuthOperationProcedure(
this.timeoutMS = commonConfig.getDatanodeTokenTimeoutMS();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May remove the unused constructors... Same in other files

public void pipeEnrichedPlanTest() throws IOException {
final PipeEnrichedPlan plan =
new PipeEnrichedPlan(
final PipeEnrichedPlanV1 plan =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better use "V2".. Same in other tests

isGeneratedByPipe = ReadWriteIOUtils.readBool(buffer);
configPhysicalPlan = ConfigPhysicalPlan.Factory.create(buffer);

// There might be an ignoredChildrenSize 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size may not appear on configNodes...

}
}

public Set<String> getSinkClusterIds() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it used?

}

public void setSinkClusterIds(Set<String> sinkClusterIds) {
realtimeExtractor.setSinkClusterIds(sinkClusterIds);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I think it's better to put this in IoTDBExtractor...

@Override
public RegionExecutionResult visitPipeEnrichedWritePlanNode(
final PipeEnrichedWritePlanNode node, final WritePlanNodeExecutionContext context) {
node.setOriginClusterId(node.getOriginClusterId());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seemingly some extra operations are needed to pass the "originClusterId" to the state machine....

1: required list<common.TConsensusGroupId> schemaRegionIdList
2: required binary pathPatternTree
3: optional bool isGeneratedByPipe
4: optional string originCluster
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add "Id" here

struct TPipeTransferResp {
1:required common.TSStatus status
2:optional binary body
3:optional string clusterId
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better use "body" instead of adding a new field?

if (pipeConnector instanceof IoTDBDataNodeSyncConnector) {
attributeSortedStringToSinkClusterIdsMap.put(
attributeSortedString,
((IoTDBDataNodeSyncConnector) pipeConnector).getEndPointsClusterIds());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better put this in "IoTDBConnector"....

insertRowStatement.getValues(),
insertRowStatement.isNeedInferType());
insertNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
insertNode.setOriginClusterId(insertRowStatement.getOriginClusterId());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this?


protected abstract void confineHistoricalEventTransferTypes(final PipeSnapshotEvent event);

public Set<String> getSinkClusterIds() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be useless

private long tabletConversionThresholdBytes = -1;
private boolean autoCreateDatabase = true;
private boolean isGeneratedByPipe = false;
private String originClusterId;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it used?


protected long ramBytesUsed = Long.MIN_VALUE;

protected String originClusterId;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it used?

protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;

// Used to store the clusterId for location comparison
public static final Map<String, String> CLUSTER_ID_MAP = new HashMap<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a hashSet....

public static PipeEnrichedWritePlanNode deserialize(final ByteBuffer buffer) {
return new PipeEnrichedWritePlanNode((WritePlanNode) PlanNodeType.deserialize(buffer));
return new PipeEnrichedWritePlanNode(
(WritePlanNode) PlanNodeType.deserialize(buffer),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be some ignored children sizes..

@Override
public RegionExecutionResult visitPipeEnrichedWritePlanNode(
final PipeEnrichedWritePlanNode node, final WritePlanNodeExecutionContext context) {
node.setOriginClusterId(node.getOriginClusterId());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seemingly some extra operations are needed to pass the "originClusterId" to the state machine....

return new PipeEnrichedInsertNode((InsertNode) PlanNodeType.deserialize(buffer));
return new PipeEnrichedInsertNode(
(InsertNode) PlanNodeType.deserialize(buffer),
buffer.hasRemaining() ? ReadWriteIOUtils.readString(buffer) : null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about ignore children size?

public static PipeEnrichedNonWritePlanNode deserialize(ByteBuffer buffer) {
return new PipeEnrichedNonWritePlanNode(PlanNodeType.deserialize(buffer));
return new PipeEnrichedNonWritePlanNode(
PlanNodeType.deserialize(buffer),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about ignored children size?

endTime);
}

public PipeTsFileInsertionEvent(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used?

private final LoadTsFileDataCacheMemoryBlock block;
private final String originClusterId;

public LoadTsFileScheduler(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better remove this?

@Caideyipi
Copy link
Collaborator

Will any similar mechanism be applied to "WriteBackSink" (i.e. write-back-sink won't transfer data written back once)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants