Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -379,4 +379,55 @@ public void testAutoManualCreateRace() throws Exception {
"root.ln.wf01.wt01.status,null,root.ln,BOOLEAN,RLE,LZ4,{\"tag3\":\"v3\"},{\"attr4\":\"v4\"},null,null,BASE,"));
}
}

@Test
public void testHistoricalActivationRace() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {

if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"create database root.sg_aligned",
"create device template aligned_template aligned (s0 int32, s1 int64, s2 float, s3 double, s4 boolean, s5 text)",
"set device template aligned_template to root.sg_aligned.device_aligned",
"create timeseries using device template on root.sg_aligned.device_aligned.d10",
"create timeseries using device template on root.sg_aligned.device_aligned.d12",
"insert into root.sg_aligned.device_aligned.d10(time, s0, s1, s2,s3,s4,s5) values (1706659200,1706659200,10,20.245,25.24555,true,''),(1706662800,null,1706662800,20.241,25.24111,false,'2'),(1706666400,3,null,20.242,25.24222,true,'3'),(1706670000,4,40,null,35.5474,true,'4'),(1706670600,5,1706670600000,20.246,null,false,'5'),(1706671200,6,60,20.248,25.24888,null,'6'),(1706671800,7,1706671800,20.249,25.24999,false,null),(1706672400,8,80,1245.392,75.51234,false,'8'),(1706672600,9,90,2345.397,2285.58734,false,'9'),(1706673000,10,100,20.241,25.24555,false,'10'),(1706673600,11,110,3345.394,4105.544,false,'11'),(1706674200,12,1706674200,30.245,35.24555,false,'12'),(1706674800,13,130,5.39,125.51234,false,'13'),(1706675400,14,1706675400,5.39,135.51234,false,'14'),(1706676000,15,150,5.39,145.51234,false,'15'),(1706676600,16,160,5.39,155.51234,false,'16'),(1706677200,17,170,5.39,165.51234,false,'17'),(1706677600,18,180,5.39,175.51234,false,'18'),(1706677800,19,190,5.39,185.51234,false,'19'),(1706678000,20,200,5.39,195.51234,false,'20'),(1706678200,21,210,5.39,null,false,'21')",
"insert into root.sg_aligned.device_aligned.d10(time, s0, s1, s2,s3,s4,s5) values (-1,1,10,5.39,5.51234,false,'negative')",
"insert into root.sg_aligned.device_aligned.d11(time, s0, s1, s2,s3,s4,s5) values (-1,-11,-110,-5.39,-5.51234,false,'activate:1')",
"insert into root.sg_aligned.device_aligned.d10(time, s0, s1, s2,s3,s4,s5,s6) values(1706678800,1,1706678800,5.39,5.51234,false,'add:s6',32);"))) {
return;
}

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.inclusion", "all");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());

TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "count devices", "count(devices),", Collections.singleton("3,"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ public TSStatus visitActivateTemplate(
@Override
public TSStatus visitBatchActivateTemplate(
final BatchActivateTemplateNode node, final ISchemaRegion schemaRegion) {
final List<TSStatus> statusList = new ArrayList<>();
for (final Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
node.getTemplateActivationMap().entrySet()) {
final Template template =
Expand All @@ -455,10 +456,10 @@ public TSStatus visitBatchActivateTemplate(
template);
} catch (final MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
statusList.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
return statusList.isEmpty() ? RpcUtils.SUCCESS_STATUS : RpcUtils.getStatus(statusList);
}

@Override
Expand Down