Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cf53a0f
clusterId
XNX02 Mar 9, 2025
1b0ad1e
update
XNX02 Mar 10, 2025
9af0445
update
XNX02 Mar 10, 2025
03bbeb4
Merge branch 'master' of https://github.com/apache/iotdb into double-…
XNX02 Mar 10, 2025
690caa0
schema
XNX02 Mar 10, 2025
23029c9
config
XNX02 Mar 11, 2025
8fdca08
config procudure
XNX02 Mar 11, 2025
897d41a
update originclusteid
XNX02 Mar 11, 2025
1a1cb5e
update configregionxtratctor
XNX02 Mar 11, 2025
998f491
fix clusterId
XNX02 Mar 11, 2025
3d828a3
update
XNX02 Mar 14, 2025
1efd28e
update
XNX02 Mar 15, 2025
3584efd
Merge branch 'master' of https://github.com/apache/iotdb into double-…
XNX02 Mar 15, 2025
37aaba1
deletedata
XNX02 Mar 16, 2025
edee9e8
table model
XNX02 Mar 16, 2025
1f0c9b5
tsfile simple
XNX02 Mar 16, 2025
02fbfe9
update
XNX02 Mar 17, 2025
ec94f2f
Merge branch 'master' of https://github.com/apache/iotdb into double-…
XNX02 Mar 17, 2025
4df0f9d
improve
XNX02 Mar 17, 2025
13eff8b
update
XNX02 Mar 17, 2025
7b7abbb
add UT for PipeEnrichedProcedureTest
XNX02 Mar 19, 2025
4d00afa
Merge branch 'master' of https://github.com/apache/iotdb into double-…
XNX02 Mar 19, 2025
6a3284f
update
XNX02 Mar 19, 2025
f735818
serialize&deserialize
XNX02 Mar 20, 2025
0c0d90e
update
XNX02 Mar 20, 2025
3b4e37e
Merge branch 'master' of https://github.com/apache/iotdb into double-…
XNX02 Mar 21, 2025
af22cab
fix
XNX02 Mar 21, 2025
0c4b22c
update
XNX02 Mar 23, 2025
62b00d4
update
XNX02 Mar 23, 2025
4779f7c
pipeenrichedplanV2
XNX02 Mar 25, 2025
2e3feb7
update
XNX02 Mar 27, 2025
2b1df48
update
XNX02 Mar 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteDevicesPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteLogicalViewPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteTimeSeriesPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlanV1;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlanV2;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeUnsetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
Expand Down Expand Up @@ -457,8 +458,11 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case PipeHandleMetaChange:
plan = new PipeHandleMetaChangePlan();
break;
case PipeEnriched:
plan = new PipeEnrichedPlan();
case PipeEnrichedV1:
plan = new PipeEnrichedPlanV1();
break;
case PipeEnrichedV2:
plan = new PipeEnrichedPlanV2();
break;
case CreateTopic:
plan = new CreateTopicPlan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,15 @@ public enum ConfigPhysicalPlanType {
PipeHandleMetaChange((short) 1601),

/** Pipe PayLoad. */
PipeEnriched((short) 1700),
PipeEnrichedV1((short) 1700),
PipeUnsetTemplate((short) 1701),
PipeDeleteTimeSeries((short) 1702),
PipeDeleteLogicalView((short) 1703),
PipeDeactivateTemplate((short) 1704),
PipeSetTTL((short) 1705),
PipeCreateTable((short) 1706),
PipeDeleteDevices((short) 1707),
PipeEnrichedV2((short) 1708),

/** Subscription */
CreateTopic((short) 1800),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,23 @@
import java.nio.ByteBuffer;
import java.util.Objects;

public class PipeEnrichedPlan extends ConfigPhysicalPlan {
public class PipeEnrichedPlanV1 extends ConfigPhysicalPlan {

private ConfigPhysicalPlan innerPlan;
protected ConfigPhysicalPlan innerPlan;

public PipeEnrichedPlan() {
super(ConfigPhysicalPlanType.PipeEnriched);
public PipeEnrichedPlanV1() {
super(ConfigPhysicalPlanType.PipeEnrichedV1);
}

public PipeEnrichedPlan(ConfigPhysicalPlan innerPlan) {
super(ConfigPhysicalPlanType.PipeEnriched);
public PipeEnrichedPlanV1(ConfigPhysicalPlan innerPlan) {
super(ConfigPhysicalPlanType.PipeEnrichedV1);
this.innerPlan = innerPlan;
}

protected PipeEnrichedPlanV1(ConfigPhysicalPlanType type) {
super(type);
}

public ConfigPhysicalPlan getInnerPlan() {
return innerPlan;
}
Expand All @@ -64,7 +68,7 @@ public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
PipeEnrichedPlan that = (PipeEnrichedPlan) obj;
PipeEnrichedPlanV1 that = (PipeEnrichedPlanV1) obj;
return innerPlan.equals(that.innerPlan);
}

Expand All @@ -75,6 +79,6 @@ public int hashCode() {

@Override
public String toString() {
return "PipeEnrichedPlan{" + "innerPlan='" + innerPlan + "'}";
return "PipeEnrichedPlanV1{" + "innerPlan='" + innerPlan + "'}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.consensus.request.write.pipe.payload;

import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;

import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class PipeEnrichedPlanV2 extends PipeEnrichedPlanV1 {

private String originClusterId;

public PipeEnrichedPlanV2() {
super(ConfigPhysicalPlanType.PipeEnrichedV2);
}

public PipeEnrichedPlanV2(ConfigPhysicalPlan innerPlan, String originClusterId) {
super(ConfigPhysicalPlanType.PipeEnrichedV2);
this.innerPlan = innerPlan;
this.originClusterId = originClusterId;
}

public String getOriginClusterId() {
return originClusterId;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
super.serializeImpl(stream);
ReadWriteIOUtils.write(originClusterId, stream);
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
super.deserializeImpl(buffer);
originClusterId = ReadWriteIOUtils.readString(buffer);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
PipeEnrichedPlanV2 that = (PipeEnrichedPlanV2) obj;
return innerPlan.equals(that.innerPlan)
&& Objects.equals(originClusterId, that.originClusterId);
}

@Override
public int hashCode() {
return Objects.hash(innerPlan, originClusterId);
}

@Override
public String toString() {
return "PipeEnrichedPlanV2{" + "innerPlan='" + innerPlan + "'}";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add "originClusterId" here

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ protected TSStatus write(ConfigPhysicalPlan plan) {
}

if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
PipeConfigNodeAgent.runtime().listener().tryListenToPlan(plan, false);
PipeConfigNodeAgent.runtime().listener().tryListenToPlan(plan, false, null);
}

return result;
Expand Down Expand Up @@ -428,7 +428,7 @@ private void initStandAloneConfigNode() {
// Recover the linked queue.
// Note that the "nextPlan"s may contain create and drop pipe operations
// and will affect whether the queue listen to the plans.
PipeConfigNodeAgent.runtime().listener().tryListenToPlan(nextPlan, false);
PipeConfigNodeAgent.runtime().listener().tryListenToPlan(nextPlan, false, null);
}
} catch (UnknownPhysicalPlanTypeException e) {
LOGGER.error("Try listen to plan failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,9 +651,9 @@ public TSStatus setTTL(SetTTLPlan setTTLPlan) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
if (setTTLPlan.getTTL() == TTLCache.NULL_TTL) {
return ttlManager.unsetTTL(setTTLPlan, false);
return ttlManager.unsetTTL(setTTLPlan, false, null);
} else {
return ttlManager.setTTL(setTTLPlan, false);
return ttlManager.setTTL(setTTLPlan, false, null);
}
} else {
return status;
Expand Down Expand Up @@ -733,7 +733,7 @@ public DataSet showTTL(ShowTTLPlan showTTLPlan) {
public TSStatus setDatabase(final DatabaseSchemaPlan databaseSchemaPlan) {
final TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return clusterSchemaManager.setDatabase(databaseSchemaPlan, false);
return clusterSchemaManager.setDatabase(databaseSchemaPlan, false, null);
} else {
return status;
}
Expand All @@ -743,7 +743,7 @@ public TSStatus setDatabase(final DatabaseSchemaPlan databaseSchemaPlan) {
public TSStatus alterDatabase(final DatabaseSchemaPlan databaseSchemaPlan) {
final TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return clusterSchemaManager.alterDatabase(databaseSchemaPlan, false);
return clusterSchemaManager.alterDatabase(databaseSchemaPlan, false, null);
} else {
return status;
}
Expand All @@ -768,7 +768,8 @@ public synchronized TSStatus deleteDatabases(final TDeleteDatabasesReq tDeleteRe

return procedureManager.deleteDatabases(
new ArrayList<>(deleteDatabaseSchemaMap.values()),
tDeleteReq.isSetIsGeneratedByPipe() && tDeleteReq.isIsGeneratedByPipe());
tDeleteReq.isSetIsGeneratedByPipe() && tDeleteReq.isIsGeneratedByPipe(),
tDeleteReq.isSetOriginClusterId() ? tDeleteReq.getOriginClusterId() : null);
} else {
return status;
}
Expand Down Expand Up @@ -1255,7 +1256,7 @@ public SubscriptionManager getSubscriptionManager() {
public TSStatus operatePermission(final AuthorPlan authorPlan) {
final TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return permissionManager.operatePermission(authorPlan, false);
return permissionManager.operatePermission(authorPlan, false, null);
} else {
return status;
}
Expand Down Expand Up @@ -1970,7 +1971,8 @@ public synchronized TSStatus setSchemaTemplate(TSetSchemaTemplateReq req) {
req.getQueryId(),
req.getName(),
req.getPath(),
req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe());
req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe(),
req.getOriginClusterId());
} else {
return status;
}
Expand Down Expand Up @@ -2041,7 +2043,8 @@ public TSStatus deactivateSchemaTemplate(final TDeactivateSchemaTemplateReq req)
return procedureManager.deactivateTemplate(
req.getQueryId(),
templateSetInfo,
req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe());
req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe(),
req.getOriginClusterId());
}

@Override
Expand All @@ -2058,7 +2061,8 @@ public synchronized TSStatus unsetSchemaTemplate(TUnsetSchemaTemplateReq req) {
req.getQueryId(),
checkResult.right,
new PartialPath(req.getPath()),
req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe());
req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe(),
req.getOriginClusterId());
} catch (IllegalPathException e) {
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
Expand Down Expand Up @@ -2086,7 +2090,7 @@ public TSStatus alterSchemaTemplate(TAlterSchemaTemplateReq req) {
TemplateAlterOperationUtil.parseOperationType(buffer);
if (operationType.equals(TemplateAlterOperationType.EXTEND_TEMPLATE)) {
return clusterSchemaManager.extendSchemaTemplate(
TemplateAlterOperationUtil.parseTemplateExtendInfo(buffer), false);
TemplateAlterOperationUtil.parseTemplateExtendInfo(buffer), false, null);
}
return RpcUtils.getStatus(TSStatusCode.UNSUPPORTED_OPERATION);
} else {
Expand All @@ -2102,6 +2106,7 @@ public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
PathPatternTree rawPatternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
boolean isGeneratedByPipe = req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe();
String originClusterId = req.getOriginClusterId();
/**
* If delete pattern is prefix path (such as root.db.**), it may be optimized to delete
* database plus create database. We need to determine two conditions: whether the pattern
Expand All @@ -2126,7 +2131,8 @@ public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
deleteTimeSeriesPatternPaths.add(path);
}
if (!canOptimize) {
return procedureManager.deleteTimeSeries(queryId, rawPatternTree, isGeneratedByPipe);
return procedureManager.deleteTimeSeries(
queryId, rawPatternTree, isGeneratedByPipe, originClusterId);
}
// check if the database is using template
try {
Expand All @@ -2143,14 +2149,14 @@ public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
deleteTimeSeriesPatternTree.constructTree();
status =
procedureManager.deleteTimeSeries(
queryId, deleteTimeSeriesPatternTree, isGeneratedByPipe);
queryId, deleteTimeSeriesPatternTree, isGeneratedByPipe, originClusterId);
}
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// 2. delete database
List<TSStatus> failedStatus = new ArrayList<>();
status =
procedureManager.deleteDatabases(
new ArrayList<>(deleteDatabaseSchemas), isGeneratedByPipe);
new ArrayList<>(deleteDatabaseSchemas), isGeneratedByPipe, originClusterId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failedStatus.add(status);
}
Expand All @@ -2159,7 +2165,8 @@ public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
status =
clusterSchemaManager.setDatabase(
new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase, databaseSchema),
isGeneratedByPipe);
isGeneratedByPipe,
originClusterId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failedStatus.add(status);
}
Expand Down Expand Up @@ -2347,7 +2354,9 @@ public TPipeConfigTransferResp handleTransferConfigPlan(TPipeConfigTransferReq r
.setType(req.type)
.setBody(req.body)
: new TPipeTransferReq(req.version, req.type, req.body));
return new TPipeConfigTransferResp(result.status).setBody(result.body);
return new TPipeConfigTransferResp(result.status)
.setBody(result.body)
.setClusterId(result.clusterId);
}

@Override
Expand Down Expand Up @@ -2676,14 +2685,16 @@ public TSStatus alterOrDropTable(final TAlterOrDropTableReq req) {
req.getDatabase(),
req.getTableName(),
ReadWriteIOUtils.readString(req.updateInfo),
false);
false,
null);
case COMMENT_COLUMN:
return clusterSchemaManager.setTableColumnComment(
req.getDatabase(),
req.getTableName(),
ReadWriteIOUtils.readString(req.updateInfo),
ReadWriteIOUtils.readString(req.updateInfo),
false);
false,
null);
default:
throw new IllegalArgumentException();
}
Expand All @@ -2696,7 +2707,7 @@ public TSStatus alterOrDropTable(final TAlterOrDropTableReq req) {
public TDeleteTableDeviceResp deleteDevice(final TDeleteTableDeviceReq req) {
final TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
? procedureManager.deleteDevices(req, false)
? procedureManager.deleteDevices(req, false, null)
: new TDeleteTableDeviceResp(status);
}

Expand Down
Loading