Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
Expand Down Expand Up @@ -519,6 +521,84 @@ public void testGetSlots() throws Exception {
}
}

@Test
public void testGetRegionGroupsByTime() throws Exception {
final String sg0 = "root.sg0";

try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {

// Get all region groups covering the full time range
TGetRegionGroupsByTimeReq req = new TGetRegionGroupsByTimeReq(sg0, 0L, Long.MAX_VALUE);
TGetRegionGroupsByTimeResp resp = client.getRegionGroupsByTime(req);
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
Assert.assertNotNull(resp.getRegionReplicaSets());
Assert.assertFalse(resp.getRegionReplicaSets().isEmpty());
int allRegionGroupCount = resp.getRegionReplicaSetsSize();

// Each replica set should have testReplicationFactor replicas
resp.getRegionReplicaSets()
.forEach(
replicaSet -> {
Assert.assertEquals(testReplicationFactor, replicaSet.getDataNodeLocationsSize());
Assert.assertEquals(
TConsensusGroupType.DataRegion, replicaSet.getRegionId().getType());
});

// Query with a single time slot range should return a subset
TGetRegionGroupsByTimeReq singleSlotReq =
new TGetRegionGroupsByTimeReq(sg0, 0L, testTimePartitionInterval - 1);
TGetRegionGroupsByTimeResp singleSlotResp = client.getRegionGroupsByTime(singleSlotReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), singleSlotResp.getStatus().getCode());
Assert.assertNotNull(singleSlotResp.getRegionReplicaSets());
Assert.assertFalse(singleSlotResp.getRegionReplicaSets().isEmpty());
Assert.assertTrue(singleSlotResp.getRegionReplicaSetsSize() <= allRegionGroupCount);

// Query with a disjoint time range should return empty result
TGetRegionGroupsByTimeReq disjointReq =
new TGetRegionGroupsByTimeReq(
sg0,
testTimePartitionSlotsNum * testTimePartitionInterval * 2,
testTimePartitionSlotsNum * testTimePartitionInterval * 3);
TGetRegionGroupsByTimeResp disjointResp = client.getRegionGroupsByTime(disjointReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), disjointResp.getStatus().getCode());
Assert.assertTrue(
disjointResp.getRegionReplicaSets() == null
|| disjointResp.getRegionReplicaSets().isEmpty());

// Query non-existent database should return empty result
TGetRegionGroupsByTimeReq nonExistReq =
new TGetRegionGroupsByTimeReq("root.nonexistent", 0L, Long.MAX_VALUE);
TGetRegionGroupsByTimeResp nonExistResp = client.getRegionGroupsByTime(nonExistReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), nonExistResp.getStatus().getCode());
Assert.assertTrue(
nonExistResp.getRegionReplicaSets() == null
|| nonExistResp.getRegionReplicaSets().isEmpty());

// Verify consistency: union of per-slot queries should equal full-range query
Set<TConsensusGroupId> unionRegionIds = new HashSet<>();
for (long t = 0; t < testTimePartitionSlotsNum; t++) {
TGetRegionGroupsByTimeReq perSlotReq =
new TGetRegionGroupsByTimeReq(
sg0, t * testTimePartitionInterval, t * testTimePartitionInterval);
TGetRegionGroupsByTimeResp perSlotResp = client.getRegionGroupsByTime(perSlotReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), perSlotResp.getStatus().getCode());
if (perSlotResp.getRegionReplicaSets() != null) {
perSlotResp
.getRegionReplicaSets()
.forEach(replicaSet -> unionRegionIds.add(replicaSet.getRegionId()));
}
}
Set<TConsensusGroupId> allRegionIds = new HashSet<>();
resp.getRegionReplicaSets().forEach(replicaSet -> allRegionIds.add(replicaSet.getRegionId()));
Assert.assertEquals(allRegionIds, unionRegionIds);
}
}

@Test
public void testGetSchemaNodeManagementPartition() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan;
import org.apache.iotdb.confignode.consensus.request.read.subscription.ShowTopicPlan;
import org.apache.iotdb.confignode.consensus.request.write.ainode.RegisterAINodePlan;
import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
Expand Down Expand Up @@ -248,6 +249,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case RemoveRegionLocation:
plan = new RemoveRegionLocationPlan();
break;
case GetRegionGroupsByTime:
plan = new GetRegionGroupsByTimePlan();
break;
case OfferRegionMaintainTasks:
plan = new OfferRegionMaintainTasksPlan();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
CountTimeSlotList((short) 310),
AddRegionLocation((short) 311),
RemoveRegionLocation((short) 312),
GetRegionGroupsByTime((short) 313),

Check failure on line 74 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this constant name to match the regular expression '^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ25CY5ATioQMx0i68Or&open=AZ25CY5ATioQMx0i68Or&pullRequest=17545

/** Partition. */
GetSchemaPartition((short) 400),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.consensus.request.read.region;

