Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
Expand All @@ -39,6 +40,8 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -81,13 +84,12 @@ public void setUp() {
public void testSingleEnv() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String sql =
String.format(
"create pipe a2b with source ('source'='iotdb-source') with processor ('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
receiverDataNode.getIpAndPortString());
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(sql);
statement.execute(
String.format(
"create pipe a2b with sink ('node-urls'='%s')",
receiverDataNode.getIpAndPortString()));
} catch (final SQLException e) {
fail(e.getMessage());
}
Expand All @@ -104,5 +106,47 @@ public void testSingleEnv() throws Exception {
|| (Objects.equals(showPipeResult.get(1).id, "a2b_history")
&& Objects.equals(showPipeResult.get(0).id, "a2b_realtime")));
}

// Do not split for pipes without insertion or non-full
TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"drop pipe a2b_history",
"drop pipe a2b_realtime",
String.format(
"create pipe a2b1 with source ('inclusion'='schema') with sink ('node-urls'='%s')",
receiverDataNode.getIpAndPortString()),
String.format(
"create pipe a2b2 with source ('realtime.enable'='false') with sink ('node-urls'='%s')",
receiverDataNode.getIpAndPortString()),
String.format(
"create pipe a2b3 with source ('history.enable'='false') with sink ('node-urls'='%s')",
receiverDataNode.getIpAndPortString())));

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final List<TShowPipeInfo> showPipeResult =
client.showPipe(new TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(3, showPipeResult.size());
}

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"drop pipe a2b1",
"drop pipe a2b2",
"drop pipe a2b3",
"insert into root.test.device(time, field) values(0,1),(1,2)",
"delete from root.test.device.* where time == 0",
String.format(
"create pipe a2b with source ('inclusion'='all') with sink ('node-urls'='%s')",
receiverDataNode.getIpAndPortString())));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.test.device",
"Time,root.test.device.field,",
Collections.singleton("1,2.0,"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ public boolean hasPipeReleaseRegionRelatedResource(final int consensusGroupId) {
}
}

public boolean isFullSync(final PipeParameters parameters) {
public boolean isFullSync(final PipeParameters parameters) throws IllegalPathException {
if (isSnapshotMode(parameters)) {
return false;
}
Expand All @@ -598,7 +598,10 @@ public boolean isFullSync(final PipeParameters parameters) {
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);

return isHistoryEnable && isRealtimeEnable;
return isHistoryEnable
&& isRealtimeEnable
&& DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
.getLeft();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
Expand Down Expand Up @@ -2205,10 +2206,10 @@
new PipeParameters(createPipeStatement.getSourceAttributes());
final PipeParameters sinkPipeParameters =
new PipeParameters(createPipeStatement.getSinkAttributes());
if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
&& PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
if (PipeConfig.getInstance().getPipeAutoSplitFullEnabled()
&& PipeDataNodeAgent.task().isFullSync(sourcePipeParameters)) {
// 1. Send request to create the real-time data synchronization pipeline
final TCreatePipeReq realtimeReq =
new TCreatePipeReq()
Expand Down Expand Up @@ -2253,11 +2254,17 @@
Boolean.toString(false),
PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY,
Boolean.toString(true),
// We force the historical pipe to transfer data only
// We force the historical pipe to transfer data (and maybe
// deletion) only
// Thus we can transfer schema only once
// And may drop the historical pipe on successfully transferred
PipeSourceConstant.SOURCE_INCLUSION_KEY,
PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
DataRegionListeningFilter
.parseInsertionDeletionListeningOptionPair(
sourcePipeParameters)
.getRight()

Check warning on line 2265 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use a primitive boolean expression here.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0emyHKXe-QB8--UPga&open=AZ0emyHKXe-QB8--UPga&pullRequest=17346
? "data"
: PipeSourceConstant.EXTRACTOR_INCLUSION_DEFAULT_VALUE,
PipeSourceConstant.SOURCE_EXCLUSION_KEY,
PipeSourceConstant.EXTRACTOR_EXCLUSION_DEFAULT_VALUE)))
.getAttribute())
Expand All @@ -2280,27 +2287,20 @@

// 3. Set success status only if both pipelines are created successfully
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
} catch (final Exception e) {
// Catch any other exceptions (e.g., network issues)
future.setException(e);
}
return future;
}

try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TCreatePipeReq req =
new TCreatePipeReq()
.setPipeName(pipeName)
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
final TSStatus tsStatus = configNodeClient.createPipe(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
future.setException(new IoTDBException(tsStatus));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
final TCreatePipeReq req =
new TCreatePipeReq()
.setPipeName(pipeName)
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
.setExtractorAttributes(createPipeStatement.getSourceAttributes())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
.setConnectorAttributes(createPipeStatement.getSinkAttributes());
final TSStatus tsStatus = configNodeClient.createPipe(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
future.setException(new IoTDBException(tsStatus));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
}
} catch (final Exception e) {
future.setException(e);
Expand All @@ -2309,7 +2309,7 @@
}

@Override
public SettableFuture<ConfigTaskResult> alterPipe(final AlterPipeStatement alterPipeStatement) {

Check warning on line 2312 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 178 to 64, Complexity from 21 to 14, Nesting Level from 4 to 2, Number of Variables from 22 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0emyHKXe-QB8--UPgb&open=AZ0emyHKXe-QB8--UPgb&pullRequest=17346
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();

// Validate pipe name
Expand Down
Loading