Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@

import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;

Expand Down Expand Up @@ -146,15 +144,6 @@ static boolean shouldPlanBeListened(ConfigPhysicalPlan plan) {
return false;
}

// Do not transfer system database plan
if (type.equals(ConfigPhysicalPlanType.CreateDatabase)
&& ((DatabaseSchemaPlan) plan)
.getSchema()
.getName()
.equals(SchemaConstant.SYSTEM_DATABASE)) {
return false;
}

// PipeEnriched & UnsetTemplate are not listened directly,
// but their inner plan or converted plan are listened.
return type.equals(ConfigPhysicalPlanType.PipeEnriched)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
Expand Down Expand Up @@ -181,13 +180,6 @@ private TSStatus executePlanAndClassifyExceptions(ConfigPhysicalPlan plan) {
private TSStatus executePlan(ConfigPhysicalPlan plan) throws ConsensusException {
switch (plan.getType()) {
case CreateDatabase:
if (((DatabaseSchemaPlan) plan)
.getSchema()
.getName()
.equals(SchemaConstant.SYSTEM_DATABASE)) {
// System database doesn't need transferring
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
// Here we only reserve database name and substitute the sender's local information
// with the receiver's default configurations
TDatabaseSchema schema = ((DatabaseSchemaPlan) plan).getSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public TSStatus visitCreateDatabase(DatabaseSchemaPlan plan, TSStatus context) {
// Lower or higher level database has been created
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
Expand Down Expand Up @@ -89,9 +90,12 @@ public TSStatus visitBatchActivateTemplate(
return visitGeneralActivateTemplate(batchActivateTemplateStatement, context);
}

// InternalBatchActivateTemplateNode is converted to BatchActivateTemplateStatement
// No need to handle InternalBatchActivateTemplateStatement

private TSStatus visitGeneralActivateTemplate(
Statement activateTemplateStatement, Exception context) {
if (context instanceof MetadataException) {
if (context instanceof MetadataException || context instanceof StatementAnalyzeException) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ private TSStatus visitInsertBase(InsertBaseStatement insertBaseStatement, TSStat

@Override
public TSStatus visitCreateTimeseries(CreateTimeSeriesStatement statement, TSStatus context) {
if (context.getCode() == TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
return visitGeneralCreateTimeSeries(statement, context);
}

Expand All @@ -99,7 +95,9 @@ private TSStatus visitGeneralCreateTimeSeries(Statement statement, TSStatus cont
|| context.getCode() == TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() == TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()) {
} else if (context.getCode() == TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
|| context.getCode() == TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()
|| context.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
Expand All @@ -125,8 +123,7 @@ public TSStatus visitInternalCreateMultiTimeSeries(
return visitGeneralCreateMultiTimeseries(internalCreateMultiTimeSeriesStatement, context);
}

private TSStatus visitGeneralCreateMultiTimeseries(
Statement internalCreateTimeSeriesStatement, TSStatus context) {
private TSStatus visitGeneralCreateMultiTimeseries(Statement statement, TSStatus context) {
if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
for (TSStatus status : context.getSubStatus()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
Expand All @@ -135,13 +132,16 @@ private TSStatus visitGeneralCreateMultiTimeseries(
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
return visitStatement(internalCreateTimeSeriesStatement, context);
return visitStatement(statement, context);
}
}
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
return visitStatement(internalCreateTimeSeriesStatement, context);
return visitStatement(statement, context);
}

@Override
Expand All @@ -166,7 +166,10 @@ public TSStatus visitAlterTimeseries(
@Override
public TSStatus visitCreateLogicalView(
CreateLogicalViewStatement createLogicalViewStatement, TSStatus context) {
if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
if (context.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
for (TSStatus status : context.getSubStatus()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
Expand Down