import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class GetRegionGroupsByTimePlan extends ConfigPhysicalReadPlan {

private String database;

private TTimePartitionSlot startTimeSlot;

private TTimePartitionSlot endTimeSlot;

public GetRegionGroupsByTimePlan() {
super(ConfigPhysicalPlanType.GetRegionGroupsByTime);
}

public GetRegionGroupsByTimePlan(
final String database, final long startTime, final long endTime) {
super(ConfigPhysicalPlanType.GetRegionGroupsByTime);
this.database = database;
this.startTimeSlot = TimePartitionUtils.getTimePartitionSlot(startTime);
this.endTimeSlot = TimePartitionUtils.getTimePartitionSlot(endTime);
}

public String getDatabase() {
return database;
}

public TTimePartitionSlot getStartTimeSlot() {
return startTimeSlot;
}

public TTimePartitionSlot getEndTimeSlot() {
return endTimeSlot;
}

@Override
protected void serializeImpl(final DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
BasicStructureSerDeUtil.write(database, stream);
stream.writeLong(startTimeSlot.getStartTime());
stream.writeLong(endTimeSlot.getStartTime());
}

@Override
protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
database = BasicStructureSerDeUtil.readString(buffer);
startTimeSlot = new TTimePartitionSlot(buffer.getLong());
endTimeSlot = new TTimePartitionSlot(buffer.getLong());
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final GetRegionGroupsByTimePlan that = (GetRegionGroupsByTimePlan) o;
return Objects.equals(database, that.database)
&& Objects.equals(startTimeSlot, that.startTimeSlot)
&& Objects.equals(endTimeSlot, that.endTimeSlot);
}

@Override
public int hashCode() {
return Objects.hash(database, startTimeSlot, endTimeSlot);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.consensus.response.partition;

import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;

import java.util.Set;

public class GetRegionGroupsByTimeResp implements DataSet {

private final TSStatus status;

private final Set<TRegionReplicaSet> regionReplicaSets;

public GetRegionGroupsByTimeResp(
final TSStatus status, final Set<TRegionReplicaSet> regionReplicaSets) {
this.status = status;
this.regionReplicaSets = regionReplicaSets;
}

public TSStatus getStatus() {
return status;
}

public TGetRegionGroupsByTimeResp convertToRpcResp() {
TGetRegionGroupsByTimeResp resp = new TGetRegionGroupsByTimeResp();
resp.setStatus(status);

if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
resp.setRegionReplicaSets(regionReplicaSets);
}

return resp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
Expand Down Expand Up @@ -2569,6 +2571,14 @@ public TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) {
: new TGetSeriesSlotListResp(status);
}

@Override
public TGetRegionGroupsByTimeResp getRegionGroupsByTime(TGetRegionGroupsByTimeReq req) {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
? partitionManager.getRegionGroupsByTime(req).convertToRpcResp()
: new TGetRegionGroupsByTimeResp(status);
}

@Override
public TSStatus migrateRegion(TMigrateRegionReq req) {
TSStatus status = confirmLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
Expand Down Expand Up @@ -854,6 +856,13 @@ TPermissionInfoResp login(
*/
TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req);

/**
* Get DataRegion groups that overlap a time range for the given database.
*
* @return TGetRegionGroupsByTimeResp.
*/
TGetRegionGroupsByTimeResp getRegionGroupsByTime(TGetRegionGroupsByTimeReq req);

TSStatus migrateRegion(TMigrateRegionReq req);

TSStatus reconstructRegion(TReconstructRegionReq req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan;
Expand All @@ -60,6 +61,7 @@
import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
import org.apache.iotdb.confignode.consensus.response.partition.CountTimeSlotListResp;
import org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp;
import org.apache.iotdb.confignode.consensus.response.partition.GetRegionGroupsByTimeResp;
import org.apache.iotdb.confignode.consensus.response.partition.GetRegionIdResp;
import org.apache.iotdb.confignode.consensus.response.partition.GetSeriesSlotListResp;
import org.apache.iotdb.confignode.consensus.response.partition.GetTimeSlotListResp;
Expand All @@ -82,6 +84,7 @@
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
Expand Down Expand Up @@ -1180,6 +1183,19 @@ public GetRegionIdResp getRegionId(final TGetRegionIdReq req) {
}
}

public GetRegionGroupsByTimeResp getRegionGroupsByTime(final TGetRegionGroupsByTimeReq req) {
final GetRegionGroupsByTimePlan plan =
new GetRegionGroupsByTimePlan(req.getDatabase(), req.getStartTime(), req.getEndTime());
try {
return (GetRegionGroupsByTimeResp) getConsensusManager().read(plan);
} catch (final ConsensusException e) {
LOGGER.warn(CONSENSUS_READ_ERROR, e);
final TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
return new GetRegionGroupsByTimeResp(res, Collections.emptySet());
}
}

public GetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) {
long startTime = req.isSetStartTime() ? req.getStartTime() : Long.MIN_VALUE;
long endTime = req.isSetEndTime() ? req.getEndTime() : Long.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.table.DescTablePlan;
Expand Down Expand Up @@ -355,6 +356,8 @@ public DataSet executeQueryPlan(final ConfigPhysicalReadPlan req)
return partitionInfo.countTimeSlotList((CountTimeSlotListPlan) req);
case GetSeriesSlotList:
return partitionInfo.getSeriesSlotList((GetSeriesSlotListPlan) req);
case GetRegionGroupsByTime:
return partitionInfo.getRegionGroupsByTime((GetRegionGroupsByTimePlan) req);
case SHOW_CQ:
return cqInfo.showCQ();
case ShowExternalService:
Expand Down
Loading
Loading